PyCon IL 2021 - Liberate your API

I am doing a talk at this year’s PyCon IL.

Liberate your API: Building a task manager inside Sanic

2021-05-03, 10:00–10:25 UTC+3

The talk is mainly about different strategies to push off work to the background, since this is a fairly common question. As a fun little experiment, I built a scalable POC background worker that is entirely managed by Sanic.

It spins up background worker processes to execute tasks and keeps your main worker processes open for handling requests.

Meet SAJE (Sanic Asynchronous Job Executor):

from __future__ import annotations
import ujson as json
from abc import abstractmethod, ABC
import os
import asyncio
from queue import Empty
from typing import Any, Dict, List, Type
from sanic.log import logger
from multiprocessing import Queue, Process
from uuid import UUID, uuid4
from dataclasses import dataclass, asdict
from datetime import datetime
from asyncio import Task as AsyncTask

STOP = "__STOP__"
TIMEOUT = 3


@dataclass
class JobDetails:
    uid: UUID
    name: str
    complete: bool
    timestamp: datetime
    kwargs: Dict[str, Any]

    def __json__(self):
        output = asdict(self)
        output["uid"] = str(self.uid)
        output["timestamp"] = self.timestamp.isoformat()
        return json.dumps(output)


class Backend(ABC):
    @abstractmethod
    async def store(self, *_, **__):
        ...

    @abstractmethod
    async def fetch(self, *_, **__):
        ...

    @abstractmethod
    async def start(self, *_, **__):
        ...

    @abstractmethod
    async def stop(self, *_, **__):
        ...


class FileBackend(Backend):
    def __init__(self, path):
        self.path = path

    async def store(self, key, value):
        with open(self.path, "a") as f:
            f.write(f"{key}|{value}\n")

    async def fetch(self, key, chunk=4096):
        size = os.stat(self.path)[6]
        count = 1
        value = None
        with open(self.path, "rb") as f:
            if size > chunk:
                f.seek(-1 * chunk * count, 2)

            data = f.read(chunk).split(b"\n")
            if not data[-1]:
                data = data[:-1]

            while value is None:
                while len(data) == 1 and ((count * chunk) < size):
                    count = count + 1
                    line = data[0]
                    try:
                        f.seek(-1 * chunk * count, 2)
                        data = (f.read(chunk) + line).split(b"\n")
                    except IOError:
                        f.seek(0)
                        pos = size - (chunk * (count - 1))
                        data = (f.read(pos) + line).split(b"\n")

                if len(data) == 0:
                    return None

                line = data[-1]

                if line.startswith(str(key).encode()):
                    value = line
                else:
                    data.pop()

        raw = list(value.strip().decode("utf-8").split("|"))
        raw[0] = UUID(raw[0])
        raw[2] = bool(int(raw[2]))
        raw[3] = datetime.fromisoformat(raw[3])
        raw[4] = json.loads(raw[4])
        return JobDetails(*raw)

    async def start(self, job):
        await self.store(
            job.uid,
            "|".join(
                [
                    job.task,
                    str(0),
                    datetime.utcnow().isoformat(),
                    json.dumps(job.kwargs),
                ]
            ),
        )

    async def stop(self, job):
        await self.store(
            job.uid,
            "|".join(
                [
                    job.task,
                    str(1),
                    datetime.utcnow().isoformat(),
                    json.dumps(job.kwargs),
                ]
            ),
        )


class Task(ABC):
    __registry__: Dict[str, Type[Task]] = {}
    name = ""

    def __init_subclass__(cls) -> None:
        cls.__registry__[cls.name] = cls

    @abstractmethod
    async def run(self, **_):
        raise NotImplementedError


class HelloWorld(Task):
    name = "hello"

    async def run(self, name: str = "world", **_):
        for i in range(20):
            logger.info(f"> {str(i).rjust(2)}) Hello, {name}.")
            await asyncio.sleep(1)


