Here is a working solution I came up with.
FIrst, let’s get a running instance of redis
:
$ docker run -p 6379:6379 redis:alpine
Next, here is my basic app structure (all the logic not needed for my explanation)
import asyncio
import logging
import aioredis
from sanic import Sanic
from sanic.log import logger
from sanic.response import text
app = Sanic("dynamic-logging")
CHANNEL_NAME = "logging-level"
task = None
connections = []
async def get_connection():
conn = await aioredis.create_redis(
address="redis://localhost:6379/0", encoding="utf-8",
)
connections.append(conn)
return conn
@app.get("/")
async def handler(request):
logger.info("foobar")
logger.debug("foobar")
return text("Done.")
@app.post("/setlevel")
async def set_handler(request):
...
@app.listener("after_server_start")
async def run_logging_listener(app, loop):
...
@app.listener("before_server_stop")
async def cleanup(app, loop):
global task
global connections
if task and not task.cancelled():
task.cancel()
for conn in connections:
print(f"closing {conn}")
conn.close()
await conn.wait_closed()
if __name__ == "__main__":
app.run(debug=True, workers=4)
We are going to run run_logging_listener
on startup. This will launch a task that will act as our receiver and listen to any messages on the pubsub
channel.
Seperately, we will have set_handler
that will be the endpoint that publishes messages to our workers.
First, let’s build the setter. All we need to do is to obtain a connection to our redis instance, and publish to the channel what we want the new level to be.
@app.post("/setlevel")
async def set_handler(request):
level = request.json.get("level", "INFO")
print(f"Requested to change logging to {level}")
conn = await get_connection()
await conn.publish(CHANNEL_NAME, level)
return text(f"Set level to {level}")
But, how does each worker know what to do? At strartup, it launches a task that will act as a receiver.
@app.listener("after_server_start")
async def run_logging_listener(app, loop):
conn = await get_connection()
async def receiver(channel):
print(f"Listening on {channel}")
...
(channel,) = await conn.subscribe(CHANNEL_NAME)
task = loop.create_task(receiver(channel))
But, what does receiver
do?
It will just hang around and wait until there is an incoming message. When there is, it will trigger logging.setLevel
async def receiver(channel):
print(f"Listening on {channel}")
while await channel.wait_message():
try:
message = await channel.get(encoding="utf-8")
level = getattr(logging, message, logging.INFO)
logger.setLevel(level)
print(f"Changing logging to {message}")
except aioredis.ChannelClosedError:
print("Channel has closed")
break
For sake of ease, I have made rather judicious use of global variables. You might not want to follow that pattern. But, this should give you the idea of some of the basic patterns and considerations you might want to employ.
Here is the full code:
import asyncio
import logging
import aioredis
from sanic import Sanic
from sanic.log import logger
from sanic.response import text
app = Sanic("dynamic-logging")
CHANNEL_NAME = "logging-level"
task = None
connections = []
async def get_connection():
conn = await aioredis.create_redis(
address="redis://localhost:6379/0", encoding="utf-8",
)
connections.append(conn)
return conn
@app.get("/")
async def handler(request):
logger.info("foobar")
logger.debug("foobar")
return text("Done.")
@app.post("/setlevel")
async def set_handler(request):
level = request.json.get("level", "INFO")
print(f"Requested to change logging to {level}")
conn = await get_connection()
await conn.publish(CHANNEL_NAME, level)
return text(f"Set level to {level}")
@app.listener("after_server_start")
async def run_logging_listener(app, loop):
conn = await get_connection()
async def receiver(channel):
print(f"Listening on {channel}")
while await channel.wait_message():
try:
message = await channel.get(encoding="utf-8")
level = getattr(logging, message, logging.INFO)
logger.setLevel(level)
print(f"Changing logging to {message}")
except aioredis.ChannelClosedError:
print("Channel has closed")
break
(channel,) = await conn.subscribe(CHANNEL_NAME)
task = loop.create_task(receiver(channel))
@app.listener("before_server_stop")
async def cleanup(app, loop):
global task
global connections
if task and not task.cancelled():
task.cancel()
for conn in connections:
print(f"closing {conn}")
conn.close()
await conn.wait_closed()
if __name__ == "__main__":
app.run(debug=True, workers=4)