|
import asyncio |
|
import json |
|
import time |
|
|
|
import ray |
|
from datasets import load_dataset |
|
|
|
from lagent.distributed.ray_serve import AsyncAgentRayActor |
|
from lagent.llms import INTERNLM2_META |
|
from lagent.llms.lmdeploy_wrapper import AsyncLMDeployPipeline |
|
|
|
ray.init() |
|
|
|
|
|
|
|
|
|
loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(loop) |
|
model = dict( |
|
type=AsyncLMDeployPipeline, |
|
path='internlm/internlm2_5-7b-chat', |
|
meta_template=INTERNLM2_META, |
|
tp=1, |
|
top_k=1, |
|
temperature=1.0, |
|
stop_words=['<|im_end|>', '<|action_end|>'], |
|
max_new_tokens=1024, |
|
) |
|
|
|
|
|
print('-' * 80, 'interpreter', '-' * 80) |
|
ds = load_dataset('lighteval/MATH', split='test') |
|
problems = [item['problem'] for item in ds.select(range(5000))] |
|
|
|
coder = dict( |
|
type='lagent.agents.stream.AsyncMathCoder', |
|
llm=model, |
|
interpreter=dict(type='AsyncIPythonInterpreter', max_kernels=300), |
|
) |
|
tic = time.time() |
|
|
|
actor1 = AsyncAgentRayActor(coder.copy(), num_gpus=1) |
|
actor2 = AsyncAgentRayActor(coder.copy(), num_gpus=1) |
|
corots = [ |
|
actor1(query, session_id=i) |
|
for i, query in enumerate(problems[:len(problems) // 2]) |
|
] |
|
corots += [ |
|
actor2(query, session_id=i) |
|
for i, query in enumerate(problems[len(problems) // 2:]) |
|
] |
|
results = loop.run_until_complete(asyncio.gather(*corots)) |
|
|
|
print('-' * 120) |
|
print(f'time elapsed: {time.time() - tic}') |
|
all_step = ray.get([ |
|
actor1.agent_actor.get_steps.remote(i) for i in range(len(problems) // 2) |
|
]) |
|
all_step += ray.get([ |
|
actor2.agent_actor.get_steps.remote(i) |
|
for i in range(len(problems[len(problems) // 2:])) |
|
]) |
|
|
|
with open('./tmp_1.json', 'w') as f: |
|
json.dump(all_step, f, ensure_ascii=False, indent=4) |
|
|