Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 100 additions & 24 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,16 +304,6 @@ def insert_tasks(self, tasks, current_id=-1, allocated=False):
status, msg = self.split_connector.check_decode_allocated(task)
if not status:
self.llm_logger.error(f"{task.request_id} prefill failed with msg:{msg}.")
self.scheduler.put_results(
[
RequestOutput(
request_id=task.request_id,
finished=True,
error_code=500,
error_msg=msg,
)
]
)
need_delete_tasks.append(task)
continue
for tmp_task in need_delete_tasks:
Expand Down Expand Up @@ -603,36 +593,77 @@ def _fetch_request():
task.schedule_start_time = time.time()

self.llm_logger.debug(f"get tasks from scheduler: {tasks}")

if self.cfg.scheduler_config.splitwise_role != "mixed":
need_delete_tasks = []
if envs.FD_OFFLINE_PERF_TEST_FOR_PD:
for task in tasks:
if self.resource_manager.has_existed_request(task.request_id):
self.llm_logger.error(
f"request_id: {task.request_id} has been added to scheduler, recieved requests with same request_id."
)
need_delete_tasks.append(task)
continue
# assure can allocate block ids in P
while not self.resource_manager.preallocate_resource_in_p(task):
time.sleep(0.005)
self.llm_logger.info(f"ask D resource for req_id: {task.request_id}")
while True:
self.split_connector.send_splitwise_tasks([task], task.idx)
is_successful = self.split_connector.send_splitwise_tasks([task], task.idx)
if not is_successful: # Send request for block ids to D failed
self.llm_logger.error(f"{task.request_id} send request for block ids to D failed.")
self.scheduler.put_results(
[
RequestOutput(
request_id=task.request_id,
finished=True,
error_code=500,
error_msg="send request for block ids to D failed",
)
]
)
need_delete_tasks.append(task)
break
status, msg = self.split_connector.check_decode_allocated(task)
if not status:
self.llm_logger.error(f"{task.request_id} ask D resource failed, try again.")
self.llm_logger.error(
f"{task.request_id} ask D resource failed, due to: {msg}, try again."
)
time.sleep(0.05)
else:
break
else:
for task in tasks:
if self.resource_manager.has_existed_request(task.request_id):
self.llm_logger.error(
f"request_id: {task.request_id} has been added to scheduler, recieved requests with same request_id."
)
need_delete_tasks.append(task)
continue
# assure can allocate block ids in P
while not self.resource_manager.preallocate_resource_in_p(task):
time.sleep(0.005)
self.llm_logger.info(f"ask D resource for req_id: {task.request_id}")
self.split_connector.send_splitwise_tasks([task], task.idx)

