@@ -10,7 +10,7 @@ type WithTimestamp interface {
1010}
1111
1212type Sampler [T WithTimestamp ] struct {
13- mu sync.RWMutex
13+ mu sync.Mutex
1414 minInterval time.Duration
1515 maxSamples int
1616 callback func (ts time.Time ) (T , error )
@@ -26,19 +26,15 @@ type Sub[T WithTimestamp] struct {
2626 first time.Time
2727 last time.Time
2828 samples []T
29- mu sync.RWMutex
3029 err error
3130}
3231
3332func (s * Sub [T ]) Close (captureLast bool ) ([]T , error ) {
3433 s .sampler .mu .Lock ()
3534 delete (s .sampler .subs , s )
36- s .sampler .mu .Unlock ()
37-
38- s .mu .Lock ()
39- defer s .mu .Unlock ()
4035
4136 if s .err != nil {
37+ s .sampler .mu .Unlock ()
4238 return nil , s .err
4339 }
4440 current := s .first
@@ -50,6 +46,7 @@ func (s *Sub[T]) Close(captureLast bool) ([]T, error) {
5046 current = ts
5147 }
5248 }
49+ s .sampler .mu .Unlock ()
5350
5451 if captureLast {
5552 v , err := s .sampler .callback (time .Now ())
@@ -98,26 +95,26 @@ func (s *Sampler[T]) run() {
9895 return
9996 case <- ticker .C :
10097 tm := time .Now ()
101- s .mu .RLock ()
98+ s .mu .Lock ()
10299 active := make ([]* Sub [T ], 0 , len (s .subs ))
103100 for ss := range s .subs {
104- ss .mu .Lock ()
105101 if tm .Sub (ss .last ) < ss .interval {
106- ss .mu .Unlock ()
107102 continue
108103 }
109104 ss .last = tm
110- ss .mu .Unlock ()
111105 active = append (active , ss )
112106 }
113- s .mu .RUnlock ()
107+ s .mu .Unlock ()
114108 ticker = time .NewTimer (s .minInterval )
115109 if len (active ) == 0 {
116110 continue
117111 }
118112 value , err := s .callback (tm )
113+ s .mu .Lock ()
119114 for _ , ss := range active {
120- ss .mu .Lock ()
115+ if _ , found := s .subs [ss ]; ! found {
116+ continue // skip if Close() was called while the lock was released
117+ }
121118 if err != nil {
122119 ss .err = err
123120 } else {
@@ -128,8 +125,8 @@ func (s *Sampler[T]) run() {
128125 if time .Duration (ss .interval )* time .Duration (s .maxSamples ) <= dur {
129126 ss .interval *= 2
130127 }
131- ss .mu .Unlock ()
132128 }
129+ s .mu .Unlock ()
133130 }
134131 }
135132}
0 commit comments