Skip to content

Conversation

@annahay4
Copy link
Contributor

Motivation and Context

Provides the ability to grant reward tokens for all successful requests.
Adds fractional token management, which is required for cases where a customer wants to grant a fraction of a token for a success.

Description

This PR adds configurable token bucket success rewards and fractional token management to the retry strategy. Key changes include:

  • Added success_reward field to TokenBucket with a default value of 0.0
  • Implemented reward_success() method that accumulates fractional tokens and converts them to full tokens when they reach 1.0 or more
  • Modified the standard retry strategy to call reward_success() on successful requests
  • Added success_reward() builder method to configure the reward amount
  • Implemented fractional token tracking using Arc<Mutex> to safely accumulate partial tokens across requests

Testing

Ran unit tests.
Added unit tests covering:

  • Fractional token accumulation and conversion to full tokens (test with 0.4 reward requiring 3 successes to generate 1 token)
  • Max capacity enforcement when adding reward tokens
  • Builder configuration with custom success reward values

Checklist

  • For changes to the smithy-rs codegen or runtime crates, I have created a changelog entry Markdown file in the .changelog directory, specifying "client," "server," or both in the applies_to key.

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@annahay4 annahay4 requested review from a team as code owners November 13, 2025 19:11
Copy link
Contributor

@aajtodd aajtodd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple small comments/questions.

