Spaces:
Running
Running
import supervision as sv | |
from ultralytics import YOLO | |
import cv2 | |
import numpy as np | |
from fastapi import FastAPI, File, UploadFile | |
from fastapi.responses import JSONResponse, Response | |
import uvicorn | |
import logging | |
from datetime import datetime | |
import os | |
import time | |
from collections import defaultdict | |
# Ensure the logs directory exists | |
if not os.path.exists("logs"): | |
os.makedirs("logs") | |
app = FastAPI() | |
# Load the exported ONNX model | |
onnx_model = YOLO("models/best-data-v5.onnx", task="detect") | |
# Define the logging configuration | |
LOGGING_CONFIG = { | |
"version": 1, | |
"disable_existing_loggers": False, | |
"formatters": { | |
"default": { | |
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
}, | |
"access": { | |
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
}, | |
}, | |
"handlers": { | |
"default": { | |
"formatter": "default", | |
"class": "logging.StreamHandler", | |
"stream": "ext://sys.stdout", | |
}, | |
"file": { | |
"formatter": "default", | |
"class": "logging.FileHandler", | |
"filename": f"logs/{datetime.now().strftime('%Y-%m-%d')}.log", | |
"mode": "a", | |
}, | |
"access": { | |
"formatter": "access", | |
"class": "logging.StreamHandler", | |
"stream": "ext://sys.stdout", | |
}, | |
}, | |
"loggers": { | |
"": { | |
"handlers": ["default", "file"], | |
"level": "INFO", | |
}, | |
"uvicorn.access": { | |
"handlers": ["access", "file"], | |
"level": "INFO", | |
"propagate": False, | |
}, | |
"ultralytics": { | |
"handlers": ["default", "file"], | |
"level": "INFO", | |
"propagate": False, | |
}, | |
} | |
} | |
# Apply the logging configuration | |
logging.config.dictConfig(LOGGING_CONFIG) | |
# def parse_detection(detections): | |
# parsed_rows = [] | |
# for i in range(len(detections.xyxy)): | |
# x_min = float(detections.xyxy[i][0]) | |
# y_min = float(detections.xyxy[i][1]) | |
# x_max = float(detections.xyxy[i][2]) | |
# y_max = float(detections.xyxy[i][3]) | |
# width = int(x_max - x_min) | |
# height = int(y_max - y_min) | |
# row = { | |
# "top": int(y_min), | |
# "left": int(x_min), | |
# "width": width, | |
# "height": height, | |
# "class_id": "" | |
# if detections.class_id is None | |
# else int(detections.class_id[i]), | |
# "confidence": "" | |
# if detections.confidence is None | |
# else float(detections.confidence[i]), | |
# "tracker_id": "" | |
# if detections.tracker_id is None | |
# else int(detections.tracker_id[i]), | |
# } | |
# if hasattr(detections, "data"): | |
# for key, value in detections.data.items(): | |
# row[key] = ( | |
# str(value[i]) | |
# if hasattr(value, "__getitem__") and value.ndim != 0 | |
# else str(value) | |
# ) | |
# parsed_rows.append(row) | |
# return parsed_rows | |
# # Run inference | |
# def callback(image_slice: np.ndarray) -> sv.Detections: | |
# # logging.info("Running callback for image slice") | |
# results = onnx_model(image_slice)[0] | |
# return sv.Detections.from_ultralytics(results) | |
# def infer(image): | |
# start_time = time.time() | |
# image_arr = np.frombuffer(image, np.uint8) | |
# image = cv2.imdecode(image_arr, cv2.IMREAD_COLOR) | |
# image = cv2.resize(image, (1920, 1920)) | |
# results = onnx_model(image)[0] | |
# # detections = sv.Detections.from_ultralytics(results) | |
# slicer = sv.InferenceSlicer(callback=callback, slice_wh=(640, 640)) | |
# detections = slicer(image=image) | |
# logging.info("Completed slicing and detection") | |
# parsed_rows = parse_detection(detections) | |
# # Count the occurrences of each class | |
# class_counts = defaultdict(int) | |
# for detection in parsed_rows: | |
# class_name = detection.get("class_name", "Unknown") | |
# class_counts[class_name] += 1 | |
# summary_info = ", ".join( | |
# [f"{count} {class_name}" for class_name, count in class_counts.items()] | |
# ) | |
# logging.info(f"Summary info: {summary_info}") | |
# logging.info(f"Run time: {time.time() - start_time:.2f} seconds") | |
# # label_annotator = sv.LabelAnnotator(text_color=sv.Color.BLACK) | |
# bounding_box_annotator = sv.BoundingBoxAnnotator(thickness=4) | |
# annotated_image = image.copy() | |
# annotated_image = bounding_box_annotator.annotate(scene=annotated_image, detections=detections) | |
# # annotated_image = label_annotator.annotate(scene=annotated_image, detections=detections) | |
# # logging.info("Annotated image") | |
# return annotated_image, parsed_rows | |
def parse_detection(detections, scale_x, scale_y): | |
parsed_rows = [] | |
for i in range(len(detections.xyxy)): | |
# Rescale the coordinates to match the original image size | |
x_min = float(detections.xyxy[i][0]) / scale_x | |
y_min = float(detections.xyxy[i][1]) / scale_y | |
x_max = float(detections.xyxy[i][2]) / scale_x | |
y_max = float(detections.xyxy[i][3]) / scale_y | |
width = int(x_max - x_min) | |
height = int(y_max - y_min) | |
row = { | |
"top": int(y_min), | |
"left": int(x_min), | |
"width": width, | |
"height": height, | |
"class_id": "" | |
if detections.class_id is None | |
else int(detections.class_id[i]), | |
"confidence": "" | |
if detections.confidence is None | |
else float(detections.confidence[i]), | |
"tracker_id": "" | |
if detections.tracker_id is None | |
else int(detections.tracker_id[i]), | |
} | |
if hasattr(detections, "data"): | |
for key, value in detections.data.items(): | |
row[key] = ( | |
str(value[i]) | |
if hasattr(value, "__getitem__") and value.ndim != 0 | |
else str(value) | |
) | |
parsed_rows.append(row) | |
return parsed_rows | |
# Run inference | |
def callback(image_slice: np.ndarray) -> sv.Detections: | |
# logging.info("Running callback for image slice") | |
results = onnx_model(image_slice)[0] | |
return sv.Detections.from_ultralytics(results) | |
def infer(image): | |
start_time = time.time() | |
image_arr = np.frombuffer(image, np.uint8) | |
image = cv2.imdecode(image_arr, cv2.IMREAD_COLOR) | |
# Get original dimensions | |
original_height, original_width = image.shape[:2] | |
# Resize image for detection | |
target_size = 1920 | |
image = cv2.resize(image, (target_size, target_size)) | |
# Compute scale factors | |
scale_x = target_size / original_width | |
scale_y = target_size / original_height | |
# Run model | |
results = onnx_model(image)[0] | |
# Using slicer for detection | |
slicer = sv.InferenceSlicer(callback=callback, slice_wh=(640, 640)) | |
detections = slicer(image=image) | |
logging.info("Completed slicing and detection") | |
# Parse detections and adjust coordinates to original size | |
parsed_rows = parse_detection(detections, scale_x, scale_y) | |
# Count the occurrences of each class | |
class_counts = defaultdict(int) | |
for detection in parsed_rows: | |
class_name = detection.get("class_name", "Unknown") | |
class_counts[class_name] += 1 | |
summary_info = ", ".join( | |
[f"{count} {class_name}" for class_name, count in class_counts.items()] | |
) | |
logging.info(f"Summary info: {summary_info}") | |
logging.info(f"Run time: {time.time() - start_time:.2f} seconds") | |
# Annotate the resized image | |
bounding_box_annotator = sv.BoundingBoxAnnotator(thickness=4) | |
annotated_image = image.copy() | |
annotated_image = bounding_box_annotator.annotate(scene=annotated_image, detections=detections) | |
# Resize the annotated image back to original dimensions | |
annotated_image = cv2.resize(annotated_image, (original_width, original_height)) | |
# Return the resized annotated image and parsed detection results | |
return annotated_image, parsed_rows | |
async def process_image(image: UploadFile = File(...), draw_boxes: bool = False): | |
filename = image.filename | |
logging.info(f"Received process-image request for file: {filename}") | |
image_data = await image.read() | |
annotated_image, results = infer(image_data) | |
if draw_boxes: | |
_, img_encoded = cv2.imencode('.jpg', annotated_image) | |
logging.info("Returning annotated image") | |
return Response(content=img_encoded.tobytes(), media_type="image/jpeg") | |
logging.info("Returning JSON results") | |
return JSONResponse(content=results) | |
def hello_world(): | |
return 'Hello, World!' | |
if __name__ == "__main__": | |
uvicorn.run("main:app", port=8001, reload=True) | |