"Help me Please" or how to use AsyncIOScheduler


#1

I want to use AsyncIOScheduler in Sanic like this:

> from apscheduler.schedulers.asyncio import AsyncIOScheduler
> 
> @app.listener('before_server_start')
> async def initialize_scheduler(app, loop):
>     scheduler = AsyncIOScheduler()
>     scheduler.add_job(example_function, 'interval', seconds=20, max_instances=1, coalesce=True)
>     scheduler.start()
> 
> if __name__ == "__main__":
>     app.run(host="127.0.0.1", port=8081, workers=17, debug=True, backlog=300)

This working, but, unfortunately it runs on 17 workers, I want just one 
how I can do this?

#2

So I understand… you want the scheduler on one worker but sanic on 17? This sounds like a complex setup to try and pass data from one of the 17 processes back to the 1. If that’s the case, I’d suggest using a broker in between to pass messages. Perhaps I misunderstood?


#3

sorry for my English, i’m from Russia
Yes, you understand me a correctly, but i don’t want to add broker like selery or something like this, prometheus … in Sanic a exists scheduler, and maybe he can catch one of 17 workers and work correctly


#4

This is not a trivial request. Under the hood, sanic is spinning up a new process for each worker. Are you familiar with python multiprocessing module? The only way that I know that you could do this without a broker or something like celery would be with multiprocessing.Pipe. And, to be able to get the pipe working with sanic… :thinking: I’d have to sit on that. I’m not sure it’s possible. When I am at a proper computer later I will take a look. I think the answer is that a MUCH more stable and scalable solution would be celery.


#5

I used apscheduler in the same way. You can use multiprocessing.current_process() to get process(worker) name and pid. Name of worker will look like Process-1. Then, you can use if statement to distinguish each worker and set one of the workers(say Process-1) to run apscheduler.

Here is an example:

import multiprocessing
from apscheduler.schedulers.asyncio import AsyncIOScheduler

@app.listener('before_server_start')
async def initialize_scheduler(app, loop):
    worker = multiprocessing.current_process()
    if worker.name == 'Process-1':
        scheduler = AsyncIOScheduler()
        scheduler.add_job(example_function, 'interval', seconds=20, max_instances=1, coalesce=True)
        scheduler.start()

But I’ve to say, use the broker or separate scheduler into another application will be a better way because apscheduler still having some problem with asyncio, and in my case, this might break the functionality of the scheduler sometimes.