Error in Unit test AttributeError __aenter__

Hi guys,

Thank you in advance.

Getting following error:

test_endpoint.py [2021-12-31 18:46:12 +0000] [33939] [INFO] Goin' Fast @ http://127.0.0.1:62918
[2021-12-31 18:46:12 +0000] [33939] [INFO] Goin' Fast @ http://127.0.0.1:62919
[2021-12-31 18:46:12 +0000] [33939] [ERROR] Exception occurred while handling uri: 'http://127.0.0.1:62919/predict'
Traceback (most recent call last):
  File "/Users/vedantruparelia/workspace/tensorflow-test/env/lib/python3.9/site-packages/sanic/app.py", line 770, in handle_request
    response = await response
  File "/Users/vedantruparelia/workspace/python_projects/tf_sanic_example/request_batching_server.py", line 106, in predict
    output = await style_transfer_runner.process_input(data)
  File "/Users/vedantruparelia/workspace/python_projects/tf_sanic_example/request_batching_server.py", line 48, in process_input
    async with self.queue_lock:
AttributeError: __aenter__
[2021-12-31 18:46:12 +0000] - (sanic.access)[INFO][127.0.0.1:62920]: POST http://127.0.0.1:62919/predict  500 735
<Response [500 Internal Server Error]>

Using the following code which I have taken from https://github.com/deep-learning-with-pytorch/dlwpt-code/blob/d6c0210143daa133bbdeddaffc8993b1e17b5174/p3ch15/request_batching_server.py:


import asyncio
import functools
import tensorflow as tf
from sanic import Sanic
from sanic.response import  json
from sanic.log import logger
import numpy as np

app = Sanic("test-server")


MAX_QUEUE_SIZE = 3  # we accept a backlog of MAX_QUEUE_SIZE before handing out "Too busy" errors
MAX_BATCH_SIZE = 2  # we put at most MAX_BATCH_SIZE things in a single batch
MAX_WAIT = 1        # we wait at most MAX_WAIT seconds before running for more inputs to arrive in batching

class HandlingError(Exception):
    def __init__(self, msg, code=500):
        super().__init__()
        self.handling_code = code
        self.handling_msg = msg

class ModelRunner:
    def __init__(self):
        self.queue = []

        self.queue_lock = None

        self.model = tf.keras.models.load_model("best_model_HDF5_format.h5")

        self.needs_processing = None

        self.needs_processing_timer = None

    def schedule_processing_if_needed(self):
        if len(self.queue) >= MAX_BATCH_SIZE:
            logger.debug("next batch ready when processing a batch")
            self.needs_processing.set()
        elif self.queue:
            logger.debug("queue nonempty when processing a batch, setting next timer")
            self.needs_processing_timer = app.loop.call_at(self.queue[0]["time"] + MAX_WAIT, self.needs_processing.set)

    async def process_input(self, input):
        our_task = {"done_event": asyncio.Event(loop=app.loop),
                    "input": input,
                    "time": app.loop.time()}
        async with self.queue_lock:
            if len(self.queue) >= MAX_QUEUE_SIZE:
                raise HandlingError("I'm too busy", code=503)
            self.queue.append(our_task)
            logger.debug("enqueued task. new queue size {}".format(len(self.queue)))
            self.schedule_processing_if_needed()

        await our_task["done_event"].wait()
        return our_task["output"]

    def run_model(self, batch):  # runs in other thread
        return self.model.predict_on_batch(batch['a'])

    async def model_runner(self):
        self.queue_lock = asyncio.Lock(loop=app.loop)
        self.needs_processing = asyncio.Event(loop=app.loop)
        while True:
            await self.needs_processing.wait()
            self.needs_processing.clear()
            if self.needs_processing_timer is not None:
                self.needs_processing_timer.cancel()
                self.needs_processing_timer = None
            async with self.queue_lock:
                if self.queue:
                    longest_wait = app.loop.time() - self.queue[0]["time"]
                else:  # oops
                    longest_wait = None
                logger.debug("launching processing. queue size: {}. longest wait: {}".format(len(self.queue), longest_wait))
                to_process = self.queue[:MAX_BATCH_SIZE]
                del self.queue[:len(to_process)]
                self.schedule_processing_if_needed()
            if not to_process:
                continue
            # so here we copy, it would be neater to avoid this
            batch = {}
            for t in to_process:
                for key, val in t["input"].items():
                    batch[key] = val
            
            batch = {
                k: np.array(v).reshape(len(v),) for k, v in batch.items()
            }
            # we could delete inputs here...

            result = await app.loop.run_in_executor(
                None, functools.partial(self.run_model, batch)
            )
            for t, r in zip(to_process, result):
                t["output"] = r
                t["done_event"].set()
            del to_process

style_transfer_runner = ModelRunner()

@app.route('/predict', methods=['POST'])
async def predict(request):
    
    data = request.json
    output = await style_transfer_runner.process_input(data)
    return json({"prediction": str(output)})
    
app.add_task(style_transfer_runner.model_runner())
if __name__=="__main__":
    app.run(host="0.0.0.0", port=8000,debug=True)

Sanic version: 21.6.0
pytest-sanic: latest version
TensorFlow version: 2.7.0
best_model_HDF5_format.h5 - Tensorflow Keras saved model (Happy to send this file if required)

I would be highly obliged if you could please look into this. Thank you.

This does not look to be a problem with Sanic. I’m not sure what the question is. Have you tried to simplify your code, there is a lot going on?