I am doing a talk at this year’s PyCon IL.
Liberate your API: Building a task manager inside Sanic
2021-05-03, 10:00–10:25 UTC+3
The talk is mainly about different strategies to push off work to the background, since this is a fairly common question. As a fun little experiment, I built a scalable POC background worker that is entirely managed by Sanic.
It spins up background worker processes to execute tasks and keeps your main worker processes open for handling requests.
Meet SAJE (Sanic Asynchronous Job Executor):
from __future__ import annotations
import ujson as json
from abc import abstractmethod, ABC
import os
import asyncio
from queue import Empty
from typing import Any, Dict, List, Type
from sanic.log import logger
from multiprocessing import Queue, Process
from uuid import UUID, uuid4
from dataclasses import dataclass, asdict
from datetime import datetime
from asyncio import Task as AsyncTask
STOP = "__STOP__"
TIMEOUT = 3
@dataclass
class JobDetails:
uid: UUID
name: str
complete: bool
timestamp: datetime
kwargs: Dict[str, Any]
def __json__(self):
output = asdict(self)
output["uid"] = str(self.uid)
output["timestamp"] = self.timestamp.isoformat()
return json.dumps(output)
class Backend(ABC):
@abstractmethod
async def store(self, *_, **__):
...
@abstractmethod
async def fetch(self, *_, **__):
...
@abstractmethod
async def start(self, *_, **__):
...
@abstractmethod
async def stop(self, *_, **__):
...
class FileBackend(Backend):
def __init__(self, path):
self.path = path
async def store(self, key, value):
with open(self.path, "a") as f:
f.write(f"{key}|{value}\n")
async def fetch(self, key, chunk=4096):
size = os.stat(self.path)[6]
count = 1
value = None
with open(self.path, "rb") as f:
if size > chunk:
f.seek(-1 * chunk * count, 2)
data = f.read(chunk).split(b"\n")
if not data[-1]:
data = data[:-1]
while value is None:
while len(data) == 1 and ((count * chunk) < size):
count = count + 1
line = data[0]
try:
f.seek(-1 * chunk * count, 2)
data = (f.read(chunk) + line).split(b"\n")
except IOError:
f.seek(0)
pos = size - (chunk * (count - 1))
data = (f.read(pos) + line).split(b"\n")
if len(data) == 0:
return None
line = data[-1]
if line.startswith(str(key).encode()):
value = line
else:
data.pop()
raw = list(value.strip().decode("utf-8").split("|"))
raw[0] = UUID(raw[0])
raw[2] = bool(int(raw[2]))
raw[3] = datetime.fromisoformat(raw[3])
raw[4] = json.loads(raw[4])
return JobDetails(*raw)
async def start(self, job):
await self.store(
job.uid,
"|".join(
[
job.task,
str(0),
datetime.utcnow().isoformat(),
json.dumps(job.kwargs),
]
),
)
async def stop(self, job):
await self.store(
job.uid,
"|".join(
[
job.task,
str(1),
datetime.utcnow().isoformat(),
json.dumps(job.kwargs),
]
),
)
class Task(ABC):
__registry__: Dict[str, Type[Task]] = {}
name = ""
def __init_subclass__(cls) -> None:
cls.__registry__[cls.name] = cls
@abstractmethod
async def run(self, **_):
raise NotImplementedError
class HelloWorld(Task):
name = "hello"
async def run(self, name: str = "world", **_):
for i in range(20):
logger.info(f"> {str(i).rjust(2)}) Hello, {name}.")
await asyncio.sleep(1)
class Job:
def __init__(self, task: str, backend, uid=None, kwargs=None) -> None:
self.task = task
self.uid = uid or uuid4()
self.backend = backend
self.kwargs = kwargs
async def execute(self, task: Task):
logger.info(f"Executin {self.task}")
await task.run(**self.kwargs)
async def __aenter__(self):
task_class = Task.__registry__.get(self.task)
if task_class:
task = task_class()
await self.backend.start(self)
return task
else:
raise Exception(f"No task named {self.task}")
async def __aexit__(self, *_):
await self.backend.stop(self)
@classmethod
async def create(cls, job: str, backend):
data = json.loads(job)
task = data.pop("task")
instance = cls(task, backend, **data)
async with instance as task:
await instance.execute(task)
class Scheduler:
def __init__(self, queue, loop, backend) -> None:
self.queue = queue
self.tasks: List[AsyncTask] = []
self._loop = loop
self.backend = backend
async def run(self):
logger.info("> Starting job manager")
while True:
await self._loop.run_in_executor(None, self.consumer)
await asyncio.sleep(0)
def consumer(
self,
):
try:
job = self.queue.get(timeout=1)
except Empty:
return
if job == STOP:
logger.info("> Stopping consumer")
self.queue.put_nowait(job)
else:
logger.info(f"> Job requested: {job=}")
self.execute(job)
def execute(self, job: str):
task = self._loop.create_task(Job.create(job, self.backend))
self.tasks.append(task)
def manage(queue):
logger.info(f"Starting SAJE worker [{os.getpid()}]")
loop = asyncio.new_event_loop()
backend = FileBackend("./db")
scheduler = Scheduler(queue, loop, backend)
try:
loop.create_task(scheduler.run())
loop.run_forever()
except KeyboardInterrupt:
logger.info("Shutting down")
finally:
if scheduler.tasks:
drain = asyncio.wait_for(
asyncio.gather(*scheduler.tasks),
timeout=TIMEOUT,
)
loop.run_until_complete(drain)
logger.info(f"Starting SAJE worker [{os.getpid()}]. Goodbye")
class Runner:
def __init__(self, workers=1) -> None:
logger.info(f"Initializing SAJE with {workers=}")
self.queue: Queue[str] = Queue()
self.workers = [
Process(
target=manage,
args=(self.queue,),
)
for _ in range(workers)
]
def start(self):
for worker in self.workers:
worker.start()
def stop(self):
self.send(STOP)
for worker in self.workers:
worker.join()
worker.close()
def send(self, message):
if not isinstance(message, str):
message = json.dumps(message)
self.queue.put_nowait(message)
And our server…
from sanic import Sanic, text
from sanic.response import json
from saje import Runner, FileBackend
from uuid import uuid4
app = Sanic(__name__)
WORKERS = 2
@app.get("/<uid:uuid>")
async def handler(request, uid):
data = await request.app.ctx.jobs.fetch(uid)
return json(data)
@app.post("/")
async def start_job(request):
uid = str(uuid4())
request.app.ctx.saje.send(
{
"task": "hello",
"uid": uid,
"kwargs": {"name": "Adam"},
}
)
return text(uid)
@app.main_process_start
def start_saje(app, _):
app.ctx.saje = Runner(workers=WORKERS)
app.ctx.saje.start()
@app.main_process_stop
def stop_saje(app, _):
app.ctx.saje.stop()
@app.after_server_start
async def setup_job_fetch(app, _):
app.ctx.jobs = FileBackend("./db")
app.run(port=9999, workers=WORKERS)