Lagent / examples /run_ray_async_agent_lmdeploy.py
yanyoyo
update
ec878fd
import asyncio
import json
import time
import ray
from datasets import load_dataset
from lagent.distributed.ray_serve import AsyncAgentRayActor
from lagent.llms import INTERNLM2_META
from lagent.llms.lmdeploy_wrapper import AsyncLMDeployPipeline
ray.init()
# set up the loop
# initialize the model
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
model = dict(
type=AsyncLMDeployPipeline,
path='internlm/internlm2_5-7b-chat',
meta_template=INTERNLM2_META,
tp=1,
top_k=1,
temperature=1.0,
stop_words=['<|im_end|>', '<|action_end|>'],
max_new_tokens=1024,
)
# ----------------------- interpreter -----------------------
print('-' * 80, 'interpreter', '-' * 80)
ds = load_dataset('lighteval/MATH', split='test')
problems = [item['problem'] for item in ds.select(range(5000))]
coder = dict(
type='lagent.agents.stream.AsyncMathCoder',
llm=model,
interpreter=dict(type='AsyncIPythonInterpreter', max_kernels=300),
)
tic = time.time()
actor1 = AsyncAgentRayActor(coder.copy(), num_gpus=1)
actor2 = AsyncAgentRayActor(coder.copy(), num_gpus=1)
corots = [
actor1(query, session_id=i)
for i, query in enumerate(problems[:len(problems) // 2])
]
corots += [
actor2(query, session_id=i)
for i, query in enumerate(problems[len(problems) // 2:])
]
results = loop.run_until_complete(asyncio.gather(*corots))
print('-' * 120)
print(f'time elapsed: {time.time() - tic}')
all_step = ray.get([
actor1.agent_actor.get_steps.remote(i) for i in range(len(problems) // 2)
])
all_step += ray.get([
actor2.agent_actor.get_steps.remote(i)
for i in range(len(problems[len(problems) // 2:]))
])
with open('./tmp_1.json', 'w') as f:
json.dump(all_step, f, ensure_ascii=False, indent=4)