Spaces:
Running
Running
File size: 8,903 Bytes
ca91016 d2e5b70 ca91016 d2e5b70 ca91016 d2e5b70 ca91016 d2e5b70 ca91016 d2e5b70 ca91016 d2e5b70 ca91016 d2e5b70 ca91016 d2e5b70 ca91016 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 |
import supervision as sv
from ultralytics import YOLO
import cv2
import numpy as np
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import JSONResponse, Response
import uvicorn
import logging
from datetime import datetime
import os
import time
from collections import defaultdict
# Ensure the logs directory exists
if not os.path.exists("logs"):
os.makedirs("logs")
app = FastAPI()
# Load the exported ONNX model
onnx_model = YOLO("models/best-data-v5.onnx", task="detect")
# Define the logging configuration
LOGGING_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"default": {
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
},
"access": {
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
},
},
"handlers": {
"default": {
"formatter": "default",
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout",
},
"file": {
"formatter": "default",
"class": "logging.FileHandler",
"filename": f"logs/{datetime.now().strftime('%Y-%m-%d')}.log",
"mode": "a",
},
"access": {
"formatter": "access",
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout",
},
},
"loggers": {
"": {
"handlers": ["default", "file"],
"level": "INFO",
},
"uvicorn.access": {
"handlers": ["access", "file"],
"level": "INFO",
"propagate": False,
},
"ultralytics": {
"handlers": ["default", "file"],
"level": "INFO",
"propagate": False,
},
}
}
# Apply the logging configuration
logging.config.dictConfig(LOGGING_CONFIG)
# def parse_detection(detections):
# parsed_rows = []
# for i in range(len(detections.xyxy)):
# x_min = float(detections.xyxy[i][0])
# y_min = float(detections.xyxy[i][1])
# x_max = float(detections.xyxy[i][2])
# y_max = float(detections.xyxy[i][3])
# width = int(x_max - x_min)
# height = int(y_max - y_min)
# row = {
# "top": int(y_min),
# "left": int(x_min),
# "width": width,
# "height": height,
# "class_id": ""
# if detections.class_id is None
# else int(detections.class_id[i]),
# "confidence": ""
# if detections.confidence is None
# else float(detections.confidence[i]),
# "tracker_id": ""
# if detections.tracker_id is None
# else int(detections.tracker_id[i]),
# }
# if hasattr(detections, "data"):
# for key, value in detections.data.items():
# row[key] = (
# str(value[i])
# if hasattr(value, "__getitem__") and value.ndim != 0
# else str(value)
# )
# parsed_rows.append(row)
# return parsed_rows
# # Run inference
# def callback(image_slice: np.ndarray) -> sv.Detections:
# # logging.info("Running callback for image slice")
# results = onnx_model(image_slice)[0]
# return sv.Detections.from_ultralytics(results)
# def infer(image):
# start_time = time.time()
# image_arr = np.frombuffer(image, np.uint8)
# image = cv2.imdecode(image_arr, cv2.IMREAD_COLOR)
# image = cv2.resize(image, (1920, 1920))
# results = onnx_model(image)[0]
# # detections = sv.Detections.from_ultralytics(results)
# slicer = sv.InferenceSlicer(callback=callback, slice_wh=(640, 640))
# detections = slicer(image=image)
# logging.info("Completed slicing and detection")
# parsed_rows = parse_detection(detections)
# # Count the occurrences of each class
# class_counts = defaultdict(int)
# for detection in parsed_rows:
# class_name = detection.get("class_name", "Unknown")
# class_counts[class_name] += 1
# summary_info = ", ".join(
# [f"{count} {class_name}" for class_name, count in class_counts.items()]
# )
# logging.info(f"Summary info: {summary_info}")
# logging.info(f"Run time: {time.time() - start_time:.2f} seconds")
# # label_annotator = sv.LabelAnnotator(text_color=sv.Color.BLACK)
# bounding_box_annotator = sv.BoundingBoxAnnotator(thickness=4)
# annotated_image = image.copy()
# annotated_image = bounding_box_annotator.annotate(scene=annotated_image, detections=detections)
# # annotated_image = label_annotator.annotate(scene=annotated_image, detections=detections)
# # logging.info("Annotated image")
# return annotated_image, parsed_rows
def parse_detection(detections, scale_x, scale_y):
parsed_rows = []
for i in range(len(detections.xyxy)):
# Rescale the coordinates to match the original image size
x_min = float(detections.xyxy[i][0]) / scale_x
y_min = float(detections.xyxy[i][1]) / scale_y
x_max = float(detections.xyxy[i][2]) / scale_x
y_max = float(detections.xyxy[i][3]) / scale_y
width = int(x_max - x_min)
height = int(y_max - y_min)
row = {
"top": int(y_min),
"left": int(x_min),
"width": width,
"height": height,
"class_id": ""
if detections.class_id is None
else int(detections.class_id[i]),
"confidence": ""
if detections.confidence is None
else float(detections.confidence[i]),
"tracker_id": ""
if detections.tracker_id is None
else int(detections.tracker_id[i]),
}
if hasattr(detections, "data"):
for key, value in detections.data.items():
row[key] = (
str(value[i])
if hasattr(value, "__getitem__") and value.ndim != 0
else str(value)
)
parsed_rows.append(row)
return parsed_rows
# Run inference
def callback(image_slice: np.ndarray) -> sv.Detections:
# logging.info("Running callback for image slice")
results = onnx_model(image_slice)[0]
return sv.Detections.from_ultralytics(results)
def infer(image):
start_time = time.time()
image_arr = np.frombuffer(image, np.uint8)
image = cv2.imdecode(image_arr, cv2.IMREAD_COLOR)
# Get original dimensions
original_height, original_width = image.shape[:2]
# Resize image for detection
target_size = 1920
image = cv2.resize(image, (target_size, target_size))
# Compute scale factors
scale_x = target_size / original_width
scale_y = target_size / original_height
# Run model
results = onnx_model(image)[0]
# Using slicer for detection
slicer = sv.InferenceSlicer(callback=callback, slice_wh=(640, 640))
detections = slicer(image=image)
logging.info("Completed slicing and detection")
# Parse detections and adjust coordinates to original size
parsed_rows = parse_detection(detections, scale_x, scale_y)
# Count the occurrences of each class
class_counts = defaultdict(int)
for detection in parsed_rows:
class_name = detection.get("class_name", "Unknown")
class_counts[class_name] += 1
summary_info = ", ".join(
[f"{count} {class_name}" for class_name, count in class_counts.items()]
)
logging.info(f"Summary info: {summary_info}")
logging.info(f"Run time: {time.time() - start_time:.2f} seconds")
# Annotate the resized image
bounding_box_annotator = sv.BoundingBoxAnnotator(thickness=4)
annotated_image = image.copy()
annotated_image = bounding_box_annotator.annotate(scene=annotated_image, detections=detections)
# Resize the annotated image back to original dimensions
annotated_image = cv2.resize(annotated_image, (original_width, original_height))
# Return the resized annotated image and parsed detection results
return annotated_image, parsed_rows
@app.post("/process-image/")
async def process_image(image: UploadFile = File(...), draw_boxes: bool = False):
filename = image.filename
logging.info(f"Received process-image request for file: {filename}")
image_data = await image.read()
annotated_image, results = infer(image_data)
if draw_boxes:
_, img_encoded = cv2.imencode('.jpg', annotated_image)
logging.info("Returning annotated image")
return Response(content=img_encoded.tobytes(), media_type="image/jpeg")
logging.info("Returning JSON results")
return JSONResponse(content=results)
@app.get("/")
def hello_world():
return 'Hello, World!'
if __name__ == "__main__":
uvicorn.run("main:app", port=8001, reload=True)
|