Flask
Flask is a micro web framework written in Python. It is easy and fast to implement. How is it relevant to AI? Sometimes, it might be necessary to run models in the a server or cloud, and the only way is to wrap the model in a web application.
The input containing the data for predicting & the model parameters, aka Request
will be send to this API containing the model which does the prediction, and the Response
containing the results will be returned. Flask is the most popular library for such a task.
Simple Flask App
Below is a simple flask app to serve an ML model's prediction. Assuming this app is named serve_http.py
, we can launch this flask app locally via python serve_http.py
. The API can be accessed via http://localhost:5000/
.
It is important to set the host="0.0.0.0"
, so that it binds to all network interfaces of the container, and will be callable from the outside.
"""flask app for model prediction"""
import traceback
from flask import Flask, request
from predict import detectObj
from utils_serve import array2json, from_base64
app = Flask(__name__)
@app.route("/", methods=["POST"])
def get_predictions():
"""Returns pred output in json"""
try:
req_json = request.json
# get image array
encodedImage = req_json["requests"][0]["image"]["content"]
decodedImage = from_base64(encodedImage)
# get input arguments
features = req_json["requests"][0]["features"][0]
min_height = features.get("min_height") or 0.03
min_width = features.get("min_width") or 0.03
maxResults = features.get("maxResults") or 20
score_th = features.get("score_th") or 0.3
nms_iou = features.get("nms_iou") # using .get will return None if request does not include key-value
# get pred-output
pred_bbox = detectObj(decodedImage,
min_height,
min_width,
maxResults,
score_th,
nms_iou)
# format to response json output
json_output = array2json(pred_bbox, class_mapper)
return json_output
except Exception as e:
tb = traceback.format_exc()
app.logger.error(tb)
return {"errorMessages": tb.replace("\n","")}
if __name__ == '__main__':
app.run(host="0.0.0.0")
Async
From Flask>=2.0
onwards, it supports async syntax with the installation of pip install Flask[async]
. It should be noted to use async only in I/O bound tasks which takes less than a few seconds to process. An excellent description from here. It works similarly & with the same response time as multi-threading.
Synchronous
Take for example this synchronous REST app that is requesting data from multiple urls. The response time is 1.024s.
import json
import requests
from flask import Flask, request
app = Flask(__name__)
api_urls = \
{"sml": "http://localhost:5001/test",
"asc": "http://localhost:5002/test",
"trd": "http://localhost:5003/test",
"psn": "http://localhost:5004/test"}
def call_recommender_sync(rre, api_urls):
"""call individual recommenders & get predictions"""
url = api_urls[rre]
data = {"resultSize": 1}
prediction = requests.post(url, json=data).content
prediction = json.loads(prediction)
return (prediction, rre)
@app.post("/recommendation")
def fusion_api():
# synchronous, 1.024s ---------
concat_list = []
for rre in api_urls.keys():
predictions = call_recommender_sync(rre, api_urls)
concat_list.append(predictions)
return {"result": str(concat_list)}
if __name__ == "__main__":
app.run(port=5000, debug=True)
Asynchronous
To write in asynchronous code to send multiple requests, we need to use aiohttp
in place of requests
, and aiohttp
to gather the results in a list. aiohttp
is a non-blocking program, which allow other threads to continue running while it's waiting. The appropriate async
and await
syntax needs to be added too. The response time is around 0.356s, x2.87.
import asyncio
import json
import requests
from aiohttp
from flask import Flask, request
app = Flask(__name__)
api_urls = \
{"sml": "http://localhost:5001/test",
"asc": "http://localhost:5002/test",
"trd": "http://localhost:5003/test",
"psn": "http://localhost:5004/test"}
async def call_recommender_async(rre, api_urls):
"""call individual recommenders & get predictions"""
url = api_urls[rre]
data = {"resultSize": 1}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=data) as resp:
prediction = await resp.json()
return (prediction, rre)
@app.post("/recommendation")
async def fusion_api():
# asynchronous, 0.356s, x2.87 ---------
concat_list = []
for rre in api_urls.keys():
predictions = asyncio.create_task(call_recommender_async(rre, api_urls))
concat_list.append(predictions)
concat_list = await asyncio.gather(*concat_list)
return {"result": str(concat_list)}
if __name__ == "__main__":
app.run(port=5000)
Multi-Threading
We can use concurrent.futures
to send requests using multi-threading. The response time is similar, 0.359s, x2.85 faster.
This is because of Python's Global Interpretor Lock (GIL), whereby only one thread can run one time. Python uses thread-switching to change to another thread to start another task, rendering multi-threading as an async, not parallel process.
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
from flask import Flask, request
app = Flask(__name__)
api_urls = \
{"sml": "http://localhost:5001/test",
"asc": "http://localhost:5002/test",
"trd": "http://localhost:5003/test",
"psn": "http://localhost:5004/test"}
def call_recommender(rre, api_urls):
"""call individual recommenders & get predictions"""
url = api_urls[rre]
data = {"resultSize": 1}
prediction = requests.post(url, json=data).content
prediction = json.loads(prediction)
return (prediction, rre)
def multithread(api_urls):
futures = []
with ThreadPoolExecutor(max_workers=4) as executor:
for rre in api_urls.keys():
futures.append(executor.submit(call_recommender, rre, api_urls))
return [future.result() for future in as_completed(futures)]
@app.post("/recommendation")
def fusion_api():
# multi-threading, 0.359s, x2.85 ---------
concat_list = multithread(api_urls)
return {"result": str(concat_list)}
if __name__ == "__main__":
app.run(port=5000, debug=True)
Gunicorn
Flask as a server is meant for development, as it tries to remind you everytime you launch it, giving the message WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
It has very limited parameters to manage the server, but luckily wrappers are available to connect Flask to a feature rich web server. One of the best is Gunicorn; a mature, fully featured server and process manager. It allows automated worker production and management through simple configurations.
Command
# gunicorn --bind <flask-ip>:<flask-port> <flask-script>:<flask-app>
gunicorn --bind 0.0.0.0:5000 serve_http:app
Config File
Rather than entering all the configs when launching gunicorn, we can hard code some of them in a config file gunicorn.conf.py
. With this, we can adjust the workers based on the machine's cores. Gunicorn's documentation recommend the number of workers to be set as (total-cpu * 2) + 1
. One of the most important config besides workers is the preload
, this allows the preloading of your model in memory and shared among all the workers. If not, each of your worker will load the model separately and consume a lot, if not all the RAM in the machine.
# gunicorn.conf.py
# to see all flask stdout, we can change the log level to debug
import multiprocessing
workers = (multiprocessing.cpu_count() * 2) + 1
preload_app = True
log_level = info
timeout = 10
We can also use the gevent
or gthread
to implement concurrency if there are significant I/O blocking bottlenecks. For the latter, this is done by “monkey patching” the code, mainly replacing blocking parts with compatible cooperative counterparts from gevent package. See this article for more information.
Logging
Using the default logging library does not automatically appear in the gunicorn logs. To do that, you have to use print("<something>", flush=True)
.
Testing
Python Requests
The python requests
library provides a convenient function to test your model API. Its function is basically just a one-liner, e.g. response = requests.post(url, headers=token, json=json_data)
, but below provides a complete script on how to use it with a JSON request to send over an image with model parameters.
"""python template to send request to ai-microservice"""
import base64
import json
import requests
json_template = \
{
"requests": [
{
"features": [
{
"score_th": None,
"nms_iou": None
}
],
"image": {
"content": None
}
}
]
}
def send2api(url, token, image, score_th=0.35, nms_iou=0.40):
"""Sends JSON request to AI-microservice and recieve a JSON response"""
base64_bytes = base64.b64encode(image.read()).decode("utf-8")
json_template["requests"][0]["image"]["content"] = base64_bytes
json_template["requests"][0]["features"][0]["score_th"] = score_th
json_template["requests"][0]["features"][0]["nms_iou"] = nms_iou
token = {"X-Api-Token": token}
response = requests.post(url, headers=token, json=json_template)
json_response = response.content
j = json.loads(json_response)
return j
if __name__ == "__main__":
url = "http://localhost:5000/"
token = "xxx"
image_path = "sample_images/20200312_171718.jpg"
image = open(image_path, 'rb')
j = send2api(url, token, image)
print(j)
Postman
Postman is a popular GUI to easily send requests and see the responses.
CURL
We can also use CURL in the terminal for commandline sending of requests.
Here’s a simple test to see the API works, without sending the data.
curl --request POST localhost:5000/api
Here’s one complete request with data
curl --header "Content-Type: application/json" \
--request POST \
--data '{"username":"xyz","password":"xyz"}' \
http://localhost:5000/api
To run multiple requests in parallel for stress testing
curl --header "Content-Type: application/json" \
--request POST \
--data '{"username":"xyz","password":"xyz"}' \
http://localhost:5000/api &
curl --header "Content-Type: application/json" \
--request POST \
--data '{"username":"xyz","password":"xyz"}' \
http://localhost:5000/api &
curl --header "Content-Type: application/json" \
--request POST \
--data '{"username":"xyz","password":"xyz"}' \
http://localhost:5000/api &
wait
OpenAPI
OpenAPI is a standard API documentation specification in ymal or json format, originated from Swagger. It usually comes with a user interface, the most popular being SwaggerUI.
It provides all the information required about the API, with also an ability to test the API itself.
There are three ways to go about this.
- generated separately as a single ymal file, and hosted using connexion
- generate as docstrings or individual ymal file for each endpoint using flasgger
- auto-generated using defined schemas, and adding additional info within the Flask app using Flask-Pydantic-Spec
Personally, I believe the first is the most realistic, as the API specs are usually defined before the Flask app is created, and that doc can be sent to others for verification without creating a Flask app.
Below is an example script for point 1.
import connexion
from flask import request
from predict import prediction
app = connexion.App(__name__, specification_dir='.')
@app.route("/predict", methods=["POST"])
def predict():
JScontent = request.json
img = JScontent["image"]
response = prediction(img)
return response
if __name__ == "__main__":
app.add_api('openapi.yml')
app.run(host="0.0.0.0")
And point 3.
"""flask server with pydantic validation & openapi integration"""
from typing import List
from flask import Flask, request
from flask_pydantic_spec import FlaskPydanticSpec, Request, Response
from pydantic import BaseModel, Field, confloat
from predict import prediction
app = Flask(__name__)
api = FlaskPydanticSpec("flask", title="Objection Detection", version="v1.0.0")
class RequestSchema(BaseModel):
maxResults: int = Field(None, example=20, description="Maximum detection result to return")
min_height: float = Field(None, example=0.3, description="Score")
min_width: float = Field(None, example=0.3, description="Score")
score_th: float = Field(None, example=0.3, description="Score")
nms_iou: float = Field(..., example=0.4, description="Non-max suppression, intersection over union")
type: str = Field(..., example="safetycone", description="name of object to detect")
image: str = Field(..., description="base64-encoded-image")
class _normalizedVertices(BaseModel):
x: float = Field(..., example=5.12, description="X-coordinate")
y: float = Field(..., example=20.56, description="Y-coordinate")
width: int = Field(..., example=500, description="width in pixel")
height: int = Field(..., example=600, description="height in pixel")
score: confloat(gt=0.0, lt=1.0) = Field(..., example=0.79, description="confidence score")
class ResponseSchema(BaseModel):
normalizedVertices: List[_normalizedVertices]
@app.route("/predict", methods=["POST"])
@api.validate(
body=Request(RequestSchema),
resp=Response(HTTP_200=ResponseSchema),
tags=["API Name"]
)
def get_predictions():
"""Short description of endpoint
Long description of endpoint"""
JScontent = request.json
img = JScontent["image"]
response = prediction(img)
return response
if __name__ == "__main__":
api.register(app)
app.run(host="0.0.0.0")
You can refer to my repo for the full example.