diff --git a/custom_ops/gpu_ops/rebuild_padding.cu b/custom_ops/gpu_ops/rebuild_padding.cu index 369242d197b..1ca2767b766 100644 --- a/custom_ops/gpu_ops/rebuild_padding.cu +++ b/custom_ops/gpu_ops/rebuild_padding.cu @@ -92,8 +92,8 @@ __global__ void RebuildAppendPaddingKernel(T *output_data, template std::vector rebuild_padding( - const paddle::Tensor &tmp_out, // [token_num, dim_embed] - const paddle::Tensor &cu_seqlens_q, // [bsz+1, 1] + const paddle::Tensor &tmp_out, + const paddle::Tensor &cu_seqlens_q, const paddle::Tensor &seq_len_this_time, const paddle::Tensor &seq_lens_decoder, const paddle::Tensor &seq_lens_encoder, @@ -101,16 +101,19 @@ std::vector rebuild_padding( const paddle::optional &first_token_out, int max_input_length, bool enable_logprob) { + typedef PDTraits traits_; typedef typename traits_::DataType DataType_; typedef typename traits_::data_t data_t; #ifdef PADDLE_WITH_CUSTOM_DEVICE - auto dev_ctx = static_cast(paddle::experimental::DeviceContextPool::Instance().Get(tmp_out.place())); + auto dev_ctx = static_cast( + paddle::experimental::DeviceContextPool::Instance().Get(tmp_out.place())); auto cu_stream = dev_ctx->stream(); #else auto cu_stream = tmp_out.stream(); #endif + std::vector tmp_out_shape = tmp_out.shape(); const int token_num = tmp_out_shape[0]; const int dim_embed = tmp_out_shape[1]; @@ -128,20 +131,20 @@ std::vector rebuild_padding( } } out = paddle::full({token_num - need_delete_token_num, dim_embed}, - 0, - D, - tmp_out.place()); + 0, D, tmp_out.place()); } else { - out = - paddle::full({bsz, dim_embed}, 0, tmp_out.dtype(), tmp_out.place()); + out = paddle::full({bsz, dim_embed}, 0, tmp_out.dtype(), tmp_out.place()); } constexpr int PackSize = VEC_16B / sizeof(DataType_); - int elem_nums = out.numel(); - int pack_num = elem_nums / PackSize; + const int elem_nums = out.numel(); const int blocksize = 128; - const int grid_size = (pack_num + blocksize - 1) / blocksize; + if (output_padding_offset) { + // Speculative decoding 分支 + int pack_num = (elem_nums + PackSize - 1) / PackSize; + int grid_size = std::max(1, (pack_num + blocksize - 1) / blocksize); + RebuildAppendPaddingKernel <<>>( reinterpret_cast(out.data()), @@ -161,22 +164,42 @@ std::vector rebuild_padding( bsz, enable_logprob); } else { - RebuildPaddingKernel - <<>>( - reinterpret_cast(out.data()), - reinterpret_cast( - const_cast(tmp_out.data())), - cu_seqlens_q.data(), - seq_len_this_time.data(), - seq_lens_decoder.data(), - seq_lens_encoder.data(), - max_input_length, - dim_embed, - elem_nums); + const int actual_pack_size = (dim_embed < PackSize) ? 1 : PackSize; + const int pack_num = (elem_nums + actual_pack_size - 1) / actual_pack_size; + const int grid_size = std::max(1, (pack_num + blocksize - 1) / blocksize); + + if (actual_pack_size == 1) { + RebuildPaddingKernel + <<>>( + reinterpret_cast(out.data()), + reinterpret_cast( + const_cast(tmp_out.data())), + cu_seqlens_q.data(), + seq_len_this_time.data(), + seq_lens_decoder.data(), + seq_lens_encoder.data(), + max_input_length, + dim_embed, + elem_nums); + } else { + RebuildPaddingKernel + <<>>( + reinterpret_cast(out.data()), + reinterpret_cast( + const_cast(tmp_out.data())), + cu_seqlens_q.data(), + seq_len_this_time.data(), + seq_lens_decoder.data(), + seq_lens_encoder.data(), + max_input_length, + dim_embed, + elem_nums); + } } return {out}; } + paddle::Tensor RebuildPaddingFunc( const paddle::Tensor &tmp_out, // [token_num, dim_embed] const paddle::Tensor &cu_seqlens_q, // [bsz+1, 1] diff --git a/fastdeploy/config.py b/fastdeploy/config.py index 8d9aa9ab08f..1e1be5f7d71 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -292,7 +292,7 @@ def override_name_from_config(self): self.tensor_parallel_size = self.infer_model_mp_num del self.infer_model_mp_num - if hasattr(self, "num_hidden_layers"): + if hasattr(self, "num_hidden_layers") and self.runner != "pooling": if hasattr(self, "remove_tail_layer"): if self.remove_tail_layer is True: self.num_hidden_layers -= 1 @@ -1564,18 +1564,20 @@ def __init__( self.max_long_partial_prefills = max_long_partial_prefills self.long_prefill_token_threshold = long_prefill_token_threshold + if envs.FD_FOR_TORCH_MODEL_FORMAT: self.model_config.model_format = "torch" # TODO if not envs.FD_ENABLE_MAX_PREFILL: - self.max_prefill_batch = int(os.getenv("MAX_PREFILL_NUM", "3")) + self.max_prefill_batch = int(os.getenv("MAX_PREFILL_NUM", "10")) if current_platform.is_xpu(): self.max_prefill_batch = 1 if self.model_config is not None and self.model_config.enable_mm and not envs.ENABLE_V1_KVCACHE_SCHEDULER: self.max_prefill_batch = 1 # TODO:当前多模prefill阶段只支持并行度为1,待优化 else: self.max_prefill_batch = self.scheduler_config.max_num_seqs + # print("self.max_prefill_batch",self.max_prefill_batch) num_ranks = self.parallel_config.tensor_parallel_size * self.parallel_config.data_parallel_size self.max_chips_per_node = 16 if current_platform.is_iluvatar() else 8 @@ -1627,6 +1629,7 @@ def postprocess(self): self.scheduler_config.max_num_batched_tokens = 2048 else: self.scheduler_config.max_num_batched_tokens = self.model_config.max_model_len + # print("self.scheduler_config.max_num_bathed_tokens",self.scheduler_config.max_num_batched_tokens) if self.long_prefill_token_threshold == 0: self.long_prefill_token_threshold = int(self.model_config.max_model_len * 0.04) diff --git a/fastdeploy/engine/args_utils.py b/fastdeploy/engine/args_utils.py index 77def0feb4d..4af9af2f37c 100644 --- a/fastdeploy/engine/args_utils.py +++ b/fastdeploy/engine/args_utils.py @@ -543,6 +543,9 @@ def __post_init__(self): self.enable_prefix_caching = False self.max_encoder_cache = 0 + if self.runner == "pooling" and self.enable_prefix_caching: + self.enable_prefix_caching = False + @staticmethod def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser: """ diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index 136f1950828..8128ae6aeae 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -693,7 +693,7 @@ def _fetch_request(): max_num_batched_tokens = self.cfg.scheduler_config.max_num_batched_tokens else: max_num_batched_tokens = self.cfg.model_config.max_model_len - + tasks = self.scheduler.get_requests( available_blocks=self.cfg.cache_config.max_block_num_per_seq, block_size=self.cfg.cache_config.block_size, @@ -701,6 +701,7 @@ def _fetch_request(): max_num_batched_tokens=max_num_batched_tokens, batch=num_prefill_batch, ) + for task in tasks: trace_print(LoggingEventName.REQUEST_QUEUE_END, task.request_id, getattr(task, "user", "")) diff --git a/fastdeploy/engine/pooling_params.py b/fastdeploy/engine/pooling_params.py index 93192b6ecc5..7f831412982 100644 --- a/fastdeploy/engine/pooling_params.py +++ b/fastdeploy/engine/pooling_params.py @@ -164,7 +164,7 @@ def _set_default_parameters(self, model_config: Optional["ModelConfig"]): self.softmax = True elif self.task == "reward": if self.normalize is None: - self.normalize = True + self.normalize = False else: raise ValueError(f"Unknown pooling task: {self.task}") diff --git a/fastdeploy/engine/request.py b/fastdeploy/engine/request.py index d46ec958244..4acf2a50417 100644 --- a/fastdeploy/engine/request.py +++ b/fastdeploy/engine/request.py @@ -24,7 +24,6 @@ import numpy as np from typing_extensions import TypeVar -from fastdeploy import envs from fastdeploy.engine.pooling_params import PoolingParams from fastdeploy.engine.sampling_params import SamplingParams from fastdeploy.entrypoints.openai.protocol import ToolCall @@ -145,7 +144,10 @@ def __init__( self.multimodal_data = multimodal_data self.multimodal_img_boundaries = None - self.enable_thinking = enable_thinking + if pooling_params is not None: + self.enable_thinking = False + else: + self.enable_thinking = True self.reasoning_max_tokens = reasoning_max_tokens self.trace_carrier = trace_carrier @@ -190,6 +192,10 @@ def from_dict(cls, d: dict): pooling_params = PoolingParams.from_dict(d["pooling_params"]) else: sampling_params = SamplingParams.from_dict(d) + + enable_thinking = d.get("enable_thinking", None) + if pooling_params is not None: + enable_thinking = False return cls( request_id=d["request_id"], prompt=d.get("prompt"), @@ -216,7 +222,7 @@ def from_dict(cls, d: dict): guided_grammar=d.get("guided_grammar", None), structural_tag=d.get("structural_tag", None), guided_json_object=d.get("guided_json_object", None), - enable_thinking=d.get("enable_thinking", None), + enable_thinking=enable_thinking, reasoning_max_tokens=d.get("reasoning_max_tokens", None), trace_carrier=d.get("trace_carrier", {}), chat_template=d.get("chat_template", None), diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index fbe30d24bcf..03d008eabc6 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -649,7 +649,7 @@ def _allocate_decode_and_extend(): request = self.waiting[0] if (self._is_mm_request(request) and self.exist_mm_prefill(scheduled_reqs)) or ( - paddle.is_compiled_with_xpu() and self.exist_prefill(scheduled_reqs) + paddle.is_compiled_with_xpu() and self.exist_prefill(scheduled_reqs) ): break if request.status == RequestStatus.WAITING: diff --git a/fastdeploy/entrypoints/openai/protocol.py b/fastdeploy/entrypoints/openai/protocol.py index 4a1e4ef647f..18377772713 100644 --- a/fastdeploy/entrypoints/openai/protocol.py +++ b/fastdeploy/entrypoints/openai/protocol.py @@ -889,16 +889,6 @@ class EmbeddingChatRequest(BaseModel): user: Optional[str] = None truncate_prompt_tokens: Optional[Annotated[int, Field(ge=-1)]] = None - # --8<-- [start:chat-embedding-extra-params] - add_generation_prompt: bool = Field( - default=False, - description=( - "If true, the generation prompt will be added to the chat template. " - "This is a parameter used by chat template in tokenizer config of the " - "model." - ), - ) - add_special_tokens: bool = Field( default=False, description=( diff --git a/fastdeploy/entrypoints/openai/serving_engine.py b/fastdeploy/entrypoints/openai/serving_engine.py index bd22df56671..bc8090d97ef 100644 --- a/fastdeploy/entrypoints/openai/serving_engine.py +++ b/fastdeploy/entrypoints/openai/serving_engine.py @@ -256,7 +256,6 @@ def _process_chat_template_kwargs(self, request_dict): chat_template_kwargs.update( { "chat_template": request_dict.get("chat_template"), - "add_generation_prompt": request_dict.get("add_generation_prompt"), "add_stop_sequences": request_dict.get("add_stop_sequences"), } ) diff --git a/fastdeploy/model_executor/layers/pool/metadata.py b/fastdeploy/model_executor/layers/pool/metadata.py index 699800a0f82..c6c6811d7ba 100644 --- a/fastdeploy/model_executor/layers/pool/metadata.py +++ b/fastdeploy/model_executor/layers/pool/metadata.py @@ -15,12 +15,14 @@ """ from dataclasses import dataclass -from typing import Optional +from typing import Optional, Union import paddle from fastdeploy.engine.pooling_params import PoolingParams +Device = Union[paddle.CPUPlace, paddle.CUDAPlace, paddle.XPUPlace] + @dataclass class PoolingCursor: @@ -60,21 +62,21 @@ def __getitem__(self, indices: slice): pooling_cursor=None if self.pooling_cursor is None else self.pooling_cursor[indices], ) - def build_pooling_cursor(self, num_scheduled_tokens: list[int], device: str): + def build_pooling_cursor(self, num_scheduled_tokens: list[int], device: Device): self.pooling_cursor = build_pooling_cursor(num_scheduled_tokens, self.prompt_lens, device) -def build_pooling_cursor(num_scheduled_tokens: list[int], prompt_lens: paddle.Tensor, device: str): +def build_pooling_cursor(num_scheduled_tokens: list[int], prompt_lens: paddle.Tensor, device: Device): assert len(prompt_lens) == len(num_scheduled_tokens) n_seq = len(num_scheduled_tokens) index = list(range(n_seq)) - num_scheduled_tokens = paddle.to_tensor(num_scheduled_tokens) - cumsum = paddle.zeros([n_seq + 1], dtype="int64") + num_scheduled_tokens = paddle.to_tensor(num_scheduled_tokens, dtype="int64", place=paddle.CPUPlace()) + cumsum = paddle.zeros([n_seq + 1], dtype="int64", device=paddle.CPUPlace()) paddle.cumsum(num_scheduled_tokens, axis=0, out=cumsum[1:]) - if device == "gpu": - cumsum_device = cumsum.cuda() + if isinstance(device, paddle.CUDAPlace): + cumsum_device = paddle.assign(cumsum).cuda(device.get_device_id()) else: cumsum_device = cumsum return PoolingCursor( diff --git a/fastdeploy/model_executor/layers/pooler.py b/fastdeploy/model_executor/layers/pooler.py index 0266987a8fa..cd45ad88fa3 100644 --- a/fastdeploy/model_executor/layers/pooler.py +++ b/fastdeploy/model_executor/layers/pooler.py @@ -78,7 +78,6 @@ def get_pooling_params(pooling_metadata: PoolingMetadata) -> list[PoolingParams] def get_tasks(pooling_metadata: PoolingMetadata) -> list[PoolingTask]: pooling_params = get_pooling_params(pooling_metadata) - tasks: list[PoolingTask] = [task for pooling_param in pooling_params if (task := pooling_param.task) is not None] assert len(pooling_params) == len(tasks) @@ -108,7 +107,7 @@ class Pooler(nn.Layer, ABC): @staticmethod def for_encode(pooler_config: PoolerConfig, model_config: Optional["ModelConfig"] = None): if pooler_config.pooling_type == "STEP": - return StepPooler() + return StepPooler(model_config) resolved_config = ResolvedPoolingConfig(task="encode", pooling_type=PoolingType.ALL) return SimplePooler.from_config(resolved_config, model_config) @@ -274,6 +273,7 @@ def _proj(x: paddle.Tensor) -> paddle.Tensor: pooled_data = [vecs if d is None else vecs[..., :d] for vecs, d in zip(pooled_data, dimensions_list)] # for normalize flags = [p.normalize for p in pooling_params] + if len(set(flags)) == 1: if flags[0]: pooled_data = self.activation(pooled_data) @@ -293,7 +293,6 @@ def __init__(self, model_config: Optional["ModelConfig"] = None) -> None: def forward(self, pooled_data: Union[list[paddle.Tensor], paddle.Tensor], pooling_metadata: PoolingMetadata): pooling_params = get_pooling_params(pooling_metadata) - # for softmax flags = [p.softmax for p in pooling_params] if len(set(flags)) == 1: if flags[0]: @@ -301,6 +300,7 @@ def forward(self, pooled_data: Union[list[paddle.Tensor], paddle.Tensor], poolin else: pooled_data = [self.activation(vecs) if f else vecs for vecs, f in zip(pooled_data, flags)] + print(f"==RewardPoolerHead end==>pooled_data:{pooled_data}, ===>{pooling_metadata}") return pooled_data @@ -352,7 +352,11 @@ def forward_all( hidden_states: paddle.Tensor, pooling_cursor: PoolingCursor, ) -> Union[list[paddle.Tensor], paddle.Tensor]: - return hidden_states[pooling_cursor.last_token_indices_gpu] + return hidden_states + # print("hidden_states",hidden_states) + # print("pooling_cursor.last_token_indices_gpu",pooling_cursor.last_token_indices_gpu) + # print("hidden_states[pooling]",hidden_states[pooling_cursor.last_token_indices_gpu]) + # return hidden_states[pooling_cursor.last_token_indices_gpu] class AllPool(PoolingMethod): @@ -366,8 +370,8 @@ def forward_all( ) -> Union[list[paddle.Tensor], paddle.Tensor]: assert not pooling_cursor.is_partial_prefill(), "partial prefill not supported with ALL pooling" - hidden_states_lst = list(hidden_states.split(pooling_cursor.num_scheduled_tokens_cpu.tolist())) + return [hidden_states_lst[i] for i in pooling_cursor.index] @@ -416,11 +420,12 @@ def forward_all( class StepPooler(Pooler): def __init__( self, + model_config: ModelConfig, ) -> None: super().__init__() self.pooling = AllPool() - self.head = RewardPoolerHead() + self.head = RewardPoolerHead(model_config) def extract_states( self, @@ -455,14 +460,11 @@ def get_pooling_updates(self, task: PoolingTask) -> PoolingParamsUpdate: def forward( self, - hidden_states: Union[paddle.Tensor, list[paddle.Tensor]], + hidden_states: paddle.Tensor | list[paddle.Tensor], pooling_metadata: PoolingMetadata, ) -> PoolerOutput: pooled_data = self.extract_states(hidden_states, pooling_metadata) - pooling_params = get_pooling_params(pooling_metadata) - assert len(pooled_data) == len(pooling_params) - - pooled_data = [self.head(d, p) for d, p in zip(pooled_data, pooling_params)] + pooled_data = self.head(pooled_data, pooling_metadata) return pooled_data diff --git a/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py b/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py index 2e357579bcb..5e3351ca326 100644 --- a/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py +++ b/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py @@ -348,12 +348,17 @@ def __init__( prefix=f"{prefix}.mlp", ) + norm_dtype = None + if fd_config.model_config.architectures[0] == "Ernie4_5_VLMoeForProcessRewardModel": + norm_dtype = "float32" + self.input_layernorm = RMSNorm( fd_config, hidden_size=fd_config.model_config.hidden_size, eps=fd_config.model_config.rms_norm_eps, prefix=f"{prefix}.input_layernorm", layer_id=layer_id, + dtype=norm_dtype, ) self.post_attention_layernorm = RMSNorm( @@ -362,6 +367,7 @@ def __init__( eps=fd_config.model_config.rms_norm_eps, prefix=f"{prefix}.post_attention_layernorm", layer_id=layer_id, + dtype=norm_dtype, ) def load_state_dict(self, state_dict): @@ -540,7 +546,6 @@ def forward( residual, vl_moe_meta, ) - out = self.norm(hidden_states, residual, forward_meta=forward_meta)[0] return out @@ -652,6 +657,7 @@ def load_weights(self, weights_iterator) -> None: ("qkv_proj", "v_proj", None, "v"), ("up_gate_proj", "gate_proj", None, "gate"), ("up_gate_proj", "up_proj", None, "up"), + ("norm.weight", "ernie.norm.weight", None, None), ] text_expert_params_mapping = [] diff --git a/fastdeploy/model_executor/models/ernie_vl_rm.py b/fastdeploy/model_executor/models/ernie_vl_rm.py index cfa29c84512..8491ddd7a8d 100644 --- a/fastdeploy/model_executor/models/ernie_vl_rm.py +++ b/fastdeploy/model_executor/models/ernie_vl_rm.py @@ -57,6 +57,7 @@ def __init__(self, fd_config: FDConfig): ) self.ernie = Ernie4_5_VLModel(fd_config=fd_config) self.head_dtype = paddle.bfloat16 + self.fd_config = fd_config # Persistent buffers for CUDA graphs. if fd_config.graph_opt_config.use_cudagraph: @@ -107,6 +108,7 @@ def forward( image_features: Optional[paddle.Tensor], forward_meta: ForwardMeta, ): + vl_moe_meta = self.ernie.prepare_vl_moe_meta(ids_remove_padding=ids_remove_padding) input_embeddings = self.get_input_embeddings( ids_remove_padding=ids_remove_padding, @@ -114,6 +116,7 @@ def forward( image_token_num=vl_moe_meta.image_token_num.item(), ) + if forward_meta.step_use_cudagraph: self._decoder_input_embeddings.copy_(input_embeddings, False) input_embeddings = self._decoder_input_embeddings @@ -124,18 +127,22 @@ def forward( forward_meta=forward_meta, vl_moe_meta=vl_moe_meta, ) + + if isinstance(hidden_states, tuple): + hidden_states = hidden_states[0] + hidden_states = hidden_states.to(self.head_dtype) logits = self.rm_head(hidden_states) - return logits + return logits.cast("float32") @ModelRegistry.register_model_class( architecture="Ernie4_5_VLMoeForProcessRewardModel", module_name="ernie_vl_rm", - category=[ModelCategory.REWARD], - primary_use=ModelCategory.REWARD, + category=ModelCategory.REWARD | ModelCategory.MULTIMODAL, + primary_use=ModelCategory.REWARD | ModelCategory.MULTIMODAL, ) -@default_pooling_type("ALL") +@default_pooling_type("LAST") class Ernie4_5_VLMoeForProcessRewardModel(Ernie4_5_VLMoeRewardBaseModel): def __init__(self, fd_config: FDConfig): @@ -147,7 +154,12 @@ def __init__(self, fd_config: FDConfig): pooler_config = fd_config.model_config.pooler_config assert pooler_config is not None - self.pooler = DispatchPooler({"encode": Pooler.for_encode(pooler_config)}) + self.pooler = DispatchPooler( + { + "encode": Pooler.for_encode(pooler_config, fd_config.model_config), + "embed": Pooler.for_embed(pooler_config, fd_config.model_config), + }, + ) self.process_weights_before_loading_fn = process_weights_before_loading(skip_prefixes=["lm_head"]) @@ -159,4 +171,5 @@ def name(self): @paddle.no_grad() def load_weights(self, weights_iterator): # Filter out lm_head weights of Ernie4_5_VLMoeForConditionalGeneration + Ernie4_5_VLMoeForConditionalGeneration.load_weights(self, weights_iterator) diff --git a/fastdeploy/model_executor/models/model_base.py b/fastdeploy/model_executor/models/model_base.py index b81606bae15..2103c3a0c6d 100644 --- a/fastdeploy/model_executor/models/model_base.py +++ b/fastdeploy/model_executor/models/model_base.py @@ -58,7 +58,7 @@ def from_model_cls( is_text_generation=ModelCategory.TEXT_GENERATION in category, is_multimodal=ModelCategory.MULTIMODAL in category, is_reasoning=ModelCategory.REASONING in category, - is_pooling=ModelCategory.EMBEDDING in category, + is_pooling=(ModelCategory.EMBEDDING in category) or (ModelCategory.REWARD in category), default_pooling_type=get_default_pooling_type(model_cls), module_path=module_path, ) diff --git a/fastdeploy/model_executor/pre_and_post_process.py b/fastdeploy/model_executor/pre_and_post_process.py index d2c82e2afaa..ac2436fd253 100644 --- a/fastdeploy/model_executor/pre_and_post_process.py +++ b/fastdeploy/model_executor/pre_and_post_process.py @@ -27,8 +27,6 @@ if current_platform.is_iluvatar(): from fastdeploy.model_executor.ops.iluvatar import ( get_padding_offset, - limit_thinking_content_length_v1, - limit_thinking_content_length_v2, save_output, set_stop_value_multi_ends, step_paddle, @@ -52,12 +50,8 @@ elif current_platform.is_maca(): from fastdeploy.model_executor.ops.gpu import ( get_padding_offset, - limit_thinking_content_length_v1, - limit_thinking_content_length_v2, save_output, set_stop_value_multi_ends, - speculate_limit_thinking_content_length_v1, - speculate_limit_thinking_content_length_v2, step_paddle, update_inputs, update_inputs_v1, @@ -79,7 +73,6 @@ speculate_step_paddle, speculate_step_system_cache, speculate_update, - speculate_set_stop_value_multi_seqs, step_paddle, step_system_cache, update_inputs, @@ -94,7 +87,7 @@ from fastdeploy.output.pooler import PoolerOutput, PoolingSequenceGroupOutput from fastdeploy.output.stream_transfer_data import DecoderState, StreamTransferData -from fastdeploy.worker.output import LogprobsTensors, ModelOutputData, SamplerOutput +from fastdeploy.worker.output import ModelOutputData, ModelRunnerOutput, SamplerOutput DISABLE_RECOVER = envs.FD_DISABLED_RECOVER == "1" @@ -105,8 +98,6 @@ def limit_thinking_content_length( max_think_lens: paddle.Tensor, step_idx: paddle.Tensor, limit_think_status: paddle.Tensor, - stop_flags: paddle.Tensor, - eos_token_ids: paddle.Tensor, think_end_id: int, line_break_id: int = None, ): @@ -117,8 +108,6 @@ def limit_thinking_content_length( max_think_lens, step_idx, limit_think_status, - stop_flags, - eos_token_ids, # 处理由于模型效果问题导致思考过程中输出eos token的问题 think_end_id, ) elif limit_strategy == "\n\n\n": @@ -129,7 +118,6 @@ def limit_thinking_content_length( max_think_lens, step_idx, limit_think_status, - stop_flags, think_end_id, line_break_id, ) @@ -145,8 +133,6 @@ def speculate_limit_thinking_content_length( limit_think_status: paddle.Tensor, accept_num: paddle.Tensor, seq_lens_decoder: paddle.Tensor, - stop_flags: paddle.Tensor, - eos_token_ids: paddle.Tensor, think_end_id: int, line_break_id: int = None, ): @@ -159,8 +145,6 @@ def speculate_limit_thinking_content_length( limit_think_status, accept_num, seq_lens_decoder, - stop_flags, - eos_token_ids, # 处理由于模型效果问题导致思考过程中输出eos token的问题 think_end_id, ) elif limit_strategy == "\n\n\n": @@ -173,7 +157,6 @@ def speculate_limit_thinking_content_length( limit_think_status, accept_num, seq_lens_decoder, - stop_flags, think_end_id, line_break_id, ) @@ -204,27 +187,10 @@ def pre_process( cu_seqlens_q: cu_seqlens_k: """ - token_num = paddle.sum(seq_lens_this_time) - - specific_platform = current_platform.is_cuda() or current_platform.is_maca() or current_platform.is_iluvatar() - if specific_platform and not speculative_decoding: - # Note(ZKK): This case's code is very simple! - ids_remove_padding, batch_id_per_token, cu_seqlens_q, cu_seqlens_k = get_padding_offset( - input_ids, token_num, seq_lens_this_time - ) - - return ( - ids_remove_padding, - batch_id_per_token, - cu_seqlens_q, - cu_seqlens_k, - None, - None, - ) - # Remove padding max_len = input_ids.shape[1] cum_offsets_now = paddle.cumsum(max_len - seq_lens_this_time, dtype="int32") + token_num = paddle.sum(seq_lens_this_time) output_padding_offset = None output_cum_offsets = None if speculative_decoding: @@ -262,7 +228,7 @@ def pre_process( batch_id_per_token, cu_seqlens_q, cu_seqlens_k, - ) = get_padding_offset(input_ids, cum_offsets_now, token_num, seq_lens_this_time) + ) = get_padding_offset(input_ids, token_num, seq_lens_this_time) return ( ids_remove_padding, batch_id_per_token, @@ -273,12 +239,7 @@ def pre_process( ) -def _build_stream_transfer_data( - output_tokens: paddle.Tensor, - pooler_outputs: List[PoolingSequenceGroupOutput] = None, - logprobs: Optional[LogprobsTensors] = None, - prompt_logprobs_list: Optional[LogprobsTensors] = None, -): +def _build_stream_transfer_data(output_tokens: np.ndarray, pooler_outputs: List[PoolingSequenceGroupOutput] = None): """Split output_tokens and output""" stream_transfer_datas = [] @@ -291,14 +252,11 @@ def _build_stream_transfer_data( stream_transfer_data = StreamTransferData( decoder_state=DecoderState.TEXT, tokens=output_token_per_sample, batch_id=bid ) - if logprobs: - logprobs = logprobs.slice_rows(bid, bid + 1) - stream_transfer_data.logprobs = logprobs - if prompt_logprobs_list: - stream_transfer_data.prompt_logprobs = prompt_logprobs_list[bid] stream_transfer_datas.append(stream_transfer_data) elif pooler_outputs is not None: for bid, pooler_output in enumerate(pooler_outputs): + if pooler_output is None: + continue if pooler_output.dtype == paddle.bfloat16: pooler_output = pooler_output.astype("float32") @@ -321,7 +279,7 @@ def post_process_normal( async_output_queue: queue.Queue = None, think_end_id: int = -1, line_break_id: int = -1, -): +) -> ModelRunnerOutput: """Post-processing steps after completing a single token generation.""" if think_end_id > 0: limit_thinking_content_length( @@ -330,8 +288,6 @@ def post_process_normal( max_think_lens=share_inputs["max_think_lens"], step_idx=share_inputs["step_idx"], limit_think_status=share_inputs["limit_think_status"], - stop_flags=share_inputs["stop_flags"], - eos_token_ids=share_inputs["eos_token_id"], think_end_id=think_end_id, line_break_id=line_break_id, ) @@ -420,31 +376,27 @@ def post_process_normal( # 3. Transmit the model's output and stop generation signal via message queue. # In the future, we will abandon this approach. if not skip_save_output: - if envs.FD_USE_GET_SAVE_OUTPUT_V1: - if save_each_rank or model_output.mp_rank == 0: - output = _build_stream_transfer_data( - sampler_output.sampled_token_ids, - logprobs=sampler_output.logprobs_tensors, - prompt_logprobs_list=model_output.prompt_logprobs_list, - ) - async_output_queue.put(output) - else: - if sampler_output.logprobs_tensors is None: + if sampler_output.logprobs_tensors is None: + if envs.FD_USE_GET_SAVE_OUTPUT_V1: + if save_each_rank or model_output.mp_rank == 0: + output = _build_stream_transfer_data(sampler_output.sampled_token_ids) + async_output_queue.put(output) + else: save_output( sampler_output.sampled_token_ids, model_output.not_need_stop, model_output.mp_rank, save_each_rank, ) - else: - save_output_topk( - sampler_output.sampled_token_ids, - sampler_output.logprobs_tensors.logprob_token_ids, - sampler_output.logprobs_tensors.logprobs, - sampler_output.logprobs_tensors.selected_token_ranks, - model_output.not_need_stop, - model_output.mp_rank, - ) + else: + save_output_topk( + sampler_output.sampled_token_ids, + sampler_output.logprobs_tensors.logprob_token_ids, + sampler_output.logprobs_tensors.logprobs, + sampler_output.logprobs_tensors.selected_token_ranks, + model_output.not_need_stop, + model_output.mp_rank, + ) def post_process_specualate( @@ -468,17 +420,7 @@ def post_process_specualate( think_end_id=think_end_id, line_break_id=line_break_id, ) - speculate_set_stop_value_multi_seqs( - model_output.accept_tokens, - model_output.accept_num, - model_output.pre_ids, - model_output.step_idx, - model_output.stop_flags, - model_output.seq_lens_this_time, - model_output.stop_token_ids, - model_output.stop_seqs_len, - model_output.eos_token_id, - ) + speculate_update( model_output.seq_lens_encoder, model_output.seq_lens_decoder, @@ -491,7 +433,6 @@ def post_process_specualate( model_output.seq_lens_this_time, model_output.is_block_step, model_output.stop_nums, - model_output.mask_rollback, ) if not skip_save_output: @@ -858,9 +799,7 @@ def rebuild_padding( seq_lens_decoder, seq_lens_encoder, output_padding_offset, - first_token_out, max_input_length, - enable_logprob, ) else: raise RuntimeError("Not supported platform") @@ -900,6 +839,7 @@ def post_process_pooling( paddle.ones_like(model_output.stop_flags, dtype="bool"), model_output.stop_flags, ) + update_inputs_v1( model_output.stop_flags, model_output.not_need_stop, diff --git a/fastdeploy/scheduler/local_scheduler.py b/fastdeploy/scheduler/local_scheduler.py index 9bfe90a6e29..94d82ff6fa7 100644 --- a/fastdeploy/scheduler/local_scheduler.py +++ b/fastdeploy/scheduler/local_scheduler.py @@ -255,6 +255,7 @@ def get_requests( ) return [] + # scheduler_logger.info(f"available_blocks:{available_blocks}, max_num_batched_tokens:{max_num_batched_tokens} ,batch:{batch}") with self.requests_not_empty: batch_ids = self.requests_not_empty.wait_for( lambda: self.ids[self.ids_read_cursor : self.ids_read_cursor + batch], @@ -265,11 +266,13 @@ def get_requests( required_total_blocks = 0 current_prefill_tokens = 0 long_partial_requests, short_partial_requests = 0, 0 + scheduler_logger.info(f"batch_ids:{batch_ids}") for request_id in batch_ids: request = self.requests[request_id] required_input_blocks = self.calc_required_blocks(request.prompt_tokens_ids_len, block_size) current_prefill_tokens += request.prompt_tokens_ids_len required_total_blocks += required_input_blocks + reserved_output_blocks + # scheduler_logger.info(f"required_total_blocks:{required_total_blocks},available_blocks:{available_blocks}") if required_total_blocks > available_blocks: break @@ -278,14 +281,20 @@ def get_requests( if request.prompt_tokens_ids_len > self.long_prefill_token_threshold: # 长请求 long_partial_requests += 1 + # scheduler_logger.info(f"long_partial_request:{long_partial_requests}") + # scheduler_logger.info(f"self.max_long_partial_prefills:{self.max_long_partial_prefills}") if long_partial_requests > self.max_long_partial_prefills: break else: short_partial_requests += 1 + # scheduler_logger.info(f"====>prompt_tokens_ids_len:{request.prompt_tokens_ids_len}, short_partial_request:{short_partial_requests}, long_partial_request:{long_partial_requests}, self.max_num_partial_prefills:{self.max_num_partial_prefills}") + if short_partial_requests + long_partial_requests > self.max_num_partial_prefills: break else: + # scheduler_logger.info(f"current_prefill_tokens:{current_prefill_tokens}, max_num_batched_tokens:{max_num_batched_tokens}, len(requests):{len(requests)}") + if current_prefill_tokens > max_num_batched_tokens and len(requests) > 0: break requests.append(request.raw) @@ -308,24 +317,31 @@ def put_results(self, results: List[RequestOutput]): results: List of RequestOutput objects containing results """ scheduler_logger.debug(f"put results: {results}") + # 将ResultOutput对象转换为ScheduledResponse对象 responses: List[ScheduledResponse] = [ScheduledResponse(result) for result in results] + # 过滤出已完成请求的ID列表 finished_responses = [response.request_id for response in responses if response.finished] if len(finished_responses) > 0: scheduler_logger.info(f"Scheduler has received some finished responses: {finished_responses}") - with self.mutex: + with self.mutex: # 获取互斥锁,确保线程安全 + # 将当前步骤的所有响应原始数据添加到批次响应列表中 self.batch_responses_per_step.append([response.raw for response in responses]) - for response in responses: + for response in responses: # 遍历每个响应结果 + # 检查请求ID是否存在于当前活跃任务中 if response.request_id not in self.requests: scheduler_logger.warning(f"Scheduler has received a expired response: {[response.request_id]}") - continue + continue # 如果请求已过期,忽略该响应 + # 如果请求ID尚未存在于响应字典中,创建新的响应列表 if response.request_id not in self.responses: self.responses[response.request_id] = [response] continue scheduler_logger.debug(f"append response {response.raw}") + # 将响应追加到对应请求ID的响应列表中 self.responses[response.request_id].append(response) + # 通知所有等待线程已有新响应到达 self.responses_not_empty.notify_all() def get_results(self) -> Dict[str, List[RequestOutput]]: diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index a63876d4afb..933e0f794d7 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -488,7 +488,11 @@ def _apply_mm_inputs(self, request: Request, multi_vision_inputs: dict, rope_3d_ rope_3d_position_ids["position_ids_offset"].append( position_ids.shape[0] + rope_3d_position_ids["position_ids_offset"][-1] ) - rope_3d_position_ids["max_tokens_lst"].append(request.get("max_tokens", 2048)) + + if self.is_pooling_model: + rope_3d_position_ids["max_tokens_lst"].append(0) + else: + rope_3d_position_ids["max_tokens_lst"].append(request.get("max_tokens", 2048)) def insert_tasks_v1(self, req_dicts: List[Request], num_running_requests: int = None): """ @@ -496,6 +500,8 @@ def insert_tasks_v1(self, req_dicts: List[Request], num_running_requests: int = req_dict: A list of Request dict num_running_requests: batch_size """ + + # NOTE(luotingdan): Lazy initialize kv cache if "caches" not in self.share_inputs: self.initialize_kv_cache() @@ -513,13 +519,17 @@ def insert_tasks_v1(self, req_dicts: List[Request], num_running_requests: int = "position_ids_offset": [0], "max_tokens_lst": [], } - + # logger.info(f"insert_tasks_v1 called with {req_len} requests") + # logger.info(f"seq_lens_this_time_buffer before: {self.seq_lens_this_time_buffer}") + # logger.info(f"req_dicts:{req_dicts}") for i in range(req_len): request = req_dicts[i] idx = request.idx + # logger.info(f"request:{request}") if hasattr(request, "pooling_params") and request.pooling_params is not None: batch_pooling_params.append(request.pooling_params) + request.pooling_params.task = "embed" if request.task_type.value == RequestType.PREFILL.value: # prefill task prefill_start_index = request.prefill_start_index @@ -591,7 +601,6 @@ def insert_tasks_v1(self, req_dicts: List[Request], num_running_requests: int = has_decode_task = True continue else: # preempted task - logger.info(f"Handle preempted request {request} at idx {idx}") self.share_inputs["block_tables"][idx : idx + 1, :] = -1 self.share_inputs["stop_flags"][idx : idx + 1] = True self.seq_lens_this_time_buffer[idx : idx + 1] = 0 @@ -2127,10 +2136,21 @@ class at the server level, which is too granular for ModelRunner. model_output = model_output[: self.real_token_num] prompt_logprobs_list = self._get_prompt_logprobs_list(model_output) - + if self.is_pooling_model: - hidden_states = model_output + hidden_states = rebuild_padding( + model_output, + self.share_inputs["cu_seqlens_q"], + self.share_inputs["seq_lens_this_time"], + self.share_inputs["seq_lens_decoder"], + self.share_inputs["seq_lens_encoder"], + (self.share_inputs["output_padding_offset"] if self.speculative_decoding else None), + self.model_config.max_model_len, + ) + # print("rebuild_padding后的hidden_states",hidden_states) + # hidden_states = model_output pooler_output = self._pool(hidden_states, num_running_requests) + # print("gpu_model的pooler_output",pooler_output) model_output_data = ModelOutputData( next_tokens=self.share_inputs["next_tokens"], @@ -2338,7 +2358,10 @@ def _pool(self, hidden_states: paddle.Tensor, num_running_requests: int) -> Opti num_scheduled_tokens = int(self.share_inputs["seq_lens_this_time"][:num_running_requests].sum()) + # print("num_scheduled_tokens",num_scheduled_tokens) + # print("hidden_states前面",hidden_states) hidden_states = hidden_states[:num_scheduled_tokens] + # print("hidden_states取前面",hidden_states) prompt_lens = self.share_inputs["prompt_lens"][:num_running_requests] prompt_token_ids = self.share_inputs["prompt_ids"] @@ -2351,14 +2374,24 @@ def _pool(self, hidden_states: paddle.Tensor, num_running_requests: int) -> Opti num_scheduled_tokens_list = [ int(self.share_inputs["seq_lens_this_time"][i]) for i in range(num_running_requests) ] + device_str = "gpu" if hidden_states.place.is_gpu_place() else "cpu" pooling_metadata.build_pooling_cursor(num_scheduled_tokens_list, device=device_str) raw_pooler_output = self.model.pooler(hidden_states=hidden_states, pooling_metadata=pooling_metadata) + seq_lens_cpu = self.share_inputs["seq_lens_this_time"][:num_running_requests] pooler_output: list[Optional[paddle.Tensor]] = [] + + # print("seq_lens_cpu",seq_lens_cpu) + # print("raw_pooler_output",raw_pooler_output) + for raw_output, seq_len, prompt_len in zip(raw_pooler_output, seq_lens_cpu, pooling_metadata.prompt_lens): + # for seq_len, prompt_len in zip(seq_lens_cpu, pooling_metadata.prompt_lens): + # print("prompt_len",prompt_len) + # print("seq_len",seq_len) output = raw_output.data if int(seq_len) == int(prompt_len) else None + # output = raw_pooler_output[0].data if int(seq_len) == int(prompt_len) else None pooler_output.append(output) pooler_output = PoolerOutput( diff --git a/tests/pooling/test_Ernie4_5_reward_serving.py b/tests/pooling/test_Ernie4_5_reward_serving.py new file mode 100644 index 00000000000..9181291b275 --- /dev/null +++ b/tests/pooling/test_Ernie4_5_reward_serving.py @@ -0,0 +1,257 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +import signal +import socket +import subprocess +import sys +import time + +import pytest +import requests + +# Read ports from environment variables +FD_API_PORT = int(os.getenv("FD_API_PORT", 8189)) +FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8134)) +FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8234)) +FD_CACHE_QUEUE_PORT = int(os.getenv("FD_CACHE_QUEUE_PORT", 8334)) + +PORTS_TO_CLEAN = [FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT, FD_CACHE_QUEUE_PORT] + + +def is_port_open(host: str, port: int, timeout=1.0): + """Check if a TCP port is open.""" + try: + with socket.create_connection((host, port), timeout): + return True + except Exception: + return False + + +def kill_process_on_port(port: int): + """Kill processes listening on the given port.""" + try: + output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip() + for pid in output.splitlines(): + os.kill(int(pid), signal.SIGKILL) + print(f"Killed process on port {port}, pid={pid}") + except subprocess.CalledProcessError: + pass + + +def clean_ports(): + """Clean all ports in PORTS_TO_CLEAN.""" + for port in PORTS_TO_CLEAN: + kill_process_on_port(port) + time.sleep(2) + + +@pytest.fixture(scope="session", autouse=True) +def setup_and_run_reward_server(): + """ + Start reward model API server for testing. + """ + print("Pre-test port cleanup...") + clean_ports() + + base_path = os.getenv("MODEL_PATH") + if base_path: + model_path = os.path.join(base_path, "RM_v1008_5") + else: + model_path = "./RM_v1008_5" + + if not os.path.exists(model_path): + raise FileNotFoundError(f"Model path not found: {model_path}") + + log_path = "reward_server.log" + cmd = [ + sys.executable, + "-m", + "fastdeploy.entrypoints.openai.api_server", + "--model", + model_path, + "--port", + str(FD_API_PORT), + "--tensor-parallel-size", + "2", + "--engine-worker-queue-port", + str(FD_ENGINE_QUEUE_PORT), + "--metrics-port", + str(FD_METRICS_PORT), + "--cache-queue-port", + str(FD_CACHE_QUEUE_PORT), + "--max-model-len", + "8192", + "--max-num-seqs", + "256", + "--graph-optimization-config", + '{"use_cudagraph":false}', + "--runner", + "pooling", + "--convert", + "embed", + ] + + with open(log_path, "w") as logfile: + process = subprocess.Popen( + cmd, + stdout=logfile, + stderr=subprocess.STDOUT, + start_new_session=True, + ) + + # Wait for server to start (up to 480 seconds) + for _ in range(300): + if is_port_open("127.0.0.1", FD_API_PORT): + print(f"reward API server is up on port {FD_API_PORT}") + break + time.sleep(1) + else: + print("reward API server failed to start. Cleaning up...") + try: + os.killpg(process.pid, signal.SIGTERM) + except Exception as e: + print(f"Failed to kill process group: {e}") + raise RuntimeError(f"reward API server did not start on port {FD_API_PORT}") + + yield + + print("\n===== Post-test reward server cleanup... =====") + try: + os.killpg(process.pid, signal.SIGTERM) + print(f"reward API server (pid={process.pid}) terminated") + except Exception as e: + print(f"Failed to terminate reward API server: {e}") + + +@pytest.fixture(scope="session") +def reward_api_url(): + """Returns the API endpoint URL for reward.""" + return f"http://0.0.0.0:{FD_API_PORT}/v1/reward" + + +@pytest.fixture +def headers(): + """Returns common HTTP request headers.""" + return {"Content-Type": "application/json"} + + +# ========================== +# Test Cases +# ========================== + + +@pytest.fixture +def consistent_payload(): + """ + Returns a fixed payload for reward model consistency testing. + Reward models evaluate user-assistant conversation pairs. + """ + return { + "model": "default", + "messages": [ + {"role": "user", "content": [{"type": "text", "text": "北京天安门在哪里?"}]}, + { + "role": "assistant", + "content": [ + {"type": "text", "text": "北京天安门位于中国北京市中心,天安门广场北端,故宫博物院的南门。"} + ], + }, + ], + "user": "test-user-123", + } + + +def save_score_baseline(score: float, baseline_file: str): + """ + Save reward score to baseline file. + """ + baseline_data = {"score": score} + with open(baseline_file, "w", encoding="utf-8") as f: + json.dump(baseline_data, f, indent=2) + print(f"Baseline saved to: {baseline_file}") + + +def check_score_against_baseline(current_score: float, baseline_file: str, threshold: float = 0.01): + """ + Check reward score against baseline file. + """ + try: + with open(baseline_file, "r", encoding="utf-8") as f: + baseline_data = json.load(f) + baseline_score = baseline_data["score"] + except FileNotFoundError: + print(f"Baseline file not found: {baseline_file}. Saving current as baseline.") + save_score_baseline(current_score, baseline_file) + return + + diff = abs(current_score - baseline_score) + print(f"Score Difference: {diff:.6f} (Current: {current_score}, Baseline: {baseline_score})") + + if diff >= threshold: + temp_file = f"{baseline_file}.current" + save_score_baseline(current_score, temp_file) + raise AssertionError( + f"Score differs from baseline by too much (diff={diff:.6f} >= {threshold}):\n" + f"Current score saved to: {temp_file}" + ) + + +def test_reward_model(reward_api_url, headers): + """Test reward model scoring using the chat-style payload.""" + + payload = { + "model": "default", + "messages": [ + {"role": "user", "content": [{"type": "text", "text": "北京天安门在哪里?"}]}, + {"role": "assistant", "content": [{"type": "text", "text": "北京天安门在中国北京故宫的前面。"}]}, + ], + "user": "user-123", + } + + print(f"\n=== Sending request to {reward_api_url} ===") + + # 发送HTTP请求 + response = requests.post(reward_api_url, headers=headers, json=payload, timeout=30) + + assert response.status_code == 200, f"API request failed with status {response.status_code}: {response.text}" + + result = response.json() + print(f"Response: {json.dumps(result, indent=2, ensure_ascii=False)}") + + assert "data" in result, f"Response missing 'data' field. Got: {result}" + assert len(result["data"]) > 0, "Response 'data' is empty" + + first_item = result["data"][0] + assert "score" in first_item, f"Response data item missing 'score' field. Got: {first_item}" + + score_list = first_item["score"] + assert isinstance(score_list, list), f"Expected 'score' to be a list, got {type(score_list)}" + assert len(score_list) > 0, "Score list is empty" + + score = float(score_list[0]) + + print(f"✓ Reward Score: {score}") + + base_path = os.getenv("MODEL_PATH", "") + baseline_filename = "reward_score_baseline.json" + + if base_path: + baseline_file = os.path.join(base_path, baseline_filename) + else: + baseline_file = baseline_filename + + check_score_against_baseline(score, baseline_file, threshold=0.0001) diff --git a/tests/pooling/test_Qwen3-Embedding_serving.py b/tests/pooling/test_Qwen3-Embedding_serving.py index 69e93759386..910f41671d5 100644 --- a/tests/pooling/test_Qwen3-Embedding_serving.py +++ b/tests/pooling/test_Qwen3-Embedding_serving.py @@ -79,7 +79,7 @@ def setup_and_run_embedding_server(): model_path = "./Qwen3-Embedding-0.6B" if not os.path.exists(model_path): - pytest.skip(f"Model path not found: {model_path}") + raise FileNotFoundError(f"Model path not found: {model_path}") log_path = "embedding_server.log" cmd = [