@@ -7,19 +7,43 @@ package inspect
77
88import (
99 "context"
10+ "runtime"
1011
12+ "github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
13+ "github.com/cockroachdb/cockroach/pkg/roachpb"
14+ "github.com/cockroachdb/cockroach/pkg/settings"
1115 "github.com/cockroachdb/cockroach/pkg/sql/execinfra"
1216 "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
1317 "github.com/cockroachdb/cockroach/pkg/sql/rowexec"
1418 "github.com/cockroachdb/cockroach/pkg/sql/types"
19+ "github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
1520 "github.com/cockroachdb/cockroach/pkg/util/log"
1621 "github.com/cockroachdb/cockroach/pkg/util/tracing"
22+ "github.com/cockroachdb/errors"
1723)
1824
25+ var (
26+ processorConcurrencyOverride = settings .RegisterIntSetting (
27+ settings .ApplicationLevel ,
28+ "sql.inspect.processor_concurrency" ,
29+ "sets the number of concurrent processors for INSPECT jobs. " +
30+ "0 uses the default based on GOMAXPROCS. Values above GOMAXPROCS are capped." ,
31+ 0 ,
32+ settings .NonNegativeInt ,
33+ )
34+ )
35+
36+ type inspectCheckFactory func () inspectCheck
37+
1938type inspectProcessor struct {
20- processorID int32
21- flowCtx * execinfra.FlowCtx
22- spec execinfrapb.InspectSpec
39+ processorID int32
40+ flowCtx * execinfra.FlowCtx
41+ spec execinfrapb.InspectSpec
42+ cfg * execinfra.ServerConfig
43+ checkFactories []inspectCheckFactory
44+ spanSrc spanSource
45+ logger inspectLogger
46+ concurrency int
2347}
2448
2549var _ execinfra.Processor = (* inspectProcessor )(nil )
@@ -59,21 +83,153 @@ func (p *inspectProcessor) Run(ctx context.Context, output execinfra.RowReceiver
5983 output .ProducerDone ()
6084}
6185
86+ // runInspect starts a set of worker goroutines to process spans concurrently.
87+ // Each span is read from a buffered channel and passed to processSpan.
88+ // The function blocks until all spans are processed or an error occurs.
6289func (p * inspectProcessor ) runInspect (ctx context.Context , output execinfra.RowReceiver ) error {
63- log .Infof (ctx , "INSPECT processor started processorID=%d" , p .processorID )
90+ log .Infof (ctx , "INSPECT processor started processorID=%d concurrency=%d" , p .processorID , p .concurrency )
91+
92+ group := ctxgroup .WithContext (ctx )
93+
94+ if p .concurrency == 0 {
95+ return errors .AssertionFailedf ("must have at least one worker" )
96+ }
97+ spanCh := make (chan roachpb.Span , p .concurrency )
98+
99+ // Launch worker goroutines. Each worker reads spans from spanCh and processes
100+ // them.
101+ for i := 0 ; i < p .concurrency ; i ++ {
102+ workerIndex := i
103+ group .GoCtx (func (ctx context.Context ) error {
104+ for {
105+ select {
106+ case <- ctx .Done ():
107+ // If the context is canceled (e.g., due to an error in another worker), exit.
108+ return ctx .Err ()
109+ case span , ok := <- spanCh :
110+ if ! ok {
111+ // Channel is closed, no more spans to process.
112+ return nil
113+ }
114+ if err := p .processSpan (ctx , span , workerIndex ); err != nil {
115+ // On error, return it. ctxgroup will cancel all other goroutines.
116+ return err
117+ }
118+ }
119+ }
120+ })
121+ }
122+
123+ // Producer goroutine: feeds all spans into the channel for workers to consume.
124+ group .GoCtx (func (ctx context.Context ) error {
125+ defer close (spanCh )
126+ for {
127+ span , ok , err := p .spanSrc .NextSpan (ctx )
128+ if err != nil {
129+ return err
130+ }
131+ if ! ok {
132+ return nil // done
133+ }
134+ select {
135+ case <- ctx .Done ():
136+ // Exit early if context is canceled.
137+ return ctx .Err ()
138+ case spanCh <- span :
139+ // Send span to the workers.
140+ }
141+ }
142+ })
143+
144+ // Wait for all goroutines to finish.
145+ return group .Wait ()
146+ }
147+
148+ // getProcessorConcurrency returns the number of concurrent workers to use for
149+ // INSPECT processing. If the cluster setting is non-zero, it uses the minimum
150+ // of the override and GOMAXPROCS. Otherwise, it defaults to GOMAXPROCS.
151+ func getProcessorConcurrency (flowCtx * execinfra.FlowCtx ) int {
152+ override := int (processorConcurrencyOverride .Get (& flowCtx .Cfg .Settings .SV ))
153+ if override > 0 {
154+ return min (runtime .GOMAXPROCS (0 ), override )
155+ }
156+ return runtime .GOMAXPROCS (0 )
157+ }
158+
159+ // processSpan executes all configured inspect checks against a single span.
160+ // It instantiates a fresh set of checks from the configured factories and uses
161+ // an inspectRunner to drive their execution.
162+ func (p * inspectProcessor ) processSpan (
163+ ctx context.Context , span roachpb.Span , workerIndex int ,
164+ ) error {
165+ checks := make ([]inspectCheck , len (p .checkFactories ))
166+ for i , factory := range p .checkFactories {
167+ checks [i ] = factory ()
168+ }
169+ runner := inspectRunner {
170+ checks : checks ,
171+ logger : p .logger ,
172+ }
173+
174+ // Process all checks on the given span.
175+ for {
176+ ok , err := runner .Step (ctx , p .cfg , span , workerIndex )
177+ if err != nil {
178+ return err
179+ }
180+ if ! ok {
181+ break
182+ }
183+ }
64184 return nil
65185}
66186
187+ // newInspectProcessor constructs a new inspectProcessor from the given InspectSpec.
188+ // It parses the spec to generate a set of inspectCheck factories, sets up the span source,
189+ // and wires in logging and concurrency controls.
190+ //
191+ // The returned processor is ready for integration into a distributed flow, but will not
192+ // begin execution until Run is called.
67193func newInspectProcessor (
68194 ctx context.Context , flowCtx * execinfra.FlowCtx , processorID int32 , spec execinfrapb.InspectSpec ,
69195) (execinfra.Processor , error ) {
196+ checkFactories , err := buildInspectCheckFactories (spec )
197+ if err != nil {
198+ return nil , err
199+ }
70200 return & inspectProcessor {
71- spec : spec ,
72- processorID : processorID ,
73- flowCtx : flowCtx ,
201+ spec : spec ,
202+ processorID : processorID ,
203+ flowCtx : flowCtx ,
204+ checkFactories : checkFactories ,
205+ cfg : flowCtx .Cfg ,
206+ spanSrc : newSliceSpanSource (spec .Spans ),
207+ // TODO(148301): log to cockroach.log for now, but later log to system.inspect_errors
208+ logger : & logSink {},
209+ concurrency : getProcessorConcurrency (flowCtx ),
74210 }, nil
75211}
76212
213+ // buildInspectCheckFactories returns a slice of factory functions that create
214+ // inspectCheck instances at runtime. Each factory corresponds to one check entry
215+ // defined in the job's InspectSpec.
216+ //
217+ // This indirection ensures that each check instance is freshly created per span,
218+ // avoiding shared state across concurrent workers.
219+ func buildInspectCheckFactories (spec execinfrapb.InspectSpec ) ([]inspectCheckFactory , error ) {
220+ checkFactories := make ([]inspectCheckFactory , 0 , len (spec .InspectDetails .Checks ))
221+ for _ , specCheck := range spec .InspectDetails .Checks {
222+ switch specCheck .Type {
223+ case jobspb .InspectCheckIndexConsistency :
224+ // TODO(148863): implement the index consistency checker. No-op for now.
225+
226+ default :
227+ return nil , errors .AssertionFailedf ("unsupported inspect check type: %v" , specCheck .Type )
228+ }
229+ }
230+ return checkFactories , nil
231+ }
232+
77233func init () {
78234 rowexec .NewInspectProcessor = newInspectProcessor
79235}
0 commit comments