@@ -322,6 +322,8 @@ impl SourceIndexingContext {
322322 ack_fn : Option < AckFn > ,
323323 pool : PgPool ,
324324 ) {
325+ use ContentHashBasedCollapsingBaseline :: ProcessedSourceFingerprint ;
326+
325327 // Store operation name for tracking cleanup
326328 let operation_name = {
327329 let plan_result = self . flow . get_execution_plan ( ) . await ;
@@ -392,22 +394,21 @@ impl SourceIndexingContext {
392394 // Fast path optimization: may collapse the row based on source version fingerprint.
393395 // Still need to update the tracking table as the processed ordinal advanced.
394396 if let Some ( prev_content_version_fp) =
395- & prev_version_state. content_version_fp
396- && mode == UpdateMode :: Normal
397- && row_indexer
398- . try_collapse (
399- & version,
400- content_version_fp. as_slice ( ) ,
401- & prev_version_state. source_version ,
402- ContentHashBasedCollapsingBaseline :: ProcessedSourceFingerprint (
403- prev_content_version_fp,
404- ) ,
405- )
406- . await ?
407- . is_some ( )
408- {
409- return Ok ( ( ) ) ;
410- }
397+ & prev_version_state. content_version_fp
398+ && mode == UpdateMode :: Normal
399+ {
400+ let collapse_result = row_indexer
401+ . try_collapse (
402+ & version,
403+ content_version_fp. as_slice ( ) ,
404+ & prev_version_state. source_version ,
405+ ProcessedSourceFingerprint ( prev_content_version_fp) ,
406+ )
407+ . await ?;
408+ if collapse_result. is_some ( ) {
409+ return Ok ( ( ) ) ;
410+ }
411+ }
411412 }
412413 _ => { }
413414 }
@@ -422,29 +423,26 @@ impl SourceIndexingContext {
422423 if let Some ( ref op_stats) = operation_in_process_stats_for_async {
423424 op_stats. start_processing ( & operation_name_for_async, 1 ) ;
424425 }
426+ let row_input = row_input
427+ . key_aux_info
428+ . as_ref ( )
429+ . ok_or_else ( || anyhow ! ( "`key_aux_info` must be provided" ) ) ?;
430+ let read_options = interface:: SourceExecutorReadOptions {
431+ include_value : true ,
432+ include_ordinal : true ,
433+ include_content_version_fp : true ,
434+ } ;
425435 let data = import_op
426- . executor
427- . get_value (
428- row_key,
429- row_input. key_aux_info . as_ref ( ) . ok_or_else ( || {
430- anyhow:: anyhow!(
431- "`key_aux_info` must be provided when there's no `source_data`"
432- )
433- } ) ?,
434- & interface:: SourceExecutorReadOptions {
435- include_value : true ,
436- include_ordinal : true ,
437- include_content_version_fp : true ,
438- } ,
439- )
440- . await ?;
436+ . executor
437+ . get_value ( row_key, row_input, & read_options)
438+ . await ?;
441439 if let Some ( ref op_stats) = operation_in_process_stats_for_async {
442440 op_stats. finish_processing ( & operation_name_for_async, 1 ) ;
443441 }
444442 (
445- data. ordinal . ok_or_else ( || {
446- anyhow :: anyhow! ( " ordinal is not available" )
447- } ) ? ,
443+ data. ordinal
444+ . or ( source_data . ordinal )
445+ . unwrap_or ( interface :: Ordinal :: unavailable ( ) ) ,
448446 data. content_version_fp ,
449447 data. value
450448 . ok_or_else ( || anyhow:: anyhow!( "value is not available" ) ) ?,
0 commit comments