Confusion with stream response of sanic

This is my simple code (on ubuntu):

async def gen(lis):  # A generator for simulating streaming returns
    for t in lis:
        time.sleep(0.01)
        yield t + "\n\n"

async def chat_completions(request: Request):
    request_body = json.loads(request.body)  # not used
    response = await request.respond()
    lis = ["qweryoiuqwpr" + str(i) for i in range(1000)]
    g = gen(lis)  # A generator for simulating streaming returns

    async for r in g:
        await response.send(r)
    await response.eof()

All response were cached and send only when there is no response.send() any more.
But when I changed the func gen() to:

async def gen(lis):
    for t in lis:
        await asyncio.sleep(0.01)  
        # asyncio.sleep(0) also work. 
        #This method disrupts the continuity of the current event
        yield t + "\n\n"

It works as expected.

So, it seems that there is a continuous ‘write’ operations when run await response.send(r), and asyncio.sleep(0.01) can break it. (asyncio.sleep(0) can also break it)

Can someone tell me how to resolve this problem with out using asyncio.sleep(0), since this can lead to frequent CPU overhead which is non negligible.
Thankyou.

It is because your loop is greedy and not yielding back to the loop. Therefore, there is never an opportunity for the loop to drain the buffer.

Thank you, your analysis helps a lot.
I still have questions like what is preventing the draining of the buffer?
Is that response.send will continue writting without checking the buffer, and the buffer just always waiting for the end of data from response.send? So when I create a new coroutine, the buffer thought the ‘writting’ of the last coroutine is over. If so, how can I directly drain the buffer forcibly since creating a coroutine for each flow in a streaming response is extravagant.


May be I should create a pipe or que or something else as a transfer station. Enable the response to continuously retrieve the data that needs to be sent from the transfer station.
I’m just not sure if the cost of this plan can be reduced compared to the coroutine one.

What kind of spikes are you seeing? And, is that actually a problem? If you had a never ending loop with sleep(0) I can see how that is a problem, but to clear a handler seems like a different matter.

I need to process a large number of streaming responses through sanic and return them to my client. My client displays these returned data on a page. If the data is cached, the client will wait for a long time and then print all the information at once. This is too time-consuming. I hope that every word’s return can be immediately presented on the client.