From 549b61f72e1ae185a8a3ebdb30158151f15a3884 Mon Sep 17 00:00:00 2001 From: harrisonyhq Date: Wed, 5 Nov 2025 00:30:07 -0800 Subject: [PATCH 1/2] [Patch] Separate patch into different file by feature --- ...llm-adapt.patch => vllm-adapt-aggre.patch} | 653 ++---------------- .../vllm/patch/0.9.2/vllm-adapt-pc.patch | 122 ++++ .../vllm/patch/0.9.2/vllm-adapt-sparse.patch | 474 +++++++++++++ 3 files changed, 641 insertions(+), 608 deletions(-) rename ucm/integration/vllm/patch/0.9.2/{vllm-adapt.patch => vllm-adapt-aggre.patch} (54%) create mode 100644 ucm/integration/vllm/patch/0.9.2/vllm-adapt-pc.patch create mode 100644 ucm/integration/vllm/patch/0.9.2/vllm-adapt-sparse.patch diff --git a/ucm/integration/vllm/patch/0.9.2/vllm-adapt.patch b/ucm/integration/vllm/patch/0.9.2/vllm-adapt-aggre.patch similarity index 54% rename from ucm/integration/vllm/patch/0.9.2/vllm-adapt.patch rename to ucm/integration/vllm/patch/0.9.2/vllm-adapt-aggre.patch index 29039af6..5f8df381 100644 --- a/ucm/integration/vllm/patch/0.9.2/vllm-adapt.patch +++ b/ucm/integration/vllm/patch/0.9.2/vllm-adapt-aggre.patch @@ -1,138 +1,36 @@ -From 67bb33e6d97dc5f55013ecfb4fb419f51e8b3c36 Mon Sep 17 00:00:00 2001 -From: wenxinwang -Date: Tue, 4 Nov 2025 17:41:40 +0800 -Subject: [PATCH] adapt to deepseek patch +From 6e2c814bb3b3a74ca56149b44d6a0b2017b91136 Mon Sep 17 00:00:00 2001 +From: harrisonyhq +Date: Tue, 4 Nov 2025 23:32:10 -0800 +Subject: [PATCH 2/3] [Patch1] Patch for load failure and aggregate --- - vllm/attention/layer.py | 49 +++- - .../kv_transfer/kv_connector/utils.py | 113 +++++++++ + .../kv_transfer/kv_connector/utils.py | 113 +++++++++++ .../kv_transfer/kv_connector/v1/base.py | 9 + .../kv_connector/v1/multi_connector.py | 6 + - .../v1/shared_storage_connector.py | 7 +- - vllm/v1/attention/backends/mla/common.py | 10 +- vllm/v1/core/block_pool.py | 2 +- - vllm/v1/core/kv_cache_manager.py | 7 +- - vllm/v1/core/sched/output.py | 5 + - vllm/v1/core/sched/scheduler.py | 217 ++++++++++++++++-- + vllm/v1/core/sched/output.py | 2 + + vllm/v1/core/sched/scheduler.py | 184 ++++++++++++++++-- vllm/v1/core/single_type_kv_cache_manager.py | 3 + vllm/v1/executor/multiproc_executor.py | 30 ++- - vllm/v1/outputs.py | 7 +- - vllm/v1/request.py | 2 +- - vllm/v1/worker/block_table.py | 13 ++ + vllm/v1/outputs.py | 6 +- vllm/v1/worker/gpu_input_batch.py | 14 ++ - vllm/v1/worker/gpu_model_runner.py | 104 +++++++-- - vllm/v1/worker/gpu_worker.py | 25 +- - 18 files changed, 571 insertions(+), 52 deletions(-) + vllm/v1/worker/gpu_model_runner.py | 28 ++- + vllm/v1/worker/gpu_worker.py | 23 ++- + 12 files changed, 397 insertions(+), 23 deletions(-) -diff --git a/vllm/attention/layer.py b/vllm/attention/layer.py -index f0ad68b16..728ab99fd 100644 ---- a/vllm/attention/layer.py -+++ b/vllm/attention/layer.py -@@ -2,7 +2,6 @@ - # SPDX-FileCopyrightText: Copyright contributors to the vLLM project - """Attention layer.""" - from typing import Any, Dict, List, Optional -- - import torch - import torch.nn as nn - import torch.nn.functional as F -@@ -22,6 +21,7 @@ from vllm.model_executor.layers.quantization.kv_cache import BaseKVCacheMethod - from vllm.platforms import _Backend, current_platform - from vllm.utils import direct_register_custom_op - from vllm.v1.attention.backends.utils import validate_kv_sharing_target -+from ucm.sparse.state import get_ucm_sparse, has_ucm_sparse - - - class Attention(nn.Module): -@@ -409,9 +409,10 @@ def unified_attention( - attn_metadata = attn_metadata[layer_name] - self = forward_context.no_compile_layers[layer_name] - kv_cache = self.kv_cache[forward_context.virtual_engine] -+ maybe_execute_sparse_attention_begin(query, key, value, layer_name, forward_context) - output = self.impl.forward(self, query, key, value, kv_cache, - attn_metadata) -- -+ maybe_execute_sparse_attention_finished(query, key, value, output, layer_name, forward_context) - maybe_save_kv_layer_to_connector(layer_name, kv_cache) - return output - -@@ -449,6 +450,8 @@ def unified_attention_with_output( - attn_metadata = attn_metadata[layer_name] - self = forward_context.no_compile_layers[layer_name] - kv_cache = self.kv_cache[forward_context.virtual_engine] -+ if not self.use_mla: -+ maybe_execute_sparse_attention_begin(query, key, value, layer_name, forward_context) - self.impl.forward(self, - query, - key, -@@ -457,7 +460,8 @@ def unified_attention_with_output( - attn_metadata, - output=output, - output_scale=output_scale) -- -+ if not self.use_mla: -+ maybe_execute_sparse_attention_finished(query, key, value, output, layer_name, forward_context) - maybe_save_kv_layer_to_connector(layer_name, kv_cache) - - -@@ -479,3 +483,42 @@ direct_register_custom_op( - fake_impl=unified_attention_with_output_fake, - dispatch_key=current_platform.dispatch_key, - ) -+ -+def maybe_execute_sparse_attention_begin( -+ query: torch.Tensor, -+ key: torch.Tensor, -+ value: torch.Tensor, -+ layer_name: str, -+ forward_context: ForwardContext, -+ phase: Optional[str] = None, -+): -+ if not has_ucm_sparse(): -+ return -+ -+ ucm_sparse = get_ucm_sparse() -+ -+ attn_metadata = forward_context.attn_metadata -+ if attn_metadata is None: -+ return -+ -+ ucm_sparse.attention_begin(query, key, value, layer_name, forward_context, phase) -+ -+def maybe_execute_sparse_attention_finished( -+ query: torch.Tensor, -+ key: torch.Tensor, -+ value: torch.Tensor, -+ attn_output: torch.Tensor, -+ layer_name: str, -+ forward_context: ForwardContext, -+ phase: Optional[str] = None, -+): -+ if not has_ucm_sparse(): -+ return -+ -+ ucm_sparse = get_ucm_sparse() -+ -+ attn_metadata = forward_context.attn_metadata -+ if attn_metadata is None: -+ return -+ -+ ucm_sparse.attention_finished(query, key, value, attn_output, layer_name, forward_context, phase) diff --git a/vllm/distributed/kv_transfer/kv_connector/utils.py b/vllm/distributed/kv_transfer/kv_connector/utils.py -index 5cbc8ca31..8556a979e 100644 +index 5cbc8ca31..b63bf5965 100644 --- a/vllm/distributed/kv_transfer/kv_connector/utils.py +++ b/vllm/distributed/kv_transfer/kv_connector/utils.py -@@ -3,12 +3,18 @@ - """ - KV cache helper for store. +@@ -5,10 +5,16 @@ KV cache helper for store. """ + import torch + +from collections import defaultdict +from collections.abc import Sequence +from concurrent.futures import CancelledError, Future +from typing import Optional, cast + - import torch - import vllm.envs as envs from vllm import _custom_ops as ops from vllm.config import VllmConfig, get_current_vllm_config @@ -273,10 +171,10 @@ index f80b5eba2..39d8fa389 100644 # Scheduler-side methods # ============================== diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/multi_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/multi_connector.py -index be3c23399..c4fedb3a7 100644 +index 5f92d69bd..4e1f45e7a 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/multi_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/multi_connector.py -@@ -129,6 +129,12 @@ class MultiConnector(KVConnectorBase_V1): +@@ -134,6 +134,12 @@ class MultiConnector(KVConnectorBase_V1): return finished_sending or None, finished_recving or None @@ -289,84 +187,6 @@ index be3c23399..c4fedb3a7 100644 # ============================== # Scheduler-side methods # ============================== -diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/shared_storage_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/shared_storage_connector.py -index 3c574d065..223106def 100644 ---- a/vllm/distributed/kv_transfer/kv_connector/v1/shared_storage_connector.py -+++ b/vllm/distributed/kv_transfer/kv_connector/v1/shared_storage_connector.py -@@ -2,7 +2,7 @@ - # SPDX-FileCopyrightText: Copyright contributors to the vLLM project - import hashlib - import os --from dataclasses import dataclass -+from dataclasses import dataclass, field - from typing import TYPE_CHECKING - - import safetensors -@@ -53,10 +53,7 @@ class ReqMeta: - - @dataclass - class SharedStorageConnectorMetadata(KVConnectorMetadata): -- requests: list[ReqMeta] -- -- def __init__(self): -- self.requests = [] -+ requests: list[ReqMeta] = field(default_factory=list) - - def add_request( - self, -diff --git a/vllm/v1/attention/backends/mla/common.py b/vllm/v1/attention/backends/mla/common.py -index f2aaf59a4..b56f62b39 100644 ---- a/vllm/v1/attention/backends/mla/common.py -+++ b/vllm/v1/attention/backends/mla/common.py -@@ -200,6 +200,7 @@ from vllm.attention.backends.abstract import (AttentionBackend, AttentionLayer, - MLAAttentionImpl) - from vllm.attention.backends.utils import get_mla_dims - from vllm.attention.ops.merge_attn_states import merge_attn_states -+from vllm.forward_context import ForwardContext, get_forward_context - from vllm.attention.utils.fa_utils import get_flash_attn_version - from vllm.logger import init_logger - from vllm.model_executor.layers.linear import (ColumnParallelLinear, -@@ -211,6 +212,7 @@ from vllm.v1.attention.backends.utils import (AttentionMetadataBuilder, - CommonAttentionMetadata) - from vllm.v1.kv_cache_interface import AttentionSpec - from vllm.v1.worker.block_table import BlockTable -+from vllm.attention.layer import (maybe_execute_sparse_attention_begin, maybe_execute_sparse_attention_finished) - - try: - from vllm.vllm_flash_attn import flash_attn_varlen_func -@@ -908,7 +910,7 @@ class MLACommonImpl(MLAAttentionImpl[M], Generic[M]): - output: Optional[torch.Tensor] = None, - output_scale: Optional[torch.Tensor] = None, - ) -> torch.Tensor: -- -+ forward_context: ForwardContext = get_forward_context() - assert output is not None, "Output tensor must be provided." - - if output_scale is not None: -@@ -957,10 +959,11 @@ class MLACommonImpl(MLAAttentionImpl[M], Generic[M]): - ) - - if has_prefill: -+ maybe_execute_sparse_attention_begin(prefill_q, prefill_k_c_normed, prefill_k_pe, layer.layer_name, forward_context, "prefill") - output[num_decode_tokens:] = self._forward_prefill( - prefill_q, prefill_k_c_normed, prefill_k_pe, kv_cache, - attn_metadata) -- -+ maybe_execute_sparse_attention_finished(prefill_q, prefill_k_c_normed, prefill_k_pe, output[num_decode_tokens:], layer.layer_name, forward_context, "prefill") - if has_decode: - assert attn_metadata.decode is not None - decode_q_nope, decode_q_pe = decode_q.split( -@@ -971,8 +974,9 @@ class MLACommonImpl(MLAAttentionImpl[M], Generic[M]): - decode_ql_nope = torch.bmm(decode_q_nope, self.W_UK_T) - # Convert from (N, B, L) to (B, N, L) - decode_ql_nope = decode_ql_nope.transpose(0, 1) -- -+ maybe_execute_sparse_attention_begin(torch.cat([decode_ql_nope, decode_q_pe],dim=-1), decode_ql_nope, decode_q_pe, layer.layer_name, forward_context, "decode") - output[:num_decode_tokens] = self._forward_decode( - decode_ql_nope, decode_q_pe, kv_cache, attn_metadata) -+ maybe_execute_sparse_attention_finished(torch.cat([decode_ql_nope, decode_q_pe],dim=-1), decode_ql_nope, decode_q_pe, output[:num_decode_tokens], layer.layer_name, forward_context, "decode") - - return output_padded diff --git a/vllm/v1/core/block_pool.py b/vllm/v1/core/block_pool.py index d21f94727..1800665c7 100644 --- a/vllm/v1/core/block_pool.py @@ -380,47 +200,8 @@ index d21f94727..1800665c7 100644 return new_full_blocks = blocks[num_cached_blocks:num_full_blocks] assert len(block_hashes) >= num_cached_blocks -diff --git a/vllm/v1/core/kv_cache_manager.py b/vllm/v1/core/kv_cache_manager.py -index 6937455e7..bf9aec864 100644 ---- a/vllm/v1/core/kv_cache_manager.py -+++ b/vllm/v1/core/kv_cache_manager.py -@@ -3,7 +3,7 @@ - - from collections import defaultdict - from dataclasses import dataclass --from typing import Optional -+from typing import Optional, Union - - from vllm.distributed.kv_events import KVCacheEvent - from vllm.logger import init_logger -@@ -14,6 +14,8 @@ from vllm.v1.core.kv_cache_utils import (BlockHash, KVCacheBlock, - from vllm.v1.kv_cache_interface import KVCacheConfig - from vllm.v1.metrics.stats import PrefixCacheStats - from vllm.v1.request import Request, RequestStatus -+from ucm.sparse.state import get_ucm_sparse, has_ucm_sparse -+from ucm.sparse.base import INVALID_SLOT - - logger = init_logger(__name__) - -@@ -193,6 +195,7 @@ class KVCacheManager: - num_draft_tokens: int = 0, - num_lookahead_tokens: int = 0, - delay_cache_blocks: bool = False, -+ num_slots_sparsed: Union[None, int] = None - ) -> Optional[KVCacheBlocks]: - """Add slots for a request with new tokens to append. - -@@ -231,6 +234,8 @@ class KVCacheManager: - """ - if num_new_tokens == 0: - raise ValueError("num_new_tokens must be greater than 0") -+ if num_slots_sparsed != INVALID_SLOT: -+ return get_ucm_sparse().allocate_slots(self, request, num_slots_sparsed) - - if new_computed_blocks is not None: - new_computed_block_list = new_computed_blocks.blocks diff --git a/vllm/v1/core/sched/output.py b/vllm/v1/core/sched/output.py -index d34f39327..fff0eeb42 100644 +index d34f39327..c94e421c0 100644 --- a/vllm/v1/core/sched/output.py +++ b/vllm/v1/core/sched/output.py @@ -93,6 +93,7 @@ class CachedRequestData: @@ -439,46 +220,11 @@ index d34f39327..fff0eeb42 100644 ) -@@ -155,3 +157,6 @@ class SchedulerOutput: - - # KV Cache Connector metadata. - kv_connector_metadata: Optional[KVConnectorMetadata] = None -+ -+ # modified slots by sparse algorithm -+ req_sparsed_slots: dict[str, int] = None diff --git a/vllm/v1/core/sched/scheduler.py b/vllm/v1/core/sched/scheduler.py -index fe552db74..aa172e943 100644 +index cd80f92a1..2d4fd4d59 100644 --- a/vllm/v1/core/sched/scheduler.py +++ b/vllm/v1/core/sched/scheduler.py -@@ -34,6 +34,8 @@ from vllm.v1.outputs import ModelRunnerOutput - from vllm.v1.request import Request, RequestStatus - from vllm.v1.spec_decode.metrics import SpecDecodingStats - from vllm.v1.structured_output import StructuredOutputManager -+from ucm.sparse.state import ensure_ucm_sparse_initialized, get_ucm_sparse, has_ucm_sparse -+from ucm.sparse.base import UcmSparseBase, UcmSparseRole, INVALID_SLOT - - logger = init_logger(__name__) - -@@ -79,12 +81,18 @@ class Scheduler(SchedulerInterface): - # will have a corresponding KVConnector with Role=WORKER. - # KV Connector pushes/pull of remote KVs for P/D and offloading. - self.connector = None -+ self.ucm_sparse = None - if self.vllm_config.kv_transfer_config is not None: - assert len(self.kv_cache_config.kv_cache_groups) == 1, ( - "Multiple KV cache groups are not currently supported " - "with KV connectors") - self.connector = KVConnectorFactory.create_connector_v1( - config=self.vllm_config, role=KVConnectorRole.SCHEDULER) -+ # Initialize UCM Sparse if available -+ if "ucm_sparse_config" in vllm_config.kv_transfer_config.kv_connector_extra_config: -+ ensure_ucm_sparse_initialized(vllm_config, role=UcmSparseRole.SCHEDULER) -+ self.ucm_sparse = get_ucm_sparse() -+ logger.info("UCM Sparse initialized successfully: {}".format(self.ucm_sparse)) - - self.kv_event_publisher = EventPublisherFactory.create( - self.kv_events_config, -@@ -118,6 +126,7 @@ class Scheduler(SchedulerInterface): +@@ -119,6 +119,7 @@ class Scheduler(SchedulerInterface): # KV Connector: requests in process of async KV loading or recving self.finished_recving_kv_req_ids: set[str] = set() @@ -486,58 +232,7 @@ index fe552db74..aa172e943 100644 # Encoder-related. # Calculate encoder cache size if applicable -@@ -201,8 +210,13 @@ class Scheduler(SchedulerInterface): - - # First, schedule the RUNNING requests. - req_index = 0 -+ req_sparsed_slots: dict[str, int] = {} - while req_index < len(self.running) and token_budget > 0: - request = self.running[req_index] -+ num_slots_sparsed = INVALID_SLOT -+ if self.ucm_sparse: -+ num_slots_sparsed = self.ucm_sparse.estimate_num_slots_sparsed(request) -+ req_sparsed_slots.update({request.request_id: num_slots_sparsed}) - - num_new_tokens = (request.num_tokens_with_spec - - request.num_computed_tokens) -@@ -250,7 +264,8 @@ class Scheduler(SchedulerInterface): - request, - num_new_tokens, - num_draft_tokens=num_draft_tokens, -- num_lookahead_tokens=self.num_lookahead_tokens) -+ num_lookahead_tokens=self.num_lookahead_tokens, -+ num_slots_sparsed=num_slots_sparsed) - if new_blocks is None: - # The request cannot be scheduled. - # Preempt the lowest-priority request. -@@ -337,6 +352,10 @@ class Scheduler(SchedulerInterface): - break - - request = self.waiting.peek_request() -+ num_slots_sparsed = INVALID_SLOT -+ if self.ucm_sparse: -+ num_slots_sparsed = self.ucm_sparse.estimate_num_slots_sparsed(request) -+ req_sparsed_slots.update({request.request_id: num_slots_sparsed}) - - # KVTransfer: skip request if still waiting for remote kvs. - if request.status == RequestStatus.WAITING_FOR_REMOTE_KVS: -@@ -446,6 +465,7 @@ class Scheduler(SchedulerInterface): - new_computed_blocks, - num_lookahead_tokens=self.num_lookahead_tokens, - delay_cache_blocks=load_kv_async, -+ num_slots_sparsed=num_slots_sparsed - ) - if new_blocks is None: - # The request cannot be scheduled. -@@ -559,6 +579,7 @@ class Scheduler(SchedulerInterface): - scheduled_spec_decode_tokens=scheduled_spec_decode_tokens, - scheduled_encoder_inputs=scheduled_encoder_inputs, - num_common_prefix_blocks=num_common_prefix_blocks, -+ req_sparsed_slots=req_sparsed_slots, - # finished_req_ids is an existing state in the scheduler, - # instead of being newly scheduled in this step. - # It contains the request IDs that are finished in between -@@ -620,6 +641,7 @@ class Scheduler(SchedulerInterface): +@@ -621,6 +622,7 @@ class Scheduler(SchedulerInterface): new_token_ids: list[list[int]] = [] new_block_ids: list[tuple[list[int], ...]] = [] num_computed_tokens: list[int] = [] @@ -545,7 +240,7 @@ index fe552db74..aa172e943 100644 for req in itertools.chain(running_reqs, resumed_reqs): req_id = req.request_id -@@ -637,6 +659,7 @@ class Scheduler(SchedulerInterface): +@@ -638,6 +640,7 @@ class Scheduler(SchedulerInterface): new_token_ids.append(token_ids) new_block_ids.append(req_to_new_block_ids[req_id]) num_computed_tokens.append(req.num_computed_tokens) @@ -553,7 +248,7 @@ index fe552db74..aa172e943 100644 # Because resumed_reqs is usually empty, it is more efficient to do # in-place appending so that we don't need to allocate a new list. resumed_from_preemption = [False] * len(running_reqs) -@@ -648,6 +671,7 @@ class Scheduler(SchedulerInterface): +@@ -649,6 +652,7 @@ class Scheduler(SchedulerInterface): new_token_ids=new_token_ids, new_block_ids=new_block_ids, num_computed_tokens=num_computed_tokens, @@ -561,7 +256,7 @@ index fe552db74..aa172e943 100644 ) def _try_schedule_encoder_inputs( -@@ -745,16 +769,29 @@ class Scheduler(SchedulerInterface): +@@ -746,16 +750,29 @@ class Scheduler(SchedulerInterface): num_scheduled_tokens = scheduler_output.num_scheduled_tokens pooler_outputs = model_runner_output.pooler_output num_nans_in_logits = model_runner_output.num_nans_in_logits @@ -591,54 +286,7 @@ index fe552db74..aa172e943 100644 num_tokens_scheduled = num_scheduled_tokens.get(req_id, 0) if num_tokens_scheduled == 0: # The request was not scheduled in this step. -@@ -792,6 +829,12 @@ class Scheduler(SchedulerInterface): - new_token_ids = generated_token_ids - kv_transfer_params = None - -+ if model_runner_output.finished_dumping is not None: -+ request.succeed_dumped_blocks.extend(model_runner_output.finished_dumping.get(req_id, [])) -+ is_prefill = request.num_output_tokens == 0 -+ if is_prefill: -+ self.connector.connector.commit(model_runner_output.finished_dumping.get(req_id, []), True) -+ - # Append generated tokens and check for stop. Note that if - # a request is still being prefilled, we expect the model runner - # to return empty token ids for the request. -@@ -842,7 +885,6 @@ class Scheduler(SchedulerInterface): - spec_token_ids[req_index]) - else: - request.spec_token_ids = spec_token_ids[req_index] -- - # Get prompt logprobs for this request. - prompt_logprobs_tensors = prompt_logprobs_dict.get(req_id) - if new_token_ids or pooler_output is not None \ -@@ -869,6 +911,7 @@ class Scheduler(SchedulerInterface): - - if not stopped: - new_running.append(request) -+ - self.running = new_running - - # KV Connector: update state for finished KV Transfers. -@@ -927,6 +970,8 @@ class Scheduler(SchedulerInterface): - def add_request(self, request: Request) -> None: - self.waiting.add_request(request) - self.requests[request.request_id] = request -+ if self.ucm_sparse: -+ self.ucm_sparse.request_begin(request.request_id, request.prompt_token_ids) - if self.log_stats: - request.record_event(EngineCoreEventType.QUEUED) - -@@ -976,6 +1021,8 @@ class Scheduler(SchedulerInterface): - - def _free_request(self, request: Request) -> Optional[dict[str, Any]]: - assert request.is_finished() -+ if self.ucm_sparse: -+ self.ucm_sparse.request_finished_in_scheduler(request.request_id) - - delay_free_blocks, kv_xfer_params = self._connector_finished(request) - self.encoder_cache_manager.free(request) -@@ -1078,18 +1125,31 @@ class Scheduler(SchedulerInterface): +@@ -1089,18 +1106,31 @@ class Scheduler(SchedulerInterface): if request.request_id not in self.finished_recving_kv_req_ids: return False @@ -682,11 +330,12 @@ index fe552db74..aa172e943 100644 # Return that we are ready. self.finished_recving_kv_req_ids.remove(request.request_id) -@@ -1113,3 +1173,132 @@ class Scheduler(SchedulerInterface): +@@ -1124,3 +1154,133 @@ class Scheduler(SchedulerInterface): for req_id in (model_runner_output.finished_sending or ()): logger.debug("Finished sending KV transfer for request %s", req_id) self._free_blocks(self.requests[req_id]) + ++ + def _update_requests_with_invalid_blocks( + self, requests: Iterable[Request], + invalid_block_ids: set[int]) -> tuple[set[str], int]: @@ -892,7 +541,7 @@ index b06b7cc80..61cd7110f 100644 def collective_rpc(self, method: Union[str, Callable], diff --git a/vllm/v1/outputs.py b/vllm/v1/outputs.py -index f78623f57..16af8dbce 100644 +index c8388baed..16af8dbce 100644 --- a/vllm/v1/outputs.py +++ b/vllm/v1/outputs.py @@ -1,7 +1,7 @@ @@ -904,62 +553,17 @@ index f78623f57..16af8dbce 100644 from typing import NamedTuple, Optional import torch -@@ -107,6 +107,11 @@ class ModelRunnerOutput: - # [req_ids] - finished_sending: Optional[set[str]] = None +@@ -109,6 +109,10 @@ class ModelRunnerOutput: finished_recving: Optional[set[str]] = None -+ finished_dumping: Optional[dict[str, list[str]]] = None -+ + finished_dumping: Optional[dict[str, list[str]]] = None + + # IDs of externally computed KV blocks that failed to load. + # Requests referencing these blocks should be rescheduled to recompute them. + invalid_block_ids: set[int] = field(default_factory=set) - ++ # req_id -> num_nans_in_logits num_nans_in_logits: Optional[dict[str, int]] = None -diff --git a/vllm/v1/request.py b/vllm/v1/request.py -index 9b96f4599..825b77bba 100644 ---- a/vllm/v1/request.py -+++ b/vllm/v1/request.py -@@ -102,7 +102,7 @@ class Request: - # State - # The number of tokens with prefix cache hits. - self.num_cached_tokens = -1 -- -+ self.succeed_dumped_blocks: list[str] = [] - # The number of NaNs in logits. A value greater than 0 - # indicates that the output is corrupted - self.num_nans_in_logits = 0 -diff --git a/vllm/v1/worker/block_table.py b/vllm/v1/worker/block_table.py -index 8f4e8d64c..f45e39f5c 100644 ---- a/vllm/v1/worker/block_table.py -+++ b/vllm/v1/worker/block_table.py -@@ -61,6 +61,15 @@ class BlockTable: - self.num_blocks_per_row[row_idx] += num_blocks - self.block_table_np[row_idx, start:start + num_blocks] = block_ids -+ def reset_row( -+ self, -+ row_idx: int, -+ ) -> None: -+ self.num_blocks_per_row[row_idx] = 0 -+ self.block_table[row_idx].fill_(0) -+ self.block_table_cpu[row_idx].fill_(0) -+ self.block_table_np[row_idx].fill(0) -+ - def add_row(self, block_ids: list[int], row_idx: int) -> None: - self.num_blocks_per_row[row_idx] = 0 - self.append_row(block_ids, row_idx) -@@ -117,6 +126,10 @@ class MultiGroupBlockTable: - for i, block_table in enumerate(self.block_tables): - block_table.append_row(block_ids[i], row_idx) - -+ def reset_row(self, row_idx: int) -> None: -+ for i, block_table in enumerate(self.block_tables): -+ block_table.reset_row(row_idx) -+ - def add_row(self, block_ids: tuple[list[int], ...], row_idx: int) -> None: - for i, block_table in enumerate(self.block_tables): - block_table.add_row(block_ids[i], row_idx) diff --git a/vllm/v1/worker/gpu_input_batch.py b/vllm/v1/worker/gpu_input_batch.py index 1a79d72be..8819d7629 100644 --- a/vllm/v1/worker/gpu_input_batch.py @@ -1009,43 +613,18 @@ index 1a79d72be..8819d7629 100644 self.num_tokens_no_spec[empty_index] = self.num_tokens_no_spec[ last_req_index] diff --git a/vllm/v1/worker/gpu_model_runner.py b/vllm/v1/worker/gpu_model_runner.py -index 5a26e88db..e1c9252a4 100644 +index 53ee8cfcd..c3df1d5d2 100644 --- a/vllm/v1/worker/gpu_model_runner.py +++ b/vllm/v1/worker/gpu_model_runner.py -@@ -72,6 +72,9 @@ from ..sample.logits_processor import LogitsProcessorManager - from .utils import (gather_mm_placeholders, initialize_kv_cache_for_kv_sharing, - sanity_check_mm_encoder_outputs, scatter_mm_placeholders) - -+from ucm.sparse.state import get_ucm_sparse, has_ucm_sparse -+from ucm.sparse.base import UcmSparseMetadata, INVALID_SLOT -+ - if TYPE_CHECKING: - import xgrammar as xgr - import xgrammar.kernels.apply_token_bitmask_inplace_torch_compile as xgr_torch_compile # noqa: E501 -@@ -365,6 +368,7 @@ class GPUModelRunner(LoRAModelRunnerMixin): - """ - # Remove finished requests from the cached states. - for req_id in scheduler_output.finished_req_ids: -+ self.ucm_sparse_request_finished_in_worker(req_id) - self.requests.pop(req_id, None) - self.encoder_cache.pop(req_id, None) - # Remove the finished requests from the persistent batch. -@@ -468,11 +472,14 @@ class GPUModelRunner(LoRAModelRunnerMixin): - # Update the states of the running/resumed requests. - is_last_rank = get_pp_group().is_last_rank - req_data = scheduler_output.scheduled_cached_reqs -+ req_sparsed_slots = scheduler_output.req_sparsed_slots - for i, req_id in enumerate(req_data.req_ids): - req_state = self.requests[req_id] +@@ -473,6 +473,7 @@ class GPUModelRunner(LoRAModelRunnerMixin): num_computed_tokens = req_data.num_computed_tokens[i] new_block_ids = req_data.new_block_ids[i] resumed_from_preemption = req_data.resumed_from_preemption[i] + num_output_tokens = req_data.num_output_tokens[i] -+ is_sparsed_request = req_sparsed_slots[req_id] != INVALID_SLOT # Update the cached states. req_state.num_computed_tokens = num_computed_tokens -@@ -492,17 +499,32 @@ class GPUModelRunner(LoRAModelRunnerMixin): +@@ -492,6 +493,21 @@ class GPUModelRunner(LoRAModelRunnerMixin): elif num_new_tokens > 0: req_state.output_token_ids.extend( new_token_ids[-num_new_tokens:]) @@ -1066,123 +645,24 @@ index 5a26e88db..e1c9252a4 100644 + end_idx:old_end_idx] = False # Update the block IDs. -- if not resumed_from_preemption: -- # Append the new blocks to the existing block IDs. -- for block_ids, new_ids in zip(req_state.block_ids, -- new_block_ids): -- block_ids.extend(new_ids) -- else: -+ if resumed_from_preemption or is_sparsed_request: - # The request is resumed from preemption. - # Replace the existing block IDs with the new ones. - req_state.block_ids = new_block_ids -+ else: -+ # Append the new blocks to the existing block IDs. -+ for block_ids, new_ids in zip(req_state.block_ids, -+ new_block_ids): -+ block_ids.extend(new_ids) - - req_index = self.input_batch.req_id_to_index.get(req_id) - if req_index is None: -@@ -515,6 +537,8 @@ class GPUModelRunner(LoRAModelRunnerMixin): - # Update the persistent batch. - self.input_batch.num_computed_tokens_cpu[req_index] = ( - num_computed_tokens) -+ if is_sparsed_request: -+ self.input_batch.block_table.reset_row(req_index) - self.input_batch.block_table.append_row(new_block_ids, req_index) - - # For the last rank, we don't need to update the token_ids_cpu -@@ -623,6 +647,19 @@ class GPUModelRunner(LoRAModelRunnerMixin): - if self.uses_mrope: - self._calc_mrope_positions(scheduler_output) - -+ self.seq_lens_np[:num_reqs] = ( -+ self.input_batch.num_computed_tokens_cpu[:num_reqs] + -+ num_scheduled_tokens) -+ -+ # TODO: improve performance, no `positions_np.copy()` -+ sparsed_positions = positions_np.copy() -+ req_sparsed_slots = scheduler_output.req_sparsed_slots -+ for req_id in self.input_batch.req_id_to_index: -+ is_sparsed_request = req_sparsed_slots[req_id] != INVALID_SLOT -+ req_index = self.input_batch.req_id_to_index[req_id] -+ offset = 0 if req_index == 0 else cu_num_tokens[req_index - 1] # TODO: support MTP -+ if is_sparsed_request: -+ sparsed_positions[offset] = req_sparsed_slots[req_id] - 1 - # Get token indices. - # E.g., [0, 1, 0, 1, 2, 3, 4, 0, 1, 2] - # -> [0, 1, M, M + 1, M + 2, M + 3, M + 4, 2 * M, 2 * M + 1, 2 * M + 2] -@@ -652,11 +689,11 @@ class GPUModelRunner(LoRAModelRunnerMixin): - # block_size. - block_table_indices = ( - req_indices * block_table.max_num_blocks_per_req + -- positions_np // block_size) -+ sparsed_positions // block_size) - block_table_cpu = block_table.get_cpu_tensor() - block_numbers = block_table_cpu.flatten( - )[block_table_indices].numpy() -- block_offsets = positions_np % block_size -+ block_offsets = sparsed_positions % block_size - np.add( - block_numbers * block_size, - block_offsets, -@@ -666,9 +703,11 @@ class GPUModelRunner(LoRAModelRunnerMixin): - self.query_start_loc_np[0] = 0 - self.query_start_loc_np[1:num_reqs + 1] = cu_num_tokens - -- self.seq_lens_np[:num_reqs] = ( -- self.input_batch.num_computed_tokens_cpu[:num_reqs] + -- num_scheduled_tokens) -+ for req_id in self.input_batch.req_id_to_index: -+ req_index = self.input_batch.req_id_to_index[req_id] -+ is_sparsed_request = scheduler_output.req_sparsed_slots[req_id] != INVALID_SLOT -+ if is_sparsed_request: -+ self.seq_lens_np[req_index] = scheduler_output.req_sparsed_slots[req_id] - - # Copy the tensors to the GPU. - self.input_ids[:total_num_scheduled_tokens].copy_( -@@ -680,6 +719,8 @@ class GPUModelRunner(LoRAModelRunnerMixin): - non_blocking=True) - else: - # Common case (1D positions) -+ self.positions_cpu[:total_num_scheduled_tokens] = torch.from_numpy( -+ positions_np[:total_num_scheduled_tokens]) - self.positions[:total_num_scheduled_tokens].copy_( - self.positions_cpu[:total_num_scheduled_tokens], - non_blocking=True) -@@ -1370,6 +1411,7 @@ class GPUModelRunner(LoRAModelRunnerMixin): - skip_cuda_graphs=skip_cuda_graphs, - ): - self.maybe_setup_kv_connector(scheduler_output) -+ self.maybe_execute_ucm_sparse_begin(scheduler_output, attn_metadata) - - model_output = self.model( - input_ids=input_ids, -@@ -1378,9 +1420,12 @@ class GPUModelRunner(LoRAModelRunnerMixin): - inputs_embeds=inputs_embeds, - ) - -- self.maybe_wait_for_kv_save() -+ finished_dumping = self.maybe_wait_for_kv_save() -+ self.maybe_execute_ucm_sparse_finished() -+ + if not resumed_from_preemption: +@@ -1381,6 +1397,7 @@ class GPUModelRunner(LoRAModelRunnerMixin): + finished_dumping = self.maybe_wait_for_kv_save() finished_sending, finished_recving = ( self.get_finished_kv_transfers(scheduler_output)) + invalid_block_ids = self.get_block_ids_with_load_errors() if self.use_aux_hidden_state_outputs: hidden_states, aux_hidden_states = model_output -@@ -1563,6 +1608,8 @@ class GPUModelRunner(LoRAModelRunnerMixin): - finished_sending=finished_sending, +@@ -1564,6 +1581,7 @@ class GPUModelRunner(LoRAModelRunnerMixin): finished_recving=finished_recving, + finished_dumping=finished_dumping, num_nans_in_logits=num_nans_in_logits, -+ finished_dumping=finished_dumping, + invalid_block_ids = invalid_block_ids ) def propose_draft_token_ids( -@@ -1693,13 +1740,16 @@ class GPUModelRunner(LoRAModelRunnerMixin): +@@ -1694,13 +1712,16 @@ class GPUModelRunner(LoRAModelRunnerMixin): self.maybe_setup_kv_connector(scheduler_output) finished_sending, finished_recving = ( self.get_finished_kv_transfers(scheduler_output)) @@ -1200,38 +680,7 @@ index 5a26e88db..e1c9252a4 100644 return output @staticmethod -@@ -1719,9 +1769,28 @@ class GPUModelRunner(LoRAModelRunnerMixin): - kv_connector.start_load_kv(get_forward_context()) - - @staticmethod -- def maybe_wait_for_kv_save() -> None: -+ def maybe_wait_for_kv_save() -> Optional[dict[str, list[str]]]: - if has_kv_transfer_group(): -- get_kv_transfer_group().wait_for_save() -+ return get_kv_transfer_group().wait_for_save() -+ -+ def maybe_execute_ucm_sparse_begin(self, scheduler_output: "SchedulerOutput", attn_metadata: CommonAttentionMetadata): -+ if not has_ucm_sparse(): -+ return -+ ucm_sparse = get_ucm_sparse() -+ ucm_sparse.build_sparse_meta(scheduler_output, self.requests, self.input_batch, attn_metadata) -+ ucm_sparse.execute_begin(scheduler_output) -+ -+ def maybe_execute_ucm_sparse_finished(self): -+ if not has_ucm_sparse(): -+ return -+ ucm_sparse = get_ucm_sparse() -+ ucm_sparse.execute_finished() -+ -+ def ucm_sparse_request_finished_in_worker(self, request_id: str | int): -+ if not has_ucm_sparse(): -+ return -+ ucm_sparse = get_ucm_sparse() -+ ucm_sparse.request_finished_in_worker(request_id) - - @staticmethod - def get_finished_kv_transfers( -@@ -1732,6 +1801,11 @@ class GPUModelRunner(LoRAModelRunnerMixin): +@@ -1733,6 +1754,11 @@ class GPUModelRunner(LoRAModelRunnerMixin): scheduler_output.finished_req_ids) return None, None @@ -1244,7 +693,7 @@ index 5a26e88db..e1c9252a4 100644 self, sampled_token_ids: list[list[int]], diff --git a/vllm/v1/worker/gpu_worker.py b/vllm/v1/worker/gpu_worker.py -index 9e7e44d06..d9666d102 100644 +index 9e7e44d06..1b816b25b 100644 --- a/vllm/v1/worker/gpu_worker.py +++ b/vllm/v1/worker/gpu_worker.py @@ -1,6 +1,7 @@ @@ -1265,7 +714,7 @@ index 9e7e44d06..d9666d102 100644 from vllm.distributed.parallel_state import get_pp_group, get_tp_group from vllm.logger import init_logger from vllm.lora.request import LoRARequest -@@ -24,10 +26,11 @@ from vllm.platforms import current_platform +@@ -24,7 +26,7 @@ from vllm.platforms import current_platform from vllm.sequence import IntermediateTensors from vllm.utils import GiB_bytes, MemorySnapshot, memory_profiling from vllm.v1.kv_cache_interface import KVCacheConfig, KVCacheSpec @@ -1274,11 +723,7 @@ index 9e7e44d06..d9666d102 100644 from vllm.v1.utils import report_usage_stats from vllm.v1.worker.gpu_model_runner import GPUModelRunner from vllm.v1.worker.worker_base import WorkerBase -+from ucm.sparse.state import ensure_ucm_sparse_initialized - - logger = init_logger(__name__) - -@@ -313,9 +316,22 @@ class Worker(WorkerBase): +@@ -313,9 +315,22 @@ class Worker(WorkerBase): assert isinstance(output, IntermediateTensors) get_pp_group().send_tensor_dict(output.tensors, all_gather_group=get_tp_group()) @@ -1303,14 +748,6 @@ index 9e7e44d06..d9666d102 100644 def profile(self, is_start: bool = True): if self.profiler is None: -@@ -386,6 +402,7 @@ def init_worker_distributed_environment( - parallel_config.pipeline_parallel_size) - - ensure_kv_transfer_initialized(vllm_config) -+ ensure_ucm_sparse_initialized(vllm_config) - - - def _check_if_gpu_supports_dtype(torch_dtype: torch.dtype): -- 2.34.1 diff --git a/ucm/integration/vllm/patch/0.9.2/vllm-adapt-pc.patch b/ucm/integration/vllm/patch/0.9.2/vllm-adapt-pc.patch new file mode 100644 index 00000000..bf0b7e19 --- /dev/null +++ b/ucm/integration/vllm/patch/0.9.2/vllm-adapt-pc.patch @@ -0,0 +1,122 @@ +From 26fdd2026cc3d1ed7da894907ae244a155a16566 Mon Sep 17 00:00:00 2001 +From: harrisonyhq +Date: Tue, 4 Nov 2025 19:36:36 -0800 +Subject: [PATCH 1/3] [Patch0] UCM PC adapt patch + +--- + .../kv_transfer/kv_connector/v1/multi_connector.py | 7 ++++++- + vllm/v1/core/sched/scheduler.py | 11 +++++++++++ + vllm/v1/outputs.py | 1 + + vllm/v1/request.py | 2 ++ + vllm/v1/worker/gpu_model_runner.py | 7 ++++--- + 5 files changed, 24 insertions(+), 4 deletions(-) + +diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/multi_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/multi_connector.py +index be3c23399..5f92d69bd 100644 +--- a/vllm/distributed/kv_transfer/kv_connector/v1/multi_connector.py ++++ b/vllm/distributed/kv_transfer/kv_connector/v1/multi_connector.py +@@ -99,8 +99,13 @@ class MultiConnector(KVConnectorBase_V1): + c.save_kv_layer(layer_name, kv_layer, attn_metadata, **kwargs) + + def wait_for_save(self): ++ success_dumped_blocks = None + for c in self._connectors: +- c.wait_for_save() ++ uc_dump_blocks = c.wait_for_save() ++ if uc_dump_blocks: ++ success_dumped_blocks = uc_dump_blocks ++ ++ return success_dumped_blocks if success_dumped_blocks else None + + def get_finished( + self, finished_req_ids: set[str] +diff --git a/vllm/v1/core/sched/scheduler.py b/vllm/v1/core/sched/scheduler.py +index fe552db74..cd80f92a1 100644 +--- a/vllm/v1/core/sched/scheduler.py ++++ b/vllm/v1/core/sched/scheduler.py +@@ -34,6 +34,7 @@ from vllm.v1.outputs import ModelRunnerOutput + from vllm.v1.request import Request, RequestStatus + from vllm.v1.spec_decode.metrics import SpecDecodingStats + from vllm.v1.structured_output import StructuredOutputManager ++from vllm.distributed.kv_transfer.kv_connector.v1.multi_connector import MultiConnector + + logger = init_logger(__name__) + +@@ -791,6 +792,16 @@ class Scheduler(SchedulerInterface): + new_logprobs = None + new_token_ids = generated_token_ids + kv_transfer_params = None ++ if model_runner_output.finished_dumping is not None: ++ request.succeed_dumped_blocks.extend(model_runner_output.finished_dumping.get(req_id, [])) ++ is_prefill = request.num_output_tokens == 0 ++ if is_prefill: ++ if isinstance(self.connector, MultiConnector): ++ for c in self.connector._connectors: ++ if hasattr(c, 'connector') and hasattr(c.connector, 'commit'): ++ c.connector.commit(model_runner_output.finished_dumping.get(req_id, []), True) ++ else: ++ self.connector.connector.commit(model_runner_output.finished_dumping.get(req_id, []), True) + + # Append generated tokens and check for stop. Note that if + # a request is still being prefilled, we expect the model runner +diff --git a/vllm/v1/outputs.py b/vllm/v1/outputs.py +index f78623f57..c8388baed 100644 +--- a/vllm/v1/outputs.py ++++ b/vllm/v1/outputs.py +@@ -107,6 +107,7 @@ class ModelRunnerOutput: + # [req_ids] + finished_sending: Optional[set[str]] = None + finished_recving: Optional[set[str]] = None ++ finished_dumping: Optional[dict[str, list[str]]] = None + + # req_id -> num_nans_in_logits + num_nans_in_logits: Optional[dict[str, int]] = None +diff --git a/vllm/v1/request.py b/vllm/v1/request.py +index 9b96f4599..e70d1695b 100644 +--- a/vllm/v1/request.py ++++ b/vllm/v1/request.py +@@ -103,6 +103,8 @@ class Request: + # The number of tokens with prefix cache hits. + self.num_cached_tokens = -1 + ++ self.succeed_dumped_blocks: list[str] = [] ++ + # The number of NaNs in logits. A value greater than 0 + # indicates that the output is corrupted + self.num_nans_in_logits = 0 +diff --git a/vllm/v1/worker/gpu_model_runner.py b/vllm/v1/worker/gpu_model_runner.py +index 5a26e88db..53ee8cfcd 100644 +--- a/vllm/v1/worker/gpu_model_runner.py ++++ b/vllm/v1/worker/gpu_model_runner.py +@@ -1378,7 +1378,7 @@ class GPUModelRunner(LoRAModelRunnerMixin): + inputs_embeds=inputs_embeds, + ) + +- self.maybe_wait_for_kv_save() ++ finished_dumping = self.maybe_wait_for_kv_save() + finished_sending, finished_recving = ( + self.get_finished_kv_transfers(scheduler_output)) + +@@ -1562,6 +1562,7 @@ class GPUModelRunner(LoRAModelRunnerMixin): + pooler_output=[], + finished_sending=finished_sending, + finished_recving=finished_recving, ++ finished_dumping=finished_dumping, + num_nans_in_logits=num_nans_in_logits, + ) + +@@ -1719,9 +1720,9 @@ class GPUModelRunner(LoRAModelRunnerMixin): + kv_connector.start_load_kv(get_forward_context()) + + @staticmethod +- def maybe_wait_for_kv_save() -> None: ++ def maybe_wait_for_kv_save(): + if has_kv_transfer_group(): +- get_kv_transfer_group().wait_for_save() ++ return get_kv_transfer_group().wait_for_save() + + @staticmethod + def get_finished_kv_transfers( +-- +2.34.1 + diff --git a/ucm/integration/vllm/patch/0.9.2/vllm-adapt-sparse.patch b/ucm/integration/vllm/patch/0.9.2/vllm-adapt-sparse.patch new file mode 100644 index 00000000..5f97d632 --- /dev/null +++ b/ucm/integration/vllm/patch/0.9.2/vllm-adapt-sparse.patch @@ -0,0 +1,474 @@ +From 8c02671e05ed23d7a0c9dc112f8474b26d579f99 Mon Sep 17 00:00:00 2001 +From: harrisonyhq +Date: Wed, 5 Nov 2025 00:22:36 -0800 +Subject: [PATCH 3/3] [Patch2] UCM patch for sparsed attention + +--- + vllm/attention/layer.py | 43 ++++++++++++++++++ + vllm/v1/core/kv_cache_manager.py | 7 ++- + vllm/v1/core/sched/output.py | 3 ++ + vllm/v1/core/sched/scheduler.py | 26 ++++++++++- + vllm/v1/worker/block_table.py | 13 ++++++ + vllm/v1/worker/gpu_model_runner.py | 70 +++++++++++++++++++++++++----- + vllm/v1/worker/gpu_worker.py | 2 + + 7 files changed, 151 insertions(+), 13 deletions(-) + +diff --git a/vllm/attention/layer.py b/vllm/attention/layer.py +index f0ad68b16..d55f3d689 100644 +--- a/vllm/attention/layer.py ++++ b/vllm/attention/layer.py +@@ -22,6 +22,7 @@ from vllm.model_executor.layers.quantization.kv_cache import BaseKVCacheMethod + from vllm.platforms import _Backend, current_platform + from vllm.utils import direct_register_custom_op + from vllm.v1.attention.backends.utils import validate_kv_sharing_target ++from ucm.sparse.state import get_ucm_sparse, has_ucm_sparse + + + class Attention(nn.Module): +@@ -409,9 +410,11 @@ def unified_attention( + attn_metadata = attn_metadata[layer_name] + self = forward_context.no_compile_layers[layer_name] + kv_cache = self.kv_cache[forward_context.virtual_engine] ++ maybe_execute_sparse_attention_begin(query, key, value, layer_name, forward_context) + output = self.impl.forward(self, query, key, value, kv_cache, + attn_metadata) + ++ maybe_execute_sparse_attention_finished(query, key, value, output, layer_name, forward_context) + maybe_save_kv_layer_to_connector(layer_name, kv_cache) + return output + +@@ -449,6 +452,7 @@ def unified_attention_with_output( + attn_metadata = attn_metadata[layer_name] + self = forward_context.no_compile_layers[layer_name] + kv_cache = self.kv_cache[forward_context.virtual_engine] ++ maybe_execute_sparse_attention_begin(query, key, value, layer_name, forward_context) + self.impl.forward(self, + query, + key, +@@ -458,6 +462,7 @@ def unified_attention_with_output( + output=output, + output_scale=output_scale) + ++ maybe_execute_sparse_attention_finished(query, key, value, output, layer_name, forward_context) + maybe_save_kv_layer_to_connector(layer_name, kv_cache) + + +@@ -479,3 +484,41 @@ direct_register_custom_op( + fake_impl=unified_attention_with_output_fake, + dispatch_key=current_platform.dispatch_key, + ) ++ ++ ++def maybe_execute_sparse_attention_begin( ++ query: torch.Tensor, ++ key: torch.Tensor, ++ value: torch.Tensor, ++ layer_name: str, ++ forward_context: ForwardContext, ++): ++ if not has_ucm_sparse(): ++ return ++ ++ ucm_sparse = get_ucm_sparse() ++ ++ attn_metadata = forward_context.attn_metadata ++ if attn_metadata is None: ++ return ++ ++ ucm_sparse.attention_begin(query, key, value, layer_name, forward_context) ++ ++def maybe_execute_sparse_attention_finished( ++ query: torch.Tensor, ++ key: torch.Tensor, ++ value: torch.Tensor, ++ attn_output: torch.Tensor, ++ layer_name: str, ++ forward_context: ForwardContext, ++): ++ if not has_ucm_sparse(): ++ return ++ ++ ucm_sparse = get_ucm_sparse() ++ ++ attn_metadata = forward_context.attn_metadata ++ if attn_metadata is None: ++ return ++ ++ ucm_sparse.attention_finished(query, key, value, attn_output, layer_name, forward_context) +\ No newline at end of file +diff --git a/vllm/v1/core/kv_cache_manager.py b/vllm/v1/core/kv_cache_manager.py +index 6937455e7..bf9aec864 100644 +--- a/vllm/v1/core/kv_cache_manager.py ++++ b/vllm/v1/core/kv_cache_manager.py +@@ -3,7 +3,7 @@ + + from collections import defaultdict + from dataclasses import dataclass +-from typing import Optional ++from typing import Optional, Union + + from vllm.distributed.kv_events import KVCacheEvent + from vllm.logger import init_logger +@@ -14,6 +14,8 @@ from vllm.v1.core.kv_cache_utils import (BlockHash, KVCacheBlock, + from vllm.v1.kv_cache_interface import KVCacheConfig + from vllm.v1.metrics.stats import PrefixCacheStats + from vllm.v1.request import Request, RequestStatus ++from ucm.sparse.state import get_ucm_sparse, has_ucm_sparse ++from ucm.sparse.base import INVALID_SLOT + + logger = init_logger(__name__) + +@@ -193,6 +195,7 @@ class KVCacheManager: + num_draft_tokens: int = 0, + num_lookahead_tokens: int = 0, + delay_cache_blocks: bool = False, ++ num_slots_sparsed: Union[None, int] = None + ) -> Optional[KVCacheBlocks]: + """Add slots for a request with new tokens to append. + +@@ -231,6 +234,8 @@ class KVCacheManager: + """ + if num_new_tokens == 0: + raise ValueError("num_new_tokens must be greater than 0") ++ if num_slots_sparsed != INVALID_SLOT: ++ return get_ucm_sparse().allocate_slots(self, request, num_slots_sparsed) + + if new_computed_blocks is not None: + new_computed_block_list = new_computed_blocks.blocks +diff --git a/vllm/v1/core/sched/output.py b/vllm/v1/core/sched/output.py +index c94e421c0..f6f170e10 100644 +--- a/vllm/v1/core/sched/output.py ++++ b/vllm/v1/core/sched/output.py +@@ -157,3 +157,6 @@ class SchedulerOutput: + + # KV Cache Connector metadata. + kv_connector_metadata: Optional[KVConnectorMetadata] = None ++ ++ # modified slots by sparse algorithm ++ req_sparsed_slots: dict[str, int] = None +\ No newline at end of file +diff --git a/vllm/v1/core/sched/scheduler.py b/vllm/v1/core/sched/scheduler.py +index 2d4fd4d59..8268c1409 100644 +--- a/vllm/v1/core/sched/scheduler.py ++++ b/vllm/v1/core/sched/scheduler.py +@@ -35,6 +35,8 @@ from vllm.v1.request import Request, RequestStatus + from vllm.v1.spec_decode.metrics import SpecDecodingStats + from vllm.v1.structured_output import StructuredOutputManager + from vllm.distributed.kv_transfer.kv_connector.v1.multi_connector import MultiConnector ++from ucm.sparse.state import ensure_ucm_sparse_initialized, get_ucm_sparse, has_ucm_sparse ++from ucm.sparse.base import UcmSparseBase, UcmSparseRole, INVALID_SLOT + + logger = init_logger(__name__) + +@@ -80,12 +82,18 @@ class Scheduler(SchedulerInterface): + # will have a corresponding KVConnector with Role=WORKER. + # KV Connector pushes/pull of remote KVs for P/D and offloading. + self.connector = None ++ self.ucm_sparse = None + if self.vllm_config.kv_transfer_config is not None: + assert len(self.kv_cache_config.kv_cache_groups) == 1, ( + "Multiple KV cache groups are not currently supported " + "with KV connectors") + self.connector = KVConnectorFactory.create_connector_v1( + config=self.vllm_config, role=KVConnectorRole.SCHEDULER) ++ # Initialize UCM Sparse if available ++ if "ucm_sparse_config" in vllm_config.kv_transfer_config.kv_connector_extra_config: ++ ensure_ucm_sparse_initialized(vllm_config, role=UcmSparseRole.SCHEDULER) ++ self.ucm_sparse = get_ucm_sparse() ++ logger.info("UCM Sparse initialized successfully: {}".format(self.ucm_sparse)) + + self.kv_event_publisher = EventPublisherFactory.create( + self.kv_events_config, +@@ -203,8 +211,13 @@ class Scheduler(SchedulerInterface): + + # First, schedule the RUNNING requests. + req_index = 0 ++ req_sparsed_slots: dict[str, int] = {} + while req_index < len(self.running) and token_budget > 0: + request = self.running[req_index] ++ num_slots_sparsed = INVALID_SLOT ++ if self.ucm_sparse: ++ num_slots_sparsed = self.ucm_sparse.estimate_num_slots_sparsed(request) ++ req_sparsed_slots.update({request.request_id: num_slots_sparsed}) + + num_new_tokens = (request.num_tokens_with_spec - + request.num_computed_tokens) +@@ -252,7 +265,8 @@ class Scheduler(SchedulerInterface): + request, + num_new_tokens, + num_draft_tokens=num_draft_tokens, +- num_lookahead_tokens=self.num_lookahead_tokens) ++ num_lookahead_tokens=self.num_lookahead_tokens, ++ num_slots_sparsed=num_slots_sparsed) + if new_blocks is None: + # The request cannot be scheduled. + # Preempt the lowest-priority request. +@@ -339,6 +353,10 @@ class Scheduler(SchedulerInterface): + break + + request = self.waiting.peek_request() ++ num_slots_sparsed = INVALID_SLOT ++ if self.ucm_sparse: ++ num_slots_sparsed = self.ucm_sparse.estimate_num_slots_sparsed(request) ++ req_sparsed_slots.update({request.request_id: num_slots_sparsed}) + + # KVTransfer: skip request if still waiting for remote kvs. + if request.status == RequestStatus.WAITING_FOR_REMOTE_KVS: +@@ -448,6 +466,7 @@ class Scheduler(SchedulerInterface): + new_computed_blocks, + num_lookahead_tokens=self.num_lookahead_tokens, + delay_cache_blocks=load_kv_async, ++ num_slots_sparsed=num_slots_sparsed + ) + if new_blocks is None: + # The request cannot be scheduled. +@@ -561,6 +580,7 @@ class Scheduler(SchedulerInterface): + scheduled_spec_decode_tokens=scheduled_spec_decode_tokens, + scheduled_encoder_inputs=scheduled_encoder_inputs, + num_common_prefix_blocks=num_common_prefix_blocks, ++ req_sparsed_slots=req_sparsed_slots, + # finished_req_ids is an existing state in the scheduler, + # instead of being newly scheduled in this step. + # It contains the request IDs that are finished in between +@@ -955,6 +975,8 @@ class Scheduler(SchedulerInterface): + def add_request(self, request: Request) -> None: + self.waiting.add_request(request) + self.requests[request.request_id] = request ++ if self.ucm_sparse: ++ self.ucm_sparse.request_begin(request.request_id, request.prompt_token_ids) + if self.log_stats: + request.record_event(EngineCoreEventType.QUEUED) + +@@ -1004,6 +1026,8 @@ class Scheduler(SchedulerInterface): + + def _free_request(self, request: Request) -> Optional[dict[str, Any]]: + assert request.is_finished() ++ if self.ucm_sparse: ++ self.ucm_sparse.request_finished_in_scheduler(request.request_id) + + delay_free_blocks, kv_xfer_params = self._connector_finished(request) + self.encoder_cache_manager.free(request) +diff --git a/vllm/v1/worker/block_table.py b/vllm/v1/worker/block_table.py +index 8f4e8d64c..f45e39f5c 100644 +--- a/vllm/v1/worker/block_table.py ++++ b/vllm/v1/worker/block_table.py +@@ -61,6 +61,15 @@ class BlockTable: + self.num_blocks_per_row[row_idx] += num_blocks + self.block_table_np[row_idx, start:start + num_blocks] = block_ids + ++ def reset_row( ++ self, ++ row_idx: int, ++ ) -> None: ++ self.num_blocks_per_row[row_idx] = 0 ++ self.block_table[row_idx].fill_(0) ++ self.block_table_cpu[row_idx].fill_(0) ++ self.block_table_np[row_idx].fill(0) ++ + def add_row(self, block_ids: list[int], row_idx: int) -> None: + self.num_blocks_per_row[row_idx] = 0 + self.append_row(block_ids, row_idx) +@@ -117,6 +126,10 @@ class MultiGroupBlockTable: + for i, block_table in enumerate(self.block_tables): + block_table.append_row(block_ids[i], row_idx) + ++ def reset_row(self, row_idx: int) -> None: ++ for i, block_table in enumerate(self.block_tables): ++ block_table.reset_row(row_idx) ++ + def add_row(self, block_ids: tuple[list[int], ...], row_idx: int) -> None: + for i, block_table in enumerate(self.block_tables): + block_table.add_row(block_ids[i], row_idx) +diff --git a/vllm/v1/worker/gpu_model_runner.py b/vllm/v1/worker/gpu_model_runner.py +index c3df1d5d2..6341efc70 100644 +--- a/vllm/v1/worker/gpu_model_runner.py ++++ b/vllm/v1/worker/gpu_model_runner.py +@@ -72,6 +72,9 @@ from ..sample.logits_processor import LogitsProcessorManager + from .utils import (gather_mm_placeholders, initialize_kv_cache_for_kv_sharing, + sanity_check_mm_encoder_outputs, scatter_mm_placeholders) + ++from ucm.sparse.state import get_ucm_sparse, has_ucm_sparse ++from ucm.sparse.base import UcmSparseMetadata, INVALID_SLOT ++ + if TYPE_CHECKING: + import xgrammar as xgr + import xgrammar.kernels.apply_token_bitmask_inplace_torch_compile as xgr_torch_compile # noqa: E501 +@@ -365,6 +368,7 @@ class GPUModelRunner(LoRAModelRunnerMixin): + """ + # Remove finished requests from the cached states. + for req_id in scheduler_output.finished_req_ids: ++ self.ucm_sparse_request_finished_in_worker(req_id) + self.requests.pop(req_id, None) + self.encoder_cache.pop(req_id, None) + # Remove the finished requests from the persistent batch. +@@ -468,12 +472,14 @@ class GPUModelRunner(LoRAModelRunnerMixin): + # Update the states of the running/resumed requests. + is_last_rank = get_pp_group().is_last_rank + req_data = scheduler_output.scheduled_cached_reqs ++ req_sparsed_slots = scheduler_output.req_sparsed_slots + for i, req_id in enumerate(req_data.req_ids): + req_state = self.requests[req_id] + num_computed_tokens = req_data.num_computed_tokens[i] + new_block_ids = req_data.new_block_ids[i] + resumed_from_preemption = req_data.resumed_from_preemption[i] + num_output_tokens = req_data.num_output_tokens[i] ++ is_sparsed_request = req_sparsed_slots[req_id] != INVALID_SLOT + + # Update the cached states. + req_state.num_computed_tokens = num_computed_tokens +@@ -510,15 +516,15 @@ class GPUModelRunner(LoRAModelRunnerMixin): + end_idx:old_end_idx] = False + + # Update the block IDs. +- if not resumed_from_preemption: +- # Append the new blocks to the existing block IDs. +- for block_ids, new_ids in zip(req_state.block_ids, +- new_block_ids): +- block_ids.extend(new_ids) +- else: ++ if resumed_from_preemption or is_sparsed_request: + # The request is resumed from preemption. + # Replace the existing block IDs with the new ones. + req_state.block_ids = new_block_ids ++ else: ++ # Append the new blocks to the existing block IDs. ++ for block_ids, new_ids in zip(req_state.block_ids, ++ new_block_ids): ++ block_ids.extend(new_ids) + + req_index = self.input_batch.req_id_to_index.get(req_id) + if req_index is None: +@@ -531,6 +537,8 @@ class GPUModelRunner(LoRAModelRunnerMixin): + # Update the persistent batch. + self.input_batch.num_computed_tokens_cpu[req_index] = ( + num_computed_tokens) ++ if is_sparsed_request: ++ self.input_batch.block_table.reset_row(req_index) + self.input_batch.block_table.append_row(new_block_ids, req_index) + + # For the last rank, we don't need to update the token_ids_cpu +@@ -639,6 +647,20 @@ class GPUModelRunner(LoRAModelRunnerMixin): + if self.uses_mrope: + self._calc_mrope_positions(scheduler_output) + ++ self.seq_lens_np[:num_reqs] = ( ++ self.input_batch.num_computed_tokens_cpu[:num_reqs] + ++ num_scheduled_tokens) ++ ++ # TODO: improve performance, no `positions_np.copy()` ++ sparsed_positions = positions_np.copy() ++ req_sparsed_slots = scheduler_output.req_sparsed_slots ++ for req_id in self.input_batch.req_id_to_index: ++ is_sparsed_request = req_sparsed_slots[req_id] != INVALID_SLOT ++ req_index = self.input_batch.req_id_to_index[req_id] ++ offset = 0 if req_index == 0 else cu_num_tokens[req_index - 1] # TODO: support MTP ++ if is_sparsed_request: ++ sparsed_positions[offset] = req_sparsed_slots[req_id] - 1 ++ + # Get token indices. + # E.g., [0, 1, 0, 1, 2, 3, 4, 0, 1, 2] + # -> [0, 1, M, M + 1, M + 2, M + 3, M + 4, 2 * M, 2 * M + 1, 2 * M + 2] +@@ -668,11 +690,11 @@ class GPUModelRunner(LoRAModelRunnerMixin): + # block_size. + block_table_indices = ( + req_indices * block_table.max_num_blocks_per_req + +- positions_np // block_size) ++ sparsed_positions // block_size) + block_table_cpu = block_table.get_cpu_tensor() + block_numbers = block_table_cpu.flatten( + )[block_table_indices].numpy() +- block_offsets = positions_np % block_size ++ block_offsets = sparsed_positions % block_size + np.add( + block_numbers * block_size, + block_offsets, +@@ -682,9 +704,11 @@ class GPUModelRunner(LoRAModelRunnerMixin): + self.query_start_loc_np[0] = 0 + self.query_start_loc_np[1:num_reqs + 1] = cu_num_tokens + +- self.seq_lens_np[:num_reqs] = ( +- self.input_batch.num_computed_tokens_cpu[:num_reqs] + +- num_scheduled_tokens) ++ for req_id in self.input_batch.req_id_to_index: ++ req_index = self.input_batch.req_id_to_index[req_id] ++ is_sparsed_request = scheduler_output.req_sparsed_slots[req_id] != INVALID_SLOT ++ if is_sparsed_request: ++ self.seq_lens_np[req_index] = scheduler_output.req_sparsed_slots[req_id] + + # Copy the tensors to the GPU. + self.input_ids[:total_num_scheduled_tokens].copy_( +@@ -696,6 +720,8 @@ class GPUModelRunner(LoRAModelRunnerMixin): + non_blocking=True) + else: + # Common case (1D positions) ++ self.positions_cpu[:total_num_scheduled_tokens] = torch.from_numpy( ++ positions_np[:total_num_scheduled_tokens]) + self.positions[:total_num_scheduled_tokens].copy_( + self.positions_cpu[:total_num_scheduled_tokens], + non_blocking=True) +@@ -1386,6 +1412,7 @@ class GPUModelRunner(LoRAModelRunnerMixin): + skip_cuda_graphs=skip_cuda_graphs, + ): + self.maybe_setup_kv_connector(scheduler_output) ++ self.maybe_execute_ucm_sparse_begin(scheduler_output, attn_metadata) + + model_output = self.model( + input_ids=input_ids, +@@ -1395,6 +1422,8 @@ class GPUModelRunner(LoRAModelRunnerMixin): + ) + + finished_dumping = self.maybe_wait_for_kv_save() ++ self.maybe_execute_ucm_sparse_finished() ++ + finished_sending, finished_recving = ( + self.get_finished_kv_transfers(scheduler_output)) + invalid_block_ids = self.get_block_ids_with_load_errors() +@@ -1745,6 +1774,25 @@ class GPUModelRunner(LoRAModelRunnerMixin): + if has_kv_transfer_group(): + return get_kv_transfer_group().wait_for_save() + ++ def maybe_execute_ucm_sparse_begin(self, scheduler_output: "SchedulerOutput", attn_metadata: CommonAttentionMetadata): ++ if not has_ucm_sparse(): ++ return ++ ucm_sparse = get_ucm_sparse() ++ ucm_sparse.build_sparse_meta(scheduler_output, self.requests, self.input_batch, attn_metadata) ++ ucm_sparse.execute_begin(scheduler_output) ++ ++ def maybe_execute_ucm_sparse_finished(self): ++ if not has_ucm_sparse(): ++ return ++ ucm_sparse = get_ucm_sparse() ++ ucm_sparse.execute_finished() ++ ++ def ucm_sparse_request_finished_in_worker(self, request_id: str | int): ++ if not has_ucm_sparse(): ++ return ++ ucm_sparse = get_ucm_sparse() ++ ucm_sparse.request_finished_in_worker(request_id) ++ + @staticmethod + def get_finished_kv_transfers( + scheduler_output: "SchedulerOutput", +diff --git a/vllm/v1/worker/gpu_worker.py b/vllm/v1/worker/gpu_worker.py +index 1b816b25b..d9666d102 100644 +--- a/vllm/v1/worker/gpu_worker.py ++++ b/vllm/v1/worker/gpu_worker.py +@@ -30,6 +30,7 @@ from vllm.v1.outputs import EMPTY_MODEL_RUNNER_OUTPUT, ModelRunnerOutput + from vllm.v1.utils import report_usage_stats + from vllm.v1.worker.gpu_model_runner import GPUModelRunner + from vllm.v1.worker.worker_base import WorkerBase ++from ucm.sparse.state import ensure_ucm_sparse_initialized + + logger = init_logger(__name__) + +@@ -401,6 +402,7 @@ def init_worker_distributed_environment( + parallel_config.pipeline_parallel_size) + + ensure_kv_transfer_initialized(vllm_config) ++ ensure_ucm_sparse_initialized(vllm_config) + + + def _check_if_gpu_supports_dtype(torch_dtype: torch.dtype): +-- +2.34.1 + From 3c4c5f6eb91434e59ebbabc65a3e2f7efa920310 Mon Sep 17 00:00:00 2001 From: harrisonyhq Date: Wed, 5 Nov 2025 00:41:45 -0800 Subject: [PATCH 2/2] [Docs] Adapt dockerfile and installation --- docker/Dockerfile | 4 +++- docs/source/getting-started/installation_gpu.md | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/docker/Dockerfile b/docker/Dockerfile index 622d8fc1..f4557594 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -15,6 +15,8 @@ RUN export PLATFORM="cuda" && \ # Apply patch for vLLM RUN cd $(pip show vllm | grep Location | awk '{print $2}') \ - && git apply /vllm-workspace/unified-cache-management/ucm/integration/vllm/patch/0.9.2/vllm-adapt.patch + && git apply /vllm-workspace/unified-cache-management/ucm/integration/vllm/patch/0.9.2/vllm-adapt-pc.patch \ + && git apply /vllm-workspace/unified-cache-management/ucm/integration/vllm/patch/0.9.2/vllm-adapt-aggre.patch \ + && git apply /vllm-workspace/unified-cache-management/ucm/integration/vllm/patch/0.9.2/vllm-adapt-sparse.patch ENTRYPOINT ["/bin/bash"] \ No newline at end of file diff --git a/docs/source/getting-started/installation_gpu.md b/docs/source/getting-started/installation_gpu.md index 2ab51d70..f17652d3 100644 --- a/docs/source/getting-started/installation_gpu.md +++ b/docs/source/getting-started/installation_gpu.md @@ -48,7 +48,8 @@ After installation, please apply patch to ensure uc_connector can be used: ```bash cd $(pip show vllm | grep Location | awk '{print $2}') -git apply /vllm-workspace/unified-cache-management/ucm/integration/vllm/patch/0.9.2/vllm-adapt.patch +git apply /vllm-workspace/unified-cache-management/ucm/integration/vllm/patch/0.9.2/vllm-adapt-pc.patch +git apply /vllm-workspace/unified-cache-management/ucm/integration/vllm/patch/0.9.2/vllm-adapt-aggre.patch git apply /vllm-workspace/unified-cache-management/ucm/integration/vllm/patch/0.9.2/vllm-adapt-sparse.patch ```