Graceful coroutine exit upon KeyboardInterrupt

Hello everyone,

Does someone have an example or can point me as to how to implement a graceful exit upon user issuing a Ctrl-C to shutdown the sanic server?

Rgs!

you can always try…catch KeyboardInterrupt or you can add_signal_handler to the event loop for SIGHUP, SIGTERM and SIGINT and handle your shutdown sequence in its callback

1 Like

Thank you! I have added the Keyboardinterrupt, and added it to both coroutines methods which are used via add_task(), whould that be correct approach? An example would be highly appreciated.

I would suggest wrapping the main entry point of your code in the try…except block, i.e. where you run add_task in the first place.

I am not completely sure about the following (and would like someone else’s opinion, as well), but the context switching between asyncio coroutines running in parallel is based on the concept of cooperative multitasking wherein a coroutine will voluntarily revoke its control of the event loop (via the await keyword) so that some other coroutine can use it and the other coroutine will do this as well. Now there would be a point in runtime where the control of event loop would be in transition and not acquired by any coroutine yet, so, if incidentally the user pressed Ctrl-C at this point (whose window may be practically very narrow in the order of ms) your program may fall through without a graceful exit. Just something to keep in mind while designing your asyncio programs.

Turns out I was wrong on the wrapping the whole entry with add_task. If the coroutine throws an exception it won’t get bubbled up past the add_task call. So your idea of adding try…except around every coroutine will work fine. If somehow your project has a large number of coroutine, adding try…except might be a hassle, so I will list a few solutions for anyone else who might stumble on to this thread. One way of doing this would be to attach a global exception handler to the event loop with

loop.set_exception_handler(handler_func)

One other way of doing this would be to make a wrapper coroutine that takes your coroutine as an argument wraps its in try…except to handle exceptions and awaits the provided coroutine, and pass this wrapper coroutine to add_task. So now you can have different exception handlers for different coroutines.

1 Like

@syfluqs Apart from wrapping each co-routine, I like the idea of using a decorator to warp each individual co-routine and getting rid of clutter and also set the loop.set_exception_handler(handler_func).

My experience so far is that sometimes the try/except wrapping each individual co-routine works as expected but some other times it does not and I get a non VERY DISGRACEFUL exit.

Will reply back with the results of this.

Thank you for your help!

Hi, I tried adding the exception handler but got an error:

Traceback (most recent call last):
  File "/home/fbenavides/.cache/pypoetry/virtualenvs/sanic-queues-6qajkZ6g-py3.8/lib/python3.8/site-packages/sanic/server.py", line 895, in serve
    http_server = loop.run_until_complete(server_coroutine)
  File "uvloop/loop.pyx", line 1450, in uvloop.loop.Loop.run_until_complete
  File "uvloop/loop.pyx", line 1443, in uvloop.loop.Loop.run_until_complete
  File "uvloop/loop.pyx", line 1351, in uvloop.loop.Loop.run_forever
  File "uvloop/loop.pyx", line 484, in uvloop.loop.Loop._run
RuntimeError: Cannot run the event loop while another loop is running

Here is the handler:

    def exception_handler(loop: asyncio.AbstractEventLoop, context):
    exception = context.get("exception")
    if isinstance(exception, KeyboardInterrupt):
        print("Going down...")
        loop.stop()

And here the main function:

def main(host, port):
    server = app.create_server(host=host, port=port):
    loop = asyncio.get_event_loop()
    loop.set_exception_handler(exception_handler)
    asyncio.ensure_future(server, loop=loop)
    loop.run_forever()

What could be wrong?

you need to add return_asyncio_server=True keyword argument to the app.create_server call, otherwise sanic assumes that the webserver is standalone and starts the event loop with only the webserver coroutine automatically.

Here’s the code:

looks like your exception is in task_distributor module, which is looking for an instance member distributing in the app object. There isn’t such a member in the sanic sources, so the error might be somewhere in your side of the implementation. Maybe share the task_distributor module code and we can figure out why the exception happens.

@syfluqs Here in the code for the task_distributor module:

import logging
from typing import Any
from asyncio import sleep
from asyncio import Queue
from asyncio import Lock
from asyncio import QueueEmpty
from asyncio import QueueFull
from asyncio import AbstractEventLoop
from asyncio import get_event_loop

import sanic.response
from sanic import Sanic
from sanic import response
from sanic.views import HTTPMethodView
from sanic.exceptions import ServerError

