Task was destroyed but it is pending!

Hi there!

I’m having a hard time trying to avoid the following issue:

Task was destroyed but it is pending!
task: <Task pending name=‘Task-15’ coro=<WebsocketFrameAssembler.get() running at /home/cx42/www/fernand/api/env/lib64/python3.8/site-packages/sanic/server/websockets/frame.py:117> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f2566d42160>()] created at /usr/lib64/python3.8/asyncio/locks.py:306> created at /usr/lib64/python3.8/asyncio/tasks.py:424>

This is related to the websocket, and I’ve followed the guide here.

The write system is done on background task, and the recv is on an infinite loop, like in the gist:

    try:
        await client.receiver()
    finally:
        await channel.unregister(client)

I’ve added a signal even for shutdown, to stop the connections :

@app.signal(Event.SERVER_SHUTDOWN_BEFORE)
async def unregister_clients(app, loop):
    try:
        await app.cancel_task('stream_keepalive')  # Kills the keepalive task
    except SanicException:
        # That function was never created
        pass

    await Channel.close()  # Call all the clients close

The close for each clients calls await self.ws.close()

But I always have the above error, I can’t understand why.

I noticed that recv can accept a timeout value, but even a value set at 5 seconds for instance, causes an issue.

I tried to create a Task around the recv method and cancel it in the close call, but when doing so, I get a new error:

[2022-01-07 17:21:16 +0100] [3490794] [ERROR] Experienced exception while trying to serve
Traceback (most recent call last):
File “/home/user/www/project/api/env/lib64/python3.8/site-packages/sanic/app.py”, line 1204, in run
serve_single(server_settings)
File “/home/user/www/project/api/env/lib64/python3.8/site-packages/sanic/server/runners.py”, line 206, in serve_single
serve(**server_settings)
File “/home/user/www/project/api/env/lib64/python3.8/site-packages/sanic/server/runners.py”, line 160, in serve
loop.run_until_complete(app._server_event(“shutdown”, “before”))
File “uvloop/loop.pyx”, line 1501, in uvloop.loop.Loop.run_until_complete
asyncio.exceptions.CancelledError
Traceback (most recent call last):
File “server.py”, line 29, in
app.run(**params)
File “/home/user/www/project/api/env/lib64/python3.8/site-packages/sanic/app.py”, line 1204, in run
serve_single(server_settings)
File “/home/user/www/project/api/env/lib64/python3.8/site-packages/sanic/server/runners.py”, line 206, in serve_single
serve(**server_settings)
File “/home/user/www/project/api/env/lib64/python3.8/site-packages/sanic/server/runners.py”, line 160, in serve
loop.run_until_complete(app._server_event(“shutdown”, “before”))
File “uvloop/loop.pyx”, line 1501, in uvloop.loop.Loop.run_until_complete
asyncio.exceptions.CancelledError

I don’t know how I can avoid this issue when I restarting the server.

Thank you for the help!

So sorry for not responding earlier. Is this the problem that you mentioned in the Gist comment that you solved?

No, even though it’s the same part.

When I restart the server (hot reload for instance), I frequently get this WebsocketFrameAssembler.get() issue because of the ws.recv() code.

I tried to mitigate it by surrounding it with a task I can cancel when closing it, but it still has errors.

Here’s the alternative I’ve implemetend in the Client :

async def read(self):
    self.task = asyncio.create_task(self._read())
    try:
        await self.task
    except CancelledError:
        pass

async def _read(self):
    while True:
        message = await self.ws.recv()

        if not message:
            break

        try:
            event = json.loads(message)
            # Do something with the event
        except json.decoder.JSONDecodeError:
            continue

And when closing, the close method is called like this :

async def close(self, force=False):
    if self.task and not self.task.done():
        self.task.cancel()
        try:
            await self.task
        except Exception as e:
            pass

        self.task = None

    code = 1006 if force else 1000
    try:
        await self.ws.close(code)
    except Exception:
        print('WS Close Exception')

Despite this, I still have that above error with the WebsocketFrameAssembler.get() issue. I can’t get past it.

Okay… let me take another swing at this and see if I can figure out how to clean that up. It actually might be something we an leverage the new task cleanup methods for.

1 Like

I’m circling back here because I’m still facing the issue.

Here’s how I read any incoming messages from the websocket:

    async def read(self):
        self.task = asyncio.create_task(self._read())
        try:
            await self.task
        except CancelledError:
            pass

    async def _read(self):
        while True:
            message = await self.ws.recv()

            if not message:
                break

            if message == 'ping':
                await self.write('pong')

            # ... process the data

I wrapped the _read method in a Cancelled because when the server restart (or any other reason), the connection can get closed, and ws.recv should raise a CancelledError.

But when I restart the server (update code with auto reload enabled), I get these:

Task was destroyed but it is pending!
task: <Task pending name=‘stream_keepalive’ coro=<WebsocketImplProtocol.keepalive_ping() running at /home/user/www/project/api/env/lib64/python3.10/site-packages/sanic/server/websockets/impl.py:222> wait_for=>

One for each active websocket connections.

I tried to wrap my read method with two task : recv and task_cancel, an asyncio.Future that I cancel, and return at the first one that ends, but I still have the issue.

I don’t know what to do to properly close it.

(I’m using Sanic v. 22.6.2)

I have not tried your specific use case, but in general if you use app.add_task with a name, then it puts it into a location where Sanic knows how to clean it up on server shutdown. Otherwise, you should handle closing child tasks on your own.

That’s odd. It’s what I’ve done:

In the websocket request (@app.websocket('') function), I call the following:

request.app.add_task(ws.keepalive_ping(), name="stream_keepalive")

And I have even added the following:

@app.signal(Event.SERVER_SHUTDOWN_BEFORE)
async def unregister_clients(app, loop):
    try:
        await app.cancel_task('stream_keepalive')
    except SanicException:
        # That function was never created
        pass

    await Channel.close()

This code was present before my message here: the exception is still thrown.

Also, I forgot to mention yesterday that I have indeed 2 exceptions per websocket connections : The keepalive one, and the “WebsocketFrameAssembler.get()” one:

Task was destroyed but it is pending!
task: <Task pending name='Task-15' coro=<WebsocketFrameAssembler.get() running at /home/user/www/project/api/env/lib64/python3.10/site-packages/sanic/server/websockets/frame.py:117> wait_for=<Future pending cb=[Task.task_wakeup()]>>

(To be more transparent, I made a mistake pasting the exception yesterday. The code I shared with the await self.ws.recv() code is causing the WebsocketFrameAssembler.get() exception error, not the WebsocketImplProtocol.keepalive_ping() one).

Happy to provide anything that can help!

Sorry for not getting back yet. Will take a closer look at this tomorrow to see if I can recreate it.