六.倒回去看llm.py llm类看函数 generate函数和chat函数 这个放在下面异步和同步的内容

  1. 异步和同步内容 对应online和offline

offline看benchmarks\benchmark_throughput.py

main函数入手backend 为vllm ,async_engine==False为default

(1)进入run_vllm函数 定义LLM 到vllm\entrypoints\llm.py

LLM 定义LLEngine 到vllm\v1\engine\llm_engine.py

LLMEngine 定义 engine_core

 self.engine_core = EngineCoreClient.make_client(
            multiprocess_mode=multiprocess_mode,
            asyncio_mode=False,
            vllm_config=vllm_config,
            executor_class=executor_class,
            log_stats=False,  # FIXME: implement
        )

在这里offline选SyncMPClient,online 选AsyncMPClient

(2)回到llm.py 的generate函数

先看_validate_and_add_request

里面有self.llm_engine.add_request

这个函数包含

# Make a new RequestState and queue.
self.output_processor.add_request(request, None, 0)
# Add the request to EngineCore.
self.engine_core.add_request(request)

完成后到llm.py 的generate函数往后继续看_run_engine函数

step_outputs = self.llm_engine.step()

也就到了 vllm\v1\engine\core_client.py

offline SyncMPClient的step函数

online AsyncMPClient的step函数

这些client背后的父类 上MPClient 在init阶段就已经在background process的run_engine_core在跑模型推理

# Start EngineCore in background process.
self.resources.proc_handle = BackgroundProcHandle(
    input_path=input_path,
    output_path=self.output_path,
    process_name="EngineCore",
    target_fn=EngineCoreProc.run_engine_core,
    process_kwargs={
        "vllm_config": vllm_config,
        "executor_class": executor_class,
        "log_stats": log_stats,
    })

跑函数EngineCoreProc.run_engine_core 在vllm\v1\engine\core.py

engine_core.run_busy_loop()

def run_busy_loop(self):
        """Core busy loop of the EngineCore."""

        step_fn = (self.step
                   if self.batch_queue is None else self.step_with_batch_queue)

        # Loop until process is sent a SIGINT or SIGTERM
        while True:
            # 1) Poll the input queue until there is work to do.
            while not self.scheduler.has_requests():
                logger.debug("EngineCore busy loop waiting.")
                req = self.input_queue.get()
                self._handle_client_request(*req)

            # 2) Handle any new client requests.
            while not self.input_queue.empty():
                req = self.input_queue.get_nowait()
                self._handle_client_request(*req)

            # 3) Step the engine core.
            outputs = step_fn()

            # 4) Put EngineCoreOutputs into the output queue.
            if outputs is not None:
                self.output_queue.put_nowait(outputs)

这里step_fn函数就是step或者step_with_batch_queue

def step(self) -> EngineCoreOutputs:
    """Schedule, execute, and make output."""

    # Check for any requests remaining in the scheduler - unfinished,
    # or finished and not yet removed from the batch.
    if not self.scheduler.has_requests():
        return EngineCoreOutputs(
            outputs=[],
            scheduler_stats=self.scheduler.make_stats(),
        )
    scheduler_output = self.scheduler.schedule()
    output = self.model_executor.execute_model(scheduler_output)
    engine_core_outputs = self.scheduler.update_from_output(
        scheduler_output, output)  # type: ignore

    return engine_core_outputs

def step_with_batch_queue(self) -> Optional[EngineCoreOutputs]:
    """Schedule and execute batches with the batch queue.
    Note that if nothing to output in this step, None is returned.

    The execution flow is as follows:
    1. Try to schedule a new batch if there are unscheduled requests
    and the job queue is not full. If a new batch is scheduled, directly
    return an empty engine core output. In other words, we won't check
    and return model outputs before the batch queue is full.
    2. If there is no new scheduled batch, meaning that the batch queue
    is full or no other requests can be scheduled, we block until the first
    batch in the job queue is finished.
    3. Update the scheduler from the output.
    """
    assert self.batch_queue is not None

    engine_core_outputs = None
    scheduler_output = None
    # If there are unscheduled requests and the job queue
    # is not full, schedule a new batch. Note that this is not blocking.
    if (self.scheduler.get_num_unscheduled_requests() > 0
            and not self.batch_queue.full()):
        scheduler_output = self.scheduler.schedule()
        if scheduler_output.total_num_scheduled_tokens > 0:
            future = self.model_executor.execute_model(scheduler_output)
            self.batch_queue.put_nowait(
                (future, scheduler_output))  # type: ignore

    scheduled_batch = (scheduler_output is not None
                       and scheduler_output.total_num_scheduled_tokens > 0)

    # If no more requests can be scheduled and the job queue is not empty,
    # block until the first batch in the job queue is finished.
    if not scheduled_batch and not self.batch_queue.empty():
        future, scheduler_output = self.batch_queue.get_nowait()
        # Blocking until the first result is available.
        model_output = future.result()
        self.batch_queue.task_done()
        engine_core_outputs = self.scheduler.update_from_output(
            scheduler_output, model_output)

    return engine_core_outputs

就到scheduler.step()以上已经解读过

总结就是 llm.py 定义 LLM - LLMEngine - engine_core

在 generate 函数里 先add_request 然后run_engine

add_request: llm_engine.add_request --> engine_core.add_request

run_engine : llm_engine.step() --> EngineCoreProc.run_engine_core --> engine_core.run_busy_loop() -->engine_core.step() -->Scheduler.step()

online 看 vllm\entrypoints\openai\api_server.py

从main函数到run_server函数

build_async_engine_client 里面函数build_async_engine_client_from_engine_args实现

async_llm = AsyncLLM.from_vllm_config(
    vllm_config=vllm_config,
    usage_context=usage_context,
    disable_log_requests=engine_args.disable_log_requests,
    disable_log_stats=engine_args.disable_log_stats)
yield async_llm

AsyncLLM AsyncLLM(EngineClient) 定义 在这里vllm\v1\engine\async_llm.py

self.engine_core = EngineCoreClient.make_client(
            multiprocess_mode=multiprocess_mode,
            asyncio_mode=False,
            vllm_config=vllm_config,
            executor_class=executor_class,
            log_stats=False,  # FIXME: implement
        )

后面和上面一样 只是async

在AsyncLLM的generate函数里

 if self.output_handler is None:
    self.output_handler = asyncio.create_task(
        self._run_output_handler())

    q = await self.add_request(
        request_id,
        prompt,
        sampling_params,
        lora_request=lora_request,
        trace_headers=trace_headers,
        prompt_adapter_request=prompt_adapter_request,
        priority=priority,
    )

看_run_output_handler函数

# 1) Pull EngineCoreOutputs from the EngineCore.
 outputs = await self.engine_core.get_output_async()
               
# 2) Process EngineCoreOutputs.
processed_outputs = self.output_processor.process_outputs(outputs_slice, outputs.timestamp, iteration_stats)

和上面的没啥区别 就是async

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