from config import ACTIVE_OPERATOR
from asb_queues import AzureServiceBusQueue
from task_executor import TaskExecutor
from .constants import QUEUE_MAXSIZE
from .constants import SLEEP_WAIT

logger = logging.getLogger(__name__)
app = Sanic(__name__)


def exception_handler(current_loop: AbstractEventLoop, context):
    """ Exception Handler for Ctrl-C/KeyboardInterrpt """
    exception = context.get('exception')
    if isinstance(exception, KeyboardInterrupt):
        logger.info('Going down...')
        current_loop.stop()


loop = get_event_loop()
loop.set_exception_handler(exception_handler)


class TaskDistributorException(Exception):
    """ TaskDistributor generic exception handler """
    pass


class TaskDistributor:
    """ AsyncIO Queue Task Distributor """

    __slots__ = ('_test_environment',
                 '_task_queue',
                 '_task_lock',
                 '_results_queue',
                 '_results_lock',
                 '_asb_task_queue',
                 '_asb_task_lock')

    def __init__(self, sanic_loop: Sanic.loop, test_environment: bool = True) -> None:
        """ Constructor

        :param sanic_loop: Current loop being used
        """
        self._test_environment = test_environment
        logger.info(f'[[{("PROD", "TEST")[self._test_environment]}]] Task Distributor')

        # The queue implements multi-producer, multi-consumer queues.
        self._task_queue = Queue(maxsize=QUEUE_MAXSIZE, loop=sanic_loop)
        self._results_queue = Queue(maxsize=QUEUE_MAXSIZE, loop=sanic_loop)
        self._asb_task_queue = AzureServiceBusQueue(sanic_loop, self._test_environment)
        # The lock implements a Mutex for asynchronous tasks
        self._task_lock = Lock(loop=sanic_loop)
        self._results_lock = Lock(loop=sanic_loop)
        self._asb_task_lock = Lock(loop=sanic_loop)
        logger.info(f'AIO [{", ".join(ACTIVE_OPERATOR)}] ISO(s)')

    def __del__(self) -> None:
        """ Destructor """
        if not self._task_empty:
            logger.warning(f'AIO Task Queue [pending] {self._task_qsize}')
        if not self._results_empty:
            logger.warning(f'AIO Results Queue [pending] {self._task_qsize}')
        if self._task_queue:
            del self._task_queue
        if self._task_lock:
            del self._task_lock
        if self._results_queue:
            del self._results_queue
        if self._results_lock:
            del self._results_lock
        if self._asb_task_queue:
            del self._asb_task_queue
        if self._asb_task_lock:
            del self._asb_task_lock

    @property
    def _task_qsize(self) -> int:
        """ Get task queue size

        :return: Queue size
        """
        return self._task_queue.qsize()

    @property
    def _results_qsize(self) -> int:
        """ Get results queue size

        :return: Queue size
        """
        return self._results_queue.qsize()

    @property
    def _task_empty(self) -> bool:
        """ Define if task queue is empty or not

        :return: True if task queue is empty; otherwise, False
        """
        return self._task_queue.empty()

    @property
    def _results_empty(self) -> bool:
        """ Define if results queue is empty or not

        :return: True if results queue is empty; otherwise, False
        """
        return self._results_queue.empty()

    async def _acquire(self, queue: Queue, lock: Lock) -> Any:
        """ Get latest object from the given queue """
        async with lock:
            try:
                return queue.get_nowait()

            except QueueEmpty as e:
                logger.warning(f'AIO Queue [empty] {str(e)}')
                return None

            except Exception as e:
                err = f'AIO Queue [acquire] {str(e)}'
                logger.error(err)
                raise ServerError(err)

    async def _execute_task(self, task: dict) -> sanic.response:
        """ Execute the given task

        :param task: Task object to be processed
        """

        task_executor = TaskExecutor(
            self._results_queue,
            self._results_lock,
            task)

        results = await task_executor.process_task()
        if results:
            logger.info('AIO [results]')
        return results

    async def deposit_task(self, task: dict) -> bool:
        """ Add a given task object to the task queue

        :param task: Incoming task request object
        """
        async with self._task_lock:
            try:
                self._task_queue.put_nowait(task)
                return True

            except QueueFull as e:
                logger.warning(f'AIO Task Queue [full] {str(e)}')
                return False

            except Exception as e:
                err = f'AIO Task Queue [store] {str(e)}'
                logger.error(err)
                raise ServerError(err)

    async def aqueue_results_manager(self) -> response.stream:
        """ Query the results queue to reply back the results object, if any """
        logger.info('Starting AIO Queue Results Manager...')

        try:
            async def stream_results(response):
                while True:

                    # logger.debug('AIO Results Queue')
                    if not self._results_empty:
                        results = await self._acquire(self._results_queue, self._results_lock)
                        await response.write(results)
                        # logger.debug('AIO Results Queue [replied]')
                        if ('error' in results) or ('end' in results):
                            break

                    await sleep(SLEEP_WAIT)

            return response.stream(stream_results, content_type='application/json')

        except KeyboardInterrupt as e:
            err = f'Going down... {e}'
            logger.debug(err)
            raise TaskDistributorException(err)

    async def aqueue_task_manager(self) -> None:
        """ Query the task queue to process a given task object, if any """
        logger.info('Starting AIO Queue Task Manager...')

        try:
            while True:

                # logger.debug('AIO Task Queue')
                if not self._task_empty:
                    task = await self._acquire(self._task_queue, self._task_lock)
                    logger.debug('AIO Task Queue [acquired]')
                    await self._execute_task(task)

                await sleep(SLEEP_WAIT)

        except KeyboardInterrupt as e:
            err = f'Going down... {e}'
            logger.debug(err)
            raise TaskDistributorException(err)

    async def asbqueue_task_manager(self) -> None:
        """ Query the Azure Service Bus Queue to process a given task object, if any """
        logger.info('Starting ASB Queue Task Manager...')

        try:
            while True:

                # logger.debug('ASB Queue')
                tasks = await self._asb_task_queue.get()
                for task in tasks:
                    logger.debug('ASB Queue [acquired]')
                    await self._execute_task(task)

                await sleep(SLEEP_WAIT)

        except KeyboardInterrupt as e:
            err = f'Going down... {e}'
            logger.debug(err)
            raise TaskDistributorException(err)


