Task to be run in one thread

Hi folks,

So, I need to run a task that runs forever but i don’t want to be run by the workers because it would be run nb_of_workers. times.
I tried using add_task but the task is run by every worker, I need it to be run by just 1 process/thread, and to run in parallel to the workers.

If I have something like this:

async def my_task(app):
    print(f"TASK - PID: {os.getpid()}")
    await asyncio.sleep(2)
    app.add_task(my_task)

And I add the task using app.add_task(my_task), in a run with 2 workers I obtain:

TASK - PID: 760
TASK - PID: 761
TASK - PID: 760
TASK - PID: 761
TASK - PID: 760
TASK - PID: 761
...

I hope I explained myself clearly.

I think that you did.

I think what you want is main_process_start.

If you run app.add_task inside that listener, then it should only run once per server instance. LMK if this works for you.

Thanks for the fast answer @ahopkins

First Approach
I tried doing this:

@app.main_process_start
async def my_task(app, loop):
    print(f"MY TASK - PID: {os.getpid()}")
    await asyncio.sleep(2)
    app.add_task(my_task(app, loop))

But it runs twice and then nothing happens.
When I kill the process I got this:

Task was destroyed but it is pending!
task: <Task pending name='Task-2' coro=<my_task() done, defined at /path/to/file/asgi.py:28> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f9bd5fbe760>()]>>

I tried also doing this:

async def my_task(app):
    print(f"MY TASK - PID: {os.getpid()}")
    await asyncio.sleep(2)
    app.add_task(my_task(app))


@app.main_process_start
async def my_outer_task(app, loop):
    app.add_task(my_task(app))

And what I got is that it runs once and I get the same message when killing the process.

What I need is that my_task runs every 2 seconds “forever”.

Second Approach
Another approach that I tried was to select only 1 worker to run the background task by assigning the pid into app.ctx.pid. In the first run the worker1 will set app.ctx.pid with its pid. A the second run check if the pid of the worker is the same as set, if it does then execute normally. If another worker runs it (with a different pid) just do nothing and return. I tried this but It did not worked: the app.ctx.pid changed constantly. I checked if there were different instances of app in each worker and no, it was the same instance.

It’s something like this:

async def task(app):
    await asyncio.sleep(2)
    print(f"TASK - PID: {os.getpid()}")
    if not hasattr(app.ctx, "pid"):
        app.ctx.pid = os.getpid()
    elif app.ctx.pid != os.getpid():
        return
    else:
        pass
    app.add_task(task)

Try this:

async def do():
    try:
        while True:
            print(os.getpid())
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        ...


@app.main_process_start
async def start(*_):
    app.add_task(do(), name="do-foo")

I tried and it is executed just once (or at least I saw the PID printed only once).
Another thing I had to do was to remove the argument name="do-foo" because It was unexpected by the add_task function.

About my previous post in the Second Approach, I thought that app.ctx.pid is unique for all the workers and It should because the id of the instance was the same when I did the test. But it was set twice. :thinking:

Ahh, you must be on an older version of Sanic then.

I guess it depends where and how you are setting it.

Thanks a lot for your answers so far.

I am setting app.ctx.pid after the server starts, like this:

@app.after_server_start
async def task(app, loop):
    await asyncio.sleep(2)
    print(f"TASK\tPID = {os.getpid()}")

    if not hasattr(app.ctx, "pid"):
        app.ctx.pid = os.getpid()
        print(f"SET\tPID: {os.getpid()}")
    elif app.ctx.pid != os.getpid():
        print(f"app.ctx.pid != pid\tPID: {os.getpid()}\tCTX.PID: {app.ctx.pid}")
        return
    else:
        print(f"app.ctx.pid == pid\tPID: {os.getpid()}\tCTX.PID: {app.ctx.pid}")
        pass

    app.add_task(task(app, loop))

And I running 2 workers I get:

[2022-03-31 21:31:12 +0000] [1832] [INFO] Goin' Fast @ http://0.0.0.0:8000
[2022-03-31 21:31:12 +0000] [1832] [DEBUG] Sanic auto-reload: enabled
[2022-03-31 21:31:12 +0000] [1832] [DEBUG] Sanic debug mode: enabled
TASK    PID = 1835
TASK    PID = 1834
SET     PID: 1834
SET     PID: 1835
[2022-03-31 21:31:15 +0000] [1834] [INFO] Starting worker [1834]
[2022-03-31 21:31:15 +0000] [1835] [INFO] Starting worker [1835]
TASK    PID = 1834
TASK    PID = 1835
app.ctx.pid == pid      PID: 1834       CTX.PID: 1834
app.ctx.pid == pid      PID: 1835       CTX.PID: 1835
TASK    PID = 1835
TASK    PID = 1834
app.ctx.pid == pid      PID: 1834       CTX.PID: 1834
app.ctx.pid == pid      PID: 1835       CTX.PID: 1835
TASK    PID = 1834
TASK    PID = 1835
app.ctx.pid == pid      PID: 1834       CTX.PID: 1834
app.ctx.pid == pid      PID: 1835       CTX.PID: 1835
...

I would expect that only 1 worker would be able of setting app.ctx.pid.
Any clue on what would be happening?

Sorry for the late response, I have been travelling. Each worker instance has its own instance of the application, so none of them would have the pid set. They do not share state across workers.

Just curious, what are you using this for? What are you trying to achieve?

Sorry for the very late response.

I am designing a system that runs in one (and only one) worker that periodically makes requests to another API. I need it to be only one worker to avoid making duplicated (or n-plicated) requests.

Basically, I have a task function when is run it performs the request to the other API, waits N seconds/minutes/hours/days and then calls itself to run again.

Thanks for all the help