import gradio as gr
import time
import os
import threading
from queue import Queue
result_queue = Queue()
def fun():
print(f'pid {threading.get_ident()}')
time.sleep(5)
result_queue.put(f'demo {threading.get_ident()}' )
def greet(name, intensity):
thread = threading.Thread(target=fun)
thread.start()
return result_queue.get()
if __name__ == "__main__":
demo = gr.Interface(
fn=greet,
inputs=["text", "slider"],
outputs=["text"],
)
demo.launch(server_name='0.0.0.0', max_threads=20)
When I click submit, I want it can create a new thread to process result, like
@dataclass
class HandlerStatus:
handler: int
is_busy: bool = False
last_used: float = 0
class HandlerPool:
def __init__(self, num_handlers=5):
self.handlers: List[HandlerStatus] = []
self.lock = Lock()
for _ in range(num_handlers):
handler = init_session()
self.handlers.append(HandlerStatus(handler=handler))
print('create handler pool successfully..............')
def get_available_handler(self, timeout=10) -> Optional[HandlerStatus]:
start_time = time.time()
while time.time() - start_time < timeout:
with self.lock:
available = [h for h in self.handlers if not h.is_busy]
if available:
handler_status = random.choice(available)
handler_status.is_busy = True
handler_status.last_used = time.time()
return handler_status
time.sleep(0.1)
return None
def release_handler(self, handler_status: HandlerStatus):
with self.lock:
handler_status.is_busy = False
print(f"释放handler {handler_status}")
def __len__(self):
return len(self.handlers)
def generate_streaming(prompt, handler_pool: HandlerPool):
# assert handler > 0
# print(handler)
handler = handler_pool.get_available_handler()
if handler is None:
return
print(f'using handler {handler}')
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt}
]
text = hf_tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True
)
prompt = hf_tokenizer([text], return_tensors="np")['input_ids']
prompt = prompt.flatten().tolist()
gr.Warning(f"This prefill need about {len(prompt)/100:.3f}s to finish", duration=len(prompt)/100)
# logits = llm._prefill(prompt)
logits = llm_prefill(handler.handler, prompt, emb_data, emb_dim, vocab_size)
next = int(np.argmax(logits))
output_tokens = [next]
for cur_pos in range(len(prompt), 1024):
# logits = llm.run(next, pos=cur_pos)
# logits = llm_run(self.handler, token, pos, self.emb_data, self.emb_dim, self.vocab_size)
logits = llm_run(handler.handler, next, cur_pos, emb_data, emb_dim, vocab_size)
next = int(np.argmax(logits))
if next == 151645:
break
output_tokens += [next]
text = hf_tokenizer.decode(output_tokens)
yield text
handler_pool.release_handler(handler)
if __name__ == "__main__":
# init_model(model_url)
# handler_pool = HandlerPool()
# with gr.Blocks() as demo:
# gr.Markdown("""<center><font size=8>Qwen2 Demo</center>""")
# with gr.Row():
# with gr.Column(scale=3):
# textbox = gr.Textbox(lines=1, label='Input', value="Hello, World!")
# with gr.Column(scale=1):
# sumbit = gr.Button("🚀 Send", scale=2)
# out = gr.Textbox(label="Output", lines=20)
# fun = partial(generate_streaming, handler_pool=handler_pool)
# sumbit.click(fn=fun, inputs=textbox, outputs=out)
# demo.launch(server_name='0.0.0.0')
Because it run my own hardware, and I bind C++ with ctypes in python, It can run with std::thread, and I want it can support python thread to manage its handlers to process different user’s promot.
so I need some help.