Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/online_serving/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ mm_hashes: Optional[list] = None
# Hash values for multimodal (e.g., image/audio) inputs, used for verification or tracking.
# Default None indicates no multimodal input or hash validation required.

collect_metrics: Optional[bool] = False
# Whether to return metric information, for performance analysis or debugging (default is False, meaning no metrics are returned).
```

### Differences in Return Fields
Expand Down
2 changes: 2 additions & 0 deletions docs/zh/online_serving/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ logits_processors_args: Optional[Dict] = None
mm_hashes: Optional[list] = None
# 多模态(multimodal)输入的哈希值列表,用于验证或跟踪输入内容(如图像、音频等)。默认 None 表示无多模态输入或无需哈希验证。

collect_metrics: Optional[bool] = False
# 是否返回生成过程中的指标信息,用于性能分析或调试(默认 False 表示不返回)。
```

### 返回字段差异
Expand Down
10 changes: 4 additions & 6 deletions fastdeploy/demo/offline_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@
from fastdeploy.engine.sampling_params import SamplingParams
from fastdeploy.entrypoints.llm import LLM

model_name_or_path = "/workspace/ERNIE-4.5-0.3B-Paddle"

# 超参设置
sampling_params = SamplingParams(temperature=0.1, max_tokens=30, prompt_logprobs=100)
llm = LLM(model=model_name_or_path, tensor_parallel_size=1, enable_prefix_caching=False)
output = llm.generate(prompts="who are you?", use_tqdm=True, sampling_params=sampling_params)
model_name_or_path = "PaddlePaddle/ERNIE-4.5-0.3B-Paddle"
sampling_params = SamplingParams(temperature=0.1, max_tokens=30)
llm = LLM(model=model_name_or_path)
output = llm.generate(prompts="who are you?", use_tqdm=True, sampling_params=sampling_params)

