@@ -332,6 +332,12 @@ func (f *RangeFeed) run(ctx context.Context, frontier span.Frontier, resumeWithF
332332 eventCh := make (chan kvcoord.RangeFeedMessage )
333333
334334 var rangefeedOpts []kvcoord.RangeFeedOption
335+ // We can unconditionally enable bulk-delivery from the server at least as far
336+ // as to this client; if an onValues is configured we can also bulk-process
337+ // values, but even if it isn't we know how to unwrap a bulk delivery and pass
338+ // each event to the caller's individual event handlers.
339+ rangefeedOpts = append (rangefeedOpts , kvcoord .WithBulkDelivery ())
340+
335341 if f .scanConfig .overSystemTable {
336342 rangefeedOpts = append (rangefeedOpts , kvcoord .WithSystemTablePriority ())
337343 }
@@ -427,54 +433,98 @@ func (f *RangeFeed) processEvents(
427433 for {
428434 select {
429435 case ev := <- eventCh :
430- switch {
431- case ev .Val != nil :
432- f .onValue (ctx , ev .Val )
433- case ev .Checkpoint != nil :
434- ts := ev .Checkpoint .ResolvedTS
435- if f .frontierQuantize != 0 {
436- ts .Logical = 0
437- ts .WallTime -= ts .WallTime % int64 (f .frontierQuantize )
438- }
439- advanced , err := frontier .Forward (ev .Checkpoint .Span , ts )
440- if err != nil {
441- return err
442- }
443- if f .onCheckpoint != nil {
444- f .onCheckpoint (ctx , ev .Checkpoint )
445- }
446- if advanced && f .onFrontierAdvance != nil {
447- f .onFrontierAdvance (ctx , frontier .Frontier ())
448- }
449- if f .frontierVisitor != nil {
450- f .frontierVisitor (ctx , advanced , frontier )
451- }
452- case ev .SST != nil :
453- if f .onSSTable == nil {
454- return errors .AssertionFailedf (
455- "received unexpected rangefeed SST event with no OnSSTable handler" )
456- }
457- f .onSSTable (ctx , ev .SST , ev .RegisteredSpan )
458- case ev .DeleteRange != nil :
459- if f .onDeleteRange == nil {
460- if f .knobs != nil && f .knobs .IgnoreOnDeleteRangeError {
461- continue
462- }
463- return errors .AssertionFailedf (
464- "received unexpected rangefeed DeleteRange event with no OnDeleteRange handler: %s" , ev )
465- }
466- f .onDeleteRange (ctx , ev .DeleteRange )
467- case ev .Metadata != nil :
468- if f .onMetadata == nil {
469- return errors .AssertionFailedf ("received unexpected metadata event with no OnMetadata handler" )
470- }
471- f .onMetadata (ctx , ev .Metadata )
472- case ev .Error != nil :
473- // Intentionally do nothing, we'll get an error returned from the
474- // call to RangeFeed.
436+ if err := f .processEvent (ctx , frontier , ev .RangeFeedEvent , ev .RegisteredSpan ); err != nil {
437+ return err
475438 }
476439 case <- ctx .Done ():
477440 return ctx .Err ()
478441 }
479442 }
480443}
444+
445+ func (f * RangeFeed ) processEvent (
446+ ctx context.Context , frontier span.Frontier , ev * kvpb.RangeFeedEvent , registeredSpan roachpb.Span ,
447+ ) error {
448+ switch {
449+ case ev .Val != nil :
450+ f .onValue (ctx , ev .Val )
451+ case ev .Checkpoint != nil :
452+ ts := ev .Checkpoint .ResolvedTS
453+ if f .frontierQuantize != 0 {
454+ ts .Logical = 0
455+ ts .WallTime -= ts .WallTime % int64 (f .frontierQuantize )
456+ }
457+ advanced , err := frontier .Forward (ev .Checkpoint .Span , ts )
458+ if err != nil {
459+ return err
460+ }
461+ if f .onCheckpoint != nil {
462+ f .onCheckpoint (ctx , ev .Checkpoint )
463+ }
464+ if advanced && f .onFrontierAdvance != nil {
465+ f .onFrontierAdvance (ctx , frontier .Frontier ())
466+ }
467+ if f .frontierVisitor != nil {
468+ f .frontierVisitor (ctx , advanced , frontier )
469+ }
470+ case ev .SST != nil :
471+ if f .onSSTable == nil {
472+ return errors .AssertionFailedf (
473+ "received unexpected rangefeed SST event with no OnSSTable handler" )
474+ }
475+ f .onSSTable (ctx , ev .SST , registeredSpan )
476+ case ev .DeleteRange != nil :
477+ if f .onDeleteRange == nil {
478+ if f .knobs != nil && f .knobs .IgnoreOnDeleteRangeError {
479+ return nil
480+ }
481+ return errors .AssertionFailedf (
482+ "received unexpected rangefeed DeleteRange event with no OnDeleteRange handler: %s" , ev )
483+ }
484+ f .onDeleteRange (ctx , ev .DeleteRange )
485+ case ev .Metadata != nil :
486+ if f .onMetadata == nil {
487+ return errors .AssertionFailedf ("received unexpected metadata event with no OnMetadata handler" )
488+ }
489+ f .onMetadata (ctx , ev .Metadata )
490+ case ev .Error != nil :
491+ // Intentionally do nothing, we'll get an error returned from the
492+ // call to RangeFeed.
493+ case ev .BulkEvents != nil :
494+ if f .onValues != nil {
495+ // We can optimistically assume the bulk event consists of all value
496+ // events, and allocate a buffer for them to be passed to onValues. In the
497+ // rare case we hit a non-value event (it would have to be a range key as
498+ // only a catch-up scan currently produces bulk events), we will throw out
499+ // this buffer and any events we might have copied to it so far and just
500+ // fallback to to processing each event, but this should be so uncommon it
501+ // is not worth worrying about the potential wasted work.
502+ allValues := true
503+ buf := make ([]kv.KeyValue , len (ev .BulkEvents .Events ))
504+ for i := range ev .BulkEvents .Events {
505+ if ev .BulkEvents .Events [i ].Val != nil {
506+ buf [i ] = kv.KeyValue {
507+ Key : ev .BulkEvents .Events [i ].Val .Key ,
508+ Value : & ev .BulkEvents .Events [i ].Val .Value ,
509+ }
510+ } else {
511+ allValues = false
512+ break
513+ }
514+ }
515+ if allValues {
516+ f .onValues (ctx , buf )
517+ return nil
518+ }
519+ }
520+ // Either the bulk event contains non-value events or a onValues handler is
521+ // not configured, so process each event individually.
522+ for _ , e := range ev .BulkEvents .Events {
523+ if err := f .processEvent (ctx , frontier , e , registeredSpan ); err != nil {
524+ return err
525+ }
526+ }
527+
528+ }
529+ return nil
530+ }
0 commit comments