|
11 | 11 | from contextlib import ExitStack, contextmanager |
12 | 12 | from inspect import isclass, signature |
13 | 13 | from logging import DEBUG |
14 | | -from typing import Any, TypeVar, cast |
| 14 | +from typing import Any, TypeVar |
15 | 15 |
|
16 | 16 | import msgspec |
17 | 17 | import zmq |
@@ -180,11 +180,13 @@ def __init__( |
180 | 180 | logger.info("Batch queue is enabled with size %d", self.batch_queue_size) |
181 | 181 | self.batch_queue = deque(maxlen=self.batch_queue_size) |
182 | 182 |
|
| 183 | + self.ec_producer = ( |
| 184 | + vllm_config.ec_transfer_config is not None |
| 185 | + and vllm_config.ec_transfer_config.is_ec_producer |
| 186 | + ) |
| 187 | + |
183 | 188 | self.request_block_hasher: Callable[[Request], list[BlockHash]] | None = None |
184 | | - if ( |
185 | | - self.vllm_config.cache_config.enable_prefix_caching |
186 | | - or kv_connector is not None |
187 | | - ): |
| 189 | + if vllm_config.cache_config.enable_prefix_caching or kv_connector is not None: |
188 | 190 | caching_hash_fn = get_hash_fn_by_name( |
189 | 191 | vllm_config.cache_config.prefix_caching_hash_algo |
190 | 192 | ) |
@@ -244,7 +246,7 @@ def _initialize_kv_caches( |
244 | 246 |
|
245 | 247 | elapsed = time.time() - start |
246 | 248 | logger.info_once( |
247 | | - ("init engine (profile, create kv cache, warmup model) took %.2f seconds"), |
| 249 | + "init engine (profile, create kv cache, warmup model) took %.2f seconds", |
248 | 250 | elapsed, |
249 | 251 | scope="local", |
250 | 252 | ) |
@@ -310,6 +312,16 @@ def log_error_detail(self, scheduler_output: SchedulerOutput): |
310 | 312 | ) |
311 | 313 | raise err |
312 | 314 |
|
| 315 | + def _log_err_callback(self, scheduler_output: SchedulerOutput): |
| 316 | + """Log error details of a future that's not expected to return a result.""" |
| 317 | + |
| 318 | + def callback(f, sched_output=scheduler_output): |
| 319 | + with self.log_error_detail(sched_output): |
| 320 | + result = f.result() |
| 321 | + assert result is None |
| 322 | + |
| 323 | + return callback |
| 324 | + |
313 | 325 | def step(self) -> tuple[dict[int, EngineCoreOutputs], bool]: |
314 | 326 | """Schedule, execute, and make output. |
315 | 327 |
|
@@ -370,34 +382,28 @@ def step_with_batch_queue( |
370 | 382 | deferred_scheduler_output = None |
371 | 383 | if self.scheduler.has_requests(): |
372 | 384 | scheduler_output = self.scheduler.schedule() |
373 | | - exec_future = self.model_executor.execute_model( |
374 | | - scheduler_output, non_block=True |
375 | | - ) |
376 | | - model_executed = scheduler_output.total_num_scheduled_tokens > 0 |
377 | | - |
378 | | - if scheduler_output.pending_structured_output_tokens: |
379 | | - # We need to defer sampling until we have processed the model output |
380 | | - # from the prior step. |
381 | | - deferred_scheduler_output = scheduler_output |
382 | | - # Block-wait for execute to return (continues running async on the GPU). |
383 | | - with self.log_error_detail(scheduler_output): |
384 | | - exec_result = exec_future.result() |
385 | | - assert exec_result is None |
386 | | - else: |
387 | | - # We aren't waiting for any tokens, get any grammar output immediately. |
388 | | - grammar_output = self.scheduler.get_grammar_bitmask(scheduler_output) |
389 | | - # Block-wait for execute to return (continues running async on the GPU). |
390 | | - with self.log_error_detail(scheduler_output): |
391 | | - exec_result = exec_future.result() |
392 | | - |
393 | | - if exec_result is None: |
394 | | - # Call sample tokens. |
| 385 | + future = self.model_executor.execute_model(scheduler_output, non_block=True) |
| 386 | + if not self.ec_producer: |
| 387 | + model_executed = scheduler_output.total_num_scheduled_tokens > 0 |
| 388 | + |
| 389 | + if model_executed: |
| 390 | + future.add_done_callback(self._log_err_callback(scheduler_output)) |
| 391 | + |
| 392 | + if not scheduler_output.pending_structured_output_tokens: |
| 393 | + # We aren't waiting for any tokens, get any grammar output |
| 394 | + # and sample immediately. |
| 395 | + grammar_output = self.scheduler.get_grammar_bitmask( |
| 396 | + scheduler_output |
| 397 | + ) |
395 | 398 | future = self.model_executor.sample_tokens( |
396 | 399 | grammar_output, non_block=True |
397 | 400 | ) |
398 | 401 | else: |
399 | | - # No sampling required (e.g. all requests finished). |
400 | | - future = cast(Future[ModelRunnerOutput], exec_future) |
| 402 | + # We need to defer sampling until we have processed the model output |
| 403 | + # from the prior step. |
| 404 | + deferred_scheduler_output = scheduler_output |
| 405 | + |
| 406 | + if not deferred_scheduler_output: |
401 | 407 | # Add this step's future to the queue. |
402 | 408 | batch_queue.appendleft((future, scheduler_output)) |
403 | 409 | if ( |
|
0 commit comments