Skip to content

Commit 405c3d5

Browse files
authored
Merge pull request #96 from platform-engineering-labs/discovery-fixes
fix(discovery): prevent ResumeScanning crash when message arrives in Idle state
2 parents c070da4 + efeb991 commit 405c3d5

File tree

1 file changed

+17
-4
lines changed

1 file changed

+17
-4
lines changed

internal/metastructure/discovery/discovery.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@ type DiscoveryData struct {
104104
// will not start new list operations, but will allow outstanding operations
105105
// to complete. Uses reference counting to support multiple concurrent pausers.
106106
pauseCount int
107+
// hasPendingResumeScan tracks whether a delayed ResumeScanning message is in flight.
108+
// This prevents sending redundant delayed messages when one is already pending.
109+
hasPendingResumeScan bool
107110
}
108111

109112
func (d *DiscoveryData) SetTargets(targets []*pkgmodel.Target) {
@@ -238,6 +241,7 @@ func resumeDiscovery(state gen.Atom, data DiscoveryData, message messages.Resume
238241
func discover(state gen.Atom, data DiscoveryData, message Discover, proc gen.Process) (gen.Atom, DiscoveryData, []statemachine.Action, error) {
239242
data.isScheduledDiscovery = !message.Once
240243
data.timeStarted = time.Now()
244+
data.summary = make(map[string]int)
241245

242246
if state == StateDiscovering {
243247
proc.Log().Debug("Discovery already running, consider configuring a longer interval")
@@ -310,6 +314,9 @@ func discover(state gen.Atom, data DiscoveryData, message Discover, proc gen.Pro
310314
}
311315

312316
func resumeScanning(state gen.Atom, data DiscoveryData, message ResumeScanning, proc gen.Process) (gen.Atom, DiscoveryData, []statemachine.Action, error) {
317+
// The delayed message has arrived - clear the pending flag
318+
data.hasPendingResumeScan = false
319+
313320
// Don't start new list operations if Discovery is paused
314321
if data.pauseCount > 0 {
315322
proc.Log().Debug("Discovery is paused, skipping resumeScanning", "pauseCount", data.pauseCount)
@@ -344,11 +351,13 @@ func resumeScanning(state gen.Atom, data DiscoveryData, message ResumeScanning,
344351
}
345352
for namespace, done := range finished {
346353
if !done {
354+
// Send a delayed message to continue processing
347355
_, err := proc.SendAfter(proc.PID(), ResumeScanning{}, 1*time.Second)
348356
if err != nil {
349357
proc.Log().Error("Discovery failed to send ResumeScanning message: %v", err)
350358
return state, data, nil, gen.TerminateReasonPanic
351359
}
360+
data.hasPendingResumeScan = true
352361
return state, data, nil, nil
353362
}
354363
delete(data.queuedListOperations, namespace)
@@ -604,10 +613,14 @@ func discoverChildren(op ListOperation, data DiscoveryData, proc gen.Process) er
604613
ParentKSUID: parent.Ksuid,
605614
ListParams: util.MapToString(listParams),
606615
})
607-
err = proc.Send(proc.PID(), ResumeScanning{})
608-
if err != nil {
609-
proc.Log().Error("Failed to send ResumeScanning", "error", err)
610-
return fmt.Errorf("failed to send ResumeScanning: %w", err)
616+
// Only send immediate ResumeScanning if no delayed message is pending.
617+
// If a delayed message is pending, it will process the queued work when it arrives.
618+
if !data.hasPendingResumeScan {
619+
err = proc.Send(proc.PID(), ResumeScanning{})
620+
if err != nil {
621+
proc.Log().Error("Failed to send ResumeScanning", "error", err)
622+
return fmt.Errorf("failed to send ResumeScanning: %w", err)
623+
}
611624
}
612625
}
613626
}

0 commit comments

Comments
 (0)