@@ -361,6 +361,43 @@ async fn evaluate_child_op_scope(
361361 } )
362362}
363363
364+ async fn evaluate_with_timeout_and_warning < F , T > (
365+ eval_future : F ,
366+ timeout_duration : Duration ,
367+ warn_duration : Duration ,
368+ op_kind : String ,
369+ op_name : String ,
370+ ) -> Result < T >
371+ where
372+ F : std:: future:: Future < Output = Result < T > > ,
373+ {
374+ let mut eval_future = Box :: pin ( eval_future) ;
375+ let mut warned = false ;
376+ let timeout_future = tokio:: time:: sleep ( timeout_duration) ;
377+ tokio:: pin!( timeout_future) ;
378+
379+ loop {
380+ tokio:: select! {
381+ res = & mut eval_future => {
382+ return res;
383+ }
384+ _ = & mut timeout_future => {
385+ return Err ( anyhow!(
386+ "Function '{}' ({}) timed out after {} seconds" ,
387+ op_kind, op_name, timeout_duration. as_secs( )
388+ ) ) ;
389+ }
390+ _ = tokio:: time:: sleep( warn_duration) , if !warned => {
391+ warn!(
392+ "Function '{}' ({}) is taking longer than {}s" ,
393+ op_kind, op_name, WARNING_THRESHOLD
394+ ) ;
395+ warned = true ;
396+ }
397+ }
398+ }
399+ }
400+
364401async fn evaluate_op_scope (
365402 op_scope : & AnalyzedOpScope ,
366403 scoped_entries : RefList < ' _ , & ScopeEntry < ' _ > > ,
@@ -407,70 +444,28 @@ async fn evaluate_op_scope(
407444 let eval_future = evaluate_with_cell ( output_value_cell. as_ref ( ) , move || {
408445 op. executor . evaluate ( input_values)
409446 } ) ;
410- let mut eval_future = Box :: pin ( eval_future) ;
411- let mut warned = false ;
412- let timeout_future = tokio:: time:: sleep ( timeout_duration) ;
413- tokio:: pin!( timeout_future) ;
414-
415- let res = loop {
416- tokio:: select! {
417- res = & mut eval_future => {
418- break Ok ( res?) ;
419- }
420- _ = & mut timeout_future => {
421- break Err ( anyhow!(
422- "Function '{}' ({}) timed out after {} seconds" ,
423- op. op_kind, op. name, timeout_duration. as_secs( )
424- ) ) ;
425- }
426- _ = tokio:: time:: sleep( warn_duration) , if !warned => {
427- eprintln!(
428- "WARNING: Function '{}' ({}) is taking longer than 30s" ,
429- op_kind_for_warning, op_name_for_warning
430- ) ;
431- warn!(
432- "Function '{}' ({}) is taking longer than {}s" ,
433- op_kind_for_warning, op_name_for_warning, WARNING_THRESHOLD
434- ) ;
435- warned = true ;
436- }
437- }
438- } ;
439-
440- res. and_then ( |v| head_scope. define_field ( & op. output , & v) )
447+ let v = evaluate_with_timeout_and_warning (
448+ eval_future,
449+ timeout_duration,
450+ warn_duration,
451+ op_kind_for_warning,
452+ op_name_for_warning,
453+ )
454+ . await ?;
455+
456+ head_scope. define_field ( & op. output , & v)
441457 } else {
442458 let eval_future = op. executor . evaluate ( input_values) ;
443- let mut eval_future = Box :: pin ( eval_future) ;
444- let mut warned = false ;
445- let timeout_future = tokio:: time:: sleep ( timeout_duration) ;
446- tokio:: pin!( timeout_future) ;
447-
448- let res = loop {
449- tokio:: select! {
450- res = & mut eval_future => {
451- break Ok ( res?) ;
452- }
453- _ = & mut timeout_future => {
454- break Err ( anyhow!(
455- "Function '{}' ({}) timed out after {} seconds" ,
456- op. op_kind, op. name, timeout_duration. as_secs( )
457- ) ) ;
458- }
459- _ = tokio:: time:: sleep( warn_duration) , if !warned => {
460- eprintln!(
461- "WARNING: Function '{}' ({}) is taking longer than 30s" ,
462- op_kind_for_warning, op_name_for_warning
463- ) ;
464- warn!(
465- "Function '{}' ({}) is taking longer than {}s" ,
466- op_kind_for_warning, op_name_for_warning, WARNING_THRESHOLD
467- ) ;
468- warned = true ;
469- }
470- }
471- } ;
472-
473- res. and_then ( |v| head_scope. define_field ( & op. output , & v) )
459+ let v = evaluate_with_timeout_and_warning (
460+ eval_future,
461+ timeout_duration,
462+ warn_duration,
463+ op_kind_for_warning,
464+ op_name_for_warning,
465+ )
466+ . await ?;
467+
468+ head_scope. define_field ( & op. output , & v)
474469 } ;
475470
476471 if let Some ( ref op_stats) = operation_in_process_stats {
@@ -579,43 +574,6 @@ async fn evaluate_op_scope(
579574 let collector_entry = scoped_entries
580575 . headn ( op. collector_ref . scope_up_level as usize )
581576 . ok_or_else ( || anyhow:: anyhow!( "Collector level out of bound" ) ) ?;
582-
583- // Assemble input values
584- let input_values: Vec < value:: Value > =
585- assemble_input_values ( & op. input . fields , scoped_entries)
586- . collect :: < Result < Vec < _ > > > ( ) ?;
587-
588- // Create field_values vector for all fields in the merged schema
589- let mut field_values = op
590- . field_index_mapping
591- . iter ( )
592- . map ( |idx| {
593- idx. map_or ( value:: Value :: Null , |input_idx| {
594- input_values[ input_idx] . clone ( )
595- } )
596- } )
597- . collect :: < Vec < _ > > ( ) ;
598-
599- // Handle auto_uuid_field (assumed to be at position 0 for efficiency)
600- if op. has_auto_uuid_field {
601- if let Some ( uuid_idx) = op. collector_schema . auto_uuid_field_idx {
602- let uuid = memory. next_uuid (
603- op. fingerprinter
604- . clone ( )
605- . with (
606- & field_values
607- . iter ( )
608- . enumerate ( )
609- . filter ( |( i, _) | * i != uuid_idx)
610- . map ( |( _, v) | v)
611- . collect :: < Vec < _ > > ( ) ,
612- ) ?
613- . into_fingerprint ( ) ,
614- ) ?;
615- field_values[ uuid_idx] = value:: Value :: Basic ( value:: BasicValue :: Uuid ( uuid) ) ;
616- }
617- }
618-
619577 {
620578 let mut collected_records = collector_entry. collected_values
621579 [ op. collector_ref . local . collector_idx as usize ]
0 commit comments