class Job:
    def __init__(self, task: str, backend, uid=None, kwargs=None) -> None:
        self.task = task
        self.uid = uid or uuid4()
        self.backend = backend
        self.kwargs = kwargs

    async def execute(self, task: Task):
        logger.info(f"Executin {self.task}")
        await task.run(**self.kwargs)

    async def __aenter__(self):
        task_class = Task.__registry__.get(self.task)
        if task_class:
            task = task_class()
            await self.backend.start(self)
            return task
        else:
            raise Exception(f"No task named {self.task}")

    async def __aexit__(self, *_):
        await self.backend.stop(self)

    @classmethod
    async def create(cls, job: str, backend):
        data = json.loads(job)
        task = data.pop("task")
        instance = cls(task, backend, **data)
        async with instance as task:
            await instance.execute(task)


class Scheduler:
    def __init__(self, queue, loop, backend) -> None:
        self.queue = queue
        self.tasks: List[AsyncTask] = []
        self._loop = loop
        self.backend = backend

    async def run(self):
        logger.info("> Starting job manager")
        while True:
            await self._loop.run_in_executor(None, self.consumer)
            await asyncio.sleep(0)

    def consumer(
        self,
    ):
        try:
            job = self.queue.get(timeout=1)
        except Empty:
            return

        if job == STOP:
            logger.info("> Stopping consumer")
            self.queue.put_nowait(job)
        else:
            logger.info(f"> Job requested: {job=}")
            self.execute(job)

    def execute(self, job: str):
        task = self._loop.create_task(Job.create(job, self.backend))
        self.tasks.append(task)


def manage(queue):
    logger.info(f"Starting SAJE worker [{os.getpid()}]")
    loop = asyncio.new_event_loop()
    backend = FileBackend("./db")
    scheduler = Scheduler(queue, loop, backend)
    try:
        loop.create_task(scheduler.run())
        loop.run_forever()
    except KeyboardInterrupt:
        logger.info("Shutting down")
    finally:
        if scheduler.tasks:
            drain = asyncio.wait_for(
                asyncio.gather(*scheduler.tasks),
                timeout=TIMEOUT,
            )
            loop.run_until_complete(drain)
    logger.info(f"Starting SAJE worker [{os.getpid()}]. Goodbye")


class Runner:
    def __init__(self, workers=1) -> None:
        logger.info(f"Initializing SAJE with {workers=}")
        self.queue: Queue[str] = Queue()
        self.workers = [
            Process(
                target=manage,
                args=(self.queue,),
            )
            for _ in range(workers)
        ]

    def start(self):
        for worker in self.workers:
            worker.start()

    def stop(self):
        self.send(STOP)
        for worker in self.workers:
            worker.join()
            worker.close()

    def send(self, message):
        if not isinstance(message, str):
            message = json.dumps(message)
        self.queue.put_nowait(message)

And our server…

from sanic import Sanic, text
from sanic.response import json
from saje import Runner, FileBackend
from uuid import uuid4

app = Sanic(__name__)
WORKERS = 2


@app.get("/<uid:uuid>")
async def handler(request, uid):
    data = await request.app.ctx.jobs.fetch(uid)
    return json(data)


@app.post("/")
async def start_job(request):
    uid = str(uuid4())
    request.app.ctx.saje.send(
        {
            "task": "hello",
            "uid": uid,
            "kwargs": {"name": "Adam"},
        }
    )
    return text(uid)


@app.main_process_start
def start_saje(app, _):
    app.ctx.saje = Runner(workers=WORKERS)
    app.ctx.saje.start()


@app.main_process_stop
def stop_saje(app, _):
    app.ctx.saje.stop()


@app.after_server_start
async def setup_job_fetch(app, _):
    app.ctx.jobs = FileBackend("./db")


app.run(port=9999, workers=WORKERS)
2 Likes

Can we get a recording for this talk?