@@ -10,7 +10,7 @@ type WithTimestamp interface {
1010}
1111
1212type Sampler [T WithTimestamp ] struct {
13- mu sync.RWMutex
13+ mu sync.Mutex
1414 minInterval time.Duration
1515 maxSamples int
1616 callback func (ts time.Time ) (T , error )
@@ -32,9 +32,9 @@ type Sub[T WithTimestamp] struct {
3232func (s * Sub [T ]) Close (captureLast bool ) ([]T , error ) {
3333 s .sampler .mu .Lock ()
3434 delete (s .sampler .subs , s )
35- s .sampler .mu .Unlock ()
3635
3736 if s .err != nil {
37+ s .sampler .mu .Unlock ()
3838 return nil , s .err
3939 }
4040 current := s .first
@@ -46,6 +46,7 @@ func (s *Sub[T]) Close(captureLast bool) ([]T, error) {
4646 current = ts
4747 }
4848 }
49+ s .sampler .mu .Unlock ()
4950
5051 if captureLast {
5152 v , err := s .sampler .callback (time .Now ())
@@ -94,22 +95,26 @@ func (s *Sampler[T]) run() {
9495 return
9596 case <- ticker .C :
9697 tm := time .Now ()
98+ s .mu .Lock ()
9799 active := make ([]* Sub [T ], 0 , len (s .subs ))
98- s .mu .RLock ()
99100 for ss := range s .subs {
100101 if tm .Sub (ss .last ) < ss .interval {
101102 continue
102103 }
103104 ss .last = tm
104105 active = append (active , ss )
105106 }
106- s .mu .RUnlock ()
107+ s .mu .Unlock ()
107108 ticker = time .NewTimer (s .minInterval )
108109 if len (active ) == 0 {
109110 continue
110111 }
111112 value , err := s .callback (tm )
113+ s .mu .Lock ()
112114 for _ , ss := range active {
115+ if _ , found := s .subs [ss ]; ! found {
116+ continue // skip if Close() was called while the lock was released
117+ }
113118 if err != nil {
114119 ss .err = err
115120 } else {
@@ -121,6 +126,7 @@ func (s *Sampler[T]) run() {
121126 ss .interval *= 2
122127 }
123128 }
129+ s .mu .Unlock ()
124130 }
125131 }
126132}
0 commit comments