Spaces:
Runtime error
Runtime error
File size: 1,373 Bytes
942dcac |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
import threading
import time
import uuid
class TaskManager:
def handle_completion (self, task_id):
print(f"Task {task_id} completed")
self.tasks[task_id]['status'] = 'completed'
on_task_complete = handle_completion
def __init__(self):
self.tasks = {}
self.lock = threading.Lock()
def create_task(self, target, *args):
task_id = str(uuid.uuid4())
thread = threading.Thread(target=self._run_task, args=(task_id, target, *args))
with self.lock:
self.tasks[task_id] = {'progress': "0", 'status': 'running'}
thread.start()
return task_id
def _run_task(self, task_id, target, *args):
try:
target(task_id,*args, progress_callback=lambda progress: self._update_progress(task_id, progress))
with self.lock:
self.tasks[task_id]['status'] = 'completed'
except Exception as e:
with self.lock:
self.tasks[task_id]['status'] = 'failed'
self.tasks[task_id]['error'] = str(e)
def _update_progress(self, task_id, progress):
with self.lock:
if task_id in self.tasks:
self.tasks[task_id]['progress'] = progress
def get_progress(self, task_id):
with self.lock:
return self.tasks.get(task_id, {'status': 'not found'})
|