I am designing an api to handle huge streaming requests and it requires procesing a series of commands in the stream separated by new line. I dug into the innards of sanic to find out how it handles streaming requests. As far as my understanding goes sanic request streaming uses a asyncio queue and fills it up with incoming data and stops once the queue reaches max size. But I could not find a way to read the request stream line by line. Will appreciate if someone would point me to the right direction.
Not sure if this is still relevant, but I would think the easiest place to handle this is inside your request handler. Rather than modifying how the request is parsed and chunked, I would suggest tackling it from how you are digesting those chunks.
@app.post('/stream', stream=True)
async def handler(request):
chunk = ""
while True:
body = await request.stream.read()
if body is None:
break
chunk += body
if "\n" in chunk:
parts = chunk.split("\n")
if chunk.endswith("\n"):
parts.append(None)
for part in parts[:-1]:
do_something_with_part(part)
chunk = parts[-1]
This is untested code
Another alternative is modifying body_append
, but I think this is more prone to error.
Since the body parts come in bytes: (still untested code)
@app.post('/stream', stream=True)
async def handler(request):
chunk = b""
while True:
body = await request.stream.read()
if body is None:
break
chunk += body
*parts, chunk = chunk.split(b"\n")
for part in parts:
do_something_with_part(part.decode())
if chunk:
# Final line didn't end with newline
do_something_with_part(chunk.decode())