How to close an event loop for a child thread?

When the main thread is closed,I’m trying to close the event loop,but there’s a problem.

That’s the errors

Traceback (most recent call last):
  File "/Users/liuzhichao/liuzhichao/trunk/code/TrunkPort/TrunkPortGateway/main.py", line 65, in <module>
    app.stop()
  File "/Users/liuzhichao/liuzhichao/trunk/code/TrunkPort/TrunkPortGateway/app.py", line 40, in stop
    self.__api_service.stop()
  File "/Users/liuzhichao/liuzhichao/trunk/code/TrunkPort/TrunkPortGateway/web/__init__.py", line 53, in stop
    self.__loop.run_until_complete(close_task)
  File "uvloop/loop.pyx", line 1450, in uvloop.loop.Loop.run_until_complete
  File "uvloop/loop.pyx", line 1443, in uvloop.loop.Loop.run_until_complete
  File "uvloop/loop.pyx", line 1351, in uvloop.loop.Loop.run_forever
  File "uvloop/loop.pyx", line 480, in uvloop.loop.Loop._run
RuntimeError: this event loop is already running.

That’s my code

import uvloop
import asyncio
import logging
from functools import partial

from threading import Thread

from conf import settings

from .app import app


class ApiService:

    def __init__(self, host, port):
        self.host = host
        self.port = port

        self.__server = None
        self.__loop = None

    def run(self):
        self.__loop = uvloop.new_event_loop()
        thread = Thread(target=partial(self._thread_main, self.__loop), name="api-service")
        thread.setDaemon(True)
        thread.start()

    def _thread_main(self, loop):
        logging.info("ApiService Listening on http://{}:{}".format(self.host, self.port))
        asyncio.set_event_loop(loop)
        serv_coro = app.create_server(
            host=self.host, port=self.port, return_asyncio_server=True, debug=settings.DEBUG
        )

        serv_task = asyncio.ensure_future(serv_coro, loop=self.__loop)
        self.__server = self.__loop.run_until_complete(serv_task)
        self.__server.after_start()
        self.__loop.run_forever()

    def stop(self):
        if self.__loop is None or self.__server is None:
            return

        self.__loop.stop()
        self.__server.before_stop()

        # Wait for server to close
        close_task = self.__server.close()
        self.__loop.run_until_complete(close_task)

        # Complete all tasks on the loop
        for connection in self.__server.connections:
            connection.close_if_idle()
        self.__server.after_stop()

and I’m trying to closed event loop,e.g loop.closed(),but there‘s still a problem.

Thank you very much ~

I’m trying to understand what you are trying to achieve. What’s your goal with threads and what do you want to be able to do?

Because I have multithreaded service, main thread to check all service

I would not suggest running Sanic this way. You really should run it in its own process.

Emm, should start a child process?

Yes, I think that would be much better. Here is a pretty simple working version.

import uvloop
import asyncio
import multiprocessing
import os
import time
from signal import SIGTERM
from .app import app


class ApiService:
    def __init__(self, host, port):
        self.host = host
        self.port = port

        self._server = None
        self._loop = None
        self._process = None

    def run(self):
        mp = multiprocessing.get_context("fork")
        self._process = mp.Process(target=self._run_server)
        self._process.daemon = True
        self._process.start()

    def _run_server(self):
        print("Running server")
        self._loop = uvloop.new_event_loop()
        asyncio.set_event_loop(self._loop)
        self._server = app.create_server(
            host=self.host, port=self.port, return_asyncio_server=True
        )
        self._loop = asyncio.get_event_loop()
        self._task = asyncio.ensure_future(self._server)
        self._loop.run_forever()

    def stop(self):
        os.kill(self._process.pid, SIGTERM)
        print("Stopping server")
        self._process.join()
        self._process.terminate()


if __name__ == "__main__":
    print(f"Main pid: {os.getpid()}")
    service = ApiService("localhost", 3000)
    service.run()

    time.sleep(3)
    print("Closing server in 3 seconds")
    time.sleep(3)

    service.stop()

I want to trigger the SANIC signal when it’s off .
For example:before_server_stopafter_server_stop

These signals are triggered to close some database connections.

e.g

    @app.listener("after_server_stop")
    async def close_orm(app, loop):  # pylint: disable=W0612
        await Tortoise.close_connections()
        logging.info("Tortoise-ORM shutdown")

The child thread has no problem accessing the SANIC service
I just want to shut down some connections when I shut down,e.g mysql、redis …

The easiest way would be to use Sanic.run.

# server.py
from sanic import Sanic
from sanic.response import text

app = Sanic("__BASE__")


@app.get("/")
async def get1(request):
    return text("Done.")


@app.listener("before_server_start")
async def listener_before_server_start(*args, **kwargs):
    print("before_server_start")


@app.listener("after_server_start")
async def listener_after_server_start(*args, **kwargs):
    print("after_server_start")


@app.listener("before_server_stop")
async def listener_before_server_stop(*args, **kwargs):
    print("before_server_stop")


@app.listener("after_server_stop")
async def listener_after_server_stop(*args, **kwargs):
    print("after_server_stop")
# main.py
import multiprocessing
import os
import time
from signal import SIGTERM
from server import app


class ApiService:
    def __init__(self, host, port):
        self.host = host
        self.port = port

        self._server = None
        self._loop = None
        self._process = None

    def run(self):
        mp = multiprocessing.get_context("fork")
        self._process = mp.Process(target=self._run_server)
        self._process.daemon = True
        self._process.start()

    def _run_server(self):
        print("Running server")
        app.run(host=self.host, port=self.port)

    def stop(self):
        os.kill(self._process.pid, SIGTERM)
        print("Stopping server")
        self._process.join()
        self._process.terminate()


if __name__ == "__main__":
    print(f"Main pid: {os.getpid()}")
    service = ApiService("localhost", 3000)
    service.run()

    time.sleep(3)
    print("Closing server in 3 seconds")
    time.sleep(3)

    service.stop()
$ python main.py                                                                                                                                                (env: sanic) 
Main pid: 15621
Running server
[2020-11-23 16:53:14 +0200] [15623] [INFO] Goin' Fast @ http://localhost:3000
before_server_start
after_server_start
[2020-11-23 16:53:14 +0200] [15623] [INFO] Starting worker [15623]
[2020-11-23 16:53:16 +0200] - (sanic.access)[INFO][127.0.0.1:45666]: GET http://localhost:3000/  200 5
Closing server in 3 seconds
Stopping server
[2020-11-23 16:53:20 +0200] [15623] [INFO] Stopping worker [15623]
before_server_stop
after_server_stop
[2020-11-23 16:53:20 +0200] [15623] [INFO] Server Stopped

I have got it ~ Thanks

1 Like