Spaces:
Running
Running
import cv2 | |
import numpy as np | |
import requests | |
import base64 | |
import time | |
import websockets | |
import asyncio | |
import cv2 | |
import json | |
import os | |
import imutils | |
class CaesarSendWeb: | |
def send_video_https(self,uri = "http://192.168.0.10:7860/caesarobjectdetect"): | |
cap = cv2.VideoCapture(0) | |
while True: | |
_, image = cap.read() | |
#uri = "http://palondomus-caesarai.hf.space/caesarobjectdetect" | |
response = requests.post(uri,json={"frame":base64.b64encode(image).decode(),"shape":[image.shape[0],image.shape[1]]}) | |
valresp = response.json() | |
imagebase64 = np.array(valresp["frame"]) | |
image = np.frombuffer(base64.b64decode(imagebase64),dtype="uint8").reshape(valresp["shape"][0],valresp["shape"][1],3) | |
cv2.imshow("image", image) | |
if ord("q") == cv2.waitKey(1): | |
break | |
cap.release() | |
cv2.destroyAllWindows() | |
def send_video_websocket(self,uri = 'wss://palondomus-caesarai.hf.space/caesarobjectdetectws',jsondata=None,phone=False): | |
if phone == False: | |
camera = cv2.VideoCapture(0) | |
async def main(): | |
# Connect to the server | |
#uri = 'wss://palondomus-caesarai.hf.space/sendvideows' | |
#uri = 'wss://palondomus-caesarai.hf.space/caesarobjectdetectws' | |
async with websockets.connect(uri) as ws: | |
while True: | |
if phone == False: | |
success, frame = camera.read() | |
elif phone == True: | |
img_resp = requests.get("http://192.168.0.14:8080/shot.jpg") | |
frame_arr = np.array(bytearray(img_resp.content),dtype=np.uint8) | |
frame = cv2.imdecode(frame_arr,-1) | |
frame = imutils.resize(frame,width=500,height=600) | |
success = True | |
#print(frame.shape) | |
#1080, 1920, 3) | |
#print(success) | |
#print(frame.shape) | |
if not success: | |
break | |
else: | |
#print("hi") | |
ret, buffer = cv2.imencode('.png', frame) | |
await ws.send(buffer.tobytes()) | |
if jsondata: | |
await ws.send(json.dumps(jsondata)) | |
contents = await ws.recv() | |
if type(contents) == bytes: | |
arr = np.frombuffer(contents, np.uint8) | |
frameobj = cv2.imdecode(arr, cv2.IMREAD_UNCHANGED) | |
cv2.imshow('frame',frameobj) | |
cv2.waitKey(1) | |
if type(contents) == str: | |
print(json.loads(contents)) | |
asyncio.run(main()) | |
def send_image_recieve_text(self,uri ="http://192.168.0.10:7860/caesarocr",showimage=False): | |
cap = cv2.VideoCapture(0) | |
_, image = cap.read() | |
#uri = "http://palondomus-caesarai.hf.space/caesarobjectdetect" | |
response = requests.post(uri,json={"frame":base64.b64encode(image).decode(),"shape":[image.shape[0],image.shape[1]]}) | |
messageresp = response.json() | |
if showimage == True: | |
cv2.imshow('frame',image) | |
cv2.waitKey(0) | |
# closing all open windows | |
cv2.destroyAllWindows() | |
return messageresp | |
def send_image_recieve_image(self,uri ="http://192.168.0.10:7860/caesarfacesnap",showimage=False,saveimage=False): | |
cap = cv2.VideoCapture(0) | |
_, image = cap.read() | |
#uri = "http://palondomus-caesarai.hf.space/caesarobjectdetect" | |
response = requests.post(uri,json={"frame":base64.b64encode(image).decode(),"shape":[image.shape[0],image.shape[1]]}) | |
valresp = response.json() | |
if valresp["frame"] == "no face was detected.": | |
print("No face was detected.") | |
if valresp["frame"] != "no face was detected.": | |
imagebase64 = np.array(valresp["frame"]) | |
image = np.frombuffer(base64.b64decode(imagebase64),dtype="uint8").reshape(valresp["shape"][0],valresp["shape"][1],3) | |
if showimage == True: | |
cv2.imshow('frame',image) | |
cv2.waitKey(0) | |
# closing all open windows | |
cv2.destroyAllWindows() | |
if saveimage == True: | |
filename = "croppedimage.png" | |
count = 0 | |
while filename in os.listdir("CaesarFaceDetection/FaceDataPoints/"): | |
filename = f"{filename.replace('.png','').replace(f'{count-1}','')}{count}.png" | |
count +=1 | |
cv2.imwrite(f"CaesarFaceDetection/FaceDataPoints/{filename}", image) | |
return image | |
if __name__ == "__main__": | |
#success, frame = camera.read() | |
#yolo_uri = 'wss://palondomus-caesarai.hf.space/caesarobjectdetectws' | |
#CaesarSendWeb.send_video_websocket(uri = yolo_uri) | |
#video_uri = 'wss://palondomus-caesarai.hf.space/sendvideows' | |
#CaesarSendWeb.send_video_websocket(uri = video_uri) | |
#extraction_uri = "wss://palondomus-caesarai.hf.space/caesarocrextractionws" | |
#CaesarSendWeb.send_video_websocket(uri = extraction_uri,jsondata={"target_words":["your","brain","power"]}) | |
#ocrws_uri = "wss://palondomus-caesarai.hf.space/caesarocrws" | |
#CaesarSendWeb.send_video_websocket(uri = ocrws_uri) | |
#ocr_uri = "http://192.168.0.10:7860/caesarocr" | |
#message = CaesarSendWeb.send_image_recieve_text(uri = ocr_uri,showimage=True) | |
#print(message) | |
facedetect_uri = "wss://palondomus-caesarai.hf.space/caesarfacedetectws"#"wss://palondomus-caesarai.hf.space/caesarfacedetectws" | |
CaesarSendWeb.send_video_websocket(uri = facedetect_uri,phone=True) | |
#facesnap_uri = "http://192.168.0.10:7860/caesarfacesnap" | |
#cropped_image = CaesarSendWeb.send_image_recieve_image(uri = facesnap_uri,saveimage=True) | |
#print(cropped_image) | |
#https_uri = 'http://192.168.0.10:7860/caesarobjectdetect' | |
#CaesarSendWeb.send_video_https(uri = https_uri) | |