Hi guys,
Thank you in advance.
Getting following error:
test_endpoint.py [2021-12-31 18:46:12 +0000] [33939] [INFO] Goin' Fast @ http://127.0.0.1:62918
[2021-12-31 18:46:12 +0000] [33939] [INFO] Goin' Fast @ http://127.0.0.1:62919
[2021-12-31 18:46:12 +0000] [33939] [ERROR] Exception occurred while handling uri: 'http://127.0.0.1:62919/predict'
Traceback (most recent call last):
File "/Users/vedantruparelia/workspace/tensorflow-test/env/lib/python3.9/site-packages/sanic/app.py", line 770, in handle_request
response = await response
File "/Users/vedantruparelia/workspace/python_projects/tf_sanic_example/request_batching_server.py", line 106, in predict
output = await style_transfer_runner.process_input(data)
File "/Users/vedantruparelia/workspace/python_projects/tf_sanic_example/request_batching_server.py", line 48, in process_input
async with self.queue_lock:
AttributeError: __aenter__
[2021-12-31 18:46:12 +0000] - (sanic.access)[INFO][127.0.0.1:62920]: POST http://127.0.0.1:62919/predict 500 735
<Response [500 Internal Server Error]>
Using the following code which I have taken from https://github.com/deep-learning-with-pytorch/dlwpt-code/blob/d6c0210143daa133bbdeddaffc8993b1e17b5174/p3ch15/request_batching_server.py:
import asyncio
import functools
import tensorflow as tf
from sanic import Sanic
from sanic.response import json
from sanic.log import logger
import numpy as np
app = Sanic("test-server")
MAX_QUEUE_SIZE = 3 # we accept a backlog of MAX_QUEUE_SIZE before handing out "Too busy" errors
MAX_BATCH_SIZE = 2 # we put at most MAX_BATCH_SIZE things in a single batch
MAX_WAIT = 1 # we wait at most MAX_WAIT seconds before running for more inputs to arrive in batching
class HandlingError(Exception):
def __init__(self, msg, code=500):
super().__init__()
self.handling_code = code
self.handling_msg = msg
class ModelRunner:
def __init__(self):
self.queue = []
self.queue_lock = None
self.model = tf.keras.models.load_model("best_model_HDF5_format.h5")
self.needs_processing = None
self.needs_processing_timer = None
def schedule_processing_if_needed(self):
if len(self.queue) >= MAX_BATCH_SIZE:
logger.debug("next batch ready when processing a batch")
self.needs_processing.set()
elif self.queue:
logger.debug("queue nonempty when processing a batch, setting next timer")
self.needs_processing_timer = app.loop.call_at(self.queue[0]["time"] + MAX_WAIT, self.needs_processing.set)
async def process_input(self, input):
our_task = {"done_event": asyncio.Event(loop=app.loop),
"input": input,
"time": app.loop.time()}
async with self.queue_lock:
if len(self.queue) >= MAX_QUEUE_SIZE:
raise HandlingError("I'm too busy", code=503)
self.queue.append(our_task)
logger.debug("enqueued task. new queue size {}".format(len(self.queue)))
self.schedule_processing_if_needed()
await our_task["done_event"].wait()
return our_task["output"]
def run_model(self, batch): # runs in other thread
return self.model.predict_on_batch(batch['a'])
async def model_runner(self):
self.queue_lock = asyncio.Lock(loop=app.loop)
self.needs_processing = asyncio.Event(loop=app.loop)
while True:
await self.needs_processing.wait()
self.needs_processing.clear()
if self.needs_processing_timer is not None:
self.needs_processing_timer.cancel()
self.needs_processing_timer = None
async with self.queue_lock:
if self.queue:
longest_wait = app.loop.time() - self.queue[0]["time"]
else: # oops
longest_wait = None
logger.debug("launching processing. queue size: {}. longest wait: {}".format(len(self.queue), longest_wait))
to_process = self.queue[:MAX_BATCH_SIZE]
del self.queue[:len(to_process)]
self.schedule_processing_if_needed()
if not to_process:
continue
# so here we copy, it would be neater to avoid this
batch = {}
for t in to_process:
for key, val in t["input"].items():
batch[key] = val
batch = {
k: np.array(v).reshape(len(v),) for k, v in batch.items()
}
# we could delete inputs here...
result = await app.loop.run_in_executor(
None, functools.partial(self.run_model, batch)
)
for t, r in zip(to_process, result):
t["output"] = r
t["done_event"].set()
del to_process
style_transfer_runner = ModelRunner()
@app.route('/predict', methods=['POST'])
async def predict(request):
data = request.json
output = await style_transfer_runner.process_input(data)
return json({"prediction": str(output)})
app.add_task(style_transfer_runner.model_runner())
if __name__=="__main__":
app.run(host="0.0.0.0", port=8000,debug=True)
Sanic version: 21.6.0
pytest-sanic: latest version
TensorFlow version: 2.7.0
best_model_HDF5_format.h5
- Tensorflow Keras saved model (Happy to send this file if required)
I would be highly obliged if you could please look into this. Thank you.