print(output)
6 changes: 3 additions & 3 deletions fastdeploy/engine/async_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ async def add_request(

try:
request = Request.from_dict(prompt)
request.llm_engine_recv_req_timestamp = time.time()
request.metrics.scheduler_recv_req_time = time.time()

# Check if already preprocessed by AsyncEngineClient
is_preprocessed = prompt.get("_preprocessed", False)
Expand All @@ -419,7 +419,7 @@ async def add_request(
request.need_prefill_tokens = prompt_token_ids_len

if not is_preprocessed:
request.preprocess_start_time = arrival_time
request.metrics.preprocess_start_time = arrival_time
input_ids_len = request.prompt_token_ids_len

request.set(
Expand Down Expand Up @@ -448,7 +448,7 @@ async def add_request(
llm_logger.error(error_msg)
raise EngineError(error_msg, error_code=400)

request.preprocess_end_time = time.time()
request.metrics.preprocess_end_time = time.time()

# Register output queue first, then add request
await self.output_processor.register_request(request_id, output_queue)
Expand Down
64 changes: 42 additions & 22 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,6 @@ def insert_tasks(self, tasks: List[Request], current_id=-1):
tasks.remove(tmp_task)

for item in tasks:
item.schedule_start_time = time.time()
trace_print(LoggingEventName.RESOURCE_ALLOCATE_START, item.request_id, getattr(item, "user", ""))
available_batch = np.sum(self.resource_manager.stop_flags)
if len(tasks) > available_batch:
Expand Down Expand Up @@ -400,7 +399,7 @@ def insert_tasks(self, tasks: List[Request], current_id=-1):
if not is_decode:
self.llm_logger.info(f"Tasks are sent to engine, req_ids={req_ids}")
for task in tasks:
task.inference_start_time = time.time()
task.metrics.inference_start_time = time.time()
trace_print(LoggingEventName.RESOURCE_ALLOCATE_END, task.request_id, getattr(task, "user", ""))
trace_print(LoggingEventName.REQUEST_SCHEDULE_END, task.request_id, getattr(task, "user", ""))
trace_print(LoggingEventName.INFERENCE_START, task.request_id, getattr(task, "user", ""))
Expand All @@ -415,7 +414,7 @@ def insert_tasks(self, tasks: List[Request], current_id=-1):
def _insert_prefilled_requests(self, request_outputs: List[RequestOutput]):
"""
Decode insert prefilled requests into engine worker queue.
Used in v1_kvcache_scheduler.
Used in v0_kvcache_scheduler.
Args:
request_outputs: a list of RequestOutput sent by prefill instance
"""
Expand All @@ -437,6 +436,10 @@ def _insert_prefilled_requests(self, request_outputs: List[RequestOutput]):

cur_req.prompt_token_ids[0] = req_out.outputs.token_ids[0]
cur_req.num_cached_tokens = req_out.num_cached_tokens
req_out.metrics.decode_recv_req_time = cur_req.metrics.decode_recv_req_time
req_out.metrics.decode_preallocat_req_time = cur_req.metrics.decode_preallocat_req_time
cur_req.metrics = req_out.metrics
cur_req.metrics.decode_inference_start_time = time.time()
if self.cfg.speculative_config.method in ["mtp"] and self.cfg.scheduler_config.splitwise_role == "decode":
cur_req.draft_token_ids = copy.deepcopy(req_out.outputs.draft_token_ids)

Expand Down Expand Up @@ -644,6 +647,7 @@ def _schedule_request_to_worker(self):
batch=num_prefill_batch,
)
for task in tasks:
task.metrics.engine_get_req_time = time.time()
trace_print(LoggingEventName.REQUEST_QUEUE_END, task.request_id, getattr(task, "user", ""))
if len(tasks) == 0:
time.sleep(0.001)
Expand Down Expand Up @@ -706,7 +710,7 @@ def _fetch_request():
batch=num_prefill_batch,
)
for task in tasks:
task.schedule_start_time = time.time()
task.metrics.engine_get_req_time = time.time()
trace_print(LoggingEventName.REQUEST_QUEUE_END, task.request_id, getattr(task, "user", ""))

if self.cfg.scheduler_config.splitwise_role == "decode":
Expand All @@ -725,13 +729,15 @@ def _fetch_request():
while not self.resource_manager.preallocate_resource_in_p(task):
time.sleep(0.005)
self.llm_logger.info(f"ask D resource for req_id: {task.request_id}")
task.metrics.ask_decode_resource_start_time = time.time()
while True:
self.split_connector.send_splitwise_tasks([task], task.idx)
status, msg = self.split_connector.check_decode_allocated(task)
if not status:
self.llm_logger.error(f"{task.request_id} ask D resource failed, try again.")
time.sleep(0.05)
else:
task.metrics.ask_decode_resource_finish_time = time.time()
break
else:
for task in tasks:
Expand All @@ -740,30 +746,32 @@ def _fetch_request():
self.llm_logger.info("wait for preallocate_resource_in_p")
time.sleep(0.005)
self.llm_logger.info(f"ask D resource for req_id: {task.request_id}")
task.metrics.ask_decode_resource_start_time = time.time()
self.split_connector.send_splitwise_tasks([task], task.idx)

for task in tasks:
if self.cfg.scheduler_config.splitwise_role != "mixed":
# assure fetch block ids from D
status, msg = self.split_connector.check_decode_allocated(task)
if not status:
self.llm_logger.error(f"{task.request_id} prefill failed with msg:{msg}.")
self.scheduler.put_results(
[
RequestOutput(
request_id=task.request_id,
finished=True,
error_code=500,
error_msg=msg,
)
]
)
need_delete_tasks.append(task)
continue
# assure fetch block ids from D
status, msg = self.split_connector.check_decode_allocated(task)
task.metrics.ask_decode_resource_finish_time = time.time()
if not status:
self.llm_logger.error(f"{task.request_id} prefill failed with msg:{msg}.")
self.scheduler.put_results(
[
RequestOutput(
request_id=task.request_id,
finished=True,
error_code=500,
error_msg=msg,
)
]
)
need_delete_tasks.append(task)
continue
for tmp_task in need_delete_tasks:
tasks.remove(tmp_task)
# release resource in P
self.resource_manager.pre_recycle_resource(tmp_task.request_id)

if self.cfg.scheduler_config.splitwise_role == "prefill":
# to send cache info to cache messager
if tasks:
Expand All @@ -779,9 +787,11 @@ def _fetch_request():
need_check_req_ids.remove(req_id)
else:
time.sleep(0.001)

# Fetch requests and add them to the scheduling queue
if tasks:
for task in tasks:
task.metrics.add_req_to_resource_manager_time = time.time()
trace_print(
LoggingEventName.RESOURCE_ALLOCATE_START, task.request_id, getattr(task, "user", "")
)
Expand Down Expand Up @@ -844,6 +854,11 @@ def _fetch_request():
)
self.resource_manager.get_real_bsz()
for task in tasks:
if isinstance(task, Request):
if self.cfg.scheduler_config.splitwise_role == "decode":
task.metrics.decode_inference_start_time = time.time()
else:
task.metrics.inference_start_time = time.time()
trace_print(LoggingEventName.RESOURCE_ALLOCATE_END, task.request_id, getattr(task, "user", ""))
trace_print(LoggingEventName.REQUEST_SCHEDULE_END, task.request_id, getattr(task, "user", ""))
trace_print(LoggingEventName.INFERENCE_START, task.request_id, getattr(task, "user", ""))
Expand Down Expand Up @@ -914,7 +929,7 @@ def _insert_zmq_task_to_scheduler(self):
err_msg = None
try:
request = Request.from_dict(data)
request.llm_engine_recv_req_timestamp = time.time()
request.metrics.scheduler_recv_req_time = time.time()
start_span("ENQUEUE_ZMQ", data, trace.SpanKind.PRODUCER)
main_process_metrics.requests_number.inc()
self.llm_logger.debug(f"Receive request: {request}")
Expand Down Expand Up @@ -1083,13 +1098,16 @@ def _fetch_requests():
tasks = item[1]
if isinstance(tasks[0], Request):
self.llm_logger.debug(f"receive tasks to preallocate resource, {tasks}")
for task in tasks:
task.metrics.decode_recv_req_time = time.time()
allocate_resource_requests.extend(tasks)
elif isinstance(tasks[0], RequestOutput):
self.llm_logger.debug(f"receive prefilled tasks, {tasks}")
if not isinstance(tasks, list):
tasks = [tasks]
for task in tasks:
task.finished = False
task.metrics.decode_recv_first_token_time = time.time()
prefilled_request_ouputs.extend(tasks)

def _process_allocate_resource_requests():
Expand All @@ -1099,6 +1117,7 @@ def _process_allocate_resource_requests():

if envs.ENABLE_V1_KVCACHE_SCHEDULER:
if self.resource_manager.preallocate_resource_in_d(task):
task.metrics.decode_preallocat_req_time = time.time()
self.llm_logger.info(f"Resource available, processing task {task.request_id}")
self.split_connector.send_cache_info_to_prefill([task])
processed_indices.append(idx)
Expand All @@ -1107,6 +1126,7 @@ def _process_allocate_resource_requests():
if self.resource_manager.is_resource_sufficient(task.prompt_token_ids_len):
self.llm_logger.info(f"Resource available, processing task {task.request_id}")
self.insert_tasks([task])
task.metrics.decode_preallocat_req_time = time.time()
processed_indices.append(idx)
is_success = True

Expand Down
7 changes: 4 additions & 3 deletions fastdeploy/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,13 +256,13 @@ def add_requests(self, task, sampling_params=None, **kwargs):
if sampling_params is not None:
task.update(asdict(sampling_params))
request = Request.from_dict(task)
request.llm_engine_recv_req_timestamp = time.time()
request.metrics.scheduler_recv_req_time = time.time()
llm_logger.info(f"Receive request {request}")
if sampling_params is not None:
if sampling_params.temperature is not None and abs(sampling_params.temperature) < 1e-06:
sampling_params.temperature = 1e-06
request.sampling_params = sampling_params
request.preprocess_start_time = time.time()
request.metrics.preprocess_start_time = time.time()
chat_template_kwargs = kwargs.get("chat_template_kwargs") or {}
chat_template_kwargs["chat_template"] = kwargs.get("chat_template")
kwargs["chat_template_kwargs"] = chat_template_kwargs
Expand Down Expand Up @@ -324,7 +324,8 @@ def add_requests(self, task, sampling_params=None, **kwargs):
llm_logger.error(err_msg)
raise EngineError(err_msg, error_code=400)

request.preprocess_end_time = time.time()
request.metrics.preprocess_end_time = time.time()
request.metrics.scheduler_recv_req_time = time.time()
self.engine.scheduler.put_requests([request])
llm_logger.info(f"Cache task with request_id ({request.get('request_id')})")
llm_logger.debug(f"cache task: {request}")
Expand Down
Loading
Loading