for task in tasks:
if self.cfg.scheduler_config.splitwise_role != "mixed":
# assure fetch block ids from D
status, msg = self.split_connector.check_decode_allocated(task)
if not status:
self.llm_logger.error(f"{task.request_id} prefill failed with msg:{msg}.")
is_successful = self.split_connector.send_splitwise_tasks([task], task.idx)
if not is_successful: # Send request for block ids to D failed
self.llm_logger.error(f"{task.request_id} send request for block ids to D failed.")
self.scheduler.put_results(
[
RequestOutput(
request_id=task.request_id,
finished=True,
error_code=500,
error_msg=msg,
)
]
)
need_delete_tasks.append(task)
continue
# assure fetch block ids from D
status, msg = self.split_connector.check_decode_allocated(task)
if not status:
self.llm_logger.error(f"{task.request_id} prefill failed with msg:{msg}.")
if msg != "Add task repeated": # if request repeated in D, do not need report.
self.scheduler.put_results(
[
RequestOutput(
Expand All @@ -643,8 +674,8 @@ def _fetch_request():
)
]
)
need_delete_tasks.append(task)
continue
need_delete_tasks.append(task)
continue
for tmp_task in need_delete_tasks:
tasks.remove(tmp_task)
# release resource in P
Expand Down Expand Up @@ -936,8 +967,35 @@ def receiver_loop():
for task in tasks:
can_allocate_resource = False
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
if self.resource_manager.has_existed_request(task.request_id):
self.llm_logger.error(
f"request_id: {task.request_id} has been added to scheduler, can not add it again."
)
task.error_msg = "Add task repeated"
task.error_code = 501
new_waiting.append(task)
continue
if self.resource_manager.preallocate_resource_in_d(task):
self.split_connector.send_cache_infos([task], -1)
is_successful = self.split_connector.send_cache_infos([task], -1)
if is_successful is False:
cur_task = self.resource_manager.requests[task.request_id]
self.resource_manager.prerelease_resource(cur_task)
if cur_task.request_id in self.token_processor.tokens_counter:
del self.token_processor.tokens_counter[task.request_id]
self.scheduler.put_results(
[
RequestOutput(
request_id=task.request_id,
finished=True,
error_code=500,
error_msg="failed to send block ids back to prefill instance",
)
]
)
self.llm_logger.error(
f"request {task.request_id} failed to send block_ids back to Prefill instance."
)
continue
can_allocate_resource = True
else:
if self.resource_manager.is_resource_sufficient(
Expand All @@ -952,7 +1010,25 @@ def receiver_loop():

if new_waiting:
if not self.enable_decode_cache_task:
self.split_connector.send_cache_infos(new_waiting, -1)
for task in new_waiting:
is_successful = self.split_connector.send_cache_infos([task], -1)
if (
is_successful is False
): # not enough block ids, D not allocated yet, due to communication failed, just report
if (
task.error_code != 501
): # if repeated request, do not need to report again
self.scheduler.put_results(
[
RequestOutput(
request_id=task.request_id,
finished=True,
error_code=500,
error_msg="failed to send not enough blocks msg to prefill instance",
)
]
)

else:
self.waiting_requests.extend(new_waiting)
self.llm_logger.info(
Expand Down
41 changes: 40 additions & 1 deletion fastdeploy/engine/sched/resource_manager_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from fastdeploy.engine.request import Request, RequestOutput, RequestStatus, RequestType
from fastdeploy.engine.resource_manager import ResourceManager
from fastdeploy.metrics.metrics import main_process_metrics
from fastdeploy.utils import llm_logger
from fastdeploy.utils import envs, llm_logger


@dataclass
Expand Down Expand Up @@ -94,6 +94,9 @@ def __init__(self, max_num_seqs, config, tensor_parallel_size, splitwise_role, l
main_process_metrics.set_value("max_batch_size", max_num_seqs)

self.using_extend_tables_req_id = set()
if self.config.scheduler_config.splitwise_role == "decode":
self.preallocated_requests_timestamp = {}
threading.Thread(target=self._monitor_recycle_block_ids_in_D, daemon=True).start()

def allocated_slots(self, request: Request):
return len(request.block_tables) * self.config.cache_config.block_size
Expand Down Expand Up @@ -607,6 +610,26 @@ def add_request_in_p(self, requests: list[Request]):
request.inference_start_time = time.time()
self.running.append(request)

def _monitor_recycle_block_ids_in_D(self):
while True:
try:
with self.lock:
need_recycle_request_ids = []
for request_id, timestamp in self.preallocated_requests_timestamp.items():
if time.time() - timestamp >= envs.FD_GET_FIRST_TOKEN_FROM_P_TIMEOUT:
need_recycle_request_ids.append(request_id)
for request_id in need_recycle_request_ids:
llm_logger.error(
f"Recycle block ids for request {request_id} forcefully, due to get first token from P timeout."
)
del self.preallocated_requests_timestamp[request_id]
request = self.requests[request_id]
self.prerelease_resource(request)
time.sleep(10)
except Exception as e:
llm_logger.error(f"Monitor recycle block ids in D error: {e}, {str(traceback.format_exc())}")
time.sleep(10)

def preallocate_resource_in_p(self, request: Request):
"""
In P/D aggregated deployment, preallocate resource for P.
Expand Down Expand Up @@ -689,17 +712,33 @@ def preallocate_resource_in_d(self, request: Request):
self.stop_flags[request.idx] = False
self.requests[request.request_id] = request
self.req_dict[request.request_id] = allocated_position
self.preallocated_requests_timestamp[request.request_id] = time.time()
return True
return False

def has_existed_request(self, request_id):
"""
Whether a request with the given request_id has been added to the scheduler.
"""
if request_id in self.requests:
return True
return False

def insert_task_for_decoding(self, request_output_in_p: RequestOutput):
"""
In P/D aggregated deployment, D should continue to decode after recieving first token and cache from P.
"""
assert self.config.scheduler_config.splitwise_role == "decode", "Only D instance can call this method"
with self.lock:
if request_output_in_p.request_id not in self.requests:
llm_logger.error(
f"request {request_output_in_p.request_id} with first token from P not found in preallocated resource, please check whether recycled due to timeout."
)
return
request = self.requests[request_output_in_p.request_id]
request.output_token_ids.append(request_output_in_p.outputs.token_ids[0])
if request.request_id:
del self.preallocated_requests_timestamp[request.request_id]
request.num_cached_tokens = request_output_in_p.num_cached_tokens
if (
self.config.speculative_config.method in ["mtp"]
Expand Down
6 changes: 6 additions & 0 deletions fastdeploy/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@
"FD_DEFAULT_METRIC_LABEL_VALUES": lambda: os.getenv("FD_DEFAULT_METRIC_LABEL_VALUES", "{}"),
# Enable offline perf test mode for PD disaggregation
"FD_OFFLINE_PERF_TEST_FOR_PD": lambda: int(os.getenv("FD_OFFLINE_PERF_TEST_FOR_PD", "0")),
# Timout for D response in PD disaggregation
"FD_GET_RESPONSE_FROM_D_TIMEOUT": lambda: int(os.getenv("FD_GET_RESPONSE_FROM_D_TIMEOUT", "5")),
# Timeout for first token from P in PD disaggregation
"FD_GET_FIRST_TOKEN_FROM_P_TIMEOUT": lambda: int(os.getenv("FD_GET_FIRST_TOKEN_FROM_P_TIMEOUT", "300")),
# Timeout for token processor health check
"FD_TOKEN_PROCESSOR_HEALTH_TIMEOUT": lambda: int(os.getenv("FD_TOKEN_PROCESSOR_HEALTH_TIMEOUT", "120")),
}


Expand Down
Loading