From 16b9399b6674cb0a617cd9aa851c246543976ee2 Mon Sep 17 00:00:00 2001 From: lizexu <2694294196@qq.com> Date: Tue, 25 Nov 2025 15:17:16 +0800 Subject: [PATCH 01/26] Your commit message here --- fastdeploy/config.py | 5 +- fastdeploy/engine/args_utils.py | 3 + fastdeploy/engine/pooling_params.py | 2 +- fastdeploy/engine/request.py | 12 +- fastdeploy/entrypoints/openai/protocol.py | 10 -- .../entrypoints/openai/serving_engine.py | 1 - .../model_executor/layers/pool/metadata.py | 16 +-- fastdeploy/model_executor/layers/pooler.py | 18 ++- .../models/ernie4_5_vl/ernie4_5_vl_moe.py | 8 +- .../model_executor/models/ernie_vl_rm.py | 36 +++--- .../model_executor/models/model_base.py | 2 +- .../model_executor/pre_and_post_process.py | 106 ++++-------------- fastdeploy/worker/gpu_model_runner.py | 10 +- 13 files changed, 94 insertions(+), 135 deletions(-) diff --git a/fastdeploy/config.py b/fastdeploy/config.py index 91d2bae64a5..4b4c6254c75 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -292,7 +292,7 @@ def override_name_from_config(self): self.tensor_parallel_size = self.infer_model_mp_num del self.infer_model_mp_num - if hasattr(self, "num_hidden_layers"): + if hasattr(self, "num_hidden_layers") and self.runner != "pooling": if hasattr(self, "remove_tail_layer"): if self.remove_tail_layer is True: self.num_hidden_layers -= 1 @@ -1530,6 +1530,9 @@ def __init__( self.max_long_partial_prefills = max_long_partial_prefills self.long_prefill_token_threshold = long_prefill_token_threshold + if self.model_config.runner == "pooling": + self.scheduler_config.max_num_batched_tokens = self.model_config.max_model_len + if envs.FD_FOR_TORCH_MODEL_FORMAT: self.model_config.model_format = "torch" diff --git a/fastdeploy/engine/args_utils.py b/fastdeploy/engine/args_utils.py index 05781352d77..3c62802f31a 100644 --- a/fastdeploy/engine/args_utils.py +++ b/fastdeploy/engine/args_utils.py @@ -533,6 +533,9 @@ def __post_init__(self): self.enable_prefix_caching = False self.max_encoder_cache = 0 + if self.runner == "pooling" and self.enable_prefix_caching: + self.enable_prefix_caching = False + @staticmethod def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser: """ diff --git a/fastdeploy/engine/pooling_params.py b/fastdeploy/engine/pooling_params.py index 93192b6ecc5..7f831412982 100644 --- a/fastdeploy/engine/pooling_params.py +++ b/fastdeploy/engine/pooling_params.py @@ -164,7 +164,7 @@ def _set_default_parameters(self, model_config: Optional["ModelConfig"]): self.softmax = True elif self.task == "reward": if self.normalize is None: - self.normalize = True + self.normalize = False else: raise ValueError(f"Unknown pooling task: {self.task}") diff --git a/fastdeploy/engine/request.py b/fastdeploy/engine/request.py index d46ec958244..4acf2a50417 100644 --- a/fastdeploy/engine/request.py +++ b/fastdeploy/engine/request.py @@ -24,7 +24,6 @@ import numpy as np from typing_extensions import TypeVar -from fastdeploy import envs from fastdeploy.engine.pooling_params import PoolingParams from fastdeploy.engine.sampling_params import SamplingParams from fastdeploy.entrypoints.openai.protocol import ToolCall @@ -145,7 +144,10 @@ def __init__( self.multimodal_data = multimodal_data self.multimodal_img_boundaries = None - self.enable_thinking = enable_thinking + if pooling_params is not None: + self.enable_thinking = False + else: + self.enable_thinking = True self.reasoning_max_tokens = reasoning_max_tokens self.trace_carrier = trace_carrier @@ -190,6 +192,10 @@ def from_dict(cls, d: dict): pooling_params = PoolingParams.from_dict(d["pooling_params"]) else: sampling_params = SamplingParams.from_dict(d) + + enable_thinking = d.get("enable_thinking", None) + if pooling_params is not None: + enable_thinking = False return cls( request_id=d["request_id"], prompt=d.get("prompt"), @@ -216,7 +222,7 @@ def from_dict(cls, d: dict): guided_grammar=d.get("guided_grammar", None), structural_tag=d.get("structural_tag", None), guided_json_object=d.get("guided_json_object", None), - enable_thinking=d.get("enable_thinking", None), + enable_thinking=enable_thinking, reasoning_max_tokens=d.get("reasoning_max_tokens", None), trace_carrier=d.get("trace_carrier", {}), chat_template=d.get("chat_template", None), diff --git a/fastdeploy/entrypoints/openai/protocol.py b/fastdeploy/entrypoints/openai/protocol.py index 4a1e4ef647f..18377772713 100644 --- a/fastdeploy/entrypoints/openai/protocol.py +++ b/fastdeploy/entrypoints/openai/protocol.py @@ -889,16 +889,6 @@ class EmbeddingChatRequest(BaseModel): user: Optional[str] = None truncate_prompt_tokens: Optional[Annotated[int, Field(ge=-1)]] = None - # --8<-- [start:chat-embedding-extra-params] - add_generation_prompt: bool = Field( - default=False, - description=( - "If true, the generation prompt will be added to the chat template. " - "This is a parameter used by chat template in tokenizer config of the " - "model." - ), - ) - add_special_tokens: bool = Field( default=False, description=( diff --git a/fastdeploy/entrypoints/openai/serving_engine.py b/fastdeploy/entrypoints/openai/serving_engine.py index bd22df56671..bc8090d97ef 100644 --- a/fastdeploy/entrypoints/openai/serving_engine.py +++ b/fastdeploy/entrypoints/openai/serving_engine.py @@ -256,7 +256,6 @@ def _process_chat_template_kwargs(self, request_dict): chat_template_kwargs.update( { "chat_template": request_dict.get("chat_template"), - "add_generation_prompt": request_dict.get("add_generation_prompt"), "add_stop_sequences": request_dict.get("add_stop_sequences"), } ) diff --git a/fastdeploy/model_executor/layers/pool/metadata.py b/fastdeploy/model_executor/layers/pool/metadata.py index 699800a0f82..c6c6811d7ba 100644 --- a/fastdeploy/model_executor/layers/pool/metadata.py +++ b/fastdeploy/model_executor/layers/pool/metadata.py @@ -15,12 +15,14 @@ """ from dataclasses import dataclass -from typing import Optional +from typing import Optional, Union import paddle from fastdeploy.engine.pooling_params import PoolingParams +Device = Union[paddle.CPUPlace, paddle.CUDAPlace, paddle.XPUPlace] + @dataclass class PoolingCursor: @@ -60,21 +62,21 @@ def __getitem__(self, indices: slice): pooling_cursor=None if self.pooling_cursor is None else self.pooling_cursor[indices], ) - def build_pooling_cursor(self, num_scheduled_tokens: list[int], device: str): + def build_pooling_cursor(self, num_scheduled_tokens: list[int], device: Device): self.pooling_cursor = build_pooling_cursor(num_scheduled_tokens, self.prompt_lens, device) -def build_pooling_cursor(num_scheduled_tokens: list[int], prompt_lens: paddle.Tensor, device: str): +def build_pooling_cursor(num_scheduled_tokens: list[int], prompt_lens: paddle.Tensor, device: Device): assert len(prompt_lens) == len(num_scheduled_tokens) n_seq = len(num_scheduled_tokens) index = list(range(n_seq)) - num_scheduled_tokens = paddle.to_tensor(num_scheduled_tokens) - cumsum = paddle.zeros([n_seq + 1], dtype="int64") + num_scheduled_tokens = paddle.to_tensor(num_scheduled_tokens, dtype="int64", place=paddle.CPUPlace()) + cumsum = paddle.zeros([n_seq + 1], dtype="int64", device=paddle.CPUPlace()) paddle.cumsum(num_scheduled_tokens, axis=0, out=cumsum[1:]) - if device == "gpu": - cumsum_device = cumsum.cuda() + if isinstance(device, paddle.CUDAPlace): + cumsum_device = paddle.assign(cumsum).cuda(device.get_device_id()) else: cumsum_device = cumsum return PoolingCursor( diff --git a/fastdeploy/model_executor/layers/pooler.py b/fastdeploy/model_executor/layers/pooler.py index 0266987a8fa..22e9baa898e 100644 --- a/fastdeploy/model_executor/layers/pooler.py +++ b/fastdeploy/model_executor/layers/pooler.py @@ -78,7 +78,6 @@ def get_pooling_params(pooling_metadata: PoolingMetadata) -> list[PoolingParams] def get_tasks(pooling_metadata: PoolingMetadata) -> list[PoolingTask]: pooling_params = get_pooling_params(pooling_metadata) - tasks: list[PoolingTask] = [task for pooling_param in pooling_params if (task := pooling_param.task) is not None] assert len(pooling_params) == len(tasks) @@ -108,7 +107,7 @@ class Pooler(nn.Layer, ABC): @staticmethod def for_encode(pooler_config: PoolerConfig, model_config: Optional["ModelConfig"] = None): if pooler_config.pooling_type == "STEP": - return StepPooler() + return StepPooler(model_config) resolved_config = ResolvedPoolingConfig(task="encode", pooling_type=PoolingType.ALL) return SimplePooler.from_config(resolved_config, model_config) @@ -274,6 +273,7 @@ def _proj(x: paddle.Tensor) -> paddle.Tensor: pooled_data = [vecs if d is None else vecs[..., :d] for vecs, d in zip(pooled_data, dimensions_list)] # for normalize flags = [p.normalize for p in pooling_params] + if len(set(flags)) == 1: if flags[0]: pooled_data = self.activation(pooled_data) @@ -293,7 +293,6 @@ def __init__(self, model_config: Optional["ModelConfig"] = None) -> None: def forward(self, pooled_data: Union[list[paddle.Tensor], paddle.Tensor], pooling_metadata: PoolingMetadata): pooling_params = get_pooling_params(pooling_metadata) - # for softmax flags = [p.softmax for p in pooling_params] if len(set(flags)) == 1: if flags[0]: @@ -301,6 +300,7 @@ def forward(self, pooled_data: Union[list[paddle.Tensor], paddle.Tensor], poolin else: pooled_data = [self.activation(vecs) if f else vecs for vecs, f in zip(pooled_data, flags)] + print(f"==RewardPoolerHead end==>pooled_data:{pooled_data}, ===>{pooling_metadata}") return pooled_data @@ -366,8 +366,8 @@ def forward_all( ) -> Union[list[paddle.Tensor], paddle.Tensor]: assert not pooling_cursor.is_partial_prefill(), "partial prefill not supported with ALL pooling" - hidden_states_lst = list(hidden_states.split(pooling_cursor.num_scheduled_tokens_cpu.tolist())) + return [hidden_states_lst[i] for i in pooling_cursor.index] @@ -416,11 +416,12 @@ def forward_all( class StepPooler(Pooler): def __init__( self, + model_config: ModelConfig, ) -> None: super().__init__() self.pooling = AllPool() - self.head = RewardPoolerHead() + self.head = RewardPoolerHead(model_config) def extract_states( self, @@ -455,14 +456,11 @@ def get_pooling_updates(self, task: PoolingTask) -> PoolingParamsUpdate: def forward( self, - hidden_states: Union[paddle.Tensor, list[paddle.Tensor]], + hidden_states: paddle.Tensor | list[paddle.Tensor], pooling_metadata: PoolingMetadata, ) -> PoolerOutput: pooled_data = self.extract_states(hidden_states, pooling_metadata) - pooling_params = get_pooling_params(pooling_metadata) - assert len(pooled_data) == len(pooling_params) - - pooled_data = [self.head(d, p) for d, p in zip(pooled_data, pooling_params)] + pooled_data = self.head(pooled_data, pooling_metadata) return pooled_data diff --git a/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py b/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py index 026d8c7d736..19238d30f9c 100644 --- a/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py +++ b/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py @@ -347,12 +347,17 @@ def __init__( prefix=f"{prefix}.mlp", ) + norm_dtype = None + if fd_config.model_config.architectures[0] == "Ernie4_5_VLMoeForProcessRewardModel": + norm_dtype = "float32" + self.input_layernorm = RMSNorm( fd_config, hidden_size=fd_config.model_config.hidden_size, eps=fd_config.model_config.rms_norm_eps, prefix=f"{prefix}.input_layernorm", layer_id=layer_id, + dtype=norm_dtype, ) self.post_attention_layernorm = RMSNorm( @@ -361,6 +366,7 @@ def __init__( eps=fd_config.model_config.rms_norm_eps, prefix=f"{prefix}.post_attention_layernorm", layer_id=layer_id, + dtype=norm_dtype, ) def load_state_dict(self, state_dict): @@ -539,7 +545,6 @@ def forward( residual, vl_moe_meta, ) - out = self.norm(hidden_states, residual, forward_meta=forward_meta)[0] return out @@ -651,6 +656,7 @@ def load_weights(self, weights_iterator) -> None: ("qkv_proj", "v_proj", None, "v"), ("up_gate_proj", "gate_proj", None, "gate"), ("up_gate_proj", "up_proj", None, "up"), + ("norm.weight", "ernie.norm.weight", None, None), ] text_expert_params_mapping = [] diff --git a/fastdeploy/model_executor/models/ernie_vl_rm.py b/fastdeploy/model_executor/models/ernie_vl_rm.py index cfa29c84512..848502735f6 100644 --- a/fastdeploy/model_executor/models/ernie_vl_rm.py +++ b/fastdeploy/model_executor/models/ernie_vl_rm.py @@ -57,13 +57,13 @@ def __init__(self, fd_config: FDConfig): ) self.ernie = Ernie4_5_VLModel(fd_config=fd_config) self.head_dtype = paddle.bfloat16 + self.fd_config = fd_config # Persistent buffers for CUDA graphs. - if fd_config.graph_opt_config.use_cudagraph: - self._decoder_input_embeddings = paddle.zeros( - [fd_config.graph_opt_config.max_capture_size, fd_config.model_config.hidden_size], - dtype=fd_config.model_config.dtype, - ) + self._input_embeddings = paddle.zeros( + [fd_config.model_config.max_model_len, fd_config.model_config.hidden_size], + dtype=fd_config.model_config.dtype, + ) self.rm_head = nn.Sequential( ( @@ -114,28 +114,30 @@ def forward( image_token_num=vl_moe_meta.image_token_num.item(), ) - if forward_meta.step_use_cudagraph: - self._decoder_input_embeddings.copy_(input_embeddings, False) - input_embeddings = self._decoder_input_embeddings + self._input_embeddings.copy_(input_embeddings, False) hidden_states = self.ernie( - input_embeddings=input_embeddings, + input_embeddings=self._input_embeddings, ids_remove_padding=ids_remove_padding, forward_meta=forward_meta, vl_moe_meta=vl_moe_meta, ) + + if isinstance(hidden_states, tuple): + hidden_states = hidden_states[0] + hidden_states = hidden_states.to(self.head_dtype) logits = self.rm_head(hidden_states) - return logits + return logits.cast("float32") @ModelRegistry.register_model_class( architecture="Ernie4_5_VLMoeForProcessRewardModel", module_name="ernie_vl_rm", - category=[ModelCategory.REWARD], - primary_use=ModelCategory.REWARD, + category=ModelCategory.REWARD | ModelCategory.MULTIMODAL, + primary_use=ModelCategory.REWARD | ModelCategory.MULTIMODAL, ) -@default_pooling_type("ALL") +@default_pooling_type("LAST") class Ernie4_5_VLMoeForProcessRewardModel(Ernie4_5_VLMoeRewardBaseModel): def __init__(self, fd_config: FDConfig): @@ -147,7 +149,12 @@ def __init__(self, fd_config: FDConfig): pooler_config = fd_config.model_config.pooler_config assert pooler_config is not None - self.pooler = DispatchPooler({"encode": Pooler.for_encode(pooler_config)}) + self.pooler = DispatchPooler( + { + "encode": Pooler.for_encode(pooler_config, fd_config.model_config), + "embed": Pooler.for_embed(pooler_config, fd_config.model_config), + }, + ) self.process_weights_before_loading_fn = process_weights_before_loading(skip_prefixes=["lm_head"]) @@ -159,4 +166,5 @@ def name(self): @paddle.no_grad() def load_weights(self, weights_iterator): # Filter out lm_head weights of Ernie4_5_VLMoeForConditionalGeneration + Ernie4_5_VLMoeForConditionalGeneration.load_weights(self, weights_iterator) diff --git a/fastdeploy/model_executor/models/model_base.py b/fastdeploy/model_executor/models/model_base.py index b81606bae15..2103c3a0c6d 100644 --- a/fastdeploy/model_executor/models/model_base.py +++ b/fastdeploy/model_executor/models/model_base.py @@ -58,7 +58,7 @@ def from_model_cls( is_text_generation=ModelCategory.TEXT_GENERATION in category, is_multimodal=ModelCategory.MULTIMODAL in category, is_reasoning=ModelCategory.REASONING in category, - is_pooling=ModelCategory.EMBEDDING in category, + is_pooling=(ModelCategory.EMBEDDING in category) or (ModelCategory.REWARD in category), default_pooling_type=get_default_pooling_type(model_cls), module_path=module_path, ) diff --git a/fastdeploy/model_executor/pre_and_post_process.py b/fastdeploy/model_executor/pre_and_post_process.py index d2c82e2afaa..337161869a7 100644 --- a/fastdeploy/model_executor/pre_and_post_process.py +++ b/fastdeploy/model_executor/pre_and_post_process.py @@ -27,8 +27,6 @@ if current_platform.is_iluvatar(): from fastdeploy.model_executor.ops.iluvatar import ( get_padding_offset, - limit_thinking_content_length_v1, - limit_thinking_content_length_v2, save_output, set_stop_value_multi_ends, step_paddle, @@ -52,12 +50,8 @@ elif current_platform.is_maca(): from fastdeploy.model_executor.ops.gpu import ( get_padding_offset, - limit_thinking_content_length_v1, - limit_thinking_content_length_v2, save_output, set_stop_value_multi_ends, - speculate_limit_thinking_content_length_v1, - speculate_limit_thinking_content_length_v2, step_paddle, update_inputs, update_inputs_v1, @@ -79,7 +73,6 @@ speculate_step_paddle, speculate_step_system_cache, speculate_update, - speculate_set_stop_value_multi_seqs, step_paddle, step_system_cache, update_inputs, @@ -94,7 +87,7 @@ from fastdeploy.output.pooler import PoolerOutput, PoolingSequenceGroupOutput from fastdeploy.output.stream_transfer_data import DecoderState, StreamTransferData -from fastdeploy.worker.output import LogprobsTensors, ModelOutputData, SamplerOutput +from fastdeploy.worker.output import ModelOutputData, ModelRunnerOutput, SamplerOutput DISABLE_RECOVER = envs.FD_DISABLED_RECOVER == "1" @@ -105,8 +98,6 @@ def limit_thinking_content_length( max_think_lens: paddle.Tensor, step_idx: paddle.Tensor, limit_think_status: paddle.Tensor, - stop_flags: paddle.Tensor, - eos_token_ids: paddle.Tensor, think_end_id: int, line_break_id: int = None, ): @@ -117,8 +108,6 @@ def limit_thinking_content_length( max_think_lens, step_idx, limit_think_status, - stop_flags, - eos_token_ids, # 处理由于模型效果问题导致思考过程中输出eos token的问题 think_end_id, ) elif limit_strategy == "\n\n\n": @@ -129,7 +118,6 @@ def limit_thinking_content_length( max_think_lens, step_idx, limit_think_status, - stop_flags, think_end_id, line_break_id, ) @@ -145,8 +133,6 @@ def speculate_limit_thinking_content_length( limit_think_status: paddle.Tensor, accept_num: paddle.Tensor, seq_lens_decoder: paddle.Tensor, - stop_flags: paddle.Tensor, - eos_token_ids: paddle.Tensor, think_end_id: int, line_break_id: int = None, ): @@ -159,8 +145,6 @@ def speculate_limit_thinking_content_length( limit_think_status, accept_num, seq_lens_decoder, - stop_flags, - eos_token_ids, # 处理由于模型效果问题导致思考过程中输出eos token的问题 think_end_id, ) elif limit_strategy == "\n\n\n": @@ -173,7 +157,6 @@ def speculate_limit_thinking_content_length( limit_think_status, accept_num, seq_lens_decoder, - stop_flags, think_end_id, line_break_id, ) @@ -204,27 +187,10 @@ def pre_process( cu_seqlens_q: cu_seqlens_k: """ - token_num = paddle.sum(seq_lens_this_time) - - specific_platform = current_platform.is_cuda() or current_platform.is_maca() or current_platform.is_iluvatar() - if specific_platform and not speculative_decoding: - # Note(ZKK): This case's code is very simple! - ids_remove_padding, batch_id_per_token, cu_seqlens_q, cu_seqlens_k = get_padding_offset( - input_ids, token_num, seq_lens_this_time - ) - - return ( - ids_remove_padding, - batch_id_per_token, - cu_seqlens_q, - cu_seqlens_k, - None, - None, - ) - # Remove padding max_len = input_ids.shape[1] cum_offsets_now = paddle.cumsum(max_len - seq_lens_this_time, dtype="int32") + token_num = paddle.sum(seq_lens_this_time) output_padding_offset = None output_cum_offsets = None if speculative_decoding: @@ -262,7 +228,7 @@ def pre_process( batch_id_per_token, cu_seqlens_q, cu_seqlens_k, - ) = get_padding_offset(input_ids, cum_offsets_now, token_num, seq_lens_this_time) + ) = get_padding_offset(input_ids, token_num, seq_lens_this_time) return ( ids_remove_padding, batch_id_per_token, @@ -273,12 +239,7 @@ def pre_process( ) -def _build_stream_transfer_data( - output_tokens: paddle.Tensor, - pooler_outputs: List[PoolingSequenceGroupOutput] = None, - logprobs: Optional[LogprobsTensors] = None, - prompt_logprobs_list: Optional[LogprobsTensors] = None, -): +def _build_stream_transfer_data(output_tokens: np.ndarray, pooler_outputs: List[PoolingSequenceGroupOutput] = None): """Split output_tokens and output""" stream_transfer_datas = [] @@ -291,11 +252,6 @@ def _build_stream_transfer_data( stream_transfer_data = StreamTransferData( decoder_state=DecoderState.TEXT, tokens=output_token_per_sample, batch_id=bid ) - if logprobs: - logprobs = logprobs.slice_rows(bid, bid + 1) - stream_transfer_data.logprobs = logprobs - if prompt_logprobs_list: - stream_transfer_data.prompt_logprobs = prompt_logprobs_list[bid] stream_transfer_datas.append(stream_transfer_data) elif pooler_outputs is not None: for bid, pooler_output in enumerate(pooler_outputs): @@ -321,7 +277,7 @@ def post_process_normal( async_output_queue: queue.Queue = None, think_end_id: int = -1, line_break_id: int = -1, -): +) -> ModelRunnerOutput: """Post-processing steps after completing a single token generation.""" if think_end_id > 0: limit_thinking_content_length( @@ -330,8 +286,6 @@ def post_process_normal( max_think_lens=share_inputs["max_think_lens"], step_idx=share_inputs["step_idx"], limit_think_status=share_inputs["limit_think_status"], - stop_flags=share_inputs["stop_flags"], - eos_token_ids=share_inputs["eos_token_id"], think_end_id=think_end_id, line_break_id=line_break_id, ) @@ -420,31 +374,27 @@ def post_process_normal( # 3. Transmit the model's output and stop generation signal via message queue. # In the future, we will abandon this approach. if not skip_save_output: - if envs.FD_USE_GET_SAVE_OUTPUT_V1: - if save_each_rank or model_output.mp_rank == 0: - output = _build_stream_transfer_data( - sampler_output.sampled_token_ids, - logprobs=sampler_output.logprobs_tensors, - prompt_logprobs_list=model_output.prompt_logprobs_list, - ) - async_output_queue.put(output) - else: - if sampler_output.logprobs_tensors is None: + if sampler_output.logprobs_tensors is None: + if envs.FD_USE_GET_SAVE_OUTPUT_V1: + if save_each_rank or model_output.mp_rank == 0: + output = _build_stream_transfer_data(sampler_output.sampled_token_ids) + async_output_queue.put(output) + else: save_output( sampler_output.sampled_token_ids, model_output.not_need_stop, model_output.mp_rank, save_each_rank, ) - else: - save_output_topk( - sampler_output.sampled_token_ids, - sampler_output.logprobs_tensors.logprob_token_ids, - sampler_output.logprobs_tensors.logprobs, - sampler_output.logprobs_tensors.selected_token_ranks, - model_output.not_need_stop, - model_output.mp_rank, - ) + else: + save_output_topk( + sampler_output.sampled_token_ids, + sampler_output.logprobs_tensors.logprob_token_ids, + sampler_output.logprobs_tensors.logprobs, + sampler_output.logprobs_tensors.selected_token_ranks, + model_output.not_need_stop, + model_output.mp_rank, + ) def post_process_specualate( @@ -468,17 +418,7 @@ def post_process_specualate( think_end_id=think_end_id, line_break_id=line_break_id, ) - speculate_set_stop_value_multi_seqs( - model_output.accept_tokens, - model_output.accept_num, - model_output.pre_ids, - model_output.step_idx, - model_output.stop_flags, - model_output.seq_lens_this_time, - model_output.stop_token_ids, - model_output.stop_seqs_len, - model_output.eos_token_id, - ) + speculate_update( model_output.seq_lens_encoder, model_output.seq_lens_decoder, @@ -491,7 +431,6 @@ def post_process_specualate( model_output.seq_lens_this_time, model_output.is_block_step, model_output.stop_nums, - model_output.mask_rollback, ) if not skip_save_output: @@ -858,9 +797,7 @@ def rebuild_padding( seq_lens_decoder, seq_lens_encoder, output_padding_offset, - first_token_out, max_input_length, - enable_logprob, ) else: raise RuntimeError("Not supported platform") @@ -900,6 +837,7 @@ def post_process_pooling( paddle.ones_like(model_output.stop_flags, dtype="bool"), model_output.stop_flags, ) + update_inputs_v1( model_output.stop_flags, model_output.not_need_stop, diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index a63876d4afb..462da2f6e36 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -488,7 +488,11 @@ def _apply_mm_inputs(self, request: Request, multi_vision_inputs: dict, rope_3d_ rope_3d_position_ids["position_ids_offset"].append( position_ids.shape[0] + rope_3d_position_ids["position_ids_offset"][-1] ) - rope_3d_position_ids["max_tokens_lst"].append(request.get("max_tokens", 2048)) + + if self.is_pooling_model: + rope_3d_position_ids["max_tokens_lst"].append(0) + else: + rope_3d_position_ids["max_tokens_lst"].append(request.get("max_tokens", 2048)) def insert_tasks_v1(self, req_dicts: List[Request], num_running_requests: int = None): """ @@ -520,6 +524,7 @@ def insert_tasks_v1(self, req_dicts: List[Request], num_running_requests: int = if hasattr(request, "pooling_params") and request.pooling_params is not None: batch_pooling_params.append(request.pooling_params) + request.pooling_params.task = "embed" if request.task_type.value == RequestType.PREFILL.value: # prefill task prefill_start_index = request.prefill_start_index @@ -2337,7 +2342,6 @@ class at the server level, which is too granular for ModelRunner. def _pool(self, hidden_states: paddle.Tensor, num_running_requests: int) -> Optional[ModelRunnerOutput]: num_scheduled_tokens = int(self.share_inputs["seq_lens_this_time"][:num_running_requests].sum()) - hidden_states = hidden_states[:num_scheduled_tokens] prompt_lens = self.share_inputs["prompt_lens"][:num_running_requests] @@ -2355,8 +2359,10 @@ def _pool(self, hidden_states: paddle.Tensor, num_running_requests: int) -> Opti pooling_metadata.build_pooling_cursor(num_scheduled_tokens_list, device=device_str) raw_pooler_output = self.model.pooler(hidden_states=hidden_states, pooling_metadata=pooling_metadata) + seq_lens_cpu = self.share_inputs["seq_lens_this_time"][:num_running_requests] pooler_output: list[Optional[paddle.Tensor]] = [] + for raw_output, seq_len, prompt_len in zip(raw_pooler_output, seq_lens_cpu, pooling_metadata.prompt_lens): output = raw_output.data if int(seq_len) == int(prompt_len) else None pooler_output.append(output) From d8a0f526f985b7f3d67d2b07c698c9a63dfcac0c Mon Sep 17 00:00:00 2001 From: lizexu <2694294196@qq.com> Date: Tue, 25 Nov 2025 17:52:09 +0800 Subject: [PATCH 02/26] add test --- tests/pooling/test_Ernie4_5_reward_serving.py | 257 ++++++++++++++++++ tests/pooling/test_Qwen3-Embedding_serving.py | 2 +- 2 files changed, 258 insertions(+), 1 deletion(-) create mode 100644 tests/pooling/test_Ernie4_5_reward_serving.py diff --git a/tests/pooling/test_Ernie4_5_reward_serving.py b/tests/pooling/test_Ernie4_5_reward_serving.py new file mode 100644 index 00000000000..9181291b275 --- /dev/null +++ b/tests/pooling/test_Ernie4_5_reward_serving.py @@ -0,0 +1,257 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +import signal +import socket +import subprocess +import sys +import time + +import pytest +import requests + +# Read ports from environment variables +FD_API_PORT = int(os.getenv("FD_API_PORT", 8189)) +FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8134)) +FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8234)) +FD_CACHE_QUEUE_PORT = int(os.getenv("FD_CACHE_QUEUE_PORT", 8334)) + +PORTS_TO_CLEAN = [FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT, FD_CACHE_QUEUE_PORT] + + +def is_port_open(host: str, port: int, timeout=1.0): + """Check if a TCP port is open.""" + try: + with socket.create_connection((host, port), timeout): + return True + except Exception: + return False + + +def kill_process_on_port(port: int): + """Kill processes listening on the given port.""" + try: + output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip() + for pid in output.splitlines(): + os.kill(int(pid), signal.SIGKILL) + print(f"Killed process on port {port}, pid={pid}") + except subprocess.CalledProcessError: + pass + + +def clean_ports(): + """Clean all ports in PORTS_TO_CLEAN.""" + for port in PORTS_TO_CLEAN: + kill_process_on_port(port) + time.sleep(2) + + +@pytest.fixture(scope="session", autouse=True) +def setup_and_run_reward_server(): + """ + Start reward model API server for testing. + """ + print("Pre-test port cleanup...") + clean_ports() + + base_path = os.getenv("MODEL_PATH") + if base_path: + model_path = os.path.join(base_path, "RM_v1008_5") + else: + model_path = "./RM_v1008_5" + + if not os.path.exists(model_path): + raise FileNotFoundError(f"Model path not found: {model_path}") + + log_path = "reward_server.log" + cmd = [ + sys.executable, + "-m", + "fastdeploy.entrypoints.openai.api_server", + "--model", + model_path, + "--port", + str(FD_API_PORT), + "--tensor-parallel-size", + "2", + "--engine-worker-queue-port", + str(FD_ENGINE_QUEUE_PORT), + "--metrics-port", + str(FD_METRICS_PORT), + "--cache-queue-port", + str(FD_CACHE_QUEUE_PORT), + "--max-model-len", + "8192", + "--max-num-seqs", + "256", + "--graph-optimization-config", + '{"use_cudagraph":false}', + "--runner", + "pooling", + "--convert", + "embed", + ] + + with open(log_path, "w") as logfile: + process = subprocess.Popen( + cmd, + stdout=logfile, + stderr=subprocess.STDOUT, + start_new_session=True, + ) + + # Wait for server to start (up to 480 seconds) + for _ in range(300): + if is_port_open("127.0.0.1", FD_API_PORT): + print(f"reward API server is up on port {FD_API_PORT}") + break + time.sleep(1) + else: + print("reward API server failed to start. Cleaning up...") + try: + os.killpg(process.pid, signal.SIGTERM) + except Exception as e: + print(f"Failed to kill process group: {e}") + raise RuntimeError(f"reward API server did not start on port {FD_API_PORT}") + + yield + + print("\n===== Post-test reward server cleanup... =====") + try: + os.killpg(process.pid, signal.SIGTERM) + print(f"reward API server (pid={process.pid}) terminated") + except Exception as e: + print(f"Failed to terminate reward API server: {e}") + + +@pytest.fixture(scope="session") +def reward_api_url(): + """Returns the API endpoint URL for reward.""" + return f"http://0.0.0.0:{FD_API_PORT}/v1/reward" + + +@pytest.fixture +def headers(): + """Returns common HTTP request headers.""" + return {"Content-Type": "application/json"} + + +# ========================== +# Test Cases +# ========================== + + +@pytest.fixture +def consistent_payload(): + """ + Returns a fixed payload for reward model consistency testing. + Reward models evaluate user-assistant conversation pairs. + """ + return { + "model": "default", + "messages": [ + {"role": "user", "content": [{"type": "text", "text": "北京天安门在哪里?"}]}, + { + "role": "assistant", + "content": [ + {"type": "text", "text": "北京天安门位于中国北京市中心,天安门广场北端,故宫博物院的南门。"} + ], + }, + ], + "user": "test-user-123", + } + + +def save_score_baseline(score: float, baseline_file: str): + """ + Save reward score to baseline file. + """ + baseline_data = {"score": score} + with open(baseline_file, "w", encoding="utf-8") as f: + json.dump(baseline_data, f, indent=2) + print(f"Baseline saved to: {baseline_file}") + + +def check_score_against_baseline(current_score: float, baseline_file: str, threshold: float = 0.01): + """ + Check reward score against baseline file. + """ + try: + with open(baseline_file, "r", encoding="utf-8") as f: + baseline_data = json.load(f) + baseline_score = baseline_data["score"] + except FileNotFoundError: + print(f"Baseline file not found: {baseline_file}. Saving current as baseline.") + save_score_baseline(current_score, baseline_file) + return + + diff = abs(current_score - baseline_score) + print(f"Score Difference: {diff:.6f} (Current: {current_score}, Baseline: {baseline_score})") + + if diff >= threshold: + temp_file = f"{baseline_file}.current" + save_score_baseline(current_score, temp_file) + raise AssertionError( + f"Score differs from baseline by too much (diff={diff:.6f} >= {threshold}):\n" + f"Current score saved to: {temp_file}" + ) + + +def test_reward_model(reward_api_url, headers): + """Test reward model scoring using the chat-style payload.""" + + payload = { + "model": "default", + "messages": [ + {"role": "user", "content": [{"type": "text", "text": "北京天安门在哪里?"}]}, + {"role": "assistant", "content": [{"type": "text", "text": "北京天安门在中国北京故宫的前面。"}]}, + ], + "user": "user-123", + } + + print(f"\n=== Sending request to {reward_api_url} ===") + + # 发送HTTP请求 + response = requests.post(reward_api_url, headers=headers, json=payload, timeout=30) + + assert response.status_code == 200, f"API request failed with status {response.status_code}: {response.text}" + + result = response.json() + print(f"Response: {json.dumps(result, indent=2, ensure_ascii=False)}") + + assert "data" in result, f"Response missing 'data' field. Got: {result}" + assert len(result["data"]) > 0, "Response 'data' is empty" + + first_item = result["data"][0] + assert "score" in first_item, f"Response data item missing 'score' field. Got: {first_item}" + + score_list = first_item["score"] + assert isinstance(score_list, list), f"Expected 'score' to be a list, got {type(score_list)}" + assert len(score_list) > 0, "Score list is empty" + + score = float(score_list[0]) + + print(f"✓ Reward Score: {score}") + + base_path = os.getenv("MODEL_PATH", "") + baseline_filename = "reward_score_baseline.json" + + if base_path: + baseline_file = os.path.join(base_path, baseline_filename) + else: + baseline_file = baseline_filename + + check_score_against_baseline(score, baseline_file, threshold=0.0001) diff --git a/tests/pooling/test_Qwen3-Embedding_serving.py b/tests/pooling/test_Qwen3-Embedding_serving.py index 69e93759386..910f41671d5 100644 --- a/tests/pooling/test_Qwen3-Embedding_serving.py +++ b/tests/pooling/test_Qwen3-Embedding_serving.py @@ -79,7 +79,7 @@ def setup_and_run_embedding_server(): model_path = "./Qwen3-Embedding-0.6B" if not os.path.exists(model_path): - pytest.skip(f"Model path not found: {model_path}") + raise FileNotFoundError(f"Model path not found: {model_path}") log_path = "embedding_server.log" cmd = [ From 63df44bc7d4db60d204aae6427c8753872f9aec3 Mon Sep 17 00:00:00 2001 From: lizexu <2694294196@qq.com> Date: Tue, 25 Nov 2025 17:55:56 +0800 Subject: [PATCH 03/26] update develop --- fastdeploy/model_executor/models/ernie_vl_rm.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/fastdeploy/model_executor/models/ernie_vl_rm.py b/fastdeploy/model_executor/models/ernie_vl_rm.py index 848502735f6..988baceb575 100644 --- a/fastdeploy/model_executor/models/ernie_vl_rm.py +++ b/fastdeploy/model_executor/models/ernie_vl_rm.py @@ -60,10 +60,11 @@ def __init__(self, fd_config: FDConfig): self.fd_config = fd_config # Persistent buffers for CUDA graphs. - self._input_embeddings = paddle.zeros( - [fd_config.model_config.max_model_len, fd_config.model_config.hidden_size], - dtype=fd_config.model_config.dtype, - ) + if fd_config.graph_opt_config.use_cudagraph: + self._decoder_input_embeddings = paddle.zeros( + [fd_config.graph_opt_config.max_capture_size, fd_config.model_config.hidden_size], + dtype=fd_config.model_config.dtype, + ) self.rm_head = nn.Sequential( ( @@ -114,10 +115,12 @@ def forward( image_token_num=vl_moe_meta.image_token_num.item(), ) - self._input_embeddings.copy_(input_embeddings, False) + if forward_meta.step_use_cudagraph: + self._decoder_input_embeddings.copy_(input_embeddings, False) + input_embeddings = self._decoder_input_embeddings hidden_states = self.ernie( - input_embeddings=self._input_embeddings, + input_embeddings=input_embeddings, ids_remove_padding=ids_remove_padding, forward_meta=forward_meta, vl_moe_meta=vl_moe_meta, From 7cca89f2bb35869fd51ed1f7f78851cd9d022f83 Mon Sep 17 00:00:00 2001 From: lizexu <2694294196@qq.com> Date: Tue, 25 Nov 2025 19:23:27 +0800 Subject: [PATCH 04/26] support reward --- fastdeploy/model_executor/layers/pooler.py | 13 ++- fastdeploy/model_executor/models/adapters.py | 1 + .../model_executor/models/ernie_vl_rm.py | 1 + .../model_executor/pre_and_post_process.py | 106 ++++++++++++++---- fastdeploy/worker/gpu_model_runner.py | 1 - tests/pooling/test_Ernie4_5_reward_serving.py | 4 +- 6 files changed, 97 insertions(+), 29 deletions(-) diff --git a/fastdeploy/model_executor/layers/pooler.py b/fastdeploy/model_executor/layers/pooler.py index 22e9baa898e..78f9a0eab39 100644 --- a/fastdeploy/model_executor/layers/pooler.py +++ b/fastdeploy/model_executor/layers/pooler.py @@ -120,6 +120,14 @@ def for_embed(pooler_config: PoolerConfig, model_config: Optional["ModelConfig"] ) return SimplePooler.from_config(resolved_config, model_config) + @staticmethod + def for_reward(pooler_config: PoolerConfig, model_config: Optional["ModelConfig"] = None): + resolved_config = ResolvedPoolingConfig.from_config( + task="reward", + pooler_config=pooler_config, + ) + return SimplePooler.from_config(resolved_config, model_config) + @staticmethod def for_classify( pooler_config: PoolerConfig, @@ -300,7 +308,6 @@ def forward(self, pooled_data: Union[list[paddle.Tensor], paddle.Tensor], poolin else: pooled_data = [self.activation(vecs) if f else vecs for vecs, f in zip(pooled_data, flags)] - print(f"==RewardPoolerHead end==>pooled_data:{pooled_data}, ===>{pooling_metadata}") return pooled_data @@ -345,7 +352,7 @@ def forward( class LastPool(PoolingMethod): def get_supported_tasks(self) -> Set[PoolingTask]: - return {"encode", "embed", "classify", "score"} + return {"encode", "embed", "classify", "score", "reward"} def forward_all( self, @@ -482,7 +489,7 @@ def from_config( pooling = PoolingMethod.from_pooling_type(pooler_config.pooling_type) if pooler_config.task == "embed": head = EmbeddingPoolerHead(model_config) - elif pooler_config.task == "encode": + elif pooler_config.task == "encode" or pooler_config.task == "reward": head = RewardPoolerHead(model_config) else: raise NotImplementedError(f"Unknown task: {pooler_config.task}") diff --git a/fastdeploy/model_executor/models/adapters.py b/fastdeploy/model_executor/models/adapters.py index 7dcfd2c0cde..306bbcd5169 100644 --- a/fastdeploy/model_executor/models/adapters.py +++ b/fastdeploy/model_executor/models/adapters.py @@ -166,6 +166,7 @@ def _init_pooler(self, fd_config, prefix: str = ""): { "encode": Pooler.for_encode(pooler_config, fd_config.model_config), "embed": Pooler.for_embed(pooler_config, fd_config.model_config), + "reward": Pooler.for_reward(pooler_config, fd_config.model_config), }, ) diff --git a/fastdeploy/model_executor/models/ernie_vl_rm.py b/fastdeploy/model_executor/models/ernie_vl_rm.py index 988baceb575..7d801fbdf04 100644 --- a/fastdeploy/model_executor/models/ernie_vl_rm.py +++ b/fastdeploy/model_executor/models/ernie_vl_rm.py @@ -156,6 +156,7 @@ def __init__(self, fd_config: FDConfig): { "encode": Pooler.for_encode(pooler_config, fd_config.model_config), "embed": Pooler.for_embed(pooler_config, fd_config.model_config), + "reward": Pooler.for_reward(pooler_config, fd_config.model_config), }, ) diff --git a/fastdeploy/model_executor/pre_and_post_process.py b/fastdeploy/model_executor/pre_and_post_process.py index 337161869a7..d2c82e2afaa 100644 --- a/fastdeploy/model_executor/pre_and_post_process.py +++ b/fastdeploy/model_executor/pre_and_post_process.py @@ -27,6 +27,8 @@ if current_platform.is_iluvatar(): from fastdeploy.model_executor.ops.iluvatar import ( get_padding_offset, + limit_thinking_content_length_v1, + limit_thinking_content_length_v2, save_output, set_stop_value_multi_ends, step_paddle, @@ -50,8 +52,12 @@ elif current_platform.is_maca(): from fastdeploy.model_executor.ops.gpu import ( get_padding_offset, + limit_thinking_content_length_v1, + limit_thinking_content_length_v2, save_output, set_stop_value_multi_ends, + speculate_limit_thinking_content_length_v1, + speculate_limit_thinking_content_length_v2, step_paddle, update_inputs, update_inputs_v1, @@ -73,6 +79,7 @@ speculate_step_paddle, speculate_step_system_cache, speculate_update, + speculate_set_stop_value_multi_seqs, step_paddle, step_system_cache, update_inputs, @@ -87,7 +94,7 @@ from fastdeploy.output.pooler import PoolerOutput, PoolingSequenceGroupOutput from fastdeploy.output.stream_transfer_data import DecoderState, StreamTransferData -from fastdeploy.worker.output import ModelOutputData, ModelRunnerOutput, SamplerOutput +from fastdeploy.worker.output import LogprobsTensors, ModelOutputData, SamplerOutput DISABLE_RECOVER = envs.FD_DISABLED_RECOVER == "1" @@ -98,6 +105,8 @@ def limit_thinking_content_length( max_think_lens: paddle.Tensor, step_idx: paddle.Tensor, limit_think_status: paddle.Tensor, + stop_flags: paddle.Tensor, + eos_token_ids: paddle.Tensor, think_end_id: int, line_break_id: int = None, ): @@ -108,6 +117,8 @@ def limit_thinking_content_length( max_think_lens, step_idx, limit_think_status, + stop_flags, + eos_token_ids, # 处理由于模型效果问题导致思考过程中输出eos token的问题 think_end_id, ) elif limit_strategy == "\n\n\n": @@ -118,6 +129,7 @@ def limit_thinking_content_length( max_think_lens, step_idx, limit_think_status, + stop_flags, think_end_id, line_break_id, ) @@ -133,6 +145,8 @@ def speculate_limit_thinking_content_length( limit_think_status: paddle.Tensor, accept_num: paddle.Tensor, seq_lens_decoder: paddle.Tensor, + stop_flags: paddle.Tensor, + eos_token_ids: paddle.Tensor, think_end_id: int, line_break_id: int = None, ): @@ -145,6 +159,8 @@ def speculate_limit_thinking_content_length( limit_think_status, accept_num, seq_lens_decoder, + stop_flags, + eos_token_ids, # 处理由于模型效果问题导致思考过程中输出eos token的问题 think_end_id, ) elif limit_strategy == "\n\n\n": @@ -157,6 +173,7 @@ def speculate_limit_thinking_content_length( limit_think_status, accept_num, seq_lens_decoder, + stop_flags, think_end_id, line_break_id, ) @@ -187,10 +204,27 @@ def pre_process( cu_seqlens_q: cu_seqlens_k: """ + token_num = paddle.sum(seq_lens_this_time) + + specific_platform = current_platform.is_cuda() or current_platform.is_maca() or current_platform.is_iluvatar() + if specific_platform and not speculative_decoding: + # Note(ZKK): This case's code is very simple! + ids_remove_padding, batch_id_per_token, cu_seqlens_q, cu_seqlens_k = get_padding_offset( + input_ids, token_num, seq_lens_this_time + ) + + return ( + ids_remove_padding, + batch_id_per_token, + cu_seqlens_q, + cu_seqlens_k, + None, + None, + ) + # Remove padding max_len = input_ids.shape[1] cum_offsets_now = paddle.cumsum(max_len - seq_lens_this_time, dtype="int32") - token_num = paddle.sum(seq_lens_this_time) output_padding_offset = None output_cum_offsets = None if speculative_decoding: @@ -228,7 +262,7 @@ def pre_process( batch_id_per_token, cu_seqlens_q, cu_seqlens_k, - ) = get_padding_offset(input_ids, token_num, seq_lens_this_time) + ) = get_padding_offset(input_ids, cum_offsets_now, token_num, seq_lens_this_time) return ( ids_remove_padding, batch_id_per_token, @@ -239,7 +273,12 @@ def pre_process( ) -def _build_stream_transfer_data(output_tokens: np.ndarray, pooler_outputs: List[PoolingSequenceGroupOutput] = None): +def _build_stream_transfer_data( + output_tokens: paddle.Tensor, + pooler_outputs: List[PoolingSequenceGroupOutput] = None, + logprobs: Optional[LogprobsTensors] = None, + prompt_logprobs_list: Optional[LogprobsTensors] = None, +): """Split output_tokens and output""" stream_transfer_datas = [] @@ -252,6 +291,11 @@ def _build_stream_transfer_data(output_tokens: np.ndarray, pooler_outputs: List[ stream_transfer_data = StreamTransferData( decoder_state=DecoderState.TEXT, tokens=output_token_per_sample, batch_id=bid ) + if logprobs: + logprobs = logprobs.slice_rows(bid, bid + 1) + stream_transfer_data.logprobs = logprobs + if prompt_logprobs_list: + stream_transfer_data.prompt_logprobs = prompt_logprobs_list[bid] stream_transfer_datas.append(stream_transfer_data) elif pooler_outputs is not None: for bid, pooler_output in enumerate(pooler_outputs): @@ -277,7 +321,7 @@ def post_process_normal( async_output_queue: queue.Queue = None, think_end_id: int = -1, line_break_id: int = -1, -) -> ModelRunnerOutput: +): """Post-processing steps after completing a single token generation.""" if think_end_id > 0: limit_thinking_content_length( @@ -286,6 +330,8 @@ def post_process_normal( max_think_lens=share_inputs["max_think_lens"], step_idx=share_inputs["step_idx"], limit_think_status=share_inputs["limit_think_status"], + stop_flags=share_inputs["stop_flags"], + eos_token_ids=share_inputs["eos_token_id"], think_end_id=think_end_id, line_break_id=line_break_id, ) @@ -374,27 +420,31 @@ def post_process_normal( # 3. Transmit the model's output and stop generation signal via message queue. # In the future, we will abandon this approach. if not skip_save_output: - if sampler_output.logprobs_tensors is None: - if envs.FD_USE_GET_SAVE_OUTPUT_V1: - if save_each_rank or model_output.mp_rank == 0: - output = _build_stream_transfer_data(sampler_output.sampled_token_ids) - async_output_queue.put(output) - else: + if envs.FD_USE_GET_SAVE_OUTPUT_V1: + if save_each_rank or model_output.mp_rank == 0: + output = _build_stream_transfer_data( + sampler_output.sampled_token_ids, + logprobs=sampler_output.logprobs_tensors, + prompt_logprobs_list=model_output.prompt_logprobs_list, + ) + async_output_queue.put(output) + else: + if sampler_output.logprobs_tensors is None: save_output( sampler_output.sampled_token_ids, model_output.not_need_stop, model_output.mp_rank, save_each_rank, ) - else: - save_output_topk( - sampler_output.sampled_token_ids, - sampler_output.logprobs_tensors.logprob_token_ids, - sampler_output.logprobs_tensors.logprobs, - sampler_output.logprobs_tensors.selected_token_ranks, - model_output.not_need_stop, - model_output.mp_rank, - ) + else: + save_output_topk( + sampler_output.sampled_token_ids, + sampler_output.logprobs_tensors.logprob_token_ids, + sampler_output.logprobs_tensors.logprobs, + sampler_output.logprobs_tensors.selected_token_ranks, + model_output.not_need_stop, + model_output.mp_rank, + ) def post_process_specualate( @@ -418,7 +468,17 @@ def post_process_specualate( think_end_id=think_end_id, line_break_id=line_break_id, ) - + speculate_set_stop_value_multi_seqs( + model_output.accept_tokens, + model_output.accept_num, + model_output.pre_ids, + model_output.step_idx, + model_output.stop_flags, + model_output.seq_lens_this_time, + model_output.stop_token_ids, + model_output.stop_seqs_len, + model_output.eos_token_id, + ) speculate_update( model_output.seq_lens_encoder, model_output.seq_lens_decoder, @@ -431,6 +491,7 @@ def post_process_specualate( model_output.seq_lens_this_time, model_output.is_block_step, model_output.stop_nums, + model_output.mask_rollback, ) if not skip_save_output: @@ -797,7 +858,9 @@ def rebuild_padding( seq_lens_decoder, seq_lens_encoder, output_padding_offset, + first_token_out, max_input_length, + enable_logprob, ) else: raise RuntimeError("Not supported platform") @@ -837,7 +900,6 @@ def post_process_pooling( paddle.ones_like(model_output.stop_flags, dtype="bool"), model_output.stop_flags, ) - update_inputs_v1( model_output.stop_flags, model_output.not_need_stop, diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index 462da2f6e36..e410c72af57 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -524,7 +524,6 @@ def insert_tasks_v1(self, req_dicts: List[Request], num_running_requests: int = if hasattr(request, "pooling_params") and request.pooling_params is not None: batch_pooling_params.append(request.pooling_params) - request.pooling_params.task = "embed" if request.task_type.value == RequestType.PREFILL.value: # prefill task prefill_start_index = request.prefill_start_index diff --git a/tests/pooling/test_Ernie4_5_reward_serving.py b/tests/pooling/test_Ernie4_5_reward_serving.py index 9181291b275..7e5b6749509 100644 --- a/tests/pooling/test_Ernie4_5_reward_serving.py +++ b/tests/pooling/test_Ernie4_5_reward_serving.py @@ -166,9 +166,7 @@ def consistent_payload(): {"role": "user", "content": [{"type": "text", "text": "北京天安门在哪里?"}]}, { "role": "assistant", - "content": [ - {"type": "text", "text": "北京天安门位于中国北京市中心,天安门广场北端,故宫博物院的南门。"} - ], + "content": [{"type": "text", "text": "北京天安门在中国北京故宫的前面。"}], }, ], "user": "test-user-123", From af3b93b80145ea3f05fd8674e41bc33f093e65f7 Mon Sep 17 00:00:00 2001 From: lizexu <2694294196@qq.com> Date: Tue, 25 Nov 2025 21:29:47 +0800 Subject: [PATCH 05/26] support enable_chunk_prefill --- fastdeploy/config.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/fastdeploy/config.py b/fastdeploy/config.py index 2485cf7bc59..8256028be1a 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -1564,9 +1564,6 @@ def __init__( self.max_long_partial_prefills = max_long_partial_prefills self.long_prefill_token_threshold = long_prefill_token_threshold - if self.model_config.runner == "pooling": - self.scheduler_config.max_num_batched_tokens = self.model_config.max_model_len - if envs.FD_FOR_TORCH_MODEL_FORMAT: self.model_config.model_format = "torch" From fa45a916b249dd91f869c0c53be85cf9db70b94c Mon Sep 17 00:00:00 2001 From: lizexu <2694294196@qq.com> Date: Wed, 26 Nov 2025 22:03:58 +0800 Subject: [PATCH 06/26] support bingfa --- fastdeploy/model_executor/pre_and_post_process.py | 3 +++ fastdeploy/worker/gpu_model_runner.py | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/fastdeploy/model_executor/pre_and_post_process.py b/fastdeploy/model_executor/pre_and_post_process.py index d2c82e2afaa..94ba5106a44 100644 --- a/fastdeploy/model_executor/pre_and_post_process.py +++ b/fastdeploy/model_executor/pre_and_post_process.py @@ -299,6 +299,8 @@ def _build_stream_transfer_data( stream_transfer_datas.append(stream_transfer_data) elif pooler_outputs is not None: for bid, pooler_output in enumerate(pooler_outputs): + if pooler_output is None: + continue if pooler_output.dtype == paddle.bfloat16: pooler_output = pooler_output.astype("float32") @@ -919,5 +921,6 @@ def post_process_pooling( if not skip_save_output: if save_each_rank or model_output.mp_rank == 0: + print("pooler_output.outputs", pooler_output.outputs) output = _build_stream_transfer_data(output_tokens=None, pooler_outputs=pooler_output.outputs) async_output_queue.put(output) diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index e410c72af57..52697644b30 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -2362,8 +2362,8 @@ def _pool(self, hidden_states: paddle.Tensor, num_running_requests: int) -> Opti seq_lens_cpu = self.share_inputs["seq_lens_this_time"][:num_running_requests] pooler_output: list[Optional[paddle.Tensor]] = [] - for raw_output, seq_len, prompt_len in zip(raw_pooler_output, seq_lens_cpu, pooling_metadata.prompt_lens): - output = raw_output.data if int(seq_len) == int(prompt_len) else None + for seq_len, prompt_len in zip(seq_lens_cpu, pooling_metadata.prompt_lens): + output = raw_pooler_output[0].data if int(seq_len) == int(prompt_len) else None pooler_output.append(output) pooler_output = PoolerOutput( From 8868d8eeecc4b24184dbb88ed0944775716a8cc8 Mon Sep 17 00:00:00 2001 From: lizexu <2694294196@qq.com> Date: Thu, 27 Nov 2025 12:04:20 +0800 Subject: [PATCH 07/26] support convert is reward --- fastdeploy/config.py | 10 +++++----- .../model_executor/model_loader/default_loader_v1.py | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/fastdeploy/config.py b/fastdeploy/config.py index a791012218b..aad15487b6d 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -37,21 +37,21 @@ logger = get_logger("config", "config.log") -TaskOption = Literal["auto", "generate", "embedding", "embed"] +TaskOption = Literal["auto", "generate", "embedding", "embed", "reward"] RunnerType = Literal["generate", "pooling"] RunnerOption = Literal["auto", "generate", "pooling"] -ConvertOption = Literal["auto", "none", "embed"] +ConvertOption = Literal["auto", "none", "embed", "reward"] -ConvertType = Literal["none", "embed"] +ConvertType = Literal["none", "embed", "reward"] -_ResolvedTask = Literal["generate", "encode", "embed"] +_ResolvedTask = Literal["generate", "encode", "embed", "reward"] _RUNNER_CONVERTS: dict[RunnerType, list[ConvertType]] = { "generate": [], - "pooling": ["embed"], + "pooling": ["embed", "reward"], } # Some model suffixes are based on auto classes from Transformers: diff --git a/fastdeploy/model_executor/model_loader/default_loader_v1.py b/fastdeploy/model_executor/model_loader/default_loader_v1.py index 8fb0ebf3881..98fe9940c77 100644 --- a/fastdeploy/model_executor/model_loader/default_loader_v1.py +++ b/fastdeploy/model_executor/model_loader/default_loader_v1.py @@ -83,7 +83,7 @@ def load_model(self, fd_config: FDConfig) -> nn.Layer: convert_type = fd_config.model_config.convert_type if convert_type == "none": pass - elif convert_type == "embed": + elif convert_type == "embed" or convert_type == "reward": model_cls = as_embedding_model(model_cls) else: assert_never(convert_type) From 3a289c90c4572707eddac6f3f3ec0a50a4223cf8 Mon Sep 17 00:00:00 2001 From: lizexu <2694294196@qq.com> Date: Thu, 27 Nov 2025 12:13:30 +0800 Subject: [PATCH 08/26] update test --- tests/pooling/test_Ernie4_5_reward_serving.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/pooling/test_Ernie4_5_reward_serving.py b/tests/pooling/test_Ernie4_5_reward_serving.py index 7e5b6749509..295269f8ed3 100644 --- a/tests/pooling/test_Ernie4_5_reward_serving.py +++ b/tests/pooling/test_Ernie4_5_reward_serving.py @@ -102,7 +102,7 @@ def setup_and_run_reward_server(): "--runner", "pooling", "--convert", - "embed", + "reward", ] with open(log_path, "w") as logfile: From 921e04de1f40c39a1afa01f69126244019a21af6 Mon Sep 17 00:00:00 2001 From: lizexu <2694294196@qq.com> Date: Thu, 27 Nov 2025 12:14:36 +0800 Subject: [PATCH 09/26] delete print --- fastdeploy/model_executor/pre_and_post_process.py | 1 - 1 file changed, 1 deletion(-) diff --git a/fastdeploy/model_executor/pre_and_post_process.py b/fastdeploy/model_executor/pre_and_post_process.py index 25ac25894e8..4a4132597f0 100644 --- a/fastdeploy/model_executor/pre_and_post_process.py +++ b/fastdeploy/model_executor/pre_and_post_process.py @@ -923,6 +923,5 @@ def post_process_pooling( if not skip_save_output: if save_each_rank or model_output.mp_rank == 0: - print("pooler_output.outputs", pooler_output.outputs) output = _build_stream_transfer_data(output_tokens=None, pooler_outputs=pooler_output.outputs) async_output_queue.put(output) From 0a07749d4416060961b7cf7ed1cf715fc5e50c19 Mon Sep 17 00:00:00 2001 From: lizexu <2694294196@qq.com> Date: Thu, 27 Nov 2025 14:08:47 +0800 Subject: [PATCH 10/26] fix enable_thinking --- fastdeploy/engine/request.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fastdeploy/engine/request.py b/fastdeploy/engine/request.py index 028081c83a2..3028d9e2e30 100644 --- a/fastdeploy/engine/request.py +++ b/fastdeploy/engine/request.py @@ -149,7 +149,7 @@ def __init__( if pooling_params is not None: self.enable_thinking = False else: - self.enable_thinking = True + self.enable_thinking = enable_thinking self.reasoning_max_tokens = reasoning_max_tokens self.trace_carrier = trace_carrier @@ -196,6 +196,7 @@ def from_dict(cls, d: dict): sampling_params = SamplingParams.from_dict(d) enable_thinking = d.get("enable_thinking", None) + if pooling_params is not None: enable_thinking = False if ( @@ -213,7 +214,6 @@ def from_dict(cls, d: dict): data_processor_logger.error( f"Convert mm_positions to ImagePosition error: {e}, {str(traceback.format_exc())}" ) - return cls( request_id=d["request_id"], prompt=d.get("prompt"), From dd6cb235dd098a99cdadb1bd12243004b85f0880 Mon Sep 17 00:00:00 2001 From: lizexu <2694294196@qq.com> Date: Thu, 27 Nov 2025 20:53:31 +0800 Subject: [PATCH 11/26] add document --- docs/features/pooling_models.md | 163 ++++++++++++++++++ docs/zh/features/pooling_models.md | 157 +++++++++++++++++ .../models/ernie4_5_vl/ernie4_5_vl_moe.py | 2 - 3 files changed, 320 insertions(+), 2 deletions(-) create mode 100644 docs/features/pooling_models.md create mode 100644 docs/zh/features/pooling_models.md diff --git a/docs/features/pooling_models.md b/docs/features/pooling_models.md new file mode 100644 index 00000000000..8c35d466eab --- /dev/null +++ b/docs/features/pooling_models.md @@ -0,0 +1,163 @@ +[简体中文](../zh/features//pooling_models.md) + +# Pooling Models + +FastDeploy also supports pooling models, such as embedding and reward models. + +In FastDeploy, pooling models implement the [FdModelForPooling][fastdeploy.model_executor.models.interfaces_base] interface. +These models use a [Pooler][fastdeploy.model_executor.layers.pooler.Pooler] to extract the final hidden states of the input +before returning them. + +## Configuration + +### Model Runner + +Run a model in pooling mode via the option `--runner pooling`. + +!!! tip + There is no need to set this option in the vast majority of cases as Fastdeploy can automatically + detect the appropriate model runner via `--runner auto`. + +### Model Conversion + +FastDeploy can adapt models for various pooling tasks via the option `--convert `. + +If `--runner pooling` has been set (manually or automatically) but the model does not implement the +[FdModelForPooling][astdeploy.model_executor.models.interfaces_base.FdModelForPooling] interface, +vLLM will attempt to automatically convert the model according to the architecture names +shown in the table below. + +| Architecture | `--convert` | Supported pooling tasks | +|-------------------------------------------------|-------------|---------------------------------------| +| `*ForTextEncoding`, `*EmbeddingModel`, `*Model` | `embed` | `embed` | +| `*ForRewardModeling`, `*RewardModel` | `reward` | `reward` | + +!!! tip + You can explicitly set `--convert ` to specify how to convert the model. + +### Pooler Configuration + +#### Predefined models + +If the [Pooler][fastdeploy.model_executor.layers.pooler.Pooler] defined by the model accepts `pooler_config`, +you can override some of its attributes via the `--pooler-config` option. + +#### Converted models + +If the model has been converted via `--convert` (see above), +the pooler assigned to each task has the following attributes by default: + +| Task | Pooling Type | Normalization | Softmax | +|------------|--------------|---------------|---------| +| `reward` | `ALL` | ❌ | ❌ | +| `embed` | `LAST` | ✅︎ | ❌ | + +## Offline Inference + +FastDeploy's OpenAI-compatible server provides API endpoints and custom reward interfaces. + +[Embeddings API], supports text and multi-modal inputs + +[Reward API], scores specific content + +### Embedding Model: +```python +model_path=Qwen/Qwen3-Embedding-0.6B + +python -m fastdeploy.entrypoints.openai.api_server --model ${model_path} \ + --max-num-seqs 256 --max-model-len 32768 \ + --port 9412 --engine-worker-queue-port 7142 \ + --metrics-port 7211 --tensor-parallel-size 1 \ + --gpu-memory-utilization 0.9 \ + --load_choices "default_v1" \ + --runner pooling + --no-enable-prefix-caching \ +``` + +Request Methods: +A. EmbeddingCompletionRequest Example (Standard Text Input) + +```bash +curl -X POST 'YOUR_SERVICE_URL/v1/embeddings' \ + -H 'Content-Type: application/json' \ + -d '{ + "model": "text-embedding-chat-model", + "input": [ + "This is a sentence for pooling embedding.", + "Another input text." + ], + "user": "test_client" + }' +``` + +B.EmbeddingChatRequest Example (Message Sequence Input) + +```bash +curl -X POST 'YOUR_SERVICE_URL/v1/embeddings' \ + -H 'Content-Type: application/json' \ + -d '{ + "model": "text-embedding-chat-model", + "messages": [ + {"role": "user", "content": "Generate embedding for user query."} + ] + }' +``` + +### Reward Model: +```python +model_path=RM_v1008 +python -m fastdeploy.entrypoints.openai.api_server \ + --model ${model_path} \ + --max-num-seqs 256 \ + --max-model-len 8192 \ + --port 13351 \ + --engine-worker-queue-port 7562 \ + --metrics-port 7531 \ + --tensor-parallel-size 8 \ + --gpu-memory-utilization 0.9 \ + --graph-optimization-config '{"use_cudagraph":false}' \ + --load-choices "default_v1" \ + --runner pooling \ + --convert reward \ + --no-enable-prefix-caching +``` +Request Method: + +ChatRewardRequest +```bash +curl --location 'http://xxxx/v1/chat/reward' \ +--header 'Content-Type: application/json' \ +--data '{ + "model": "", + "messages": [ + { + "role": "user", + "content": [ + { + "type": "image_url", + "image_url": { + "url": "https://xxx/a.png" + } + } + ] + }, + { + "role": "assistant", + "content": [ + { + "type": "text", + "text": "图里有几个人" + } + ] + } + ], + "user": "user-123", + "chat_template": null, + "chat_template_kwargs": { + "custom_var": "value" + }, + "mm_processor_kwargs": { + "image_size": 224 + } +}' +``` diff --git a/docs/zh/features/pooling_models.md b/docs/zh/features/pooling_models.md new file mode 100644 index 00000000000..f3726ad81af --- /dev/null +++ b/docs/zh/features/pooling_models.md @@ -0,0 +1,157 @@ +[English](../../features/pooling_models.md) + +# Pooling Models + +FastDeploy也支持pooling模型,例如嵌入(embedding)和奖励(reward)模型。 + +在FastDeploy中,池化模型通过[FdModelForPooling][fastdeploy.model_executor.models.interfaces_base.FdModelForPooling]接口。这些模型使用一个[Pooler][fastdeploy.model_executor.layers.pooler.Pooler]来提取输入的最终隐藏状态并返回。 + +## Configuration + +### Model Runner + +通过`--runner pooling`选项以池化模型运行模型。 + +!!! 提示 + 在绝大多数情况下无需手动设置该选项,因此Fastdeploy可以通过--runner auto(默认值)自动检测合适的runner。 + +### Model Conversion + +如果模型未实现FdModelForPooling接口但你希望以池化模式运行,FastDeploy可通过`--convert `自动转换模型。 + +当设置了`--runner pooling`(手动或自动)但模型不符合接口时,FastDeploy会根据模型架构名称自动转换: + +| Architecture | `--convert` | 支持的池化类型 | +|-------------------------------------------------|-------------|---------------------------------------| +| `*ForTextEncoding`, `*EmbeddingModel`, `*Model` | `embed` | `embed` | +| `*ForRewardModeling`, `*RewardModel` | `reward` | `reward` | + +!!! 提示 + 你可以显示设置`--convert `来制定模型转换方式。 + +### Pooler Configuration + +#### Predefined models + +如果模型定义的[Pooler][fastdeploy.model_executor.layers.pooler.Pooler]接受pooler_config,你可以通过--pooler_config覆盖部分属性。 + +#### Converted models + +如果模型通过--convert转换,各任务默认的池化配置如下: + +| Task | Pooling Type | Normalization | Softmax | +|------------|--------------|---------------|---------| +| `reward` | `LAST` | ❌ | ❌ | +| `embed` | `LAST` | ✅︎ | ❌ | + +加载[Sentence Transformers]模型时,其modules.json配置优于默认值,也可以通过@default_pooling_type("LAST")在模型组网时指定。 + +## Online Serving + +FastDeploy的OpenAI兼容服务器提供了API的端点和自定义的reward接口 + +- [Embeddings API],支持文本和多模态输入 +- [Reward API],给指定的内容打分 + +### Embedding模型: +```python +model_path=Qwen/Qwen3-Embedding-0.6B + +python -m fastdeploy.entrypoints.openai.api_server --model ${model_path} \ + --max-num-seqs 256 --max-model-len 32768 \ + --port 9412 --engine-worker-queue-port 7142 \ + --metrics-port 7211 --tensor-parallel-size 1 \ + --gpu-memory-utilization 0.9 \ + --load_choices "default_v1" \ + --runner pooling + --no-enable-prefix-caching \ +``` + +请求方式: +A.EmbeddingCompletionRequest 示例(标准文本输入) + +```bash +curl -X POST 'YOUR_SERVICE_URL/v1/embeddings' \ + -H 'Content-Type: application/json' \ + -d '{ + "model": "text-embedding-chat-model", + "input": [ + "This is a sentence for pooling embedding.", + "Another input text." + ], + "user": "test_client" + }' +``` + +B.EmbeddingChatRequest 示例(消息序列输入) + +```bash +curl -X POST 'YOUR_SERVICE_URL/v1/embeddings' \ + -H 'Content-Type: application/json' \ + -d '{ + "model": "text-embedding-chat-model", + "messages": [ + {"role": "user", "content": "Generate embedding for user query."} + ] + }' +``` + +### Reward模型: +```python +model_path=RM_v1008 +python -m fastdeploy.entrypoints.openai.api_server \ + --model ${model_path} \ + --max-num-seqs 256 \ + --max-model-len 8192 \ + --port 13351 \ + --engine-worker-queue-port 7562 \ + --metrics-port 7531 \ + --tensor-parallel-size 8 \ + --gpu-memory-utilization 0.9 \ + --graph-optimization-config '{"use_cudagraph":false}' \ + --load-choices "default_v1" \ + --runner pooling \ + --convert reward \ + --no-enable-prefix-caching +``` + +请求方式: +ChatRewardRequest + +```bash +curl --location 'http://xxxx/v1/chat/reward' \ +--header 'Content-Type: application/json' \ +--data '{ + "model": "", + "messages": [ + { + "role": "user", + "content": [ + { + "type": "image_url", + "image_url": { + "url": "https://xxx/a.png" + } + } + ] + }, + { + "role": "assistant", + "content": [ + { + "type": "text", + "text": "图里有几个人" + } + ] + } + ], + "user": "user-123", + "chat_template": null, + "chat_template_kwargs": { + "custom_var": "value" + }, + "mm_processor_kwargs": { + "image_size": 224 + } +}' +``` diff --git a/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py b/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py index a39c1aec7e5..022a3cc9216 100644 --- a/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py +++ b/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py @@ -548,7 +548,6 @@ def forward( ) out = self.norm(hidden_states, residual, forward_meta=forward_meta)[0] - return out @@ -658,7 +657,6 @@ def load_weights(self, weights_iterator) -> None: ("qkv_proj", "v_proj", None, "v"), ("up_gate_proj", "gate_proj", None, "gate"), ("up_gate_proj", "up_proj", None, "up"), - ("norm.weight", "ernie.norm.weight", None, None), ] text_expert_params_mapping = [] From 3df2899c5d1f7527ba4eaf719428a6c0f5453cb3 Mon Sep 17 00:00:00 2001 From: lizexu <2694294196@qq.com> Date: Thu, 27 Nov 2025 21:08:06 +0800 Subject: [PATCH 12/26] fix place --- fastdeploy/model_executor/layers/pool/metadata.py | 4 ++-- tests/pooling/test_Ernie4_5_reward_serving.py | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/fastdeploy/model_executor/layers/pool/metadata.py b/fastdeploy/model_executor/layers/pool/metadata.py index c6c6811d7ba..951c3caeae2 100644 --- a/fastdeploy/model_executor/layers/pool/metadata.py +++ b/fastdeploy/model_executor/layers/pool/metadata.py @@ -71,8 +71,8 @@ def build_pooling_cursor(num_scheduled_tokens: list[int], prompt_lens: paddle.Te n_seq = len(num_scheduled_tokens) index = list(range(n_seq)) - num_scheduled_tokens = paddle.to_tensor(num_scheduled_tokens, dtype="int64", place=paddle.CPUPlace()) - cumsum = paddle.zeros([n_seq + 1], dtype="int64", device=paddle.CPUPlace()) + num_scheduled_tokens = paddle.to_tensor(num_scheduled_tokens, dtype="int64") + cumsum = paddle.zeros([n_seq + 1], dtype="int64") paddle.cumsum(num_scheduled_tokens, axis=0, out=cumsum[1:]) if isinstance(device, paddle.CUDAPlace): diff --git a/tests/pooling/test_Ernie4_5_reward_serving.py b/tests/pooling/test_Ernie4_5_reward_serving.py index 295269f8ed3..eff59eafef4 100644 --- a/tests/pooling/test_Ernie4_5_reward_serving.py +++ b/tests/pooling/test_Ernie4_5_reward_serving.py @@ -222,7 +222,6 @@ def test_reward_model(reward_api_url, headers): print(f"\n=== Sending request to {reward_api_url} ===") - # 发送HTTP请求 response = requests.post(reward_api_url, headers=headers, json=payload, timeout=30) assert response.status_code == 200, f"API request failed with status {response.status_code}: {response.text}" From 8023a669bbefc8bb18ee97efcb049d1afffc6f47 Mon Sep 17 00:00:00 2001 From: lizexu <2694294196@qq.com> Date: Thu, 27 Nov 2025 21:15:11 +0800 Subject: [PATCH 13/26] fix test --- tests/pooling/test_Ernie4_5_reward_serving.py | 44 ++++--------------- 1 file changed, 8 insertions(+), 36 deletions(-) diff --git a/tests/pooling/test_Ernie4_5_reward_serving.py b/tests/pooling/test_Ernie4_5_reward_serving.py index eff59eafef4..2acc5feb6c1 100644 --- a/tests/pooling/test_Ernie4_5_reward_serving.py +++ b/tests/pooling/test_Ernie4_5_reward_serving.py @@ -15,48 +15,20 @@ import json import os import signal -import socket import subprocess import sys import time import pytest import requests - -# Read ports from environment variables -FD_API_PORT = int(os.getenv("FD_API_PORT", 8189)) -FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8134)) -FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8234)) -FD_CACHE_QUEUE_PORT = int(os.getenv("FD_CACHE_QUEUE_PORT", 8334)) - -PORTS_TO_CLEAN = [FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT, FD_CACHE_QUEUE_PORT] - - -def is_port_open(host: str, port: int, timeout=1.0): - """Check if a TCP port is open.""" - try: - with socket.create_connection((host, port), timeout): - return True - except Exception: - return False - - -def kill_process_on_port(port: int): - """Kill processes listening on the given port.""" - try: - output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip() - for pid in output.splitlines(): - os.kill(int(pid), signal.SIGKILL) - print(f"Killed process on port {port}, pid={pid}") - except subprocess.CalledProcessError: - pass - - -def clean_ports(): - """Clean all ports in PORTS_TO_CLEAN.""" - for port in PORTS_TO_CLEAN: - kill_process_on_port(port) - time.sleep(2) +from e2e.utils.serving_utils import ( + FD_API_PORT, + FD_CACHE_QUEUE_PORT, + FD_ENGINE_QUEUE_PORT, + FD_METRICS_PORT, + clean_ports, + is_port_open, +) @pytest.fixture(scope="session", autouse=True) From d0c415184b4f9f778ec84fef955754d52a539865 Mon Sep 17 00:00:00 2001 From: lizexu <2694294196@qq.com> Date: Thu, 27 Nov 2025 22:23:09 +0800 Subject: [PATCH 14/26] fix --- docs/features/pooling_models.md | 18 ++++++++---------- docs/zh/features/pooling_models.md | 21 ++++++++++----------- 2 files changed, 18 insertions(+), 21 deletions(-) diff --git a/docs/features/pooling_models.md b/docs/features/pooling_models.md index 8c35d466eab..1a2614c049a 100644 --- a/docs/features/pooling_models.md +++ b/docs/features/pooling_models.md @@ -4,8 +4,8 @@ FastDeploy also supports pooling models, such as embedding and reward models. -In FastDeploy, pooling models implement the [FdModelForPooling][fastdeploy.model_executor.models.interfaces_base] interface. -These models use a [Pooler][fastdeploy.model_executor.layers.pooler.Pooler] to extract the final hidden states of the input +In FastDeploy, pooling models implement the `FdModelForPooling` interface. +These models use a `Pooler` to extract the final hidden states of the input before returning them. ## Configuration @@ -14,7 +14,7 @@ before returning them. Run a model in pooling mode via the option `--runner pooling`. -!!! tip +!!! tip
There is no need to set this option in the vast majority of cases as Fastdeploy can automatically detect the appropriate model runner via `--runner auto`. @@ -23,7 +23,7 @@ Run a model in pooling mode via the option `--runner pooling`. FastDeploy can adapt models for various pooling tasks via the option `--convert `. If `--runner pooling` has been set (manually or automatically) but the model does not implement the -[FdModelForPooling][astdeploy.model_executor.models.interfaces_base.FdModelForPooling] interface, +`FdModelForPooling` interface, vLLM will attempt to automatically convert the model according to the architecture names shown in the table below. @@ -32,14 +32,14 @@ shown in the table below. | `*ForTextEncoding`, `*EmbeddingModel`, `*Model` | `embed` | `embed` | | `*ForRewardModeling`, `*RewardModel` | `reward` | `reward` | -!!! tip +!!! tip
You can explicitly set `--convert ` to specify how to convert the model. ### Pooler Configuration #### Predefined models -If the [Pooler][fastdeploy.model_executor.layers.pooler.Pooler] defined by the model accepts `pooler_config`, +If the `Pooler` defined by the model accepts `pooler_config`, you can override some of its attributes via the `--pooler-config` option. #### Converted models @@ -90,7 +90,7 @@ curl -X POST 'YOUR_SERVICE_URL/v1/embeddings' \ }' ``` -B.EmbeddingChatRequest Example (Message Sequence Input) +B. EmbeddingChatRequest Example (Message Sequence Input) ```bash curl -X POST 'YOUR_SERVICE_URL/v1/embeddings' \ @@ -121,9 +121,7 @@ python -m fastdeploy.entrypoints.openai.api_server \ --convert reward \ --no-enable-prefix-caching ``` -Request Method: - -ChatRewardRequest +Request Method: ChatRewardRequest ```bash curl --location 'http://xxxx/v1/chat/reward' \ --header 'Content-Type: application/json' \ diff --git a/docs/zh/features/pooling_models.md b/docs/zh/features/pooling_models.md index f3726ad81af..f111512b034 100644 --- a/docs/zh/features/pooling_models.md +++ b/docs/zh/features/pooling_models.md @@ -4,7 +4,7 @@ FastDeploy也支持pooling模型,例如嵌入(embedding)和奖励(reward)模型。 -在FastDeploy中,池化模型通过[FdModelForPooling][fastdeploy.model_executor.models.interfaces_base.FdModelForPooling]接口。这些模型使用一个[Pooler][fastdeploy.model_executor.layers.pooler.Pooler]来提取输入的最终隐藏状态并返回。 +在FastDeploy中,池化模型通过`FdModelForPooling`接口。这些模型使用一个`Pooler`来提取输入的最终隐藏状态并返回。 ## Configuration @@ -12,7 +12,7 @@ FastDeploy也支持pooling模型,例如嵌入(embedding)和奖励(reward)模 通过`--runner pooling`选项以池化模型运行模型。 -!!! 提示 +!!! 提示
在绝大多数情况下无需手动设置该选项,因此Fastdeploy可以通过--runner auto(默认值)自动检测合适的runner。 ### Model Conversion @@ -26,7 +26,7 @@ FastDeploy也支持pooling模型,例如嵌入(embedding)和奖励(reward)模 | `*ForTextEncoding`, `*EmbeddingModel`, `*Model` | `embed` | `embed` | | `*ForRewardModeling`, `*RewardModel` | `reward` | `reward` | -!!! 提示 +!!! 提示
你可以显示设置`--convert `来制定模型转换方式。 ### Pooler Configuration @@ -44,14 +44,14 @@ FastDeploy也支持pooling模型,例如嵌入(embedding)和奖励(reward)模 | `reward` | `LAST` | ❌ | ❌ | | `embed` | `LAST` | ✅︎ | ❌ | -加载[Sentence Transformers]模型时,其modules.json配置优于默认值,也可以通过@default_pooling_type("LAST")在模型组网时指定。 +加载`Sentence Transformers`模型时,其modules.json配置优于默认值,也可以通过@default_pooling_type("LAST")在模型组网时指定。 ## Online Serving FastDeploy的OpenAI兼容服务器提供了API的端点和自定义的reward接口 -- [Embeddings API],支持文本和多模态输入 -- [Reward API],给指定的内容打分 +- `Embeddings API`,支持文本和多模态输入 +- `Reward API`,给指定的内容打分 ### Embedding模型: ```python @@ -67,8 +67,8 @@ python -m fastdeploy.entrypoints.openai.api_server --model ${model_path} \ --no-enable-prefix-caching \ ``` -请求方式: -A.EmbeddingCompletionRequest 示例(标准文本输入) +请求方式:
+A. EmbeddingCompletionRequest 示例(标准文本输入) ```bash curl -X POST 'YOUR_SERVICE_URL/v1/embeddings' \ @@ -83,7 +83,7 @@ curl -X POST 'YOUR_SERVICE_URL/v1/embeddings' \ }' ``` -B.EmbeddingChatRequest 示例(消息序列输入) +B. EmbeddingChatRequest 示例(消息序列输入) ```bash curl -X POST 'YOUR_SERVICE_URL/v1/embeddings' \ @@ -115,8 +115,7 @@ python -m fastdeploy.entrypoints.openai.api_server \ --no-enable-prefix-caching ``` -请求方式: -ChatRewardRequest +请求方式: ChatRewardRequest ```bash curl --location 'http://xxxx/v1/chat/reward' \ From ab0b6aa5d78c43ad273cf4eef25a3a206b78ffb9 Mon Sep 17 00:00:00 2001 From: lizexu <2694294196@qq.com> Date: Fri, 28 Nov 2025 11:45:47 +0800 Subject: [PATCH 15/26] support enable_prefix_caching --- fastdeploy/engine/args_utils.py | 3 --- fastdeploy/worker/gpu_model_runner.py | 12 ++++++++++-- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/fastdeploy/engine/args_utils.py b/fastdeploy/engine/args_utils.py index 459a0690d55..3f7952fc5aa 100644 --- a/fastdeploy/engine/args_utils.py +++ b/fastdeploy/engine/args_utils.py @@ -541,9 +541,6 @@ def __post_init__(self): self.enable_prefix_caching = False self.max_encoder_cache = 0 - if self.runner == "pooling" and self.enable_prefix_caching: - self.enable_prefix_caching = False - @staticmethod def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser: """ diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index d2fbc8aa566..fd6b53615ed 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -2403,8 +2403,16 @@ def _pool(self, hidden_states: paddle.Tensor, num_running_requests: int) -> Opti pooler_output: list[Optional[paddle.Tensor]] = [] for seq_len, prompt_len in zip(seq_lens_cpu, pooling_metadata.prompt_lens): - output = raw_pooler_output[0].data if int(seq_len) == int(prompt_len) else None - pooler_output.append(output) + if not self.cache_config.enable_prefix_caching: + output = raw_pooler_output[0].data if int(seq_len) == int(prompt_len) else None + pooler_output.append(output) + else: + seq_lens_decoder = self.share_inputs["seq_lens_decoder"][:num_running_requests] + if int(seq_lens_decoder) + int(seq_len) == int(prompt_len): + output = raw_pooler_output[0].data + else: + output = None + pooler_output.append(output) pooler_output = PoolerOutput( outputs=pooler_output, From b777746310f313d5b40e6e047ff03f623f29b7cb Mon Sep 17 00:00:00 2001 From: lizexu <2694294196@qq.com> Date: Fri, 28 Nov 2025 12:33:15 +0800 Subject: [PATCH 16/26] add no-enable_prefix-caching test --- tests/pooling/test_Ernie4_5_reward_serving.py | 117 +++++++----------- 1 file changed, 48 insertions(+), 69 deletions(-) diff --git a/tests/pooling/test_Ernie4_5_reward_serving.py b/tests/pooling/test_Ernie4_5_reward_serving.py index 2acc5feb6c1..bd4a7abd84e 100644 --- a/tests/pooling/test_Ernie4_5_reward_serving.py +++ b/tests/pooling/test_Ernie4_5_reward_serving.py @@ -30,13 +30,14 @@ is_port_open, ) +# ========================== +# Shared Helper Functions +# ========================== + + +def _start_server_process(enable_caching: bool, log_filename: str): -@pytest.fixture(scope="session", autouse=True) -def setup_and_run_reward_server(): - """ - Start reward model API server for testing. - """ - print("Pre-test port cleanup...") + print(f"\n[Server Setup] Cleaning ports before starting (Caching={'ON' if enable_caching else 'OFF'})...") clean_ports() base_path = os.getenv("MODEL_PATH") @@ -48,7 +49,6 @@ def setup_and_run_reward_server(): if not os.path.exists(model_path): raise FileNotFoundError(f"Model path not found: {model_path}") - log_path = "reward_server.log" cmd = [ sys.executable, "-m", @@ -77,7 +77,14 @@ def setup_and_run_reward_server(): "reward", ] - with open(log_path, "w") as logfile: + if enable_caching: + cmd.append("--enable-prefix-caching") + else: + cmd.append("--no-enable-prefix-caching") + + print(f"[Server Setup] Command: {' '.join(cmd)}") + + with open(log_filename, "w") as logfile: process = subprocess.Popen( cmd, stdout=logfile, @@ -85,70 +92,50 @@ def setup_and_run_reward_server(): start_new_session=True, ) - # Wait for server to start (up to 480 seconds) + # Wait for server to start for _ in range(300): if is_port_open("127.0.0.1", FD_API_PORT): - print(f"reward API server is up on port {FD_API_PORT}") + print(f"[Server Setup] Server is up on port {FD_API_PORT}") break time.sleep(1) else: - print("reward API server failed to start. Cleaning up...") + print("[Server Setup] Server failed to start. Cleaning up...") try: os.killpg(process.pid, signal.SIGTERM) - except Exception as e: - print(f"Failed to kill process group: {e}") - raise RuntimeError(f"reward API server did not start on port {FD_API_PORT}") + except Exception: + pass + if os.path.exists(log_filename): + with open(log_filename, "r") as f: + print(f"Server Log Tail ({log_filename}):\n{f.read()[-500:]}") + raise RuntimeError(f"Server did not start on port {FD_API_PORT}") - yield - - print("\n===== Post-test reward server cleanup... =====") - try: - os.killpg(process.pid, signal.SIGTERM) - print(f"reward API server (pid={process.pid}) terminated") - except Exception as e: - print(f"Failed to terminate reward API server: {e}") + return process -@pytest.fixture(scope="session") +@pytest.fixture(scope="function") def reward_api_url(): """Returns the API endpoint URL for reward.""" return f"http://0.0.0.0:{FD_API_PORT}/v1/reward" -@pytest.fixture +@pytest.fixture(scope="function") def headers(): """Returns common HTTP request headers.""" return {"Content-Type": "application/json"} -# ========================== -# Test Cases -# ========================== +@pytest.fixture(scope="function") +def server_default_caching(): + _start_server_process(enable_caching=True, log_filename="reward_server_caching_on.log") -@pytest.fixture -def consistent_payload(): - """ - Returns a fixed payload for reward model consistency testing. - Reward models evaluate user-assistant conversation pairs. - """ - return { - "model": "default", - "messages": [ - {"role": "user", "content": [{"type": "text", "text": "北京天安门在哪里?"}]}, - { - "role": "assistant", - "content": [{"type": "text", "text": "北京天安门在中国北京故宫的前面。"}], - }, - ], - "user": "test-user-123", - } +@pytest.fixture(scope="function") +def server_no_caching(): + _start_server_process(enable_caching=False, log_filename="reward_server_caching_off.log") def save_score_baseline(score: float, baseline_file: str): - """ - Save reward score to baseline file. - """ + """Save reward score to baseline file.""" baseline_data = {"score": score} with open(baseline_file, "w", encoding="utf-8") as f: json.dump(baseline_data, f, indent=2) @@ -156,9 +143,7 @@ def save_score_baseline(score: float, baseline_file: str): def check_score_against_baseline(current_score: float, baseline_file: str, threshold: float = 0.01): - """ - Check reward score against baseline file. - """ + """Check reward score against baseline file.""" try: with open(baseline_file, "r", encoding="utf-8") as f: baseline_data = json.load(f) @@ -180,9 +165,7 @@ def check_score_against_baseline(current_score: float, baseline_file: str, thres ) -def test_reward_model(reward_api_url, headers): - """Test reward model scoring using the chat-style payload.""" - +def _run_test_logic(reward_api_url, headers, baseline_filename): payload = { "model": "default", "messages": [ @@ -193,34 +176,30 @@ def test_reward_model(reward_api_url, headers): } print(f"\n=== Sending request to {reward_api_url} ===") - response = requests.post(reward_api_url, headers=headers, json=payload, timeout=30) - assert response.status_code == 200, f"API request failed with status {response.status_code}: {response.text}" result = response.json() print(f"Response: {json.dumps(result, indent=2, ensure_ascii=False)}") - assert "data" in result, f"Response missing 'data' field. Got: {result}" - assert len(result["data"]) > 0, "Response 'data' is empty" - - first_item = result["data"][0] - assert "score" in first_item, f"Response data item missing 'score' field. Got: {first_item}" - - score_list = first_item["score"] - assert isinstance(score_list, list), f"Expected 'score' to be a list, got {type(score_list)}" - assert len(score_list) > 0, "Score list is empty" - - score = float(score_list[0]) - + assert "data" in result and len(result["data"]) > 0 + score = float(result["data"][0]["score"][0]) print(f"✓ Reward Score: {score}") base_path = os.getenv("MODEL_PATH", "") - baseline_filename = "reward_score_baseline.json" - if base_path: baseline_file = os.path.join(base_path, baseline_filename) else: baseline_file = baseline_filename check_score_against_baseline(score, baseline_file, threshold=0.0001) + + +def test_reward_model_with_caching(server_default_caching, reward_api_url, headers): + print("\n>>> Running Test: WITH Prefix Caching") + _run_test_logic(reward_api_url, headers, baseline_filename="reward_score_baseline.json") + + +def test_reward_model_without_caching(server_no_caching, reward_api_url, headers): + print("\n>>> Running Test: WITHOUT Prefix Caching") + _run_test_logic(reward_api_url, headers, baseline_filename="reward_score_baseline_no_caching.json") From c83a9e020d26bac05d9c945fa6ce7890a9b7e57a Mon Sep 17 00:00:00 2001 From: lizexu <2694294196@qq.com> Date: Fri, 28 Nov 2025 12:34:45 +0800 Subject: [PATCH 17/26] fix --- tests/pooling/test_Ernie4_5_reward_serving.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/pooling/test_Ernie4_5_reward_serving.py b/tests/pooling/test_Ernie4_5_reward_serving.py index bd4a7abd84e..a7e3a427ef3 100644 --- a/tests/pooling/test_Ernie4_5_reward_serving.py +++ b/tests/pooling/test_Ernie4_5_reward_serving.py @@ -192,7 +192,7 @@ def _run_test_logic(reward_api_url, headers, baseline_filename): else: baseline_file = baseline_filename - check_score_against_baseline(score, baseline_file, threshold=0.0001) + check_score_against_baseline(score, baseline_file, threshold=0.01) def test_reward_model_with_caching(server_default_caching, reward_api_url, headers): From 0751d214d023fc36911428fc1f67c8ba12ba574c Mon Sep 17 00:00:00 2001 From: lizexu <2694294196@qq.com> Date: Fri, 28 Nov 2025 23:07:22 +0800 Subject: [PATCH 18/26] support enable_prefix_caching --- docs/features/pooling_models.md | 10 ++++------ docs/zh/features/pooling_models.md | 10 ++++------ fastdeploy/config.py | 10 +++++----- .../model_loader/default_loader_v1.py | 2 +- fastdeploy/model_executor/models/ernie_vl_rm.py | 2 +- fastdeploy/worker/gpu_model_runner.py | 16 +++++++++++++--- 6 files changed, 28 insertions(+), 22 deletions(-) diff --git a/docs/features/pooling_models.md b/docs/features/pooling_models.md index 1a2614c049a..4bb6c5551a4 100644 --- a/docs/features/pooling_models.md +++ b/docs/features/pooling_models.md @@ -2,7 +2,7 @@ # Pooling Models -FastDeploy also supports pooling models, such as embedding and reward models. +FastDeploy also supports pooling models, such as embedding models. In FastDeploy, pooling models implement the `FdModelForPooling` interface. These models use a `Pooler` to extract the final hidden states of the input @@ -29,8 +29,7 @@ shown in the table below. | Architecture | `--convert` | Supported pooling tasks | |-------------------------------------------------|-------------|---------------------------------------| -| `*ForTextEncoding`, `*EmbeddingModel`, `*Model` | `embed` | `embed` | -| `*ForRewardModeling`, `*RewardModel` | `reward` | `reward` | +| `*ForTextEncoding`, `*EmbeddingModel`, `*Model` `*ForProcessRewardModel` | `embed` | `embed` | !!! tip
You can explicitly set `--convert ` to specify how to convert the model. @@ -49,7 +48,6 @@ the pooler assigned to each task has the following attributes by default: | Task | Pooling Type | Normalization | Softmax | |------------|--------------|---------------|---------| -| `reward` | `ALL` | ❌ | ❌ | | `embed` | `LAST` | ✅︎ | ❌ | ## Offline Inference @@ -103,7 +101,7 @@ curl -X POST 'YOUR_SERVICE_URL/v1/embeddings' \ }' ``` -### Reward Model: +### Pooling Model and reward score ```python model_path=RM_v1008 python -m fastdeploy.entrypoints.openai.api_server \ @@ -118,7 +116,7 @@ python -m fastdeploy.entrypoints.openai.api_server \ --graph-optimization-config '{"use_cudagraph":false}' \ --load-choices "default_v1" \ --runner pooling \ - --convert reward \ + --convert embed \ --no-enable-prefix-caching ``` Request Method: ChatRewardRequest diff --git a/docs/zh/features/pooling_models.md b/docs/zh/features/pooling_models.md index f111512b034..e08f3db37f6 100644 --- a/docs/zh/features/pooling_models.md +++ b/docs/zh/features/pooling_models.md @@ -2,7 +2,7 @@ # Pooling Models -FastDeploy也支持pooling模型,例如嵌入(embedding)和奖励(reward)模型。 +FastDeploy也支持pooling模型,例如嵌入(embedding)模型。 在FastDeploy中,池化模型通过`FdModelForPooling`接口。这些模型使用一个`Pooler`来提取输入的最终隐藏状态并返回。 @@ -23,8 +23,7 @@ FastDeploy也支持pooling模型,例如嵌入(embedding)和奖励(reward)模 | Architecture | `--convert` | 支持的池化类型 | |-------------------------------------------------|-------------|---------------------------------------| -| `*ForTextEncoding`, `*EmbeddingModel`, `*Model` | `embed` | `embed` | -| `*ForRewardModeling`, `*RewardModel` | `reward` | `reward` | +| `*ForTextEncoding`, `*EmbeddingModel`, `*Model` `**ForProcessRewardModel` | `embed` | `embed` | !!! 提示
你可以显示设置`--convert `来制定模型转换方式。 @@ -41,7 +40,6 @@ FastDeploy也支持pooling模型,例如嵌入(embedding)和奖励(reward)模 | Task | Pooling Type | Normalization | Softmax | |------------|--------------|---------------|---------| -| `reward` | `LAST` | ❌ | ❌ | | `embed` | `LAST` | ✅︎ | ❌ | 加载`Sentence Transformers`模型时,其modules.json配置优于默认值,也可以通过@default_pooling_type("LAST")在模型组网时指定。 @@ -96,7 +94,7 @@ curl -X POST 'YOUR_SERVICE_URL/v1/embeddings' \ }' ``` -### Reward模型: +### Pooling模型和打分机制 ```python model_path=RM_v1008 python -m fastdeploy.entrypoints.openai.api_server \ @@ -111,7 +109,7 @@ python -m fastdeploy.entrypoints.openai.api_server \ --graph-optimization-config '{"use_cudagraph":false}' \ --load-choices "default_v1" \ --runner pooling \ - --convert reward \ + --convert embed \ --no-enable-prefix-caching ``` diff --git a/fastdeploy/config.py b/fastdeploy/config.py index aaef02d31fd..a11a4d9b043 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -37,21 +37,21 @@ logger = get_logger("config", "config.log") -TaskOption = Literal["auto", "generate", "embedding", "embed", "reward"] +TaskOption = Literal["auto", "generate", "embedding", "embed"] RunnerType = Literal["generate", "pooling"] RunnerOption = Literal["auto", "generate", "pooling"] -ConvertOption = Literal["auto", "none", "embed", "reward"] +ConvertOption = Literal["auto", "none", "embed"] -ConvertType = Literal["none", "embed", "reward"] +ConvertType = Literal["none", "embed"] -_ResolvedTask = Literal["generate", "encode", "embed", "reward"] +_ResolvedTask = Literal["generate", "encode", "embed"] _RUNNER_CONVERTS: dict[RunnerType, list[ConvertType]] = { "generate": [], - "pooling": ["embed", "reward"], + "pooling": ["embed"], } # Some model suffixes are based on auto classes from Transformers: diff --git a/fastdeploy/model_executor/model_loader/default_loader_v1.py b/fastdeploy/model_executor/model_loader/default_loader_v1.py index 98fe9940c77..8fb0ebf3881 100644 --- a/fastdeploy/model_executor/model_loader/default_loader_v1.py +++ b/fastdeploy/model_executor/model_loader/default_loader_v1.py @@ -83,7 +83,7 @@ def load_model(self, fd_config: FDConfig) -> nn.Layer: convert_type = fd_config.model_config.convert_type if convert_type == "none": pass - elif convert_type == "embed" or convert_type == "reward": + elif convert_type == "embed": model_cls = as_embedding_model(model_cls) else: assert_never(convert_type) diff --git a/fastdeploy/model_executor/models/ernie_vl_rm.py b/fastdeploy/model_executor/models/ernie_vl_rm.py index 7d801fbdf04..c7d255f944e 100644 --- a/fastdeploy/model_executor/models/ernie_vl_rm.py +++ b/fastdeploy/model_executor/models/ernie_vl_rm.py @@ -112,7 +112,7 @@ def forward( input_embeddings = self.get_input_embeddings( ids_remove_padding=ids_remove_padding, image_features=image_features, - image_token_num=vl_moe_meta.image_token_num.item(), + image_token_num=vl_moe_meta.num_image_patch_id.item(), ) if forward_meta.step_use_cudagraph: diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index fd6b53615ed..d244ab17f3a 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -2157,6 +2157,7 @@ class at the server level, which is too granular for ModelRunner. # 3. Execute model if self.enable_mm: + print("self.share_inputs[image_features]", self.share_inputs["image_features"]) model_output = self.model( self.share_inputs["ids_remove_padding"], self.share_inputs["image_features"], @@ -2386,6 +2387,8 @@ def _pool(self, hidden_states: paddle.Tensor, num_running_requests: int) -> Opti prompt_lens = self.share_inputs["prompt_lens"][:num_running_requests] prompt_token_ids = self.share_inputs["prompt_ids"] + print("prompt_lens", prompt_lens) + print("prompt_token_ids", prompt_token_ids) pooling_metadata = PoolingMetadata( prompt_lens=prompt_lens, prompt_token_ids=prompt_token_ids, @@ -2397,18 +2400,25 @@ def _pool(self, hidden_states: paddle.Tensor, num_running_requests: int) -> Opti device_str = "gpu" if hidden_states.place.is_gpu_place() else "cpu" pooling_metadata.build_pooling_cursor(num_scheduled_tokens_list, device=device_str) + print("hidden_states", hidden_states) raw_pooler_output = self.model.pooler(hidden_states=hidden_states, pooling_metadata=pooling_metadata) + print("raw_pooler_output", raw_pooler_output) seq_lens_cpu = self.share_inputs["seq_lens_this_time"][:num_running_requests] pooler_output: list[Optional[paddle.Tensor]] = [] - for seq_len, prompt_len in zip(seq_lens_cpu, pooling_metadata.prompt_lens): + seq_lens_decoder_batch = self.share_inputs["seq_lens_decoder"][:num_running_requests] + + for i, (seq_len, prompt_len) in enumerate(zip(seq_lens_cpu, pooling_metadata.prompt_lens)): if not self.cache_config.enable_prefix_caching: output = raw_pooler_output[0].data if int(seq_len) == int(prompt_len) else None pooler_output.append(output) else: - seq_lens_decoder = self.share_inputs["seq_lens_decoder"][:num_running_requests] - if int(seq_lens_decoder) + int(seq_len) == int(prompt_len): + current_seq_len_decoder = seq_lens_decoder_batch[i] + print("current_seq_len_decoder", current_seq_len_decoder) + print("seq_len", seq_len) + print("prompt_len", prompt_len) + if int(current_seq_len_decoder) + int(seq_len) == int(prompt_len): output = raw_pooler_output[0].data else: output = None From 0b692d0c267233049d816c10bef7813abbaa2eef Mon Sep 17 00:00:00 2001 From: lizexu <2694294196@qq.com> Date: Sun, 30 Nov 2025 05:02:48 +0000 Subject: [PATCH 19/26] delete print --- fastdeploy/worker/gpu_model_runner.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index d244ab17f3a..bb1b2f3d857 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -2157,7 +2157,6 @@ class at the server level, which is too granular for ModelRunner. # 3. Execute model if self.enable_mm: - print("self.share_inputs[image_features]", self.share_inputs["image_features"]) model_output = self.model( self.share_inputs["ids_remove_padding"], self.share_inputs["image_features"], @@ -2387,8 +2386,6 @@ def _pool(self, hidden_states: paddle.Tensor, num_running_requests: int) -> Opti prompt_lens = self.share_inputs["prompt_lens"][:num_running_requests] prompt_token_ids = self.share_inputs["prompt_ids"] - print("prompt_lens", prompt_lens) - print("prompt_token_ids", prompt_token_ids) pooling_metadata = PoolingMetadata( prompt_lens=prompt_lens, prompt_token_ids=prompt_token_ids, @@ -2400,10 +2397,8 @@ def _pool(self, hidden_states: paddle.Tensor, num_running_requests: int) -> Opti device_str = "gpu" if hidden_states.place.is_gpu_place() else "cpu" pooling_metadata.build_pooling_cursor(num_scheduled_tokens_list, device=device_str) - print("hidden_states", hidden_states) raw_pooler_output = self.model.pooler(hidden_states=hidden_states, pooling_metadata=pooling_metadata) - print("raw_pooler_output", raw_pooler_output) seq_lens_cpu = self.share_inputs["seq_lens_this_time"][:num_running_requests] pooler_output: list[Optional[paddle.Tensor]] = [] @@ -2415,9 +2410,6 @@ def _pool(self, hidden_states: paddle.Tensor, num_running_requests: int) -> Opti pooler_output.append(output) else: current_seq_len_decoder = seq_lens_decoder_batch[i] - print("current_seq_len_decoder", current_seq_len_decoder) - print("seq_len", seq_len) - print("prompt_len", prompt_len) if int(current_seq_len_decoder) + int(seq_len) == int(prompt_len): output = raw_pooler_output[0].data else: From 6f1b431e710178c0dc31c8400a4d7c8926deb7ed Mon Sep 17 00:00:00 2001 From: lizexu <2694294196@qq.com> Date: Sun, 30 Nov 2025 05:13:23 +0000 Subject: [PATCH 20/26] fix document --- docs/features/pooling_models.md | 7 ++----- docs/zh/features/pooling_models.md | 7 +++---- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/docs/features/pooling_models.md b/docs/features/pooling_models.md index 4bb6c5551a4..4cfba9479f1 100644 --- a/docs/features/pooling_models.md +++ b/docs/features/pooling_models.md @@ -67,9 +67,8 @@ python -m fastdeploy.entrypoints.openai.api_server --model ${model_path} \ --port 9412 --engine-worker-queue-port 7142 \ --metrics-port 7211 --tensor-parallel-size 1 \ --gpu-memory-utilization 0.9 \ - --load_choices "default_v1" \ + --graph-optimization-config '{"use_cudagraph":false}' \ --runner pooling - --no-enable-prefix-caching \ ``` Request Methods: @@ -114,10 +113,8 @@ python -m fastdeploy.entrypoints.openai.api_server \ --tensor-parallel-size 8 \ --gpu-memory-utilization 0.9 \ --graph-optimization-config '{"use_cudagraph":false}' \ - --load-choices "default_v1" \ --runner pooling \ - --convert embed \ - --no-enable-prefix-caching + --convert embed ``` Request Method: ChatRewardRequest ```bash diff --git a/docs/zh/features/pooling_models.md b/docs/zh/features/pooling_models.md index e08f3db37f6..6cda60f3a94 100644 --- a/docs/zh/features/pooling_models.md +++ b/docs/zh/features/pooling_models.md @@ -60,9 +60,9 @@ python -m fastdeploy.entrypoints.openai.api_server --model ${model_path} \ --port 9412 --engine-worker-queue-port 7142 \ --metrics-port 7211 --tensor-parallel-size 1 \ --gpu-memory-utilization 0.9 \ - --load_choices "default_v1" \ - --runner pooling - --no-enable-prefix-caching \ + --graph-optimization-config '{"use_cudagraph":false}' \ + --runner pooling \ + ``` 请求方式:
@@ -110,7 +110,6 @@ python -m fastdeploy.entrypoints.openai.api_server \ --load-choices "default_v1" \ --runner pooling \ --convert embed \ - --no-enable-prefix-caching ``` 请求方式: ChatRewardRequest From 4d7f4fbfc08eaae38d408366ac9d81987751be94 Mon Sep 17 00:00:00 2001 From: lizexu <2694294196@qq.com> Date: Sun, 30 Nov 2025 12:12:15 +0000 Subject: [PATCH 21/26] fix --- docs/features/pooling_models.md | 2 -- docs/zh/features/pooling_models.md | 3 --- 2 files changed, 5 deletions(-) diff --git a/docs/features/pooling_models.md b/docs/features/pooling_models.md index 4cfba9479f1..6eec945abf4 100644 --- a/docs/features/pooling_models.md +++ b/docs/features/pooling_models.md @@ -67,7 +67,6 @@ python -m fastdeploy.entrypoints.openai.api_server --model ${model_path} \ --port 9412 --engine-worker-queue-port 7142 \ --metrics-port 7211 --tensor-parallel-size 1 \ --gpu-memory-utilization 0.9 \ - --graph-optimization-config '{"use_cudagraph":false}' \ --runner pooling ``` @@ -112,7 +111,6 @@ python -m fastdeploy.entrypoints.openai.api_server \ --metrics-port 7531 \ --tensor-parallel-size 8 \ --gpu-memory-utilization 0.9 \ - --graph-optimization-config '{"use_cudagraph":false}' \ --runner pooling \ --convert embed ``` diff --git a/docs/zh/features/pooling_models.md b/docs/zh/features/pooling_models.md index 6cda60f3a94..ee2333ca1cc 100644 --- a/docs/zh/features/pooling_models.md +++ b/docs/zh/features/pooling_models.md @@ -60,7 +60,6 @@ python -m fastdeploy.entrypoints.openai.api_server --model ${model_path} \ --port 9412 --engine-worker-queue-port 7142 \ --metrics-port 7211 --tensor-parallel-size 1 \ --gpu-memory-utilization 0.9 \ - --graph-optimization-config '{"use_cudagraph":false}' \ --runner pooling \ ``` @@ -106,8 +105,6 @@ python -m fastdeploy.entrypoints.openai.api_server \ --metrics-port 7531 \ --tensor-parallel-size 8 \ --gpu-memory-utilization 0.9 \ - --graph-optimization-config '{"use_cudagraph":false}' \ - --load-choices "default_v1" \ --runner pooling \ --convert embed \ ``` From 817ede45fb0aba0589349ef76a40521a3861ecb6 Mon Sep 17 00:00:00 2001 From: lizexu <2694294196@qq.com> Date: Mon, 1 Dec 2025 03:15:27 +0000 Subject: [PATCH 22/26] fix test --- tests/pooling/test_Ernie4_5_reward_serving.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/pooling/test_Ernie4_5_reward_serving.py b/tests/pooling/test_Ernie4_5_reward_serving.py index a7e3a427ef3..f0bae9fd80a 100644 --- a/tests/pooling/test_Ernie4_5_reward_serving.py +++ b/tests/pooling/test_Ernie4_5_reward_serving.py @@ -69,12 +69,10 @@ def _start_server_process(enable_caching: bool, log_filename: str): "8192", "--max-num-seqs", "256", - "--graph-optimization-config", - '{"use_cudagraph":false}', "--runner", "pooling", "--convert", - "reward", + "embed", ] if enable_caching: From 6f59f5b106f338c60b33031a70b7fc5a30252d18 Mon Sep 17 00:00:00 2001 From: lizexu <2694294196@qq.com> Date: Mon, 1 Dec 2025 07:58:45 +0000 Subject: [PATCH 23/26] fix document and delete chinese --- docs/features/pooling_models.md | 23 +++++++++++++++++++++- docs/zh/features/pooling_models.md | 22 +++++++++++++++++++-- fastdeploy/entrypoints/openai/protocol.py | 24 +++++++++++------------ 3 files changed, 54 insertions(+), 15 deletions(-) diff --git a/docs/features/pooling_models.md b/docs/features/pooling_models.md index 6eec945abf4..921f38c3ae2 100644 --- a/docs/features/pooling_models.md +++ b/docs/features/pooling_models.md @@ -50,7 +50,28 @@ the pooler assigned to each task has the following attributes by default: |------------|--------------|---------------|---------| | `embed` | `LAST` | ✅︎ | ❌ | -## Offline Inference +When loading [Sentence Transformers](https://huggingface.co/sentence-transformers) models, +its Sentence Transformers configuration file (`modules.json`) takes priority over the model's defaults and It can also be specified during model network construction through @default_pooling_type("LAST"). + +##### Pooling Type + +1.LastPool(PoolingType.LAST) + +Purpose:Extracts the hidden state of the last token in each sequence + +2.AllPool(PoolingType.ALL) + +Purpose:Returns the hidden states of all tokens in each sequence + +3.CLSPool(PoolingType.CLS) + +Purpose:Returns the hidden state of the first token in each sequence (CLS token) + +4.MeanPool(PoolingType.MEAN) + +Purpose:Computes the average of all token hidden states in each sequence + +## Online Serving FastDeploy's OpenAI-compatible server provides API endpoints and custom reward interfaces. diff --git a/docs/zh/features/pooling_models.md b/docs/zh/features/pooling_models.md index ee2333ca1cc..e2f4ce7b2b7 100644 --- a/docs/zh/features/pooling_models.md +++ b/docs/zh/features/pooling_models.md @@ -32,7 +32,7 @@ FastDeploy也支持pooling模型,例如嵌入(embedding)模型。 #### Predefined models -如果模型定义的[Pooler][fastdeploy.model_executor.layers.pooler.Pooler]接受pooler_config,你可以通过--pooler_config覆盖部分属性。 +如果模型定义的`Pooler`接受pooler_config,你可以通过--pooler_config覆盖部分属性。 #### Converted models @@ -42,7 +42,25 @@ FastDeploy也支持pooling模型,例如嵌入(embedding)模型。 |------------|--------------|---------------|---------| | `embed` | `LAST` | ✅︎ | ❌ | -加载`Sentence Transformers`模型时,其modules.json配置优于默认值,也可以通过@default_pooling_type("LAST")在模型组网时指定。 +加载[Sentence Transformers](https://huggingface.co/sentence-transformers)模型时,其`modules.json`配置优于默认值,也可以通过@default_pooling_type("LAST")在模型组网时指定。 + +#### Pooling Type + +1.LastPool(PoolingType.LAST) + +作用:提取每个序列的最后一个token的隐藏状态 + +2.AllPool(PoolingType.ALL) + +作用:返回每个序列的所有token的隐藏状态 + +3.CLSPool(PoolingType.CLS) + +作用:返回每个序列的第一个token(CLS token)的隐藏状态 + +4.MeanPool(PoolingType.MEAN) + +作用:计算每个序列所有token隐藏状态的平均值 ## Online Serving diff --git a/fastdeploy/entrypoints/openai/protocol.py b/fastdeploy/entrypoints/openai/protocol.py index 69bcac00f47..d61b76093dd 100644 --- a/fastdeploy/entrypoints/openai/protocol.py +++ b/fastdeploy/entrypoints/openai/protocol.py @@ -969,9 +969,9 @@ class EmbeddingResponse(BaseModel): class ChatRewardRequest(BaseModel): - model: Optional[str] = None # 指定模型,例如 "default" 或支持 embedding 的 chat 模型 - messages: Union[List[Any], List[int]] # 聊天消息列表(必选) - user: Optional[str] = None # 调用方标识符 + model: Optional[str] = None + messages: Union[List[Any], List[int]] + user: Optional[str] = None dimensions: Optional[int] = None truncate_prompt_tokens: Optional[Annotated[int, Field(ge=-1)]] = None @@ -1040,15 +1040,15 @@ def to_pooling_params(self): class ChatRewardData(BaseModel): - index: Optional[int] = None # 数据索引(可选) - object: str = "reward" # 固定为 "reward" - score: List[float] # reward 分数(浮点数列表) + index: Optional[int] = None + object: str = "reward" + score: List[float] class ChatRewardResponse(BaseModel): - id: str # 响应 ID,例如 chat-reward- - object: str = "object" # 固定为 "object" - created: int # 创建时间(Unix 时间戳) - model: str # 使用的模型名 - data: List[ChatRewardData] # reward 结果列表 - usage: Optional[UsageInfo] = None # Token 使用情况 + id: str + object: str = "object" + created: int + model: str + data: List[ChatRewardData] + usage: Optional[UsageInfo] = None From 243e6c1a39cfae07c25fb44907e6e40789142943 Mon Sep 17 00:00:00 2001 From: lizexu <2694294196@qq.com> Date: Mon, 1 Dec 2025 12:13:14 +0000 Subject: [PATCH 24/26] udpate --- fastdeploy/worker/gpu_model_runner.py | 4 ++-- tests/pooling/test_Ernie4_5_reward_serving.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index bb1b2f3d857..9e264de1674 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -2406,12 +2406,12 @@ def _pool(self, hidden_states: paddle.Tensor, num_running_requests: int) -> Opti for i, (seq_len, prompt_len) in enumerate(zip(seq_lens_cpu, pooling_metadata.prompt_lens)): if not self.cache_config.enable_prefix_caching: - output = raw_pooler_output[0].data if int(seq_len) == int(prompt_len) else None + output = raw_pooler_output[i].data if int(seq_len) == int(prompt_len) else None pooler_output.append(output) else: current_seq_len_decoder = seq_lens_decoder_batch[i] if int(current_seq_len_decoder) + int(seq_len) == int(prompt_len): - output = raw_pooler_output[0].data + output = raw_pooler_output[i].data else: output = None pooler_output.append(output) diff --git a/tests/pooling/test_Ernie4_5_reward_serving.py b/tests/pooling/test_Ernie4_5_reward_serving.py index f0bae9fd80a..cc7a70839c2 100644 --- a/tests/pooling/test_Ernie4_5_reward_serving.py +++ b/tests/pooling/test_Ernie4_5_reward_serving.py @@ -113,7 +113,7 @@ def _start_server_process(enable_caching: bool, log_filename: str): @pytest.fixture(scope="function") def reward_api_url(): """Returns the API endpoint URL for reward.""" - return f"http://0.0.0.0:{FD_API_PORT}/v1/reward" + return f"http://0.0.0.0:{FD_API_PORT}/v1/chat/reward" @pytest.fixture(scope="function") From 87d4d456c8c840ae9f12f2e313a11cf872d1563b Mon Sep 17 00:00:00 2001 From: lizexu <2694294196@qq.com> Date: Mon, 1 Dec 2025 12:18:48 +0000 Subject: [PATCH 25/26] enable_thinking --- fastdeploy/engine/request.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/fastdeploy/engine/request.py b/fastdeploy/engine/request.py index 3028d9e2e30..8f9f3f704bf 100644 --- a/fastdeploy/engine/request.py +++ b/fastdeploy/engine/request.py @@ -146,10 +146,7 @@ def __init__( self.multimodal_data = multimodal_data self.multimodal_img_boundaries = None - if pooling_params is not None: - self.enable_thinking = False - else: - self.enable_thinking = enable_thinking + self.enable_thinking = enable_thinking self.reasoning_max_tokens = reasoning_max_tokens self.trace_carrier = trace_carrier @@ -195,10 +192,6 @@ def from_dict(cls, d: dict): else: sampling_params = SamplingParams.from_dict(d) - enable_thinking = d.get("enable_thinking", None) - - if pooling_params is not None: - enable_thinking = False if ( isinstance(d.get("multimodal_inputs"), dict) and isinstance(d["multimodal_inputs"].get("mm_positions"), list) @@ -240,7 +233,7 @@ def from_dict(cls, d: dict): guided_grammar=d.get("guided_grammar", None), structural_tag=d.get("structural_tag", None), guided_json_object=d.get("guided_json_object", None), - enable_thinking=enable_thinking, + enable_thinking=d.get("enable_thinking", None), reasoning_max_tokens=d.get("reasoning_max_tokens", None), trace_carrier=d.get("trace_carrier", {}), chat_template=d.get("chat_template", None), From 4a8bf034aa17c0594b6b0280868da4e37a498021 Mon Sep 17 00:00:00 2001 From: lizexu <2694294196@qq.com> Date: Tue, 2 Dec 2025 03:37:56 +0000 Subject: [PATCH 26/26] fix test --- tests/pooling/test_Ernie4_5_reward_serving.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/pooling/test_Ernie4_5_reward_serving.py b/tests/pooling/test_Ernie4_5_reward_serving.py index cc7a70839c2..32a5d4f8036 100644 --- a/tests/pooling/test_Ernie4_5_reward_serving.py +++ b/tests/pooling/test_Ernie4_5_reward_serving.py @@ -113,7 +113,7 @@ def _start_server_process(enable_caching: bool, log_filename: str): @pytest.fixture(scope="function") def reward_api_url(): """Returns the API endpoint URL for reward.""" - return f"http://0.0.0.0:{FD_API_PORT}/v1/chat/reward" + return f"http://0.0.0.0:{FD_API_PORT}/v1/reward" @pytest.fixture(scope="function") @@ -171,6 +171,7 @@ def _run_test_logic(reward_api_url, headers, baseline_filename): {"role": "assistant", "content": [{"type": "text", "text": "北京天安门在中国北京故宫的前面。"}]}, ], "user": "user-123", + "enable_thinking": False, } print(f"\n=== Sending request to {reward_api_url} ===")