OMG / inference /enterprise /device_manager /container_service.py
Fucius's picture
Upload 422 files
df6c67d verified
import base64
import time
from dataclasses import dataclass
from datetime import datetime
import requests
import docker
from inference.core.cache import cache
from inference.core.env import METRICS_INTERVAL
from inference.core.logger import logger
from inference.core.utils.image_utils import load_image_rgb
from inference.enterprise.device_manager.helpers import get_cache_model_items
@dataclass
class InferServerContainer:
status: str
id: str
port: int
host: str
startup_time: float
version: str
def __init__(self, docker_container, details):
self.container = docker_container
self.status = details.get("status")
self.id = details.get("uuid")
self.port = details.get("port")
self.host = details.get("host")
self.version = details.get("version")
t = details.get("startup_time_ts").split(".")[0]
self.startup_time = (
datetime.strptime(t, "%Y-%m-%dT%H:%M:%S").timestamp()
if t is not None
else datetime.now().timestamp()
)
def kill(self):
try:
self.container.kill()
return True, None
except Exception as e:
logger.error(e)
return False, None
def restart(self):
try:
self.container.restart()
return True, None
except Exception as e:
logger.error(e)
return False, None
def stop(self):
try:
self.container.stop()
return True, None
except Exception as e:
logger.error(e)
return False, None
def start(self):
try:
self.container.start()
return True, None
except Exception as e:
logger.error(e)
return False, None
def inspect(self):
try:
info = requests.get(f"http://{self.host}:{self.port}/info").json()
return True, info
except Exception as e:
logger.error(e)
return False, None
def snapshot(self):
try:
snapshot = self.get_latest_inferred_images()
snapshot.update({"container_id": self.id})
return True, snapshot
except Exception as e:
logger.error(e)
return False, None
def get_latest_inferred_images(self, max=4):
"""
Retrieve the latest inferred images and associated information for this container.
This method fetches the most recent inferred images within the time interval defined by METRICS_INTERVAL.
Args:
max (int, optional): The maximum number of inferred images to retrieve.
Defaults to 4.
Returns:
dict: A dictionary where each key represents a model ID associated with this
container, and the corresponding value is a list of dictionaries containing
information about the latest inferred images. Each dictionary has the following keys:
- "image" (str): The base64-encoded image data.
- "dimensions" (dict): Image dimensions (width and height).
- "predictions" (list): A list of predictions or results associated with the image.
Notes:
- This method uses the global constant METRICS_INTERVAL to specify the time interval.
"""
now = time.time()
start = now - METRICS_INTERVAL
api_keys = get_cache_model_items().get(self.id, dict()).keys()
model_ids = []
for api_key in api_keys:
mids = get_cache_model_items().get(self.id, dict()).get(api_key, [])
model_ids.extend(mids)
num_images = 0
latest_inferred_images = dict()
for model_id in model_ids:
if num_images >= max:
break
latest_reqs = cache.zrangebyscore(
f"inference:{self.id}:{model_id}", min=start, max=now
)
for req in latest_reqs:
images = req["request"]["image"]
image_dims = req.get("response", {}).get("image", dict())
predictions = req.get("response", {}).get("predictions", [])
if images is None or len(images) == 0:
continue
if type(images) is not list:
images = [images]
for image in images:
value = None
if image["type"] == "base64":
value = image["value"]
else:
loaded_image = load_image_rgb(image)
image_bytes = loaded_image.tobytes()
image_base64 = base64.b64encode(image_bytes).decode("utf-8")
value = image_base64
if latest_inferred_images.get(model_id) is None:
latest_inferred_images[model_id] = []
inference = dict(
image=value, dimensions=image_dims, predictions=predictions
)
latest_inferred_images[model_id].append(inference)
num_images += 1
return latest_inferred_images
def get_startup_config(self):
"""
Get the startup configuration for this container.
Returns:
dict: A dictionary containing the startup configuration for this container.
"""
env_vars = self.container.attrs.get("Config", {}).get("Env", {})
port_bindings = self.container.attrs.get("HostConfig", {}).get(
"PortBindings", {}
)
detached = self.container.attrs.get("HostConfig", {}).get("Detached", False)
image = self.container.attrs.get("Config", {}).get("Image", "")
privileged = self.container.attrs.get("HostConfig", {}).get("Privileged", False)
labels = self.container.attrs.get("Config", {}).get("Labels", {})
env = []
for var in env_vars:
name, value = var.split("=")
env.append(f"{name}={value}")
return {
"env": env,
"port_bindings": port_bindings,
"detach": detached,
"image": image,
"privileged": privileged,
"labels": labels,
# TODO: add device requests
}
def is_inference_server_container(container):
"""
Checks if a container is an inference server container
Args:
container (any): A container object from the Docker SDK
Returns:
boolean: True if the container is an inference server container, False otherwise
"""
image_tags = container.image.tags
for t in image_tags:
if t.startswith("roboflow/roboflow-inference-server"):
return True
return False
def get_inference_containers():
"""
Discovers inference server containers running on the host
and parses their information into a list of InferServerContainer objects
"""
client = docker.from_env()
containers = client.containers.list()
inference_containers = []
for c in containers:
if is_inference_server_container(c):
details = parse_container_info(c)
info = {}
try:
info = requests.get(
f"http://{details['host']}:{details['port']}/info", timeout=3
).json()
except Exception as e:
logger.error(f"Failed to get info from container {c.id} {details} {e}")
details.update(info)
infer_container = InferServerContainer(c, details)
if len(inference_containers) == 0:
inference_containers.append(infer_container)
continue
for ic in inference_containers:
if ic.id == infer_container.id:
continue
inference_containers.append(infer_container)
return inference_containers
def parse_container_info(c):
"""
Parses the container information into a dictionary
Args:
c (any): Docker SDK Container object
Returns:
dict: A dictionary containing the container information
"""
env = c.attrs.get("Config", {}).get("Env", {})
info = {"container_id": c.id, "port": 9001, "host": "0.0.0.0"}
for var in env:
if var.startswith("PORT="):
info["port"] = var.split("=")[1]
elif var.startswith("HOST="):
info["host"] = var.split("=")[1]
status = c.attrs.get("State", {}).get("Status")
if status:
info["status"] = status
container_name = c.attrs.get("Name")
if container_name:
info["container_name_on_host"] = container_name
startup_time = c.attrs.get("State", {}).get("StartedAt")
if startup_time:
info["startup_time_ts"] = startup_time
return info
def get_container_by_id(id):
"""
Gets an inference server container by its id
Args:
id (string): The id of the container
Returns:
container: The container object if found, None otherwise
"""
containers = get_inference_containers()
for c in containers:
if c.id == id:
return c
return None
def get_container_ids():
"""
Gets the ids of the inference server containers
Returns:
list: A list of container ids
"""
containers = get_inference_containers()
return [c.id for c in containers]