Skip to content

Commit e418737

Browse files
committed
try to fix the semaphore
1 parent 1dfa805 commit e418737

File tree

1 file changed

+193
-45
lines changed

1 file changed

+193
-45
lines changed

internal/semaphore.go

Lines changed: 193 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -15,34 +15,47 @@ var semTimers = sync.Pool{
1515
},
1616
}
1717

18+
// waiter represents a goroutine waiting for a token.
19+
type waiter struct {
20+
ready chan struct{}
21+
next *waiter
22+
cancelled atomic.Bool // Set to true if this waiter was cancelled/timed out
23+
}
24+
1825
// FastSemaphore is a counting semaphore implementation using atomic operations.
1926
// It's optimized for the fast path (no blocking) while still supporting timeouts and context cancellation.
2027
//
28+
// This implementation maintains FIFO ordering of waiters using a linked list queue.
29+
// When a token is released, the first waiter in the queue is notified.
30+
//
2131
// Performance characteristics:
2232
// - Fast path (no blocking): Single atomic CAS operation
23-
// - Slow path (blocking): Falls back to channel-based waiting
24-
// - Release: Single atomic decrement + optional channel notification
33+
// - Slow path (blocking): FIFO queue-based waiting
34+
// - Release: Single atomic decrement + wake up first waiter in queue
2535
//
2636
// This is significantly faster than a pure channel-based semaphore because:
2737
// 1. The fast path avoids channel operations entirely (no scheduler involvement)
2838
// 2. Atomic operations are much cheaper than channel send/receive
39+
// 3. FIFO ordering prevents starvation
2940
type FastSemaphore struct {
3041
// Current number of acquired tokens (atomic)
3142
count atomic.Int32
3243

3344
// Maximum number of tokens (capacity)
3445
max int32
3546

36-
// Channel for blocking waiters
37-
// Only used when fast path fails (semaphore is full)
38-
waitCh chan struct{}
47+
// Mutex to protect the waiter queue
48+
lock sync.Mutex
49+
50+
// Head and tail of the waiter queue (FIFO)
51+
head *waiter
52+
tail *waiter
3953
}
4054

4155
// NewFastSemaphore creates a new fast semaphore with the given capacity.
4256
func NewFastSemaphore(capacity int32) *FastSemaphore {
4357
return &FastSemaphore{
44-
max: capacity,
45-
waitCh: make(chan struct{}, capacity),
58+
max: capacity,
4659
}
4760
}
4861

@@ -63,51 +76,153 @@ func (s *FastSemaphore) TryAcquire() bool {
6376
}
6477
}
6578