timeout_retry_cost: 0,
retry_cost: 0,
success_reward: 0.0,
fractional_tokens: Arc::new(Mutex::new(0.0)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can probably avoid a mutex here with something like rust-lang/rust#72353 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great - I'll set this up and run some tests.

// on success release any retry quota held by previous attempts and award success tokens
if !ctx.is_failed() {
// When a request succeeds, we grant an award, if present
token_bucket.reward_success();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will reward every successful attempt even retries, is that what you were going for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is the intention

// When a request succeeds, we grant an award, if present
token_bucket.reward_success();

if let NoPermitWasReleased = self.release_retry_permit() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to worry about add_permits panicking if we exceed MAX_PERMITS? e.g. since we are now rewarding on success (or I believe eventually you want to refill on time as well) then on Drop a released permit puts it's tokens back.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the CI testing revealed that let tokens_to_add = amount.min(self.max_permits - self.semaphore.available_permits()); was causing panics in the case where available > max. I've updated this in a new commit so that in the case where available > max, we just return without adding permits.

In the case where we have a race condition that results in the number of available permits being greater than max after a Drop occurs, it will self-correct when a sufficient number of tokens are acquired. My initial assessment is that this is something that we are at risk of regardless of where the drop and add_permits are in relation to each other, due to multi-threading.

Copy link
Contributor Author

@annahay4 annahay4 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the comments! Will make changes and put out a new commit.

timeout_retry_cost: 0,
retry_cost: 0,
success_reward: 0.0,
fractional_tokens: Arc::new(Mutex::new(0.0)),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great - I'll set this up and run some tests.

// When a request succeeds, we grant an award, if present
token_bucket.reward_success();

if let NoPermitWasReleased = self.release_retry_permit() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the CI testing revealed that let tokens_to_add = amount.min(self.max_permits - self.semaphore.available_permits()); was causing panics in the case where available > max. I've updated this in a new commit so that in the case where available > max, we just return without adding permits.

In the case where we have a race condition that results in the number of available permits being greater than max after a Drop occurs, it will self-correct when a sufficient number of tokens are acquired. My initial assessment is that this is something that we are at risk of regardless of where the drop and add_permits are in relation to each other, due to multi-threading.

Copy link
Contributor

@aajtodd aajtodd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the CI testing revealed that let tokens_to_add = amount.min(self.max_permits - self.semaphore.available_permits()); was causing panics in the case where available > max. I've updated this in a new commit so that in the case where available > max, we just return without adding permits.

In the case where we have a race condition that results in the number of available permits being greater than max after a Drop occurs, it will self-correct when a sufficient number of tokens are acquired. My initial assessment is that this is something that we are at risk of regardless of where the drop and add_permits are in relation to each other, due to multi-threading.

I don't know that we can ship this without accounting for this somehow. You're bound to hit it at some point otherwise.


Just throwing out ideas but if we wrap the permit returned from acquire in our own type and add logic to Drop we might be able to avoid it with some combination of split and forget to avoid exceeding the max permits.

fractional_tokens: AtomicF64,
}

pub struct AtomicF64 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd keep this pub(crate) at most

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack, will update

if !self.fractional_tokens.load().is_finite() {
tracing::error!("Fractional tokens corrupted to: {}", self.fractional_tokens.load());
// If corrupted, reset to the number of permits the bucket was created with
self.fractional_tokens.store(self.max_permits as f64);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why wouldn't you reset to zero?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial state of the bucket is the last known healthy state. Because the corruption would've happened at no fault to the user, we don't want to punish them by not allowing retries. Therefore, returning to the last known healthy state is the safest path.

let full_tokens_accumulated = self.fractional_tokens.load().floor();
if full_tokens_accumulated >= 1.0 {
self.add_tokens(full_tokens_accumulated as usize);
self.fractional_tokens.store(self.fractional_tokens.load() - full_tokens_accumulated);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You call load three different times in this function, just call load once:

let fractional_tokens = self.fractional_tokens.load();
let full_tokens_accumulated = fractional_tokens.floor();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack, will update

}

if self.success_reward > 0.0 {
self.fractional_tokens.store(self.fractional_tokens.load() + self.success_reward);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could make this a single store as well by doing the calculations up front and then you have a single load and single store.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack, will update

fn add_tokens(&self, amount: usize) {
let tokens_to_add = amount.min(self.max_permits - self.semaphore.available_permits());
let available = self.semaphore.available_permits();
if available >= self.max_permits {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aajtodd this addresses the concern you brought up in your comment "I don't know that we can ship this without accounting for this somehow. You're bound to hit it at some point otherwise." - with this check, we ensure we don't do any math that will result in an overflow/underflow error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add_tokens is only called by our internal logic though not when an OwnedSemaphorePermit is dropped (which internally calls add_permits.

If we accumulate some fractional tokens then at some point as permits are returned we could overflow right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make sure I'm answering the right question - are you worried about the overflow on L155, or overflow as in the bucket having more permits than it should?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was worried about add_permits panicking if we exceeded the maximum permits for the semaphore but I can see now I misread it (it panics if you exceed Semaphore::MAX_PERMITS not the amount configured when the bucket was created).

Copy link
Contributor Author

@annahay4 annahay4 Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. In that case, I think we should make a cap on max size - as is, the max usize is greater than max_permits so we will need a cap either way (1.8x10^19(usize max) vs 2.3x10^18 (semaphore max permits)). I'd be shocked if someone needed a token bucket anywhere near that big. Do you have any opinions on what a reasonable max would be? I'd propose 1 million tokens max (10^6).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would you enforce a cap? Isn't max_permits already our cap?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to enforce a max value for max_permits

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right how would you enforce it given how owned permits work on Drop? You can enforce it here in add_tokens but that isn't called when dropping permits. Feel free to ping me offline if you need/want to discuss it in detail.

@annahay4 annahay4 changed the base branch from main to arc-token-bucket November 18, 2025 18:38
…al token management

Adding support for atomic float

Adding support for atomic float
[package]
name = "aws-smithy-mocks"
version = "0.2.1"
version = "0.2.2"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - didn't see this snuck in :(

[package]
name = "aws-smithy-runtime"
version = "1.9.4"
version = "1.9.5"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unchanged?

self.called.fetch_add(1, Ordering::Relaxed);
let token_bucket = cfg.load::<#{TokenBucket}>().unwrap();
let expected = format!("permits: {}", tokio::sync::Semaphore::MAX_PERMITS);
let expected = format!("permits: 500000000");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we expose this as a public constant MAX_TOKENS/MAX_PERMITS so we don't have to have magic numbers in tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes


pub(crate) fn acquire(&self, err: &ErrorKind) -> Option<OwnedSemaphorePermit> {
// We have to handle the case where the number of permits in the semaphore exceeds the intended
// max. This can occur when the bucket is already at max capacity and then an OwnedSemaphorePermit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can occur when the bucket is already at max capacity (success reward is > 0), ...

if !calc_fractional_tokens.is_finite() {
tracing::error!("Fractional tokens corrupted to: {}", calc_fractional_tokens);
// If corrupted, reset to the number of permits the bucket was created with
self.fractional_tokens.store(self.max_permits as f32);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think this is "wrong", we're rewarding success and every whole token is converted and added to our actual tokens/permits. Setting this to max permits is just going to immediately on the next go around fill up the bucket to max permits. I think this should be zero.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you're saying! Thank you for explaining more, I'll change this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants