diff --git a/docs/online_serving/README.md b/docs/online_serving/README.md index bbf88fd1d26..217a211f91f 100644 --- a/docs/online_serving/README.md +++ b/docs/online_serving/README.md @@ -231,6 +231,8 @@ mm_hashes: Optional[list] = None # Hash values for multimodal (e.g., image/audio) inputs, used for verification or tracking. # Default None indicates no multimodal input or hash validation required. +collect_metrics: Optional[bool] = False +# Whether to return metric information, for performance analysis or debugging (default is False, meaning no metrics are returned). ``` ### Differences in Return Fields diff --git a/docs/zh/online_serving/README.md b/docs/zh/online_serving/README.md index 59debbdbf4d..2a794b5ecb9 100644 --- a/docs/zh/online_serving/README.md +++ b/docs/zh/online_serving/README.md @@ -224,6 +224,8 @@ logits_processors_args: Optional[Dict] = None mm_hashes: Optional[list] = None # 多模态(multimodal)输入的哈希值列表,用于验证或跟踪输入内容(如图像、音频等)。默认 None 表示无多模态输入或无需哈希验证。 +collect_metrics: Optional[bool] = False +# 是否返回生成过程中的指标信息,用于性能分析或调试(默认 False 表示不返回)。 ``` ### 返回字段差异 diff --git a/fastdeploy/demo/offline_demo.py b/fastdeploy/demo/offline_demo.py index 137960b3ed0..741a625faac 100644 --- a/fastdeploy/demo/offline_demo.py +++ b/fastdeploy/demo/offline_demo.py @@ -17,11 +17,9 @@ from fastdeploy.engine.sampling_params import SamplingParams from fastdeploy.entrypoints.llm import LLM -model_name_or_path = "/workspace/ERNIE-4.5-0.3B-Paddle" - -# 超参设置 -sampling_params = SamplingParams(temperature=0.1, max_tokens=30, prompt_logprobs=100) -llm = LLM(model=model_name_or_path, tensor_parallel_size=1, enable_prefix_caching=False) -output = llm.generate(prompts="who are you?", use_tqdm=True, sampling_params=sampling_params) +model_name_or_path = "PaddlePaddle/ERNIE-4.5-0.3B-Paddle" +sampling_params = SamplingParams(temperature=0.1, max_tokens=30) +llm = LLM(model=model_name_or_path) +output = llm.generate(prompts="who are you?", use_tqdm=True, sampling_params=sampling_params) print(output) diff --git a/fastdeploy/engine/async_llm.py b/fastdeploy/engine/async_llm.py index 70c004aa3ca..097e0ad7586 100644 --- a/fastdeploy/engine/async_llm.py +++ b/fastdeploy/engine/async_llm.py @@ -402,7 +402,7 @@ async def add_request( try: request = Request.from_dict(prompt) - request.llm_engine_recv_req_timestamp = time.time() + request.metrics.scheduler_recv_req_time = time.time() # Check if already preprocessed by AsyncEngineClient is_preprocessed = prompt.get("_preprocessed", False) @@ -419,7 +419,7 @@ async def add_request( request.need_prefill_tokens = prompt_token_ids_len if not is_preprocessed: - request.preprocess_start_time = arrival_time + request.metrics.preprocess_start_time = arrival_time input_ids_len = request.prompt_token_ids_len request.set( @@ -448,7 +448,7 @@ async def add_request( llm_logger.error(error_msg) raise EngineError(error_msg, error_code=400) - request.preprocess_end_time = time.time() + request.metrics.preprocess_end_time = time.time() # Register output queue first, then add request await self.output_processor.register_request(request_id, output_queue) diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index 8dd697308c9..8a6c7e593ba 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -362,7 +362,6 @@ def insert_tasks(self, tasks: List[Request], current_id=-1): tasks.remove(tmp_task) for item in tasks: - item.schedule_start_time = time.time() trace_print(LoggingEventName.RESOURCE_ALLOCATE_START, item.request_id, getattr(item, "user", "")) available_batch = np.sum(self.resource_manager.stop_flags) if len(tasks) > available_batch: @@ -400,7 +399,7 @@ def insert_tasks(self, tasks: List[Request], current_id=-1): if not is_decode: self.llm_logger.info(f"Tasks are sent to engine, req_ids={req_ids}") for task in tasks: - task.inference_start_time = time.time() + task.metrics.inference_start_time = time.time() trace_print(LoggingEventName.RESOURCE_ALLOCATE_END, task.request_id, getattr(task, "user", "")) trace_print(LoggingEventName.REQUEST_SCHEDULE_END, task.request_id, getattr(task, "user", "")) trace_print(LoggingEventName.INFERENCE_START, task.request_id, getattr(task, "user", "")) @@ -415,7 +414,7 @@ def insert_tasks(self, tasks: List[Request], current_id=-1): def _insert_prefilled_requests(self, request_outputs: List[RequestOutput]): """ Decode insert prefilled requests into engine worker queue. - Used in v1_kvcache_scheduler. + Used in v0_kvcache_scheduler. Args: request_outputs: a list of RequestOutput sent by prefill instance """ @@ -437,6 +436,10 @@ def _insert_prefilled_requests(self, request_outputs: List[RequestOutput]): cur_req.prompt_token_ids[0] = req_out.outputs.token_ids[0] cur_req.num_cached_tokens = req_out.num_cached_tokens + req_out.metrics.decode_recv_req_time = cur_req.metrics.decode_recv_req_time + req_out.metrics.decode_preallocat_req_time = cur_req.metrics.decode_preallocat_req_time + cur_req.metrics = req_out.metrics + cur_req.metrics.decode_inference_start_time = time.time() if self.cfg.speculative_config.method in ["mtp"] and self.cfg.scheduler_config.splitwise_role == "decode": cur_req.draft_token_ids = copy.deepcopy(req_out.outputs.draft_token_ids) @@ -644,6 +647,7 @@ def _schedule_request_to_worker(self): batch=num_prefill_batch, ) for task in tasks: + task.metrics.engine_get_req_time = time.time() trace_print(LoggingEventName.REQUEST_QUEUE_END, task.request_id, getattr(task, "user", "")) if len(tasks) == 0: time.sleep(0.001) @@ -706,7 +710,7 @@ def _fetch_request(): batch=num_prefill_batch, ) for task in tasks: - task.schedule_start_time = time.time() + task.metrics.engine_get_req_time = time.time() trace_print(LoggingEventName.REQUEST_QUEUE_END, task.request_id, getattr(task, "user", "")) if self.cfg.scheduler_config.splitwise_role == "decode": @@ -725,6 +729,7 @@ def _fetch_request(): while not self.resource_manager.preallocate_resource_in_p(task): time.sleep(0.005) self.llm_logger.info(f"ask D resource for req_id: {task.request_id}") + task.metrics.ask_decode_resource_start_time = time.time() while True: self.split_connector.send_splitwise_tasks([task], task.idx) status, msg = self.split_connector.check_decode_allocated(task) @@ -732,6 +737,7 @@ def _fetch_request(): self.llm_logger.error(f"{task.request_id} ask D resource failed, try again.") time.sleep(0.05) else: + task.metrics.ask_decode_resource_finish_time = time.time() break else: for task in tasks: @@ -740,30 +746,32 @@ def _fetch_request(): self.llm_logger.info("wait for preallocate_resource_in_p") time.sleep(0.005) self.llm_logger.info(f"ask D resource for req_id: {task.request_id}") + task.metrics.ask_decode_resource_start_time = time.time() self.split_connector.send_splitwise_tasks([task], task.idx) for task in tasks: - if self.cfg.scheduler_config.splitwise_role != "mixed": - # assure fetch block ids from D - status, msg = self.split_connector.check_decode_allocated(task) - if not status: - self.llm_logger.error(f"{task.request_id} prefill failed with msg:{msg}.") - self.scheduler.put_results( - [ - RequestOutput( - request_id=task.request_id, - finished=True, - error_code=500, - error_msg=msg, - ) - ] - ) - need_delete_tasks.append(task) - continue + # assure fetch block ids from D + status, msg = self.split_connector.check_decode_allocated(task) + task.metrics.ask_decode_resource_finish_time = time.time() + if not status: + self.llm_logger.error(f"{task.request_id} prefill failed with msg:{msg}.") + self.scheduler.put_results( + [ + RequestOutput( + request_id=task.request_id, + finished=True, + error_code=500, + error_msg=msg, + ) + ] + ) + need_delete_tasks.append(task) + continue for tmp_task in need_delete_tasks: tasks.remove(tmp_task) # release resource in P self.resource_manager.pre_recycle_resource(tmp_task.request_id) + if self.cfg.scheduler_config.splitwise_role == "prefill": # to send cache info to cache messager if tasks: @@ -779,9 +787,11 @@ def _fetch_request(): need_check_req_ids.remove(req_id) else: time.sleep(0.001) + # Fetch requests and add them to the scheduling queue if tasks: for task in tasks: + task.metrics.add_req_to_resource_manager_time = time.time() trace_print( LoggingEventName.RESOURCE_ALLOCATE_START, task.request_id, getattr(task, "user", "") ) @@ -844,6 +854,11 @@ def _fetch_request(): ) self.resource_manager.get_real_bsz() for task in tasks: + if isinstance(task, Request): + if self.cfg.scheduler_config.splitwise_role == "decode": + task.metrics.decode_inference_start_time = time.time() + else: + task.metrics.inference_start_time = time.time() trace_print(LoggingEventName.RESOURCE_ALLOCATE_END, task.request_id, getattr(task, "user", "")) trace_print(LoggingEventName.REQUEST_SCHEDULE_END, task.request_id, getattr(task, "user", "")) trace_print(LoggingEventName.INFERENCE_START, task.request_id, getattr(task, "user", "")) @@ -914,7 +929,7 @@ def _insert_zmq_task_to_scheduler(self): err_msg = None try: request = Request.from_dict(data) - request.llm_engine_recv_req_timestamp = time.time() + request.metrics.scheduler_recv_req_time = time.time() start_span("ENQUEUE_ZMQ", data, trace.SpanKind.PRODUCER) main_process_metrics.requests_number.inc() self.llm_logger.debug(f"Receive request: {request}") @@ -1083,6 +1098,8 @@ def _fetch_requests(): tasks = item[1] if isinstance(tasks[0], Request): self.llm_logger.debug(f"receive tasks to preallocate resource, {tasks}") + for task in tasks: + task.metrics.decode_recv_req_time = time.time() allocate_resource_requests.extend(tasks) elif isinstance(tasks[0], RequestOutput): self.llm_logger.debug(f"receive prefilled tasks, {tasks}") @@ -1090,6 +1107,7 @@ def _fetch_requests(): tasks = [tasks] for task in tasks: task.finished = False + task.metrics.decode_recv_first_token_time = time.time() prefilled_request_ouputs.extend(tasks) def _process_allocate_resource_requests(): @@ -1099,6 +1117,7 @@ def _process_allocate_resource_requests(): if envs.ENABLE_V1_KVCACHE_SCHEDULER: if self.resource_manager.preallocate_resource_in_d(task): + task.metrics.decode_preallocat_req_time = time.time() self.llm_logger.info(f"Resource available, processing task {task.request_id}") self.split_connector.send_cache_info_to_prefill([task]) processed_indices.append(idx) @@ -1107,6 +1126,7 @@ def _process_allocate_resource_requests(): if self.resource_manager.is_resource_sufficient(task.prompt_token_ids_len): self.llm_logger.info(f"Resource available, processing task {task.request_id}") self.insert_tasks([task]) + task.metrics.decode_preallocat_req_time = time.time() processed_indices.append(idx) is_success = True diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index ec3328a6a90..fb0e947e273 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -256,13 +256,13 @@ def add_requests(self, task, sampling_params=None, **kwargs): if sampling_params is not None: task.update(asdict(sampling_params)) request = Request.from_dict(task) - request.llm_engine_recv_req_timestamp = time.time() + request.metrics.scheduler_recv_req_time = time.time() llm_logger.info(f"Receive request {request}") if sampling_params is not None: if sampling_params.temperature is not None and abs(sampling_params.temperature) < 1e-06: sampling_params.temperature = 1e-06 request.sampling_params = sampling_params - request.preprocess_start_time = time.time() + request.metrics.preprocess_start_time = time.time() chat_template_kwargs = kwargs.get("chat_template_kwargs") or {} chat_template_kwargs["chat_template"] = kwargs.get("chat_template") kwargs["chat_template_kwargs"] = chat_template_kwargs @@ -324,7 +324,8 @@ def add_requests(self, task, sampling_params=None, **kwargs): llm_logger.error(err_msg) raise EngineError(err_msg, error_code=400) - request.preprocess_end_time = time.time() + request.metrics.preprocess_end_time = time.time() + request.metrics.scheduler_recv_req_time = time.time() self.engine.scheduler.put_requests([request]) llm_logger.info(f"Cache task with request_id ({request.get('request_id')})") llm_logger.debug(f"cache task: {request}") diff --git a/fastdeploy/engine/request.py b/fastdeploy/engine/request.py index 888364bb672..e07247c31e0 100644 --- a/fastdeploy/engine/request.py +++ b/fastdeploy/engine/request.py @@ -67,14 +67,8 @@ def __init__( tools: Optional[list[Dict]], system: Optional[Union[str, list[str]]], eos_token_ids: Optional[list[int]], - arrival_time: float, sampling_params: Optional[SamplingParams] = None, pooling_params: Optional[PoolingParams] = None, - preprocess_start_time: Optional[float] = None, - preprocess_end_time: Optional[float] = None, - schedule_start_time: Optional[float] = None, - inference_start_time: Optional[float] = None, - llm_engine_recv_req_timestamp: Optional[float] = None, multimodal_inputs: Optional[dict] = None, multimodal_data: Optional[dict] = None, disable_chat_template: bool = False, @@ -102,6 +96,7 @@ def __init__( num_computed_tokens: int = 0, # for internal adapter ic_req_data: Optional[dict] = (None,), + metrics: Optional[RequestMetrics] = None, ) -> None: self.request_id = request_id self.prompt = prompt @@ -116,13 +111,6 @@ def __init__( # model specific token ids: end of sentence token ids self.eos_token_ids = eos_token_ids self.num_cached_tokens = 0 - - self.arrival_time = arrival_time - self.preprocess_start_time = preprocess_start_time - self.preprocess_end_time = preprocess_end_time - self.schedule_start_time = schedule_start_time - self.inference_start_time = inference_start_time - self.llm_engine_recv_req_timestamp = llm_engine_recv_req_timestamp or time.time() self.disable_chat_template = disable_chat_template self.disaggregate_info = disaggregate_info @@ -171,22 +159,32 @@ def __init__( self.extend_block_tables = [] # dp self.dp_rank = dp_rank - self.llm_engine_recv_req_timestamp = time.time() + self.scheduler_recv_req_time = time.time() self.ic_req_data = ic_req_data self.async_process_futures = [] self.error_message = None self.error_code = None + if metrics is None: + self.metrics = RequestMetrics() + else: + self.metrics = metrics + @classmethod def from_dict(cls, d: dict): data_processor_logger.debug(f"{d}") sampling_params: SamplingParams = None pooling_params: PoolingParams = None + metrics: RequestMetrics = None if "pooling_params" in d and d["pooling_params"] is not None: pooling_params = PoolingParams.from_dict(d["pooling_params"]) else: sampling_params = SamplingParams.from_dict(d) + if "metrics" in d and d["metrics"] is not None: + metrics = RequestMetrics.from_dict(d["metrics"]) + else: + metrics = RequestMetrics.from_dict(d) if ( isinstance(d.get("multimodal_inputs"), dict) @@ -215,9 +213,6 @@ def from_dict(cls, d: dict): sampling_params=sampling_params, pooling_params=pooling_params, eos_token_ids=d.get("eos_token_ids"), - arrival_time=d.get("arrival_time", time.time()), - preprocess_start_time=d.get("preprocess_start_time"), - preprocess_end_time=d.get("preprocess_end_time"), multimodal_inputs=d.get("multimodal_inputs"), multimodal_data=d.get("multimodal_data"), disable_chat_template=d.get("disable_chat_template"), @@ -244,8 +239,7 @@ def from_dict(cls, d: dict): audio_end=d.get("audio_end", 0), dp_rank=d.get("dp_rank", None), ic_req_data=d.get("ic_req_data", None), - inference_start_time=d.get("inference_start_time"), - llm_engine_recv_req_timestamp=d.get("llm_engine_recv_req_timestamp"), + metrics=metrics, ) @property @@ -290,9 +284,6 @@ def to_dict(self) -> dict: "history": self.history, "tools": self.tools, "eos_token_ids": self.eos_token_ids, - "arrival_time": self.arrival_time, - "preprocess_start_time": self.preprocess_start_time, - "preprocess_end_time": self.preprocess_end_time, "multimodal_inputs": multimodal_inputs, "multimodal_data": self.multimodal_data, "disable_chat_template": self.disable_chat_template, @@ -326,6 +317,7 @@ def to_dict(self) -> dict: data[param] = getattr(self, param) data.update(asdict(self.sampling_params)) + data.update(asdict(self.metrics)) return data def get(self, key: str, default_value=None): @@ -430,8 +422,22 @@ class RequestMetrics: Attributes: arrival_time: The time when the request arrived. - inference_start_time: The time when the inference started. - first_token_time: The time when the first token was generated. + preprocess_start_time: The time when the preprocess started. + preprocess_end_time: The time when the preprocess ended. + scheduler_recv_req_time: The time when the scheduler received the request. + engine_get_req_time: The time when the engine got the request. + ask_decode_resource_start_time: The time when the engine asks for decode resource. + ask_decode_resource_finish_time: The time when the engine has asked for decode resource. + inference_start_time: The time when engine adds request to the running queue in resource manager. + wait_for_sending_cache_time: The time when the engine waited for sending cache. + send_request_output_to_decode_time: The time when the engine sent request_output to decode. + decode_recv_req_time: The time when the decode received the request. + decode_preallocat_req_time: The time when the decode preallocated the request. + decode_recv_first_token_time: The time when the decode received the first token. + decode_inference_start_time: The time when the decode sent the request to worker. + decode_recv_second_token_time: The time when the decode received the second token. + + first_token_time: The cost time between engine_recv_first_token_time and inference_start_time time_in_queue: The time the request spent in the queue. model_forward_time: The time spent in the model forward pass when this request was in the batch. @@ -442,35 +448,49 @@ class RequestMetrics: """ - arrival_time: float - inference_start_time: Optional[float] = None + arrival_time: Optional[float] = None # api server receives request + preprocess_start_time: Optional[float] = None # preprocess start time in api server + preprocess_end_time: Optional[float] = None # preprocess end time in api server + + scheduler_recv_req_time: Optional[float] = None # scheduler receives request and add to scheduler + engine_get_req_time: Optional[float] = None # engine gets request from scheduler + ask_decode_resource_start_time: Optional[float] = None # engine asks decode resource (only valid for prefill) + ask_decode_resource_finish_time: Optional[float] = None # engine asks decode resource (only valid for prefill) + add_req_to_resource_manager_time: Optional[float] = None # engine adds request to resource manager + inference_start_time: Optional[float] = None # requests are added into the engine work queue + engine_recv_latest_token_time: Optional[float] = None # receive the latest token from worker + engine_recv_first_token_time: Optional[float] = None # receive first token from worker + wait_for_sending_cache_time: Optional[float] = None # wait for sending cache (only valid for prefill) + send_request_output_to_decode_time: Optional[float] = ( + None # send request_output to worker (only valid for prefill) + ) + + decode_recv_req_time: Optional[float] = None # decode receive request from prefill (only valid for decode) + decode_preallocat_req_time: Optional[float] = None # decode preallocate resource for req (only valid for decode) + decode_recv_first_token_time: Optional[float] = ( + None # decode receive request_output with first token from prefill (only valid for decode) + ) + decode_inference_start_time: Optional[float] = ( + None # decode adds request to the engine work queue (only valid for decode) + ) + decode_recv_second_token_time: Optional[float] = ( + None # decode receives the second token from worker (only valid for decode) + ) + first_token_time: Optional[float] = None time_in_queue: Optional[float] = None preprocess_cost_time: Optional[float] = None model_forward_time: Optional[float] = None model_execute_time: Optional[float] = None request_start_time: Optional[float] = None + llm_engine_recv_req_timestamp: Optional[float] = None llm_engine_send_req_to_engine_timestamp: Optional[float] = None - llm_engine_recv_token_timestamp: Optional[float] = None + llm_engine_recv_latest_token_timestamp: Optional[float] = None - def to_dict(self): - """ - Convert the RequestMetrics object to a dictionary. - """ - return { - "arrival_time": self.arrival_time, - "inference_start_time": self.inference_start_time, - "first_token_time": self.first_token_time, - "time_in_queue": self.time_in_queue, - "preprocess_cost_time": self.preprocess_cost_time, - "model_forward_time": self.model_forward_time, - "model_execute_time": self.model_execute_time, - "request_start_time": self.request_start_time, - "llm_engine_recv_req_timestamp": self.llm_engine_recv_req_timestamp, - "llm_engine_send_req_to_engine_timestamp": self.llm_engine_send_req_to_engine_timestamp, - "llm_engine_recv_token_timestamp": self.llm_engine_recv_token_timestamp, - } + def __post_init__(self): + if self.arrival_time is None: + self.arrival_time = time.time() @classmethod def from_dict(cls, req_dict: dict[str, Any]) -> RequestMetrics: @@ -482,6 +502,46 @@ def from_dict(cls, req_dict: dict[str, Any]) -> RequestMetrics: } ) + def to_dict(self): + """ + Convert the RequestMetrics object to a dictionary. + """ + return {k: v for k, v in asdict(self).items()} + + def record_recv_first_token(self): + cur_time = time.time() + self.record_recv_token(cur_time) + self.engine_recv_first_token_time = cur_time + + def record_recv_token(self, cur_time: float = None): + cur_time = time.time() if cur_time is None else cur_time + self.engine_recv_latest_token_time = cur_time + self.llm_engine_recv_latest_token_timestamp = cur_time + self.model_execute_time = cur_time - self.arrival_time + self.model_forward_time = cur_time - self.inference_start_time + + def record_decode_recv_second_token(self): + cur_time = time.time() + self.record_recv_token(cur_time) + self.decode_recv_second_token_time = cur_time + + def get_inference_start_time(self, is_decode: bool): + if is_decode: + return self.decode_inference_start_time + else: + return self.inference_start_time + + def cal_cost_time(self): + """Calculates various timing metrics based on the recorded times""" + self.first_token_time = self.engine_recv_first_token_time - self.inference_start_time + self.time_in_queue = time.time() - self.preprocess_end_time + self.preprocess_cost_time = self.preprocess_end_time - self.preprocess_start_time + self.request_start_time = self.arrival_time + + # for compatibility with old metrics + self.llm_engine_recv_req_timestamp = self.engine_get_req_time + self.llm_engine_send_req_to_engine_timestamp = self.inference_start_time + class RequestOutput: """The output data of a completion request to the LLM. @@ -556,10 +616,12 @@ def add(self, next_output: RequestOutput) -> None: self.outputs.index = next_output.outputs.index self.outputs.token_ids.extend(next_output.outputs.token_ids) - if next_output.metrics.arrival_time is not None and self.metrics.inference_start_time is not None: - self.metrics.model_forward_time = next_output.metrics.arrival_time - self.metrics.inference_start_time - if next_output.metrics.arrival_time is not None and self.metrics.arrival_time is not None: - self.metrics.model_execute_time = next_output.metrics.arrival_time - self.metrics.arrival_time + if next_output.metrics.model_forward_time is not None: + self.metrics.model_forward_time = next_output.metrics.model_forward_time + if next_output.metrics.model_execute_time is not None: + self.metrics.model_execute_time = next_output.metrics.model_execute_time + if next_output.metrics.engine_recv_latest_token_time is not None: + self.metrics.engine_recv_latest_token_time = next_output.metrics.engine_recv_latest_token_time if next_output.outputs.top_logprobs is not None: self.outputs.top_logprobs.logprob_token_ids.extend(next_output.outputs.top_logprobs.logprob_token_ids) self.outputs.top_logprobs.logprobs.extend(next_output.outputs.top_logprobs.logprobs) diff --git a/fastdeploy/engine/resource_manager.py b/fastdeploy/engine/resource_manager.py index 7c72d9d8891..4a694559047 100644 --- a/fastdeploy/engine/resource_manager.py +++ b/fastdeploy/engine/resource_manager.py @@ -291,7 +291,6 @@ def allocate_resources_for_new_tasks(self, tasks): processed_tasks.append(task) # add current task self.stop_flags[allocated_position] = False # mark the slot as occupied - task.inference_start_time = time.time() task.inference_time_cost = -1.0 task.tokens_all_num = 0 self.tasks_list[allocated_position] = task diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index 91ce594e993..96827af736c 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -689,7 +689,6 @@ def _allocate_decode_and_extend(): self.waiting.popleft() self.running.append(request) scheduled_reqs.append(self._prepare_prefill_task(request, num_new_tokens)) - request.inference_start_time = time.time() token_budget -= num_new_tokens request.num_computed_tokens += num_new_tokens if self.config.cache_config.enable_prefix_caching: @@ -930,7 +929,6 @@ def pre_recycle_resource(self, request_id: str): def add_request_in_p(self, requests: list[Request]): with self.lock: for request in requests: - request.inference_start_time = time.time() self.running.append(request) def preallocate_resource_in_p(self, request: Request): @@ -1037,7 +1035,7 @@ def add_prefilled_request(self, request_output: RequestOutput): """ assert self.config.scheduler_config.splitwise_role == "decode", "Only D instance can call this method" if request_output.request_id not in self.requests: - self.logger.error(f"Request {request_output.request_id} not found in requests") + llm_logger.error(f"Request {request_output.request_id} not found in requests") return request = self.requests[request_output.request_id] @@ -1050,8 +1048,10 @@ def add_prefilled_request(self, request_output: RequestOutput): ): request.draft_token_ids = copy.deepcopy(request_output.outputs.draft_token_ids) request.need_prefill_tokens = len(request.prompt_token_ids) + 1 - request.inference_start_time = time.time() - request.schedule_start_time = time.time() + + request_output.metrics.decode_recv_req_time = request.metrics.decode_recv_req_time + request_output.metrics.decode_preallocat_req_time = request.metrics.decode_preallocat_req_time + request.metrics = request_output.metrics self.running.append(request) def _free_blocks(self, request: Request): diff --git a/fastdeploy/entrypoints/openai/protocol.py b/fastdeploy/entrypoints/openai/protocol.py index 0bb35a284dc..47487e8e3ee 100644 --- a/fastdeploy/entrypoints/openai/protocol.py +++ b/fastdeploy/entrypoints/openai/protocol.py @@ -306,6 +306,7 @@ class ChatCompletionStreamResponse(BaseModel): model: str choices: List[ChatCompletionResponseStreamChoice] usage: Optional[UsageInfo] = None + metrics: Optional[Dict] = None class CompletionResponseChoice(BaseModel): @@ -670,6 +671,8 @@ class ChatCompletionRequest(BaseModel): completion_token_ids: Optional[List[int]] = None # doc: end-chat-completion-extra-params + collect_metrics: Optional[bool] = False + def to_dict_for_infer(self, request_id=None): """ Convert the request parameters into a dictionary diff --git a/fastdeploy/entrypoints/openai/serving_chat.py b/fastdeploy/entrypoints/openai/serving_chat.py index 9bb15f90942..ba861726156 100644 --- a/fastdeploy/entrypoints/openai/serving_chat.py +++ b/fastdeploy/entrypoints/openai/serving_chat.py @@ -286,11 +286,11 @@ async def chat_completion_stream_generator( if res.get("error_code", 200) != 200: raise ValueError("{}".format(res["error_msg"])) - if res["metrics"]["first_token_time"] is not None: + if inference_start_time[idx] == 0: arrival_time = res["metrics"]["first_token_time"] inference_start_time[idx] = res["metrics"]["inference_start_time"] else: - arrival_time = res["metrics"]["arrival_time"] - inference_start_time[idx] + arrival_time = res["metrics"]["engine_recv_latest_token_time"] - inference_start_time[idx] if first_iteration: num_prompt_tokens = len(prompt_token_ids) num_cached_tokens = res.get("num_cached_tokens", 0) @@ -421,6 +421,11 @@ async def chat_completion_stream_generator( if res.get("error_msg") is not None and "Recover" in res["error_msg"]: choice.finish_reason = "recover_stop" + inference_start_time[idx] = 0 + + if request.collect_metrics: + chunk.metrics = res["metrics"] + if request.return_token_ids: if response_processor.enable_multimodal_content(): choice.delta.multimodal_content[0]["completion_token_ids"] = list(output["token_ids"]) diff --git a/fastdeploy/output/token_processor.py b/fastdeploy/output/token_processor.py index 67ad1ce5409..ff3e104443b 100644 --- a/fastdeploy/output/token_processor.py +++ b/fastdeploy/output/token_processor.py @@ -210,8 +210,10 @@ def _process_per_token(self, task, batch_id: int, token_ids: np.ndarray, result: llm_logger.info( f"Request: {task_id} finished, number of " f"generated tokens: {self.tokens_counter[task_id]}." ) + is_decode = self.cfg.scheduler_config.splitwise_role == "decode" + inference_start_time = task.metrics.get_inference_start_time(is_decode) llm_logger.info( - f"Request: {task_id} token ratio: {self.tokens_counter[task_id] / (time.time() - task.inference_start_time)}" + f"Request: {task_id} token ratio: {self.tokens_counter[task_id] / (time.time() - inference_start_time)}" ) llm_logger.info(f"{self.resource_manager.info()}") if self.cfg.speculative_config.method: @@ -233,8 +235,8 @@ def _process_batch_output_use_zmq(self, receive_datas): continue task: Request = self.resource_manager.tasks_list[i] - task_id = task.request_id + token_ids = stream_data.tokens # numpy.array if token_ids is not None and token_ids[-1] <= 0: if envs.ENABLE_V1_KVCACHE_SCHEDULER: @@ -244,21 +246,15 @@ def _process_batch_output_use_zmq(self, receive_datas): current_time = time.time() if self.tokens_counter[task_id] == 0: - metrics = RequestMetrics( - arrival_time=task.arrival_time, - inference_start_time=task.inference_start_time, - first_token_time=time.time() - task.inference_start_time, - time_in_queue=task.inference_start_time - task.preprocess_end_time, - preprocess_cost_time=task.preprocess_end_time - task.preprocess_start_time, - request_start_time=task.arrival_time, - ) + task.metrics.record_recv_first_token() + task.metrics.cal_cost_time() + metrics = copy.copy(task.metrics) self._record_first_token_metrics(task, current_time) - else: - metrics = RequestMetrics( - arrival_time=time.time(), - request_start_time=task.arrival_time, - ) + task.metrics.record_recv_token() + if self.tokens_counter[task_id] == 1 and self.cfg.scheduler_config.splitwise_role == "decode": + task.metrics.record_decode_recv_second_token() + metrics = copy.copy(task.metrics) if task.pooling_params is not None: pooler_output = stream_data.pooler_output @@ -457,6 +453,7 @@ def _recycle_resources(self, task_id, index, task, result=None, is_prefill=False """ if is_prefill: start_time = time.time() + result.metrics.wait_for_sending_cache_time = time.time() while True: finished_task_ids = self.engine_worker_queue.get_finished_req() if len(finished_task_ids) > 0: @@ -478,6 +475,7 @@ def _recycle_resources(self, task_id, index, task, result=None, is_prefill=False llm_logger.info( f"wait for sending cache, request_id: {task_id}, cost seconds: {time.time()-start_time:.5f}" ) + result.metrics.send_request_output_to_decode_time = time.time() self.split_connector.send_first_token(task.disaggregate_info, [result]) break else: @@ -643,8 +641,10 @@ def _process_batch_output(self): recovery_stop = False task = self.resource_manager.tasks_list[i] - task_id = task.request_id + is_prefill = task.disaggregate_info is not None and self.cfg.scheduler_config.splitwise_role == "prefill" + is_decode = task.disaggregate_info is not None and self.cfg.scheduler_config.splitwise_role == "decode" + if self.cfg.speculative_config.method: if accept_num[i] == -3: recovery_stop = True @@ -689,29 +689,16 @@ def _process_batch_output(self): self.total_step += 1 current_time = time.time() if self.tokens_counter[task_id] == 0: - metrics = RequestMetrics( - arrival_time=task.arrival_time, - inference_start_time=task.inference_start_time, - model_execute_time=time.time() - task.inference_start_time, - first_token_time=time.time() - task.inference_start_time, - time_in_queue=task.inference_start_time - task.preprocess_end_time, - preprocess_cost_time=task.preprocess_end_time - task.preprocess_start_time, - request_start_time=task.arrival_time, - llm_engine_recv_req_timestamp=task.llm_engine_recv_req_timestamp, - llm_engine_send_req_to_engine_timestamp=task.inference_start_time, - llm_engine_recv_token_timestamp=time.time(), - ) + task.metrics.record_recv_first_token() + task.metrics.cal_cost_time() + metrics = copy.copy(task.metrics) self._record_first_token_metrics(task, current_time) - else: - metrics = RequestMetrics( - arrival_time=time.time(), - request_start_time=task.arrival_time, - model_execute_time=time.time() - task.inference_start_time, - llm_engine_recv_req_timestamp=task.llm_engine_recv_req_timestamp, - llm_engine_send_req_to_engine_timestamp=task.inference_start_time, - llm_engine_recv_token_timestamp=time.time(), - ) + task.metrics.record_recv_token() + if self.tokens_counter[task_id] == 1 and self.cfg.scheduler_config.splitwise_role == "decode": + task.metrics.record_decode_recv_second_token() + metrics = copy.copy(task.metrics) + self.number_of_output_tokens += len(token_ids) self._record_metrics(task, current_time, token_ids) result = RequestOutput( @@ -736,8 +723,6 @@ def _process_batch_output(self): result.num_input_image_tokens = task.multimodal_inputs.get("num_input_image_tokens", 0) result.num_input_video_tokens = task.multimodal_inputs.get("num_input_video_tokens", 0) - is_prefill = task.disaggregate_info is not None and task.disaggregate_info["role"] == "prefill" - if is_prefill and len(token_ids) > 1: result.outputs.draft_token_ids = copy.deepcopy(token_ids) @@ -781,8 +766,9 @@ def _process_batch_output(self): f"Request: {task_id} finished, number of " f"generated tokens: {self.tokens_counter[task_id]}, token_id:{token_id},is_prefill:{is_prefill},recovery_stop:{recovery_stop}" ) + inference_start_time = task.metrics.get_inference_start_time(is_decode) llm_logger.info( - f"Request: {task_id} token ratio: {self.tokens_counter[task_id] / (time.time() - task.inference_start_time)}" + f"Request: {task_id} token ratio: {self.tokens_counter[task_id] / (time.time() - inference_start_time)}" ) llm_logger.info(f"{self.resource_manager.info()}") if self.cfg.speculative_config.method: @@ -809,23 +795,24 @@ def _record_metrics(self, task, current_time, token_ids): def _record_first_token_metrics(self, task, current_time): """Record metrics for first token""" - task.first_token_time = current_time + metrics = task.metrics trace_print(LoggingEventName.FIRST_TOKEN_GENERATED, task.request_id, getattr(task, "user", "")) trace_print(LoggingEventName.DECODE_START, task.request_id, getattr(task, "user", "")) - main_process_metrics.time_to_first_token.observe(current_time - task.arrival_time) - main_process_metrics.request_queue_time.observe(task.inference_start_time - task.preprocess_end_time) - main_process_metrics.request_prefill_time.observe(current_time - task.inference_start_time) + main_process_metrics.time_to_first_token.observe(current_time - metrics.arrival_time) + main_process_metrics.request_queue_time.observe(metrics.inference_start_time - metrics.preprocess_end_time) + main_process_metrics.request_prefill_time.observe(current_time - metrics.inference_start_time) def _record_completion_metrics(self, task, current_time): """Record metrics when request completes""" - if hasattr(task, "first_token_time"): - decode_time = current_time - task.first_token_time + metrics = task.metrics + if metrics.engine_recv_first_token_time: + decode_time = current_time - metrics.engine_recv_first_token_time main_process_metrics.request_decode_time.observe(decode_time) trace_print(LoggingEventName.INFERENCE_END, task.request_id, getattr(task, "user", "")) trace_print(LoggingEventName.POSTPROCESSING_START, task.request_id, getattr(task, "user", "")) main_process_metrics.num_requests_running.dec(1) main_process_metrics.request_success_total.inc() - main_process_metrics.request_inference_time.observe(current_time - task.inference_start_time) + main_process_metrics.request_inference_time.observe(current_time - metrics.inference_start_time) main_process_metrics.request_generation_tokens.observe(self.tokens_counter[task.request_id]) def _record_speculative_decoding_mertics(self, accept_num): diff --git a/fastdeploy/scheduler/splitwise_scheduler.py b/fastdeploy/scheduler/splitwise_scheduler.py index 94c947ea49f..1b34fc13063 100644 --- a/fastdeploy/scheduler/splitwise_scheduler.py +++ b/fastdeploy/scheduler/splitwise_scheduler.py @@ -302,7 +302,7 @@ def add_req(self, req): add a req to reader, reader will async fetch infer result from redis """ with self.lock: - self.reqs[req.request_id] = {"arrival_time": req.arrival_time} + self.reqs[req.request_id] = {"arrival_time": req.metrics.arrival_time} self.out_buffer[req.request_id] = [] def read(self): @@ -834,7 +834,7 @@ def get_requests( break req = self.reqs_queue.popleft() - if cur_time - req.arrival_time > self.ttl: + if cur_time - req.metrics.arrival_time > self.ttl: logger.error(f"req({req.request_id}) is expired({self.ttl}) when InferScheduler Get Requests") self.node.finish_req(req.request_id) continue