How to use asyncio queues in Sanic?

I’m working on an app that reads an input stream from a serial port and uses websockets to display the data on a web page. My plan was to create two asynchronous tasks, one for reading and writing to the serial port, the other to run the web server, and pass data between them using asyncio queues.

The problem I’m having is that I can’t see how to pass the queue as input to the Sanic server. Creating a server using the recommended method for asynchronous I/O does not permit passing keyword arguments like app.run() does:

 asyncio.set_event_loop(uvloop.new_event_loop())
 loop = asyncio.get_event_loop()
 task_queue = asyncio.Queue(loop=loop, maxsize =10)
 # this produces "unexpected keyword argument"
 # server = app.create_server(host="0.0.0.0", port=8081, task_queue=task_queue)
 server = app.create_server(host="0.0.0.0", port=8081)
 asyncio.ensure_future(read_serial_port(task_queue))
 #  can't pass it here either as it's not callable
 #  asyncio.ensure_future(server(task_queue=task_queue))
 asyncio.ensure_future(server)
 signal(SIGINT, lambda s, f: loop.stop())
 try:
     loop.run_forever()
 except:
     loop.stop()

Any help would be greatly appreciated.

I’m not at a computer now, but I’ll send you a snippet later this evening.

I think you could use a listener here: https://sanic.readthedocs.io/en/latest/sanic/middleware.html#listeners

@app.listener('after_server_start')
def create_task_queue(app, loop):
    app.task_queue = asyncio.Queue(loop=loop, maxsize=10)

I used @abuckenheimer’s idea of using a listener and implemented a working solution.

import asyncio
from sanic import Sanic
from sanic.response import text

from random import randint

app = Sanic(__name__)
MAXSIZE = 10


def fill_queue(queue, amount=MAXSIZE):
    for _ in range(amount):
        queue.put_nowait(randint(1, 7))


async def worker(name, queue):
    while True:
        job = await queue.get()
        size = queue.qsize()
        print(f"{name} is sleeping on the job for {job}. {size} remaining")
        await asyncio.sleep(job)


@app.get('/')
async def checker(request):
    size = request.app.queue.qsize()
    fill = MAXSIZE - size
    fill_queue(request.app.queue, fill)
    return text(f"You have {size} item(s) in your queue. Refilling with {fill}.")


@app.listener('after_server_start')
def create_task_queue(app, loop):
    app.queue = asyncio.Queue(loop=loop, maxsize=MAXSIZE)
    fill_queue(app.queue)

    for x in range(2):
        app.add_task(worker(f"Worker-{x}", app.queue))


app.run(port=7777, debug=True)

Run this in one terminal, and in another:

curl localhost:7777 -i

Take a look at what is happening in create_task_queue. We create the Queue and then use Sanic’s built in utility to add_task to create some workers. Those workers will be on the same loop as Sanic, so no need to create one yourself or to use create_server yourself. I would suggest sticking with app.run here.

The benefit of doing it here is that you will know that you have access to the app instance (and can then assign the Queue as a property on the app, and also the loop.

Once the Queue is a property of the app, you now have access to it on the request object as seen in the checker handler. From that handler, I can query it, push, pull, and do any other actions I need to.

Thanks, @ahopkins and @abuckenheimer, for responding so so quickly. That was a big help and took care of my problem.

Aoihttp provides a method of passing a web socket response to background tasks, allowing background tasks to send messages directly over the socket without a queue. Is this possible using Sanic?

Yes, this is what app.add_task does.

But how would the background task access the web socket instances?

As I understand it, in Aiohttp you create an array to hold each web socket instance when the app is started. Then, when a web socket is opened, an instance of the response object is appended to the array. The background task(s) pull from that array.

I suppose the same could be done in Sanic?