@@ -23,6 +23,7 @@ import (
2323 "github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
2424 "github.com/cockroachdb/cockroach/pkg/util/hlc"
2525 "github.com/cockroachdb/cockroach/pkg/util/log"
26+ "github.com/cockroachdb/cockroach/pkg/util/syncutil"
2627 "github.com/cockroachdb/cockroach/pkg/util/tracing"
2728 "github.com/cockroachdb/errors"
2829 pbtypes "github.com/gogo/protobuf/types"
@@ -71,6 +72,12 @@ type inspectProcessor struct {
7172 logger inspectLogger
7273 concurrency int
7374 clock * hlc.Clock
75+ mu struct {
76+ // Guards calls to output.Push because DistSQLReceiver.Push is not
77+ // concurrency safe and progress metadata can be emitted from multiple
78+ // worker goroutines.
79+ syncutil.Mutex
80+ }
7481}
7582
7683var _ execinfra.Processor = (* inspectProcessor )(nil )
@@ -310,7 +317,7 @@ func (p *inspectProcessor) sendInspectProgress(
310317 },
311318 }
312319
313- output . Push ( nil , meta )
320+ p . pushProgressMeta ( output , meta )
314321 return nil
315322}
316323
@@ -350,10 +357,21 @@ func (p *inspectProcessor) sendSpanCompletionProgress(
350357 },
351358 }
352359
353- output . Push ( nil , meta )
360+ p . pushProgressMeta ( output , meta )
354361 return nil
355362}
356363
364+ // pushProgressMeta serializes metadata pushes so only one goroutine interacts
365+ // with the DistSQL receiver at a time (DistSQLReceiver.Push is not concurrency
366+ // safe).
367+ func (p * inspectProcessor ) pushProgressMeta (
368+ output execinfra.RowReceiver , meta * execinfrapb.ProducerMetadata ,
369+ ) {
370+ p .mu .Lock ()
371+ defer p .mu .Unlock ()
372+ output .Push (nil , meta )
373+ }
374+
357375// newInspectProcessor constructs a new inspectProcessor from the given InspectSpec.
358376// It parses the spec to generate a set of inspectCheck factories, sets up the span source,
359377// and wires in logging and concurrency controls.
0 commit comments