class TaskDistributorApi(HTTPMethodView):
    """ Sanic AsyncIO API """

    async def get(self, request: sanic.request) -> response.json:
        """ Take a given task object from the FIFO queue, if not empty

        :param request: Incoming client request for latest available given task object, if any
        :return: A given task object
        """
        logger.info('PING')
        return response.json({'ping': 'ISOlib ok'})

    async def post(self, request: sanic.request):
        """ Add a given task object into the FIFO queue, if not full

        :param request: Incoming client request that contains a given task object
        :return: True if task object was added to task queue; otherwise, False
        """
        if request.app.distributing:
            task = request.json
            if await request.app.distributor.deposit_task(task):
                logger.info(f'AIO [tasks] {task}')
                return await request.app.distributor.aqueue_results_manager()


@app.middleware('request')
def check_task_json(request: sanic.request) -> None:
    """ Verify incoming request is valid JSON

    :param request: Incoming user request
    """
    try:
        _ = request.json

    except Exception as e:
        err = f'AIO Invalid JSON Format {e}'
        logger.error(err)

        return response.json({'error': err})


@app.listener('after_server_start')
def create_task_queue(application: Sanic, sanic_loop: Sanic.loop) -> None:
    """ Start background worker processes

    :param application: Application instance
    :param sanic_loop: Current loop handler to be used
    """
    application.distributor = TaskDistributor(sanic_loop=sanic_loop)
    application.distributing = True
    application.add_task(application.distributor.asbqueue_task_manager())
    application.add_task(application.distributor.aqueue_task_manager())


app.add_route(TaskDistributorApi.as_view(), methods=['GET', 'POST'], uri='/')

```

The after server start listener @app.listener('after_server_start') does not hook if sanic server is started with the async server. See note here (https://sanic.readthedocs.io/en/latest/sanic/deploying.html#asynchronous-support-and-sharing-the-loop) for the workaround.

@syfluqs THANK YOU!!! it worked like a charm :smiley: A complete clean graceful exit two times!!! I will continue testing and come back with confirmation.

@syfluqs Thank you again graceful clean exit works just great! :smiley: Yet the other couroutines added with the add_task are not executed :frowning: Any pointers?

The server.after_start() fails to trigger the coroutines previosly added.

Here is where the worker tasks are added, prior to running the create_server:

The following sequence allows the worker coroutines to be properly propagated and executed, and also allow a graceful clean shutdown (any further improvements are welcomed):