Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion tests/v1/core/test_async_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
from collections import deque

import numpy as np
import pytest

from vllm.v1.core.sched.output import SchedulerOutput
Expand All @@ -21,7 +22,7 @@ def _make_model_runner_output(
return ModelRunnerOutput(
req_ids=req_ids,
req_id_to_index={req_id: i for i, req_id in enumerate(req_ids)},
sampled_token_ids=[[i] for i in range(len(req_ids))],
sampled_token_ids=[np.array([i]) for i in range(len(req_ids))],
logprobs=None,
prompt_logprobs_dict={},
pooler_output=[],
Expand Down
6 changes: 4 additions & 2 deletions tests/v1/core/test_priority_scheduler_random.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import random
import uuid

import numpy as np
import pytest

from vllm.config import VllmConfig
Expand Down Expand Up @@ -99,8 +100,7 @@ def _mock_execute_model(
random.randint(*num_output_tokens_range) for _ in range(len(request_ids))
]
sampled_token_ids = [
[random.randint(0, 100) for _ in range(num_tokens)]
for num_tokens in num_output_tokens
np.random.randint(0, 100, size=num_tokens) for num_tokens in num_output_tokens
]

return ModelRunnerOutput(
Expand Down Expand Up @@ -196,6 +196,8 @@ def test_priority_scheduling_blast(
num_blocks: int,
):
random.seed(42)
np.random.seed(42)

seen_request_prompt_length = dict[str, int]()
seen_request_ids = set[str]()
seen_mm_hashes = set[str]()
Expand Down
88 changes: 50 additions & 38 deletions tests/v1/core/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import dataclasses
from unittest.mock import Mock

import numpy as np
import pytest
import torch

Expand Down Expand Up @@ -169,7 +170,7 @@ def test_schedule_partial_requests():
req_id_to_index=req_to_index,
# Only the first request has a sampled token id because
# the rest requests are still being prefilled.
sampled_token_ids=[[0], [], []],
sampled_token_ids=[np.array([0]), np.array([]), np.array([])],
logprobs=None,
prompt_logprobs_dict={},
pooler_output=[],
Expand Down Expand Up @@ -216,7 +217,7 @@ def test_no_mm_input_chunking():
model_runner_output = ModelRunnerOutput(
req_ids=[request.request_id for request in requests],
req_id_to_index=req_to_index,
sampled_token_ids=[[] for _ in range(len(requests))],
sampled_token_ids=[np.array([]) for _ in range(len(requests))],
logprobs=None,
prompt_logprobs_dict={},
pooler_output=[],
Expand Down Expand Up @@ -276,7 +277,7 @@ def test_schedule_concurrent_partial_requests(enable_prefix_caching: bool):
model_runner_output = ModelRunnerOutput(
req_ids=[request.request_id for request in requests],
req_id_to_index=req_to_index,
sampled_token_ids=[[] for _ in range(len(requests))],
sampled_token_ids=[np.array([]) for _ in range(len(requests))],
logprobs=None,
prompt_logprobs_dict={},
pooler_output=[],
Expand All @@ -300,7 +301,8 @@ def test_schedule_concurrent_partial_requests(enable_prefix_caching: bool):
model_runner_output = ModelRunnerOutput(
req_ids=[request.request_id for request in requests],
req_id_to_index=req_to_index,
sampled_token_ids=[[0], [0]] + [[] for _ in range(len(requests) - 2)],
sampled_token_ids=[np.array([0]), np.array([0])]
+ [np.array([]) for _ in range(len(requests) - 2)],
logprobs=None,
prompt_logprobs_dict={},
pooler_output=[],
Expand Down Expand Up @@ -347,8 +349,8 @@ def test_stop_via_update_from_output():
req_ids=[req.request_id for req in requests],
req_id_to_index={req.request_id: i for i, req in enumerate(requests)},
sampled_token_ids=[
[EOS_TOKEN_ID],
[10, 11],
np.array([EOS_TOKEN_ID]),
np.array([10, 11]),
], # First request hits EOS, second continues
logprobs=None,
prompt_logprobs_dict={},
Expand Down Expand Up @@ -392,7 +394,10 @@ def test_stop_via_update_from_output():
model_output = ModelRunnerOutput(
req_ids=[req.request_id for req in requests],
req_id_to_index={req.request_id: i for i, req in enumerate(requests)},
sampled_token_ids=[[10, 42, 12], [13, 14]], # First request hits stop token
sampled_token_ids=[
np.array([10, 42, 12]),
np.array([13, 14]),
], # First request hits stop token
logprobs=None,
prompt_logprobs_dict={},
pooler_output=[],
Expand Down Expand Up @@ -436,7 +441,10 @@ def test_stop_via_update_from_output():
model_output = ModelRunnerOutput(
req_ids=[req.request_id for req in requests],
req_id_to_index={req.request_id: i for i, req in enumerate(requests)},
sampled_token_ids=[[10, 11, 12], [13]], # First request exceeds max_tokens
sampled_token_ids=[
np.array([10, 11, 12]),
np.array([13]),
], # First request exceeds max_tokens
logprobs=None,
prompt_logprobs_dict={},
pooler_output=[],
Expand Down Expand Up @@ -475,7 +483,7 @@ def test_stop_via_update_from_output():
model_output = ModelRunnerOutput(
req_ids=[requests[0].request_id],
req_id_to_index={requests[0].request_id: 0},
sampled_token_ids=[[EOS_TOKEN_ID, 10, 11]],
sampled_token_ids=[np.array([EOS_TOKEN_ID, 10, 11])],
logprobs=None,
prompt_logprobs_dict={},
pooler_output=[],
Expand Down Expand Up @@ -616,7 +624,7 @@ def test_schedule_concurrent_batches(
model_runner_output = ModelRunnerOutput(
req_ids=[requests[0].request_id],
req_id_to_index={requests[0].request_id: 0},
sampled_token_ids=[[0]],
sampled_token_ids=[np.array([0])],
logprobs=None,
prompt_logprobs_dict={},
pooler_output=[],
Expand All @@ -633,7 +641,7 @@ def test_schedule_concurrent_batches(
model_runner_output = ModelRunnerOutput(
req_ids=[requests[1].request_id],
req_id_to_index={requests[1].request_id: 0},
sampled_token_ids=[[0]],
sampled_token_ids=[np.array([0])],
logprobs=None,
prompt_logprobs_dict={},
pooler_output=[],
Expand Down Expand Up @@ -670,7 +678,7 @@ def test_preempt_during_execution():
model_runner_output0 = ModelRunnerOutput(
req_ids=[requests[0].request_id],
req_id_to_index={requests[0].request_id: 0},
sampled_token_ids=[[0]],
sampled_token_ids=[np.array([0])],
logprobs=None,
prompt_logprobs_dict={},
pooler_output=[],
Expand All @@ -687,7 +695,7 @@ def test_preempt_during_execution():
model_runner_output1 = ModelRunnerOutput(
req_ids=[requests[1].request_id],
req_id_to_index={requests[1].request_id: 0},
sampled_token_ids=[[42]],
sampled_token_ids=[np.array([42])],
logprobs=None,
prompt_logprobs_dict={},
pooler_output=[],
Expand All @@ -704,14 +712,18 @@ def test_preempt_during_execution():
@pytest.mark.parametrize(
"spec_tokens,output_tokens,expected",
[
([[1, 2, 3]], [[1, 2, 3, 4]], (1, 3, 3, [1, 1, 1])), # perfect match
([[1, 2, 3]], [[1, 5]], (1, 3, 1, [1, 0, 0])), # early mismatch
([[1, 2], [3]], [[1, 2, 5], [3, 4]], (2, 3, 3, [2, 1])), # multiple sequences
([[1]], [[1, 2]], (1, 1, 1, [1])), # single token sequence
([[]], [[5]], (0, 0, 0, [0])), # empty sequence
([[1, 2, 3]], [np.array([1, 2, 3, 4])], (1, 3, 3, [1, 1, 1])), # perfect match
([[1, 2, 3]], [np.array([1, 5])], (1, 3, 1, [1, 0, 0])), # early mismatch
(
[[1, 2], [3]],
[np.array([1, 2, 5]), np.array([3, 4])],
(2, 3, 3, [2, 1]),
), # multiple sequences
([[1]], [np.array([1, 2])], (1, 1, 1, [1])), # single token sequence
([[]], [np.array([5])], (0, 0, 0, [0])), # empty sequence
(
[[1, 2, 3], [4, 5, 6]],
[[1, 2, 7], [4, 8]],
[np.array([1, 2, 7]), np.array([4, 8])],
(2, 6, 3, [2, 1, 0]),
), # multiple mismatches
],
Expand Down Expand Up @@ -745,7 +757,7 @@ def test_schedule_spec_decoding_stats(spec_tokens, output_tokens, expected):
model_runner_output = ModelRunnerOutput(
req_ids=req_ids,
req_id_to_index=req_to_index,
sampled_token_ids=[[0] for _ in range(len(requests))],
sampled_token_ids=[np.array([0]) for _ in range(len(requests))],
logprobs=None,
prompt_logprobs_dict={},
pooler_output=[],
Expand Down Expand Up @@ -972,7 +984,7 @@ def test_kv_connector_basic(is_async: bool):
MODEL_RUNNER_OUTPUT = ModelRunnerOutput(
req_ids=req_ids,
req_id_to_index=req_to_index,
sampled_token_ids=[[1000]] * len(req_ids),
sampled_token_ids=[np.array([1000])] * len(req_ids),
logprobs=None,
prompt_logprobs_dict={},
pooler_output=[],
Expand Down Expand Up @@ -1025,7 +1037,7 @@ def test_kv_connector_basic(is_async: bool):
MODEL_RUNNER_OUTPUT = ModelRunnerOutput(
req_ids=req_ids,
req_id_to_index=req_to_index,
sampled_token_ids=[[1000]] * len(req_ids),
sampled_token_ids=[np.array([1000])] * len(req_ids),
logprobs=None,
prompt_logprobs_dict={},
pooler_output=[],
Expand Down Expand Up @@ -1088,7 +1100,7 @@ def test_external_prefix_cache_metrics():
MODEL_RUNNER_OUTPUT = ModelRunnerOutput(
req_ids=[r.request_id for r in requests],
req_id_to_index={r.request_id: i for i, r in enumerate(requests)},
sampled_token_ids=[[1000]] * NUM_REQUESTS,
sampled_token_ids=[np.array([1000])] * NUM_REQUESTS,
logprobs=None,
prompt_logprobs_dict={},
pooler_output=[],
Expand Down Expand Up @@ -1154,7 +1166,7 @@ def test_kv_connector_unable_to_allocate(use_ec_connector, ec_role):
MODEL_RUNNER_OUTPUT = ModelRunnerOutput(
req_ids=req_ids,
req_id_to_index=req_to_index,
sampled_token_ids=[[1000]] * len(req_ids),
sampled_token_ids=[np.array([1000])] * len(req_ids),
logprobs=None,
prompt_logprobs_dict={},
pooler_output=[],
Expand Down Expand Up @@ -1239,7 +1251,7 @@ def test_kv_connector_handles_preemption(use_ec_connector, ec_role):
MODEL_RUNNER_OUTPUT = ModelRunnerOutput(
req_ids=req_ids,
req_id_to_index=req_to_index,
sampled_token_ids=[[1000]] * len(req_ids),
sampled_token_ids=[np.array([1000])] * len(req_ids),
logprobs=None,
prompt_logprobs_dict={},
pooler_output=[],
Expand Down Expand Up @@ -1332,7 +1344,7 @@ def make_output(scheduler: Scheduler):
return ModelRunnerOutput(
req_ids=[req.request_id for req in scheduler.running],
req_id_to_index={req.request_id: i for i, req in enumerate(scheduler.running)},
sampled_token_ids=[[1000]] * len(scheduler.running),
sampled_token_ids=[np.array([1000])] * len(scheduler.running),
logprobs=None,
prompt_logprobs_dict={},
pooler_output=[],
Expand Down Expand Up @@ -1749,7 +1761,7 @@ def test_priority_scheduling_preemption():
req_id_to_index={
req.request_id: i for i, req in enumerate(low_priority_requests)
},
sampled_token_ids=[[100] for _ in low_priority_requests],
sampled_token_ids=[np.array([100]) for _ in low_priority_requests],
logprobs=None,
prompt_logprobs_dict={},
pooler_output=[],
Expand Down Expand Up @@ -1818,7 +1830,7 @@ def test_priority_scheduling_no_preemption_when_space_available():
req_id_to_index={
req.request_id: i for i, req in enumerate(low_priority_requests)
},
sampled_token_ids=[[100] for _ in low_priority_requests],
sampled_token_ids=[np.array([100]) for _ in low_priority_requests],
logprobs=None,
prompt_logprobs_dict={},
pooler_output=[],
Expand Down Expand Up @@ -2064,7 +2076,7 @@ def test_priority_scheduling_heap_property():
model_output = ModelRunnerOutput(
req_ids=[req.req_id],
req_id_to_index={req.req_id: 0},
sampled_token_ids=[[100]],
sampled_token_ids=[np.array([100])],
logprobs=None,
prompt_logprobs_dict={},
pooler_output=[],
Expand Down Expand Up @@ -2150,7 +2162,7 @@ def test_priority_scheduling_preemption_and_resumption_when_out_of_kv(
model_output = ModelRunnerOutput(
req_ids=[request_low.request_id],
req_id_to_index={request_low.request_id: 0},
sampled_token_ids=[[100]],
sampled_token_ids=[np.array([100])],
# spec_token_ids=None,
logprobs=None,
prompt_logprobs_dict={},
Expand Down Expand Up @@ -2181,7 +2193,7 @@ def test_priority_scheduling_preemption_and_resumption_when_out_of_kv(
model_output = ModelRunnerOutput(
req_ids=[req.request_id for req in requests],
req_id_to_index={req.request_id: i for i, req in enumerate(requests)},
sampled_token_ids=[[100] for _ in requests],
sampled_token_ids=[np.array([100]) for _ in requests],
# spec_token_ids=None,
logprobs=None,
prompt_logprobs_dict={},
Expand All @@ -2207,7 +2219,7 @@ def test_priority_scheduling_preemption_and_resumption_when_out_of_kv(
model_output = ModelRunnerOutput(
req_ids=[req.request_id for req in requests],
req_id_to_index={req.request_id: i for i, req in enumerate(requests)},
sampled_token_ids=[[], [100]],
sampled_token_ids=[np.array([]), np.array([100])],
# spec_token_ids=None,
logprobs=None,
prompt_logprobs_dict={},
Expand Down Expand Up @@ -2624,7 +2636,7 @@ def test_ec_connector_with_partial_cache_hit_multi_round(use_kv_connector):
model_output = ModelRunnerOutput(
req_ids=[request1.request_id],
req_id_to_index={request1.request_id: 0},
sampled_token_ids=[[100]],
sampled_token_ids=[np.array([100])],
# spec_token_ids=None,
logprobs=None,
prompt_logprobs_dict={},
Expand Down Expand Up @@ -2830,7 +2842,7 @@ def test_ec_connector_unable_to_allocate(use_kv_connector):
MODEL_RUNNER_OUTPUT = ModelRunnerOutput(
req_ids=req_ids,
req_id_to_index=req_to_index,
sampled_token_ids=[[1000]] * len(req_ids),
sampled_token_ids=[np.array([1000])] * len(req_ids),
logprobs=None,
prompt_logprobs_dict={},
pooler_output=[],
Expand Down Expand Up @@ -2943,7 +2955,7 @@ def test_priority_scheduling_ec_connector_preemption_and_resumption(
model_output = ModelRunnerOutput(
req_ids=[request_low.request_id],
req_id_to_index={request_low.request_id: 0},
sampled_token_ids=[[100]],
sampled_token_ids=[np.array([100])],
# spec_token_ids=None,
logprobs=None,
prompt_logprobs_dict={},
Expand Down Expand Up @@ -2994,7 +3006,7 @@ def test_priority_scheduling_ec_connector_preemption_and_resumption(
model_output = ModelRunnerOutput(
req_ids=[req.request_id for req in requests],
req_id_to_index={req.request_id: i for i, req in enumerate(requests)},
sampled_token_ids=[[100] for _ in requests],
sampled_token_ids=[np.array([100]) for _ in requests],
# spec_token_ids=None,
logprobs=None,
prompt_logprobs_dict={},
Expand Down Expand Up @@ -3029,7 +3041,7 @@ def test_priority_scheduling_ec_connector_preemption_and_resumption(
model_output = ModelRunnerOutput(
req_ids=[req.request_id for req in requests],
req_id_to_index={req.request_id: i for i, req in enumerate(requests)},
sampled_token_ids=[[100], [100, 200]],
sampled_token_ids=[np.array([100]), np.array([100, 200])],
# spec_token_ids=None,
logprobs=None,
prompt_logprobs_dict={},
Expand Down Expand Up @@ -3215,7 +3227,7 @@ def test_ec_connector_allocate_encoder_tokens_with_external_load(use_kv_connecto
model_output = ModelRunnerOutput(
req_ids=[request1.request_id, request2.request_id],
req_id_to_index={request1.request_id: 0, request2.request_id: 1},
sampled_token_ids=[[100], [121]],
sampled_token_ids=[np.array([100]), np.array([121])],
# spec_token_ids=None,
logprobs=None,
prompt_logprobs_dict={},
Expand Down
7 changes: 4 additions & 3 deletions tests/v1/kv_connector/unit/test_nixl_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from collections import defaultdict
from unittest.mock import patch

import numpy as np
import pytest
import ray
import torch
Expand Down Expand Up @@ -823,7 +824,7 @@ def test_kv_connector_stats_aggregation():
output = ModelRunnerOutput(
req_ids=[f"req_{i}"],
req_id_to_index={f"req_{i}": 0},
sampled_token_ids=[[123]], # dummy token
sampled_token_ids=[np.array([123])], # dummy token
logprobs=None,
prompt_logprobs_dict={},
pooler_output=[None],
Expand Down Expand Up @@ -904,7 +905,7 @@ def make_multi_stats(nixl_count: int, foo_count: int) -> MultiKVConnectorStats:
output = ModelRunnerOutput(
req_ids=[f"req_{i}"],
req_id_to_index={f"req_{i}": 0},
sampled_token_ids=[[123]],
sampled_token_ids=[np.array([123])],
logprobs=None,
prompt_logprobs_dict={},
pooler_output=[None],
Expand Down Expand Up @@ -962,7 +963,7 @@ def test_scheduler_kv_connector_stats_aggregation():
model_output = ModelRunnerOutput(
req_ids=["req_0"],
req_id_to_index={"req_0": 0},
sampled_token_ids=[[123]],
sampled_token_ids=[np.array([123])],
logprobs=None,
prompt_logprobs_dict={},
pooler_output=[None],
Expand Down
Loading
Loading