Skip to content

Commit 140bd20

Browse files
committed
Refactor concurrency package to use "Permit" instead of "Token" for better clarity and consistency
1 parent c4bb058 commit 140bd20

File tree

3 files changed

+100
-83
lines changed

3 files changed

+100
-83
lines changed

concurrency/handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ type ConcurrencyMetrics struct {
2323
TotalRequests int64 // Total number of requests made
2424
TotalRetries int64 // Total number of retry attempts
2525
TotalRateLimitErrors int64 // Total number of rate limit errors encountered
26-
TokenWaitTime time.Duration // Total time spent waiting for tokens
26+
PermitWaitTime time.Duration // Total time spent waiting for tokens
2727
TTFB struct { // Metrics related to Time to First Byte (TTFB)
2828
Total time.Duration // Total Time to First Byte (TTFB) for all requests
2929
Count int64 // Count of requests used for calculating TTFB

concurrency/semaphore.go

Lines changed: 89 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -12,107 +12,124 @@ import (
1212
"go.uber.org/zap"
1313
)
1414

15-
// AcquireConcurrencyToken acquires a concurrency token to regulate the number of concurrent
16-
// operations within predefined limits, ensuring system stability and adherence to concurrency policies.
17-
// This function initiates a token acquisition process that involves generating a unique request ID
18-
// for tracking purposes and attempting to acquire a token within a specified timeout to prevent
19-
// indefinite blocking. Successful acquisition updates performance metrics and associates the
20-
// unique request ID with the provided context for enhanced traceability and management of
21-
// concurrent requests.
15+
// AcquireConcurrencyPermit acquires a concurrency permit to manage the number of simultaneous
16+
// operations within predefined limits. This method ensures system stability and compliance
17+
// with concurrency policies by regulating the execution of concurrent operations.
2218
//
2319
// Parameters:
24-
// - ctx: A parent context used as the basis for the token acquisition attempt, facilitating
25-
// appropriate cancellation and timeout handling in line with best practices for concurrency control.
20+
// - ctx: A parent context which is used as the basis for permit acquisition. This allows
21+
// for proper handling of timeouts and cancellation in line with best practices.
2622
//
2723
// Returns:
28-
// - context.Context: A derived context that includes the unique request ID, offering a mechanism
29-
// for associating subsequent operations with the acquired concurrency token and facilitating
30-
// effective request tracking and management.
31-
// - uuid.UUID: The unique request ID generated as part of the token acquisition process, serving
32-
// as an identifier for the acquired token and enabling detailed tracking and auditing of
33-
// concurrent operations.
34-
// - error: An error that signals failure to acquire a concurrency token within the allotted timeout,
35-
// or due to other encountered issues, ensuring that potential problems in concurrency control
36-
// are surfaced and can be addressed.
24+
// - context.Context: A new context derived from the original, including a unique request ID.
25+
// This context is used to trace and manage operations under the acquired concurrency permit.
26+
// - uuid.UUID: The unique request ID generated during the permit acquisition process.
27+
// - error: An error object that indicates failure to acquire a permit within the allotted
28+
// timeout, or other system-related issues.
3729
//
3830
// Usage:
39-
// This function is a critical component of concurrency control and should be invoked prior to
40-
// executing operations that require regulation of concurrency. The returned context, enhanced
41-
// with the unique request ID, should be utilized in the execution of these operations to maintain
42-
// consistency in tracking and managing concurrency tokens. The unique request ID also facilitates
43-
// detailed auditing and troubleshooting of the concurrency control mechanism.
44-
//
45-
// Example:
46-
// ctx, requestID, err := concurrencyHandler.AcquireConcurrencyToken(context.Background())
47-
//
48-
// if err != nil {
49-
// // Handle token acquisition failure
50-
// }
51-
//
52-
// defer concurrencyHandler.Release(requestID)
53-
// // Proceed with the operation using the modified context
54-
func (ch *ConcurrencyHandler) AcquireConcurrencyToken(ctx context.Context) (context.Context, uuid.UUID, error) {
31+
// This function should be used before initiating any operation that requires concurrency control.
32+
// The returned context should be passed to subsequent operations to maintain consistency in
33+
// concurrency tracking.
34+
func (ch *ConcurrencyHandler) AcquireConcurrencyPermit(ctx context.Context) (context.Context, uuid.UUID, error) {
5535
log := ch.logger
5636

57-
// Measure the token acquisition start time
37+
// Start measuring the permit acquisition time.
5838
tokenAcquisitionStart := time.Now()
5939

60-
// Generate a unique request ID for this acquisition
40+
// Generate a unique request ID for this permit acquisition.
6141
requestID := uuid.New()
6242

63-
// Create a new context with a timeout for acquiring the concurrency token
43+
// Create a new context with a specified timeout for acquiring the permit.
6444
ctxWithTimeout, cancel := context.WithTimeout(ctx, 10*time.Second)
65-
defer cancel()
45+
defer cancel() // Ensure to free up resources by cancelling the context after use.
6646

67-
// Attempt to acquire a token from the semaphore within the given context timeout
6847
select {
69-
case ch.sem <- struct{}{}: // Successfully acquired a token
70-
// Calculate the duration it took to acquire the token and record it
48+
case ch.sem <- struct{}{}: // permit acquisition was successful.
49+
// Record the time taken to acquire the permit.
7150
tokenAcquisitionDuration := time.Since(tokenAcquisitionStart)
72-
ch.lock.Lock()
73-
ch.AcquisitionTimes = append(ch.AcquisitionTimes, tokenAcquisitionDuration)
74-
ch.Metrics.Lock.Lock()
75-
ch.Metrics.TokenWaitTime += tokenAcquisitionDuration
76-
ch.Metrics.TotalRequests++ // Increment total requests count
77-
ch.Metrics.Lock.Unlock()
78-
ch.lock.Unlock()
79-
80-
// Logging the acquisition
81-
utilizedTokens := len(ch.sem)
82-
availableTokens := cap(ch.sem) - utilizedTokens
83-
log.Debug("Acquired concurrency token", zap.String("RequestID", requestID.String()), zap.Duration("AcquisitionTime", tokenAcquisitionDuration), zap.Int("UtilizedTokens", utilizedTokens), zap.Int("AvailableTokens", availableTokens))
84-
85-
// Add the acquired request ID to the context for use in subsequent operations
86-
ctxWithRequestID := context.WithValue(ctx, RequestIDKey{}, requestID)
51+
ch.trackResourceAcquisition(tokenAcquisitionDuration, requestID) // Track and log metrics.
8752

88-
// Return the updated context, the request ID, and nil error to indicate success
53+
// Create a new context that includes the unique request ID.
54+
ctxWithRequestID := context.WithValue(ctx, RequestIDKey{}, requestID)
8955
return ctxWithRequestID, requestID, nil
9056

91-
case <-ctxWithTimeout.Done(): // Failed to acquire a token within the timeout
92-
log.Error("Failed to acquire concurrency token", zap.Error(ctxWithTimeout.Err()))
57+
case <-ctxWithTimeout.Done(): // Timeout occurred before a permit could be acquired.
58+
log.Error("Failed to acquire concurrency permit", zap.Error(ctxWithTimeout.Err()))
9359
return ctx, requestID, ctxWithTimeout.Err()
9460
}
9561
}
9662

97-
// ReleaseConcurrencyToken returns a token back to the semaphore pool, allowing other
98-
// operations to proceed. It uses the provided requestID for structured logging,
99-
// aiding in tracking and debugging the release of concurrency tokens.
100-
func (ch *ConcurrencyHandler) ReleaseConcurrencyToken(requestID uuid.UUID) {
101-
<-ch.sem // Release a token back to the semaphore
63+
// trackResourceAcquisition logs and updates metrics associated with the acquisition of concurrency tokens.
64+
// This method centralizes the logic for updating metrics and logging acquisition details, promoting code
65+
// reusability and cleaner main logic in the permit acquisition method.
66+
//
67+
// Parameters:
68+
// - duration: The time duration it took to acquire the permit.
69+
// - requestID: The unique identifier for the request associated with this permit.
70+
//
71+
// This method locks the concurrency handler to safely update shared metrics and logs detailed
72+
// information about the permit acquisition for debugging and monitoring purposes.
73+
func (ch *ConcurrencyHandler) trackResourceAcquisition(duration time.Duration, requestID uuid.UUID) {
74+
ch.lock.Lock()
75+
defer ch.lock.Unlock()
76+
77+
// Record the time taken to acquire the permit and update related metrics.
78+
ch.AcquisitionTimes = append(ch.AcquisitionTimes, duration)
79+
ch.Metrics.Lock.Lock()
80+
ch.Metrics.PermitWaitTime += duration
81+
ch.Metrics.TotalRequests++ // Increment the count of total requests handled.
82+
ch.Metrics.Lock.Unlock()
83+
84+
// Calculate and log the current state of permit utilization.
85+
utilizedPermits := len(ch.sem)
86+
availablePermits := cap(ch.sem) - utilizedPermits
87+
ch.logger.Debug("Resource acquired", zap.String("RequestID", requestID.String()), zap.Duration("Duration", duration), zap.Int("UtilizedPermits", utilizedPermits), zap.Int("AvailablePermits", availablePermits))
88+
}
89+
90+
// ReleaseConcurrencyPermit releases a concurrency permit back to the semaphore, making it available for other
91+
// operations. This function is essential for maintaining the health and efficiency of the application's concurrency
92+
// control system by ensuring that resources are properly recycled and available for use by subsequent operations.
93+
//
94+
// Parameters:
95+
// - requestID: The unique identifier for the request associated with the permit being released. This ID is used
96+
// for structured logging to aid in tracking and debugging permit lifecycle events.
97+
//
98+
// Usage:
99+
// This method should be called as soon as a request or operation that required a concurrency permit is completed.
100+
// It ensures that concurrency limits are adhered to and helps prevent issues such as permit leakage or semaphore saturation,
101+
// which could lead to degraded performance or deadlock conditions.
102+
//
103+
// Example:
104+
// defer concurrencyHandler.ReleaseConcurrencyPermit(requestID)
105+
// This usage ensures that the permit is released in a deferred manner at the end of the operation, regardless of
106+
// how the operation exits (normal completion or error path).
107+
func (ch *ConcurrencyHandler) ReleaseConcurrencyPermit(requestID uuid.UUID) {
108+
// Safely remove a permit from the semaphore to make it available for other operations.
109+
select {
110+
case <-ch.sem:
111+
// Continue to process after successfully retrieving a permit from the semaphore.
112+
default:
113+
// Log an error if no permit was available to release, indicating a potential synchronization issue.
114+
ch.logger.Error("Attempted to release a non-existent concurrency permit", zap.String("RequestID", requestID.String()))
115+
return
116+
}
102117

103118
ch.lock.Lock()
104119
defer ch.lock.Unlock()
105120

106-
// Update the list of acquisition times by removing the time related to the released token
107-
// This step is optional and depends on whether you track acquisition times per token or not
121+
// Update metrics related to permit release.
122+
ch.Metrics.Lock.Lock()
123+
ch.Metrics.TotalRequests-- // Decrement the count of total requests handled, if applicable.
124+
ch.Metrics.Lock.Unlock()
108125

109-
utilizedTokens := len(ch.sem) // Tokens currently in use
110-
availableTokens := cap(ch.sem) - utilizedTokens // Tokens available for use
126+
utilizedPermits := len(ch.sem) // Calculate tokens currently in use.
127+
availablePermits := cap(ch.sem) - utilizedPermits // Calculate tokens that are available for use.
111128

112-
// Log the release of the concurrency token for auditing and debugging purposes
113-
ch.logger.Debug("Released concurrency token",
129+
// Log the release of the concurrency permit for auditing and debugging purposes.
130+
ch.logger.Debug("Released concurrency permit",
114131
zap.String("RequestID", requestID.String()),
115-
zap.Int("UtilizedTokens", utilizedTokens),
116-
zap.Int("AvailableTokens", availableTokens),
132+
zap.Int("UtilizedPermits", utilizedPermits),
133+
zap.Int("AvailablePermits", availablePermits),
117134
)
118135
}

httpclient/request.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -128,15 +128,15 @@ func (c *Client) executeRequestWithRetries(method, endpoint string, body, out in
128128
return nil, err
129129
}
130130

131-
// Acquire a concurrency token along with a unique request ID
132-
ctx, requestID, err := c.ConcurrencyHandler.AcquireConcurrencyToken(context.Background())
131+
// Acquire a concurrency permit along with a unique request ID
132+
ctx, requestID, err := c.ConcurrencyHandler.AcquireConcurrencyPermit(context.Background())
133133
if err != nil {
134-
return nil, c.Logger.Error("Failed to acquire concurrency token", zap.Error(err))
134+
return nil, c.Logger.Error("Failed to acquire concurrency permit", zap.Error(err))
135135
}
136136

137-
// Ensure the token is released after the function exits
137+
// Ensure the permit is released after the function exits
138138
defer func() {
139-
c.ConcurrencyHandler.ReleaseConcurrencyToken(requestID)
139+
c.ConcurrencyHandler.ReleaseConcurrencyPermit(requestID)
140140
}()
141141

142142
// Marshal Request with correct encoding defined in api handler
@@ -291,15 +291,15 @@ func (c *Client) executeRequest(method, endpoint string, body, out interface{})
291291
return nil, err
292292
}
293293

294-
// Acquire a concurrency token along with a unique request ID
295-
ctx, requestID, err := c.ConcurrencyHandler.AcquireConcurrencyToken(context.Background())
294+
// Acquire a concurrency permit along with a unique request ID
295+
ctx, requestID, err := c.ConcurrencyHandler.AcquireConcurrencyPermit(context.Background())
296296
if err != nil {
297-
return nil, c.Logger.Error("Failed to acquire concurrency token", zap.Error(err))
297+
return nil, c.Logger.Error("Failed to acquire concurrency permit", zap.Error(err))
298298
}
299299

300-
// Ensure the token is released after the function exits
300+
// Ensure the permit is released after the function exits
301301
defer func() {
302-
c.ConcurrencyHandler.ReleaseConcurrencyToken(requestID)
302+
c.ConcurrencyHandler.ReleaseConcurrencyPermit(requestID)
303303
}()
304304

305305
// Determine which set of encoding and content-type request rules to use

0 commit comments

Comments
 (0)