vumichien's picture
Update main.py
d2e5b70 verified
import supervision as sv
from ultralytics import YOLO
import cv2
import numpy as np
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import JSONResponse, Response
import uvicorn
import logging
from datetime import datetime
import os
import time
from collections import defaultdict
# Ensure the logs directory exists
if not os.path.exists("logs"):
os.makedirs("logs")
app = FastAPI()
# Load the exported ONNX model
onnx_model = YOLO("models/best-data-v5.onnx", task="detect")
# Define the logging configuration
LOGGING_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"default": {
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
},
"access": {
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
},
},
"handlers": {
"default": {
"formatter": "default",
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout",
},
"file": {
"formatter": "default",
"class": "logging.FileHandler",
"filename": f"logs/{datetime.now().strftime('%Y-%m-%d')}.log",
"mode": "a",
},
"access": {
"formatter": "access",
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout",
},
},
"loggers": {
"": {
"handlers": ["default", "file"],
"level": "INFO",
},
"uvicorn.access": {
"handlers": ["access", "file"],
"level": "INFO",
"propagate": False,
},
"ultralytics": {
"handlers": ["default", "file"],
"level": "INFO",
"propagate": False,
},
}
}
# Apply the logging configuration
logging.config.dictConfig(LOGGING_CONFIG)
# def parse_detection(detections):
# parsed_rows = []
# for i in range(len(detections.xyxy)):
# x_min = float(detections.xyxy[i][0])
# y_min = float(detections.xyxy[i][1])
# x_max = float(detections.xyxy[i][2])
# y_max = float(detections.xyxy[i][3])
# width = int(x_max - x_min)
# height = int(y_max - y_min)
# row = {
# "top": int(y_min),
# "left": int(x_min),
# "width": width,
# "height": height,
# "class_id": ""
# if detections.class_id is None
# else int(detections.class_id[i]),
# "confidence": ""
# if detections.confidence is None
# else float(detections.confidence[i]),
# "tracker_id": ""
# if detections.tracker_id is None
# else int(detections.tracker_id[i]),
# }
# if hasattr(detections, "data"):
# for key, value in detections.data.items():
# row[key] = (
# str(value[i])
# if hasattr(value, "__getitem__") and value.ndim != 0
# else str(value)
# )
# parsed_rows.append(row)
# return parsed_rows
# # Run inference
# def callback(image_slice: np.ndarray) -> sv.Detections:
# # logging.info("Running callback for image slice")
# results = onnx_model(image_slice)[0]
# return sv.Detections.from_ultralytics(results)
# def infer(image):
# start_time = time.time()
# image_arr = np.frombuffer(image, np.uint8)
# image = cv2.imdecode(image_arr, cv2.IMREAD_COLOR)
# image = cv2.resize(image, (1920, 1920))
# results = onnx_model(image)[0]
# # detections = sv.Detections.from_ultralytics(results)
# slicer = sv.InferenceSlicer(callback=callback, slice_wh=(640, 640))
# detections = slicer(image=image)
# logging.info("Completed slicing and detection")
# parsed_rows = parse_detection(detections)
# # Count the occurrences of each class
# class_counts = defaultdict(int)
# for detection in parsed_rows:
# class_name = detection.get("class_name", "Unknown")
# class_counts[class_name] += 1
# summary_info = ", ".join(
# [f"{count} {class_name}" for class_name, count in class_counts.items()]
# )
# logging.info(f"Summary info: {summary_info}")
# logging.info(f"Run time: {time.time() - start_time:.2f} seconds")
# # label_annotator = sv.LabelAnnotator(text_color=sv.Color.BLACK)
# bounding_box_annotator = sv.BoundingBoxAnnotator(thickness=4)
# annotated_image = image.copy()
# annotated_image = bounding_box_annotator.annotate(scene=annotated_image, detections=detections)
# # annotated_image = label_annotator.annotate(scene=annotated_image, detections=detections)
# # logging.info("Annotated image")
# return annotated_image, parsed_rows
def parse_detection(detections, scale_x, scale_y):
parsed_rows = []
for i in range(len(detections.xyxy)):
# Rescale the coordinates to match the original image size
x_min = float(detections.xyxy[i][0]) / scale_x
y_min = float(detections.xyxy[i][1]) / scale_y
x_max = float(detections.xyxy[i][2]) / scale_x
y_max = float(detections.xyxy[i][3]) / scale_y
width = int(x_max - x_min)
height = int(y_max - y_min)
row = {
"top": int(y_min),
"left": int(x_min),
"width": width,
"height": height,
"class_id": ""
if detections.class_id is None
else int(detections.class_id[i]),
"confidence": ""
if detections.confidence is None
else float(detections.confidence[i]),
"tracker_id": ""
if detections.tracker_id is None
else int(detections.tracker_id[i]),
}
if hasattr(detections, "data"):
for key, value in detections.data.items():
row[key] = (
str(value[i])
if hasattr(value, "__getitem__") and value.ndim != 0
else str(value)
)
parsed_rows.append(row)
return parsed_rows
# Run inference
def callback(image_slice: np.ndarray) -> sv.Detections:
# logging.info("Running callback for image slice")
results = onnx_model(image_slice)[0]
return sv.Detections.from_ultralytics(results)
def infer(image):
start_time = time.time()
image_arr = np.frombuffer(image, np.uint8)
image = cv2.imdecode(image_arr, cv2.IMREAD_COLOR)
# Get original dimensions
original_height, original_width = image.shape[:2]
# Resize image for detection
target_size = 1920
image = cv2.resize(image, (target_size, target_size))
# Compute scale factors
scale_x = target_size / original_width
scale_y = target_size / original_height
# Run model
results = onnx_model(image)[0]
# Using slicer for detection
slicer = sv.InferenceSlicer(callback=callback, slice_wh=(640, 640))
detections = slicer(image=image)
logging.info("Completed slicing and detection")
# Parse detections and adjust coordinates to original size
parsed_rows = parse_detection(detections, scale_x, scale_y)
# Count the occurrences of each class
class_counts = defaultdict(int)
for detection in parsed_rows:
class_name = detection.get("class_name", "Unknown")
class_counts[class_name] += 1
summary_info = ", ".join(
[f"{count} {class_name}" for class_name, count in class_counts.items()]
)
logging.info(f"Summary info: {summary_info}")
logging.info(f"Run time: {time.time() - start_time:.2f} seconds")
# Annotate the resized image
bounding_box_annotator = sv.BoundingBoxAnnotator(thickness=4)
annotated_image = image.copy()
annotated_image = bounding_box_annotator.annotate(scene=annotated_image, detections=detections)
# Resize the annotated image back to original dimensions
annotated_image = cv2.resize(annotated_image, (original_width, original_height))
# Return the resized annotated image and parsed detection results
return annotated_image, parsed_rows
@app.post("/process-image/")
async def process_image(image: UploadFile = File(...), draw_boxes: bool = False):
filename = image.filename
logging.info(f"Received process-image request for file: {filename}")
image_data = await image.read()
annotated_image, results = infer(image_data)
if draw_boxes:
_, img_encoded = cv2.imencode('.jpg', annotated_image)
logging.info("Returning annotated image")
return Response(content=img_encoded.tobytes(), media_type="image/jpeg")
logging.info("Returning JSON results")
return JSONResponse(content=results)
@app.get("/")
def hello_world():
return 'Hello, World!'
if __name__ == "__main__":
uvicorn.run("main:app", port=8001, reload=True)