@@ -180,11 +180,13 @@ def __init__(
180180 logger .info ("Batch queue is enabled with size %d" , self .batch_queue_size )
181181 self .batch_queue = deque (maxlen = self .batch_queue_size )
182182
183+ self .ec_producer = (
184+ vllm_config .ec_transfer_config is not None
185+ and vllm_config .ec_transfer_config .is_ec_producer
186+ )
187+
183188 self .request_block_hasher : Callable [[Request ], list [BlockHash ]] | None = None
184- if (
185- self .vllm_config .cache_config .enable_prefix_caching
186- or kv_connector is not None
187- ):
189+ if vllm_config .cache_config .enable_prefix_caching or kv_connector is not None :
188190 caching_hash_fn = get_hash_fn_by_name (
189191 vllm_config .cache_config .prefix_caching_hash_algo
190192 )
@@ -244,7 +246,7 @@ def _initialize_kv_caches(
244246
245247 elapsed = time .time () - start
246248 logger .info_once (
247- ( "init engine (profile, create kv cache, warmup model) took %.2f seconds" ) ,
249+ "init engine (profile, create kv cache, warmup model) took %.2f seconds" ,
248250 elapsed ,
249251 scope = "local" ,
250252 )
@@ -310,6 +312,16 @@ def log_error_detail(self, scheduler_output: SchedulerOutput):
310312 )
311313 raise err
312314
315+ def _log_err_callback (self , scheduler_output : SchedulerOutput ):
316+ """Log error details of a future that's not expected to return a result."""
317+
318+ def callback (f , sched_output = scheduler_output ):
319+ with self .log_error_detail (sched_output ):
320+ result = f .result ()
321+ assert result is None
322+
323+ return callback
324+
313325 def step (self ) -> tuple [dict [int , EngineCoreOutputs ], bool ]:
314326 """Schedule, execute, and make output.
315327
@@ -373,31 +385,30 @@ def step_with_batch_queue(
373385 exec_future = self .model_executor .execute_model (
374386 scheduler_output , non_block = True
375387 )
376- model_executed = scheduler_output .total_num_scheduled_tokens > 0
377-
378- if scheduler_output .pending_structured_output_tokens :
379- # We need to defer sampling until we have processed the model output
380- # from the prior step.
381- deferred_scheduler_output = scheduler_output
382- # Block-wait for execute to return (continues running async on the GPU).
383- with self .log_error_detail (scheduler_output ):
384- exec_result = exec_future .result ()
385- assert exec_result is None
388+ if not self .ec_producer :
389+ model_executed = scheduler_output .total_num_scheduled_tokens > 0
390+
391+ if not model_executed :
392+ # No sampling required (no requests scheduled).
393+ future = cast (Future [ModelRunnerOutput ], exec_future )
386394 else :
387- # We aren't waiting for any tokens, get any grammar output immediately.
388- grammar_output = self . scheduler . get_grammar_bitmask ( scheduler_output )
389- # Block-wait for execute to return (continues running async on the GPU).
390- with self . log_error_detail ( scheduler_output ):
391- exec_result = exec_future . result ()
392-
393- if exec_result is None :
394- # Call sample tokens.
395+ exec_future . add_done_callback ( self . _log_err_callback ( scheduler_output ))
396+
397+ if not scheduler_output . pending_structured_output_tokens :
398+ # We aren't waiting for any tokens, get any grammar output
399+ # and sample immediately.
400+ grammar_output = self . scheduler . get_grammar_bitmask (
401+ scheduler_output
402+ )
395403 future = self .model_executor .sample_tokens (
396404 grammar_output , non_block = True
397405 )
398406 else :
399- # No sampling required (e.g. all requests finished).
400- future = cast (Future [ModelRunnerOutput ], exec_future )
407+ # We need to defer sampling until we have processed the model output
408+ # from the prior step.
409+ deferred_scheduler_output = scheduler_output
410+
411+ if not deferred_scheduler_output :
401412 # Add this step's future to the queue.
402413 batch_queue .appendleft ((future , scheduler_output ))
403414 if (
0 commit comments