-
Notifications
You must be signed in to change notification settings - Fork 223
Add support for success reward and fractional token mgmt #4394
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: arc-token-bucket
Are you sure you want to change the base?
Conversation
aajtodd
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple small comments/questions.
| timeout_retry_cost: 0, | ||
| retry_cost: 0, | ||
| success_reward: 0.0, | ||
| fractional_tokens: Arc::new(Mutex::new(0.0)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can probably avoid a mutex here with something like rust-lang/rust#72353 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great - I'll set this up and run some tests.
| // on success release any retry quota held by previous attempts and award success tokens | ||
| if !ctx.is_failed() { | ||
| // When a request succeeds, we grant an award, if present | ||
| token_bucket.reward_success(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will reward every successful attempt even retries, is that what you were going for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that is the intention
| // When a request succeeds, we grant an award, if present | ||
| token_bucket.reward_success(); | ||
|
|
||
| if let NoPermitWasReleased = self.release_retry_permit() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to worry about add_permits panicking if we exceed MAX_PERMITS? e.g. since we are now rewarding on success (or I believe eventually you want to refill on time as well) then on Drop a released permit puts it's tokens back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the CI testing revealed that let tokens_to_add = amount.min(self.max_permits - self.semaphore.available_permits()); was causing panics in the case where available > max. I've updated this in a new commit so that in the case where available > max, we just return without adding permits.
In the case where we have a race condition that results in the number of available permits being greater than max after a Drop occurs, it will self-correct when a sufficient number of tokens are acquired. My initial assessment is that this is something that we are at risk of regardless of where the drop and add_permits are in relation to each other, due to multi-threading.
annahay4
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the comments! Will make changes and put out a new commit.
| timeout_retry_cost: 0, | ||
| retry_cost: 0, | ||
| success_reward: 0.0, | ||
| fractional_tokens: Arc::new(Mutex::new(0.0)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great - I'll set this up and run some tests.
| // When a request succeeds, we grant an award, if present | ||
| token_bucket.reward_success(); | ||
|
|
||
| if let NoPermitWasReleased = self.release_retry_permit() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the CI testing revealed that let tokens_to_add = amount.min(self.max_permits - self.semaphore.available_permits()); was causing panics in the case where available > max. I've updated this in a new commit so that in the case where available > max, we just return without adding permits.
In the case where we have a race condition that results in the number of available permits being greater than max after a Drop occurs, it will self-correct when a sufficient number of tokens are acquired. My initial assessment is that this is something that we are at risk of regardless of where the drop and add_permits are in relation to each other, due to multi-threading.
aajtodd
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the CI testing revealed that let tokens_to_add = amount.min(self.max_permits - self.semaphore.available_permits()); was causing panics in the case where available > max. I've updated this in a new commit so that in the case where available > max, we just return without adding permits.
In the case where we have a race condition that results in the number of available permits being greater than max after a Drop occurs, it will self-correct when a sufficient number of tokens are acquired. My initial assessment is that this is something that we are at risk of regardless of where the drop and add_permits are in relation to each other, due to multi-threading.
I don't know that we can ship this without accounting for this somehow. You're bound to hit it at some point otherwise.
Just throwing out ideas but if we wrap the permit returned from acquire in our own type and add logic to Drop we might be able to avoid it with some combination of split and forget to avoid exceeding the max permits.
| fractional_tokens: AtomicF64, | ||
| } | ||
|
|
||
| pub struct AtomicF64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd keep this pub(crate) at most
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack, will update
| if !self.fractional_tokens.load().is_finite() { | ||
| tracing::error!("Fractional tokens corrupted to: {}", self.fractional_tokens.load()); | ||
| // If corrupted, reset to the number of permits the bucket was created with | ||
| self.fractional_tokens.store(self.max_permits as f64); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why wouldn't you reset to zero?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The initial state of the bucket is the last known healthy state. Because the corruption would've happened at no fault to the user, we don't want to punish them by not allowing retries. Therefore, returning to the last known healthy state is the safest path.
| let full_tokens_accumulated = self.fractional_tokens.load().floor(); | ||
| if full_tokens_accumulated >= 1.0 { | ||
| self.add_tokens(full_tokens_accumulated as usize); | ||
| self.fractional_tokens.store(self.fractional_tokens.load() - full_tokens_accumulated); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You call load three different times in this function, just call load once:
let fractional_tokens = self.fractional_tokens.load();
let full_tokens_accumulated = fractional_tokens.floor();There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack, will update
| } | ||
|
|
||
| if self.success_reward > 0.0 { | ||
| self.fractional_tokens.store(self.fractional_tokens.load() + self.success_reward); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could make this a single store as well by doing the calculations up front and then you have a single load and single store.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack, will update
| fn add_tokens(&self, amount: usize) { | ||
| let tokens_to_add = amount.min(self.max_permits - self.semaphore.available_permits()); | ||
| let available = self.semaphore.available_permits(); | ||
| if available >= self.max_permits { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aajtodd this addresses the concern you brought up in your comment "I don't know that we can ship this without accounting for this somehow. You're bound to hit it at some point otherwise." - with this check, we ensure we don't do any math that will result in an overflow/underflow error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add_tokens is only called by our internal logic though not when an OwnedSemaphorePermit is dropped (which internally calls add_permits.
If we accumulate some fractional tokens then at some point as permits are returned we could overflow right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make sure I'm answering the right question - are you worried about the overflow on L155, or overflow as in the bucket having more permits than it should?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was worried about add_permits panicking if we exceeded the maximum permits for the semaphore but I can see now I misread it (it panics if you exceed Semaphore::MAX_PERMITS not the amount configured when the bucket was created).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. In that case, I think we should make a cap on max size - as is, the max usize is greater than max_permits so we will need a cap either way (1.8x10^19(usize max) vs 2.3x10^18 (semaphore max permits)). I'd be shocked if someone needed a token bucket anywhere near that big. Do you have any opinions on what a reasonable max would be? I'd propose 1 million tokens max (10^6).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would you enforce a cap? Isn't max_permits already our cap?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to enforce a max value for max_permits
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right how would you enforce it given how owned permits work on Drop? You can enforce it here in add_tokens but that isn't called when dropping permits. Feel free to ping me offline if you need/want to discuss it in detail.
…al token management Adding support for atomic float Adding support for atomic float
| [package] | ||
| name = "aws-smithy-mocks" | ||
| version = "0.2.1" | ||
| version = "0.2.2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes - didn't see this snuck in :(
| [package] | ||
| name = "aws-smithy-runtime" | ||
| version = "1.9.4" | ||
| version = "1.9.5" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unchanged?
| self.called.fetch_add(1, Ordering::Relaxed); | ||
| let token_bucket = cfg.load::<#{TokenBucket}>().unwrap(); | ||
| let expected = format!("permits: {}", tokio::sync::Semaphore::MAX_PERMITS); | ||
| let expected = format!("permits: 500000000"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we expose this as a public constant MAX_TOKENS/MAX_PERMITS so we don't have to have magic numbers in tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
|
|
||
| pub(crate) fn acquire(&self, err: &ErrorKind) -> Option<OwnedSemaphorePermit> { | ||
| // We have to handle the case where the number of permits in the semaphore exceeds the intended | ||
| // max. This can occur when the bucket is already at max capacity and then an OwnedSemaphorePermit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can occur when the bucket is already at max capacity (success reward is > 0), ...
| if !calc_fractional_tokens.is_finite() { | ||
| tracing::error!("Fractional tokens corrupted to: {}", calc_fractional_tokens); | ||
| // If corrupted, reset to the number of permits the bucket was created with | ||
| self.fractional_tokens.store(self.max_permits as f32); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think this is "wrong", we're rewarding success and every whole token is converted and added to our actual tokens/permits. Setting this to max permits is just going to immediately on the next go around fill up the bucket to max permits. I think this should be zero.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see what you're saying! Thank you for explaining more, I'll change this
…ity to accommodate multiple architectures
Motivation and Context
Provides the ability to grant reward tokens for all successful requests.
Adds fractional token management, which is required for cases where a customer wants to grant a fraction of a token for a success.
Description
This PR adds configurable token bucket success rewards and fractional token management to the retry strategy. Key changes include:
Testing
Ran unit tests.
Added unit tests covering:
Checklist
.changelogdirectory, specifying "client," "server," or both in theapplies_tokey.By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.