79+
// enqueue adds a waiter to the end of the queue.
80+
// Must be called with lock held.
81+
func (s *FastSemaphore) enqueue(w *waiter) {
82+
if s.tail == nil {
83+
s.head = w
84+
s.tail = w
85+
} else {
86+
s.tail.next = w
87+
s.tail = w
88+
}
89+
}
90+
91+
// dequeue removes and returns the first waiter from the queue.
92+
// Must be called with lock held.
93+
// Returns nil if the queue is empty.
94+
func (s *FastSemaphore) dequeue() *waiter {
95+
if s.head == nil {
96+
return nil
97+
}
98+
w := s.head
99+
s.head = w.next
100+
if s.head == nil {
101+
s.tail = nil
102+
}
103+
w.next = nil
104+
return w
105+
}
106+
107+
// notifyOne wakes up the first waiter in the queue if any.
108+
func (s *FastSemaphore) notifyOne() {
109+
s.lock.Lock()
110+
w := s.dequeue()
111+
s.lock.Unlock()
112+
113+
if w != nil {
114+
close(w.ready)
115+
}
116+
}
117+
66118
// Acquire acquires a token, blocking if necessary until one is available or the context is cancelled.
67119
// Returns an error if the context is cancelled or the timeout expires.
68120
// Returns timeoutErr when the timeout expires.
69-
//
70-
// Performance optimization:
71-
// 1. First try fast path (no blocking)
72-
// 2. If that fails, fall back to channel-based waiting
73121
func (s *FastSemaphore) Acquire(ctx context.Context, timeout time.Duration, timeoutErr error) error {
74-
// Fast path: try to acquire without blocking
122+
// Check context first
75123
select {
76124
case <-ctx.Done():
77125
return ctx.Err()
78126
default:
79127
}
80128

81-
// Try fast acquire first
129+
// Try fast path first
82130
if s.TryAcquire() {
83131
return nil
84132
}
85133

86-
// Fast path failed, need to wait
134+
// Need to wait - create a waiter and add to queue
135+
w := &waiter{
136+
ready: make(chan struct{}),
137+
}
138+
139+
s.lock.Lock()
140+
s.enqueue(w)
141+
s.lock.Unlock()
142+
87143
// Use timer pool to avoid allocation
88144
timer := semTimers.Get().(*time.Timer)
89145
defer semTimers.Put(timer)
90146
timer.Reset(timeout)
91147

92-
for {
93-
select {
94-
case <-ctx.Done():
95-
if !timer.Stop() {
96-
<-timer.C
97-
}
148+
select {
149+
case <-ctx.Done():
150+
if !timer.Stop() {
151+
<-timer.C
152+
}
153+
// Mark ourselves as cancelled
154+
w.cancelled.Store(true)
155+
// Try to remove ourselves from the queue
156+
s.lock.Lock()
157+
removed := s.removeWaiter(w)
158+
s.lock.Unlock()
159+
160+
if !removed {
161+
// We were already dequeued and notified
162+
// Wait for the notification and then release the token
163+
<-w.ready
164+
s.releaseToPool()
165+
}
166+
return ctx.Err()
167+
case <-w.ready:
168+
// We were notified, check if we were cancelled
169+
if !timer.Stop() {
170+
<-timer.C
171+
}
172+
if w.cancelled.Load() {
173+
// We were cancelled while being notified, release the token
174+
s.releaseToPool()
98175
return ctx.Err()
176+
}
177+
return nil
178+
case <-timer.C:
179+
// Mark ourselves as cancelled
180+
w.cancelled.Store(true)
181+
// Try to remove ourselves from the queue
182+
s.lock.Lock()
183+
removed := s.removeWaiter(w)
184+
s.lock.Unlock()
185+
186+
if !removed {
187+
// We were already dequeued and notified
188+
// Wait for the notification and then release the token
189+
<-w.ready
190+
s.releaseToPool()
191+
}
192+
return timeoutErr
193+
}
194+
}
195+
196+
// removeWaiter removes a waiter from the queue.
197+
// Must be called with lock held.
198+
// Returns true if the waiter was found and removed, false otherwise.
199+
func (s *FastSemaphore) removeWaiter(target *waiter) bool {
200+
if s.head == nil {
201+
return false
202+
}
99203

100-
case <-s.waitCh:
101-
// Someone released a token, we got the spot
102-
// no need to touch the counter
103-
if !timer.Stop() {
104-
<-timer.C
204+
// Special case: removing head
205+
if s.head == target {
206+
s.head = target.next
207+
if s.head == nil {
208+
s.tail = nil
209+
}
210+
return true
211+
}
212+
213+
// Find and remove from middle or tail
214+
prev := s.head
215+
for prev.next != nil {
216+
if prev.next == target {
217+
prev.next = target.next
218+
if prev.next == nil {
219+
s.tail = prev
105220
}
106-
return nil
107-
case <-timer.C:
108-
return timeoutErr
221+
return true
109222
}
223+
prev = prev.next
110224
}
225+
return false
111226
}
112227

113228
// AcquireBlocking acquires a token, blocking indefinitely until one is available.
@@ -119,29 +234,62 @@ func (s *FastSemaphore) AcquireBlocking() {
119234
return
120235
}
121236

122-
// Slow path: wait for a token
123-
for {
124-
<-s.waitCh
125-
// Someone released a token, we got the spot
126-
// no need to touch the counter
127-
return
237+
// Need to wait - create a waiter and add to queue
238+
w := &waiter{
239+
ready: make(chan struct{}),
240+
}
241+
242+
s.lock.Lock()
243+
s.enqueue(w)
244+
s.lock.Unlock()
245+
246+
// Wait to be notified
247+
<-w.ready
248+
}
249+
250+
// releaseToPool releases a token back to the pool.
251+
// This should be called when a waiter was notified but then cancelled/timed out.
252+
// We need to pass the token to another waiter if any, otherwise decrement the counter.
253+
func (s *FastSemaphore) releaseToPool() {
254+
s.lock.Lock()
255+
w := s.dequeue()
256+
s.lock.Unlock()
257+
258+
if w != nil {
259+
// Transfer the token to another waiter
260+
close(w.ready)
261+
} else {
262+
// No waiters, decrement the counter to free the slot
263+
s.count.Add(-1)
128264
}
129265
}
130266

131267
// Release releases a token back to the semaphore.
132-
// This wakes up one waiting goroutine if any are blocked.
268+
// This wakes up the first waiting goroutine if any are blocked.
133269
func (s *FastSemaphore) Release() {
270+
for {
271+
s.lock.Lock()
272+
w := s.dequeue()
273+
s.lock.Unlock()
134274

135-
// Try to wake up a waiter (non-blocking)
136-
// If no one is waiting, this is a no-op
137-
select {
138-
case s.waitCh <- struct{}{}:
139-
// Successfully notified a waiter
140-
// no need to decrement the counter, the waiter will use this spot
141-
default:
142-
// No waiters, that's fine
143-
// decrement the counter
144-
s.count.Add(-1)
275+
if w == nil {
276+
// No waiters, decrement the counter to free the slot
277+
s.count.Add(-1)
278+
return
279+
}
280+
281+
// Check if this waiter was cancelled before we notify them
282+
if w.cancelled.Load() {
283+
// This waiter was cancelled, skip them and try the next one
284+
// We still have the token, so continue the loop
285+
close(w.ready) // Still need to close to unblock them
286+
continue
287+
}
288+
289+
// Transfer the token directly to the waiter
290+
// Don't decrement the counter - the waiter takes over this slot
291+
close(w.ready)
292+
return
145293
}
146294
}
147295

0 commit comments

Comments
 (0)