Asynchronous background refresh

Hello,

I am trying to develop a Sanic web server. The web server has dynamic context that needs to be fetched every period of time. I came along with this snippet:

import os
from multiprocessing import Queue, Process

import aiohttp
from sanic import Sanic
from sanic.request import Request
from sanic.response import json, HTTPResponse

import pprint

import asyncio


class Client:
    @classmethod
    async def create(cls, loop):
        self = Client()

        ssl = os.getenv("DEBUG", "True") == "True"
        self._tcp_connector = aiohttp.TCPConnector(verify_ssl=ssl)
        self._client_session = aiohttp.ClientSession(
            connector=self._tcp_connector, loop=loop
        )  # Must call self._client_session.close() at the end

        return self

    async def get(self, session, url):
        async with self._client_session.get(url) as response:
            return await response.json()

    async def fetch_cats_facts(self):
        res = await self.get(
            self._client_session, "https://cat-fact.herokuapp.com/facts"
        )
        return res


class Server:
    def __init__(self):
        self._app = Sanic(__name__)

        @self._app.listener("before_server_start")
        async def before_server_start(app, loop):
            self._client = await Client.create(loop)
            self._app.add_task(self.refresh_context())

        @self._app.route("/v1/init")
        def init(request: Request):
            return json({"hey": "you"})

    async def refresh_context(self):
        def wrapper(q: Queue):
            async def refresh_facts():
                loop = asyncio.get_running_loop()
                client = await Client.create(loop)
                facts = await client.fetch_cats_facts()
                q.put(facts)
            asyncio.run(refresh_facts())

        while True:
            q = Queue()
            p = Process(target=wrapper, args=(q,))
            p.start()

            pprint.pprint(q.get())

            p.join()
            await asyncio.sleep(REFRESH_PERIOD_SECONDS)

    def start(self):
        # Do not change the worker number. Since it's not possible to simply share a state between sanic processes, I omit this for now
        self._app.run(workers=1, port=8080)


if __name__ == "__main__":
    server = Server()

    server.start()

The problem is that the context refreshing is stopping the server from responding. I tried an approach using a Thread here (https://stackoverflow.com/a/5210951/4561258) but I can’t get it to work because of a TypeError: Pickling an AuthenticationString object is disallowed for security reasons.

It feels like I am not looking at the problem correctly. Is my approach valid ? To be more accurate the context I am fetching periodically is later loaded into memory and has an influence on the server responses. That’s why I want it to be updated frequently but I want it to be loaded in memory without interrupting the service.

Thanks in advance for your help.

Alexis

Hi @alexisdurieux. I think that in general you can skip the multiprocessing module. You can accomplish something similar by creating a background task (see add_task), and using an asyncio.Queue.

Take a look at this answer. While not exactly the same, it is similar. Start a background task that kicks off the update process periodically and push to the Queue, which can be accessed in the response.

Hope this helps.

Thanks @ahopkins.
It works. I managed to make it work using your remarks. In fact I did not use a queue. Just the add_task method made it work. I thought I tried that before but I probably didn’t do it correctly. It’s hard to understand all the asyncio and event loop stuff at first.

Thanks again for your reactivity ! Take care.

Happy to help. :sunglasses: