Spaces:
Build error
Build error
File size: 3,550 Bytes
60929fd 0d2ffa9 60929fd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
from datetime import datetime, timedelta, timezone
import tiktoken
from modules.chat_processor.helpers import update_user_info, convert_to_dict
def get_time():
return datetime.now(timezone.utc).isoformat()
async def check_user_cooldown(
user_info, current_time, COOLDOWN_TIME, TOKENS_LEFT, REGEN_TIME
):
# # Check if no tokens left
tokens_left = user_info.metadata.get("tokens_left", 0)
if tokens_left > 0 and not user_info.metadata.get("in_cooldown", False):
return False, None
user_info = convert_to_dict(user_info)
last_message_time_str = user_info["metadata"].get("last_message_time")
# Convert from ISO format string to datetime object and ensure UTC timezone
last_message_time = datetime.fromisoformat(last_message_time_str).replace(
tzinfo=timezone.utc
)
current_time = datetime.fromisoformat(current_time).replace(tzinfo=timezone.utc)
# Calculate the elapsed time
elapsed_time = current_time - last_message_time
elapsed_time_in_seconds = elapsed_time.total_seconds()
# Calculate when the cooldown period ends
cooldown_end_time = last_message_time + timedelta(seconds=COOLDOWN_TIME)
cooldown_end_time_iso = cooldown_end_time.isoformat()
# Check if the user is still in cooldown
if elapsed_time_in_seconds < COOLDOWN_TIME:
return True, cooldown_end_time_iso # Return in ISO 8601 format
user_info["metadata"]["in_cooldown"] = False
# If not in cooldown, regenerate tokens
await reset_tokens_for_user(user_info, TOKENS_LEFT, REGEN_TIME)
return False, None
async def reset_tokens_for_user(user_info, TOKENS_LEFT, REGEN_TIME):
user_info = convert_to_dict(user_info)
last_message_time_str = user_info["metadata"].get("last_message_time")
last_message_time = datetime.fromisoformat(last_message_time_str).replace(
tzinfo=timezone.utc
)
current_time = datetime.fromisoformat(get_time()).replace(tzinfo=timezone.utc)
# Calculate the elapsed time since the last message
elapsed_time_in_seconds = (current_time - last_message_time).total_seconds()
# Current token count (can be negative)
current_tokens = user_info["metadata"].get("tokens_left_at_last_message", 0)
current_tokens = min(current_tokens, TOKENS_LEFT)
# Maximum tokens that can be regenerated
max_tokens = user_info["metadata"].get("max_tokens", TOKENS_LEFT)
# Calculate how many tokens should have been regenerated proportionally
if current_tokens < max_tokens:
# Calculate the regeneration rate per second based on REGEN_TIME for full regeneration
# If current_tokens is close to 0, then the regeneration rate is relatively high, and if current_tokens is close to max_tokens, then the regeneration rate is relatively low
regeneration_rate_per_second = (
max_tokens - max(current_tokens, 0)
) / REGEN_TIME
# Calculate how many tokens should have been regenerated based on the elapsed time
tokens_to_regenerate = int(
elapsed_time_in_seconds * regeneration_rate_per_second
)
# Ensure the new token count does not exceed max_tokens
new_token_count = min(current_tokens + tokens_to_regenerate, max_tokens)
# Update the user's token count
user_info["metadata"]["tokens_left"] = new_token_count
await update_user_info(user_info)
def get_num_tokens(text, model):
encoding = tiktoken.encoding_for_model(model)
tokens = encoding.encode(text)
return len(tokens)
|