结合猛猿博客阅读VLLM代码5: 离线和在线推理
六.倒回去看llm.py llm类看函数 generate函数和chat函数 这个放在下面异步和同步的内容offline看benchmarks\benchmark_throughput.pymain函数入手backend 为vllm ,async_engine==False为default(1)进入run_vllm函数 定义LLM 到vllm\entrypoints\llm.pyLLM 定义LLE
六.倒回去看llm.py llm类看函数 generate函数和chat函数 这个放在下面异步和同步的内容
- 异步和同步内容 对应online和offline
offline看benchmarks\benchmark_throughput.py
main函数入手backend 为vllm ,async_engine==False为default
(1)进入run_vllm函数 定义LLM 到vllm\entrypoints\llm.py
LLM 定义LLEngine 到vllm\v1\engine\llm_engine.py
LLMEngine 定义 engine_core
self.engine_core = EngineCoreClient.make_client(
multiprocess_mode=multiprocess_mode,
asyncio_mode=False,
vllm_config=vllm_config,
executor_class=executor_class,
log_stats=False, # FIXME: implement
)
在这里offline选SyncMPClient,online 选AsyncMPClient
(2)回到llm.py 的generate函数
先看_validate_and_add_request
里面有self.llm_engine.add_request
这个函数包含
# Make a new RequestState and queue.
self.output_processor.add_request(request, None, 0)
# Add the request to EngineCore.
self.engine_core.add_request(request)
完成后到llm.py 的generate函数往后继续看_run_engine函数
step_outputs = self.llm_engine.step()
也就到了 vllm\v1\engine\core_client.py
offline SyncMPClient的step函数
online AsyncMPClient的step函数
这些client背后的父类 上MPClient 在init阶段就已经在background process的run_engine_core在跑模型推理
# Start EngineCore in background process.
self.resources.proc_handle = BackgroundProcHandle(
input_path=input_path,
output_path=self.output_path,
process_name="EngineCore",
target_fn=EngineCoreProc.run_engine_core,
process_kwargs={
"vllm_config": vllm_config,
"executor_class": executor_class,
"log_stats": log_stats,
})
跑函数EngineCoreProc.run_engine_core 在vllm\v1\engine\core.py
engine_core.run_busy_loop()
def run_busy_loop(self):
"""Core busy loop of the EngineCore."""
step_fn = (self.step
if self.batch_queue is None else self.step_with_batch_queue)
# Loop until process is sent a SIGINT or SIGTERM
while True:
# 1) Poll the input queue until there is work to do.
while not self.scheduler.has_requests():
logger.debug("EngineCore busy loop waiting.")
req = self.input_queue.get()
self._handle_client_request(*req)
# 2) Handle any new client requests.
while not self.input_queue.empty():
req = self.input_queue.get_nowait()
self._handle_client_request(*req)
# 3) Step the engine core.
outputs = step_fn()
# 4) Put EngineCoreOutputs into the output queue.
if outputs is not None:
self.output_queue.put_nowait(outputs)
这里step_fn函数就是step或者step_with_batch_queue
def step(self) -> EngineCoreOutputs:
"""Schedule, execute, and make output."""
# Check for any requests remaining in the scheduler - unfinished,
# or finished and not yet removed from the batch.
if not self.scheduler.has_requests():
return EngineCoreOutputs(
outputs=[],
scheduler_stats=self.scheduler.make_stats(),
)
scheduler_output = self.scheduler.schedule()
output = self.model_executor.execute_model(scheduler_output)
engine_core_outputs = self.scheduler.update_from_output(
scheduler_output, output) # type: ignore
return engine_core_outputs
def step_with_batch_queue(self) -> Optional[EngineCoreOutputs]:
"""Schedule and execute batches with the batch queue.
Note that if nothing to output in this step, None is returned.
The execution flow is as follows:
1. Try to schedule a new batch if there are unscheduled requests
and the job queue is not full. If a new batch is scheduled, directly
return an empty engine core output. In other words, we won't check
and return model outputs before the batch queue is full.
2. If there is no new scheduled batch, meaning that the batch queue
is full or no other requests can be scheduled, we block until the first
batch in the job queue is finished.
3. Update the scheduler from the output.
"""
assert self.batch_queue is not None
engine_core_outputs = None
scheduler_output = None
# If there are unscheduled requests and the job queue
# is not full, schedule a new batch. Note that this is not blocking.
if (self.scheduler.get_num_unscheduled_requests() > 0
and not self.batch_queue.full()):
scheduler_output = self.scheduler.schedule()
if scheduler_output.total_num_scheduled_tokens > 0:
future = self.model_executor.execute_model(scheduler_output)
self.batch_queue.put_nowait(
(future, scheduler_output)) # type: ignore
scheduled_batch = (scheduler_output is not None
and scheduler_output.total_num_scheduled_tokens > 0)
# If no more requests can be scheduled and the job queue is not empty,
# block until the first batch in the job queue is finished.
if not scheduled_batch and not self.batch_queue.empty():
future, scheduler_output = self.batch_queue.get_nowait()
# Blocking until the first result is available.
model_output = future.result()
self.batch_queue.task_done()
engine_core_outputs = self.scheduler.update_from_output(
scheduler_output, model_output)
return engine_core_outputs
就到scheduler.step()以上已经解读过
总结就是 llm.py 定义 LLM - LLMEngine - engine_core
在 generate 函数里 先add_request 然后run_engine
add_request: llm_engine.add_request --> engine_core.add_request
run_engine : llm_engine.step() --> EngineCoreProc.run_engine_core --> engine_core.run_busy_loop() -->engine_core.step() -->Scheduler.step()
online 看 vllm\entrypoints\openai\api_server.py
从main函数到run_server函数
build_async_engine_client 里面函数build_async_engine_client_from_engine_args实现
async_llm = AsyncLLM.from_vllm_config(
vllm_config=vllm_config,
usage_context=usage_context,
disable_log_requests=engine_args.disable_log_requests,
disable_log_stats=engine_args.disable_log_stats)
yield async_llm
AsyncLLM AsyncLLM(EngineClient) 定义 在这里vllm\v1\engine\async_llm.py
self.engine_core = EngineCoreClient.make_client(
multiprocess_mode=multiprocess_mode,
asyncio_mode=False,
vllm_config=vllm_config,
executor_class=executor_class,
log_stats=False, # FIXME: implement
)
后面和上面一样 只是async
在AsyncLLM的generate函数里
if self.output_handler is None:
self.output_handler = asyncio.create_task(
self._run_output_handler())
q = await self.add_request(
request_id,
prompt,
sampling_params,
lora_request=lora_request,
trace_headers=trace_headers,
prompt_adapter_request=prompt_adapter_request,
priority=priority,
)
看_run_output_handler函数
# 1) Pull EngineCoreOutputs from the EngineCore.
outputs = await self.engine_core.get_output_async()
# 2) Process EngineCoreOutputs.
processed_outputs = self.output_processor.process_outputs(outputs_slice, outputs.timestamp, iteration_stats)
和上面的没啥区别 就是async

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐
所有评论(0)