Task was destroyed but it is pending on ws.recv()

Getting following error

Task was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
  File "/code/api.py", line 1306, in <module>
    app.run(host="0.0.0.0", debug=debug, port=PORT, workers=WORKERS, access_log=False)
  File "/usr/local/lib/python3.9/site-packages/sanic/app.py", line 1053, in run
    serve_multiple(server_settings, workers)
  File "/usr/local/lib/python3.9/site-packages/sanic/server/runners.py", line 254, in serve_multiple
    process.start()
  File "/usr/local/lib/python3.9/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/usr/local/lib/python3.9/multiprocessing/context.py", line 277, in _Popen
    return Popen(process_obj)
  File "/usr/local/lib/python3.9/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/usr/local/lib/python3.9/multiprocessing/popen_fork.py", line 71, in _launch
    code = process_obj._bootstrap(parent_sentinel=child_r)
  File "/usr/local/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.9/site-packages/sanic/server/runners.py", line 150, in serve
    loop.run_forever()
  File "/code/api.py", line 1108, in handle_events
    input_json = await ws.recv()
  File "/usr/local/lib/python3.9/site-packages/sanic/server/websockets/impl.py", line 521, in recv
    done, pending = await asyncio.wait(
  File "/usr/local/lib/python3.9/asyncio/tasks.py", line 411, in wait
    fs = {ensure_future(f, loop=loop) for f in fs}
  File "/usr/local/lib/python3.9/asyncio/tasks.py", line 411, in <setcomp>
    fs = {ensure_future(f, loop=loop) for f in fs}
task: <Task pending name='Task-147' coro=<WebsocketFrameAssembler.get() running at /usr/local/lib/python3.9/site-packages/sanic/server/websockets/frame.py:117> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f56a5e90bb0>()] created at /usr/local/lib/python3.9/asyncio/locks.py:223> created at /usr/local/lib/python3.9/asyncio/tasks.py:411>

from apparently await ws.recv() line

Package versions

sanic==21.9.3 
websockets==10.1

Endpoint looks like

@app.websocket('/v1/websocket')
async def handle_events(request, ws):
    access_token = await ws.recv()
    # First piece of data is JSON access token
    auth_resp = {'status': 200, 'type': 'auth'}
    # Get user from the auth token
    user_id = await get_user_from_db(access_token)
    if not user_id:
        auth_resp = {'status': 400, "error": {"message": 'Invalid access token provided'}, 'type': 'auth'}
    # Send auth response
    await ws.send(json.dumps(auth_resp))

    while True:
        # Wait for input
        input_json = await ws.recv()
        events = json.loads(input_json)['events']

        # Store diffs into DB
        for event in events:
            # < process event here >
            await ws.send(json.dumps({'status': 200, 'type': 'sync'}))

I would venture to guess the problem is that your loop is sitting and waiting for a message from the client when it disconnects. And therefore the ws.recv coro is never completed or handled properly. FWIW, you would usually want to put your send and recv into separate tasks so that can operate asynchronously and not be blocked by each other.

You can see an example of this (albeit with more logic to allow for sharing state across multiple worker instances).

1 Like

Thank you @ahopkins, will try that out.
I observed that this issue was not there in

sanic==21.6.2
websockets==9.1

However, with sanic v21.6.2, It was giving some other errors of connection lost.

That may be the case. There was a pretty significant change to how websockets were handled between 21.6 and 21.9. There were some bugs that were corrected. The update from websockets 10 to 9 introduced a new API layer for frameworks like Sanic to handle the i/o independently of websockets. This would account for the difference between 21.6 and 21.9 you are seeing.

Most importantly, we want to get your code working properly with the new version since it is the more "correct’ way to handle websockets.

Thank you @ahopkins for sharing this. have tried this out and it is working great for me.
My clients are from JS/Java.

However, I had to make couple changes to make it work for me

1- Catching couple more exceptions in following block

1- aioredis.exceptions.ConnectionError
2- ValueError
try:
   raw = await self.pubsub.get_message(
       ignore_subscribe_messages=True, timeout=1.0
   )
except PubSubErro as e:
   logger.info(f"PUBSUB closed <{self.name}>}")
   break

2- Killing connection if no event in received from client side in first 5s

@app.websocket('/v1/websocket')
async def handle_events(request, ws):
    query_params = dict(request.get_query_args())
    access_token = query_params.get('token')
    # Get user from the auth token
    user_id = await fetch_result(UsersQ.get_id_by_access_token, {"access_token": access_token})

    if not user_id:
       return await ws.send(json.dumps({'status': 401, "error": {"message": 'Invalid access token provided'}, 'type': 'auth'}))
    
    auth_resp = {'status': 200, 'type': 'auth'}
    await ws.send(json.dumps(auth_resp))
    
    # Close the connection if nothing is received in first 5s after the auth is done
    try:
       if not await ws.recv(5.0):
           return
    except Exception:
       return
    
    redis = request.app.ctx.redis
    pubsub = redis.pubsub()
    channel, is_existing = await Channel.get(
       pubsub, redis, user_id)

Otherwise for a live user, it was connecting, immediately destroying channel and raising redisConnection error on await self.pubsub.get_message everytime user makes a connection

3- Modified client receiver to kill the idle connection after 5minutes.

KILL_AFTER = 5 * 60

async def receiver(self):
   while True:
       message = await self.protocol.recv(KILL_AFTER)
       if not message:
           break
       await self.redis.publish(self.channel_name, message)

Reason: If client remains idle for more than 10 minutes was getting a warning

[WARNING] Websocket timed out waiting for pong

After this warning, making an http request and calling an async task was causing Task was pending but it is destroyed error

I am on sanic v21.12.0 by the way and my app is deployed to Heroku.

aioreids docs help in creating pubsub: Recipes - aioredis

Following warning sounds minor, but getting this upon every new connection

/app/.heroku/python/lib/python3.9/site-packages/sanic/server/websockets/impl.py:521: DeprecationWarning: The explicit passing of coroutine objects to asyncio.wait() is deprecated since Python 3.8, and scheduled for removal in Python 3.11.

Thanks for sharing! This is good feedback.