|
from dotenv import load_dotenv |
|
load_dotenv() |
|
import json, subprocess |
|
import psutil |
|
import atexit |
|
try: |
|
|
|
from celery import Celery |
|
import redis |
|
except: |
|
import sys |
|
|
|
subprocess.check_call( |
|
[ |
|
sys.executable, |
|
"-m", |
|
"pip", |
|
"install", |
|
"redis", |
|
"celery" |
|
] |
|
) |
|
|
|
import time |
|
import sys, os |
|
sys.path.insert( |
|
0, os.path.abspath("../../..") |
|
) |
|
import litellm |
|
|
|
|
|
pool = redis.ConnectionPool(host=os.getenv("REDIS_HOST"), port=os.getenv("REDIS_PORT"), password=os.getenv("REDIS_PASSWORD"), db=0, max_connections=5) |
|
redis_client = redis.Redis(connection_pool=pool) |
|
|
|
|
|
celery_app = Celery('tasks', broker=f"redis://default:{os.getenv('REDIS_PASSWORD')}@{os.getenv('REDIS_HOST')}:{os.getenv('REDIS_PORT')}", backend=f"redis://default:{os.getenv('REDIS_PASSWORD')}@{os.getenv('REDIS_HOST')}:{os.getenv('REDIS_PORT')}") |
|
celery_app.conf.update( |
|
broker_pool_limit = None, |
|
broker_transport_options = {'connection_pool': pool}, |
|
result_backend_transport_options = {'connection_pool': pool}, |
|
) |
|
|
|
|
|
|
|
@celery_app.task(name='process_job', max_retries=3) |
|
def process_job(*args, **kwargs): |
|
try: |
|
llm_router: litellm.Router = litellm.Router(model_list=kwargs.pop("llm_model_list")) |
|
response = llm_router.completion(*args, **kwargs) |
|
if isinstance(response, litellm.ModelResponse): |
|
response = response.model_dump_json() |
|
return json.loads(response) |
|
return str(response) |
|
except Exception as e: |
|
raise e |
|
|
|
|
|
def cleanup(): |
|
try: |
|
|
|
for process in psutil.process_iter(attrs=['pid', 'name']): |
|
|
|
if process.info['name'] == 'celery': |
|
print(f"Terminating Celery worker with PID {process.info['pid']}") |
|
|
|
psutil.Process(process.info['pid']).terminate() |
|
except Exception as e: |
|
print(f"Error during cleanup: {e}") |
|
|
|
|
|
atexit.register(cleanup) |