Skip to content

Commit 42b5dc6

Browse files
authored
RateLimiter.Period is now a time.Duration (#124)
1 parent 5714e17 commit 42b5dc6

File tree

3 files changed

+7
-9
lines changed

3 files changed

+7
-9
lines changed

dbos/queue.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ const (
2222
// RateLimiter configures rate limiting for workflow queue execution.
2323
// Rate limits prevent overwhelming external services and provide backpressure.
2424
type RateLimiter struct {
25-
Limit int // Maximum number of workflows to start within the period
26-
Period float64 // Time period in seconds for the rate limit
25+
Limit int // Maximum number of workflows to start within the period
26+
Period time.Duration // Time period for the rate limit
2727
}
2828

2929
// WorkflowQueue defines a named queue for workflow execution.
@@ -90,7 +90,7 @@ func WithMaxTasksPerIteration(maxTasks int) QueueOption {
9090
// dbos.WithWorkerConcurrency(5),
9191
// dbos.WithRateLimiter(&dbos.RateLimiter{
9292
// Limit: 100,
93-
// Period: 60.0, // 100 workflows per minute
93+
// Period: 60 * time.Second, // 100 workflows per minute
9494
// }),
9595
// dbos.WithPriorityEnabled(),
9696
// )

dbos/queues_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -833,7 +833,7 @@ func rateLimiterTestWorkflow(ctx DBOSContext, _ string) (time.Time, error) {
833833
func TestQueueRateLimiter(t *testing.T) {
834834
dbosCtx := setupDBOS(t, true, true)
835835

836-
rateLimiterQueue := NewWorkflowQueue(dbosCtx, "test-rate-limiter-queue", WithRateLimiter(&RateLimiter{Limit: 5, Period: 1.8}))
836+
rateLimiterQueue := NewWorkflowQueue(dbosCtx, "test-rate-limiter-queue", WithRateLimiter(&RateLimiter{Limit: 5, Period: time.Duration(1800 * time.Millisecond)}))
837837

838838
// Create workflow with dbosContext
839839
RegisterWorkflow(dbosCtx, rateLimiterTestWorkflow)
@@ -842,7 +842,7 @@ func TestQueueRateLimiter(t *testing.T) {
842842
require.NoError(t, err, "failed to launch DBOS instance")
843843

844844
limit := 5
845-
period := 1.8
845+
periodSeconds := 1.8
846846
numWaves := 3
847847

848848
var handles []WorkflowHandle[time.Time]
@@ -889,7 +889,7 @@ func TestQueueRateLimiter(t *testing.T) {
889889
// Group workflows into waves based on their start time
890890
for _, workflowTime := range sortedTimes {
891891
timeSinceBase := workflowTime.Sub(baseTime).Seconds()
892-
waveIndex := int(timeSinceBase / period)
892+
waveIndex := int(timeSinceBase / periodSeconds)
893893
waveMap[waveIndex] = append(waveMap[waveIndex], workflowTime)
894894
}
895895
// Verify each wave has fewer than the limit

dbos/system_database.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2093,10 +2093,8 @@ func (s *sysDB) dequeueWorkflows(ctx context.Context, input dequeueWorkflowsInpu
20932093
// First check the rate limiter
20942094
var numRecentQueries int
20952095
if input.queue.RateLimit != nil {
2096-
limiterPeriod := time.Duration(input.queue.RateLimit.Period * float64(time.Second))
2097-
20982096
// Calculate the cutoff time: current time minus limiter period
2099-
cutoffTimeMs := time.Now().Add(-limiterPeriod).UnixMilli()
2097+
cutoffTimeMs := time.Now().Add(-input.queue.RateLimit.Period).UnixMilli()
21002098

21012099
// Count workflows that have started in the limiter period
21022100
limiterQuery := `

0 commit comments

Comments
 (0)