-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Bail out from barriers when barriers hit 410 Lease Not Found.
#47232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 17 commits
Commits
Show all changes
35 commits
Select commit
Hold shift + click to select a range
1a5cf3f
Enable 410-1022 on Head requests to bail out.
jeet1995 f571a5e
Bail fast on barrier on reads.
jeet1995 cf65b20
Enhance tests to ensure primary is contacted and barrier request counβ¦
jeet1995 617c961
Enhance tests to ensure barrier post the QuorumSelected phase is invoβ¦
jeet1995 8899ea0
Merge branch 'main' of github.com:jeet1995/azure-sdk-for-java into faβ¦
jeet1995 f0d3331
Code comments
jeet1995 e3b0db0
Adding a way to run tests against a multi-region Strong account.
jeet1995 5d87a6f
Adding a way to run tests against a multi-region Strong account.
jeet1995 d46d999
Validate sub-status code too.
jeet1995 77c9c5a
Code cleanup.
jeet1995 6e98e04
Add CHANGELOG.md entry.
jeet1995 2ef0be6
Modify barrier hit criteria.
jeet1995 8206806
Merge branch 'main' of github.com:jeet1995/azure-sdk-for-java into faβ¦
jeet1995 42ef429
Modify barrier hit criteria.
jeet1995 ad13540
Add tests for barrier bail out in Bounded Staleness consistency.
jeet1995 41cec42
Add tests for barrier bail out in Bounded Staleness consistency.
jeet1995 e4f93b0
Refactoring
jeet1995 23a8dee
Addressing code comments.
jeet1995 cff675e
Verify write barrier criteria.
jeet1995 0fc232b
Verify barrier bail out criteria.
jeet1995 150f2d0
Verify barrier bail out criteria.
jeet1995 d3ee96e
Merge branch 'main' of https://github.com/jeet1995/azure-sdk-for-javaβ¦
jeet1995 84deb31
Managing merge.
jeet1995 a49950b
Fix compilation errors.
jeet1995 822ee0f
Fix tests.
jeet1995 bca781b
Fix tests.
jeet1995 510e9c4
Addressing comments.
jeet1995 a7683f2
Merge branch 'main' of https://github.com/jeet1995/azure-sdk-for-javaβ¦
jeet1995 6fce95b
Addressing comments.
jeet1995 73be72c
Addressing review comments.
jeet1995 2729916
Refactoring.
jeet1995 73cfe16
Addressing review comments.
jeet1995 abe4a76
Addressing review comments.
jeet1995 d2a625c
Addressing review comments.
jeet1995 3bcacbd
Merge branch 'main' of https://github.com/jeet1995/azure-sdk-for-javaβ¦
jeet1995 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
596 changes: 596 additions & 0 deletions
596
...smos/azure-cosmos-tests/src/test/java/com/azure/cosmos/ExitFromConsistencyLayerTests.java
Large diffs are not rendered by default.
Oops, something went wrong.
72 changes: 72 additions & 0 deletions
72
...ests/src/test/java/com/azure/cosmos/RntbdTransportClientWithStoreResponseInterceptor.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| // Copyright (c) Microsoft Corporation. All rights reserved. | ||
| // Licensed under the MIT License. | ||
|
|
||
| package com.azure.cosmos; | ||
|
|
||
| import com.azure.cosmos.implementation.GlobalEndpointManager; | ||
| import com.azure.cosmos.implementation.RxDocumentServiceRequest; | ||
| import com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient; | ||
| import com.azure.cosmos.implementation.directconnectivity.StoreResponse; | ||
| import com.azure.cosmos.implementation.directconnectivity.Uri; | ||
| import com.azure.cosmos.implementation.directconnectivity.rntbd.ProactiveOpenConnectionsProcessor; | ||
| import com.azure.cosmos.implementation.faultinjection.IFaultInjectorProvider; | ||
| import com.azure.cosmos.models.CosmosContainerIdentity; | ||
| import reactor.core.publisher.Mono; | ||
|
|
||
| import java.util.List; | ||
| import java.util.function.BiFunction; | ||
|
|
||
| public class RntbdTransportClientWithStoreResponseInterceptor extends RntbdTransportClient { | ||
| private final RntbdTransportClient underlying; | ||
| private BiFunction<RxDocumentServiceRequest, StoreResponse, StoreResponse> responseInterceptor; | ||
|
|
||
| public RntbdTransportClientWithStoreResponseInterceptor(RntbdTransportClient underlying) { | ||
| super(underlying); | ||
| this.underlying = underlying; | ||
| } | ||
|
|
||
| public void setResponseInterceptor(BiFunction<RxDocumentServiceRequest, StoreResponse, StoreResponse> responseInterceptor) { | ||
| this.responseInterceptor = responseInterceptor; | ||
| } | ||
|
|
||
| @Override | ||
| public Mono<StoreResponse> invokeStoreAsync(Uri physicalAddress, RxDocumentServiceRequest request) { | ||
| return this.underlying.invokeStoreAsync(physicalAddress, request) | ||
| .map(response -> { | ||
| if (responseInterceptor != null) { | ||
| return responseInterceptor.apply(request, response); | ||
| } | ||
| return response; | ||
| }); | ||
| } | ||
|
|
||
| @Override | ||
| public void configureFaultInjectorProvider(IFaultInjectorProvider injectorProvider) { | ||
| this.underlying.configureFaultInjectorProvider(injectorProvider); | ||
| } | ||
|
|
||
| @Override | ||
| public GlobalEndpointManager getGlobalEndpointManager() { | ||
| return this.underlying.getGlobalEndpointManager(); | ||
| } | ||
|
|
||
| @Override | ||
| public ProactiveOpenConnectionsProcessor getProactiveOpenConnectionsProcessor() { | ||
| return this.underlying.getProactiveOpenConnectionsProcessor(); | ||
| } | ||
|
|
||
| @Override | ||
| public void recordOpenConnectionsAndInitCachesCompleted(List<CosmosContainerIdentity> cosmosContainerIdentities) { | ||
| this.underlying.recordOpenConnectionsAndInitCachesCompleted(cosmosContainerIdentities); | ||
| } | ||
|
|
||
| @Override | ||
| public void recordOpenConnectionsAndInitCachesStarted(List<CosmosContainerIdentity> cosmosContainerIdentities) { | ||
| this.underlying.recordOpenConnectionsAndInitCachesStarted(cosmosContainerIdentities); | ||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| this.underlying.close(); | ||
| } | ||
| } |
149 changes: 149 additions & 0 deletions
149
...smos/azure-cosmos-tests/src/test/java/com/azure/cosmos/StoreResponseInterceptorUtils.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,149 @@ | ||
| // Copyright (c) Microsoft Corporation. All rights reserved. | ||
| // Licensed under the MIT License. | ||
|
|
||
| package com.azure.cosmos; | ||
|
|
||
| import com.azure.cosmos.implementation.OperationType; | ||
| import com.azure.cosmos.implementation.RxDocumentServiceRequest; | ||
| import com.azure.cosmos.implementation.Utils; | ||
| import com.azure.cosmos.implementation.directconnectivity.StoreResponse; | ||
| import com.azure.cosmos.implementation.directconnectivity.WFConstants; | ||
|
|
||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.function.BiFunction; | ||
|
|
||
| public class StoreResponseInterceptorUtils { | ||
|
|
||
| public static BiFunction<RxDocumentServiceRequest, StoreResponse, StoreResponse> forceBarrierFollowedByBarrierFailure( | ||
| ConsistencyLevel operationConsistencyLevel, | ||
| String regionName, | ||
| int maxAllowedFailureCount, | ||
| AtomicInteger failureCount, | ||
| int statusCode, | ||
| int subStatusCode) { | ||
|
|
||
| return (request, storeResponse) -> { | ||
|
|
||
| if (OperationType.Create.equals(request.getOperationType()) && regionName.equals(request.requestContext.regionalRoutingContextToRoute.getGatewayRegionalEndpoint().toString())) { | ||
|
|
||
| long localLsn = Long.parseLong(storeResponse.getHeaderValue(WFConstants.BackendHeaders.LOCAL_LSN)); | ||
| long manipulatedGCLSN = localLsn - 1; | ||
|
|
||
| storeResponse.setHeaderValue(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, String.valueOf(manipulatedGCLSN)); | ||
|
|
||
| return storeResponse; | ||
| } | ||
|
|
||
| if (OperationType.Read.equals(request.getOperationType()) && regionName.equals(request.requestContext.regionalRoutingContextToRoute.getGatewayRegionalEndpoint().toString())) { | ||
|
|
||
| if (ConsistencyLevel.STRONG.equals(operationConsistencyLevel)) { | ||
|
|
||
| long globalLsn = Long.parseLong(storeResponse.getHeaderValue(WFConstants.BackendHeaders.LSN)); | ||
| long itemLsn = Long.parseLong(storeResponse.getHeaderValue(WFConstants.BackendHeaders.ITEM_LSN)); | ||
| long manipulatedGlobalCommittedLSN = Math.min(globalLsn, itemLsn) - 1; | ||
|
|
||
| storeResponse.setHeaderValue(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, String.valueOf(manipulatedGlobalCommittedLSN)); | ||
|
|
||
| return storeResponse; | ||
| } else if (ConsistencyLevel.BOUNDED_STALENESS.equals(operationConsistencyLevel)) { | ||
|
|
||
| long manipulatedItemLSN = -1; | ||
| long manipulatedGlobalLSN = 0; | ||
|
|
||
| storeResponse.setHeaderValue(WFConstants.BackendHeaders.LSN, String.valueOf(manipulatedGlobalLSN)); | ||
| storeResponse.setHeaderValue(WFConstants.BackendHeaders.LOCAL_LSN, String.valueOf(manipulatedGlobalLSN)); | ||
| storeResponse.setHeaderValue(WFConstants.BackendHeaders.ITEM_LSN, String.valueOf(manipulatedItemLSN)); | ||
| storeResponse.setHeaderValue(WFConstants.BackendHeaders.ITEM_LOCAL_LSN, String.valueOf(manipulatedItemLSN)); | ||
|
|
||
| return storeResponse; | ||
| } | ||
|
|
||
| return storeResponse; | ||
| } | ||
|
|
||
| if (OperationType.Head.equals(request.getOperationType()) && regionName.equals(request.requestContext.regionalRoutingContextToRoute.getGatewayRegionalEndpoint().toString())) { | ||
| if (failureCount.incrementAndGet() <= maxAllowedFailureCount) { | ||
| throw Utils.createCosmosException(statusCode, subStatusCode, new Exception("An intercepted exception occurred. Check status and substatus code for details."), null); | ||
| } | ||
| } | ||
|
|
||
| return storeResponse; | ||
| }; | ||
| } | ||
|
|
||
| public static BiFunction<RxDocumentServiceRequest, StoreResponse, StoreResponse> forceSuccessfulBarriersOnReadUntilQuorumSelectionThenForceBarrierFailures( | ||
| ConsistencyLevel operationConsistencyLevel, | ||
| String regionName, | ||
| int allowedSuccessfulHeadRequestsWithoutBarrierBeingMet, | ||
| AtomicInteger successfulHeadRequestCount, | ||
| int maxAllowedFailureCount, | ||
| AtomicInteger failureCount, | ||
| int statusCode, | ||
| int subStatusCode) { | ||
jeet1995 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return (request, storeResponse) -> { | ||
|
|
||
| if (OperationType.Read.equals(request.getOperationType()) && regionName.equals(request.requestContext.regionalRoutingContextToRoute.getGatewayRegionalEndpoint().toString())) { | ||
|
|
||
| if (ConsistencyLevel.STRONG.equals(operationConsistencyLevel)) { | ||
|
|
||
| long globalLsn = Long.parseLong(storeResponse.getHeaderValue(WFConstants.BackendHeaders.LSN)); | ||
| long itemLsn = Long.parseLong(storeResponse.getHeaderValue(WFConstants.BackendHeaders.ITEM_LSN)); | ||
| long manipulatedGlobalCommittedLSN = Math.min(globalLsn, itemLsn) - 1; | ||
|
|
||
| storeResponse.setHeaderValue(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, String.valueOf(manipulatedGlobalCommittedLSN)); | ||
|
|
||
| return storeResponse; | ||
| } else if (ConsistencyLevel.BOUNDED_STALENESS.equals(operationConsistencyLevel)) { | ||
|
|
||
| long manipulatedItemLSN = -1; | ||
| long manipulatedGlobalLSN = 0; | ||
|
|
||
| storeResponse.setHeaderValue(WFConstants.BackendHeaders.LSN, String.valueOf(manipulatedGlobalLSN)); | ||
| storeResponse.setHeaderValue(WFConstants.BackendHeaders.LOCAL_LSN, String.valueOf(manipulatedGlobalLSN)); | ||
| storeResponse.setHeaderValue(WFConstants.BackendHeaders.ITEM_LSN, String.valueOf(manipulatedItemLSN)); | ||
| storeResponse.setHeaderValue(WFConstants.BackendHeaders.ITEM_LOCAL_LSN, String.valueOf(manipulatedItemLSN)); | ||
|
|
||
| return storeResponse; | ||
| } | ||
|
|
||
| return storeResponse; | ||
| } | ||
|
|
||
| if (OperationType.Head.equals(request.getOperationType()) && regionName.equals(request.requestContext.regionalRoutingContextToRoute.getGatewayRegionalEndpoint().toString())) { | ||
|
|
||
| if (successfulHeadRequestCount.incrementAndGet() <= allowedSuccessfulHeadRequestsWithoutBarrierBeingMet) { | ||
|
|
||
| if (ConsistencyLevel.STRONG.equals(operationConsistencyLevel)) { | ||
|
|
||
| long localLsn = Long.parseLong(storeResponse.getHeaderValue(WFConstants.BackendHeaders.LOCAL_LSN)); | ||
| long itemLsn = Long.parseLong(storeResponse.getHeaderValue(WFConstants.BackendHeaders.ITEM_LSN)); | ||
| long manipulatedGCLSN = Math.min(localLsn, itemLsn) - 1; | ||
|
|
||
| storeResponse.setHeaderValue(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, String.valueOf(manipulatedGCLSN)); | ||
|
|
||
| return storeResponse; | ||
| } else if (ConsistencyLevel.BOUNDED_STALENESS.equals(operationConsistencyLevel)) { | ||
|
|
||
| long manipulatedItemLSN = -1; | ||
| long manipulatedGlobalLSN = -1; | ||
|
|
||
| storeResponse.setHeaderValue(WFConstants.BackendHeaders.LSN, String.valueOf(manipulatedGlobalLSN)); | ||
| storeResponse.setHeaderValue(WFConstants.BackendHeaders.LOCAL_LSN, String.valueOf(manipulatedGlobalLSN)); | ||
| storeResponse.setHeaderValue(WFConstants.BackendHeaders.ITEM_LSN, String.valueOf(manipulatedItemLSN)); | ||
| storeResponse.setHeaderValue(WFConstants.BackendHeaders.ITEM_LOCAL_LSN, String.valueOf(manipulatedItemLSN)); | ||
|
|
||
| return storeResponse; | ||
| } | ||
|
|
||
| return storeResponse; | ||
| } | ||
|
|
||
| if (failureCount.incrementAndGet() <= maxAllowedFailureCount) { | ||
| throw Utils.createCosmosException(statusCode, subStatusCode, new Exception("An intercepted exception occurred. Check status and substatus code for details."), null); | ||
| } | ||
| } | ||
|
|
||
| return storeResponse; | ||
| }; | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
38 changes: 38 additions & 0 deletions
38
sdk/cosmos/azure-cosmos-tests/src/test/resources/multi-region-strong.xml
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,38 @@ | ||
| <!-- | ||
| ~ The MIT License (MIT) | ||
| ~ Copyright (c) 2018 Microsoft Corporation | ||
| ~ | ||
| ~ Permission is hereby granted, free of charge, to any person obtaining a copy | ||
| ~ of this software and associated documentation files (the "Software"), to deal | ||
| ~ in the Software without restriction, including without limitation the rights | ||
| ~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
| ~ copies of the Software, and to permit persons to whom the Software is | ||
| ~ furnished to do so, subject to the following conditions: | ||
| ~ | ||
| ~ The above copyright notice and this permission notice shall be included in all | ||
| ~ copies or substantial portions of the Software. | ||
| ~ | ||
| ~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
| ~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
| ~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
| ~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
| ~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
| ~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
| ~ SOFTWARE. | ||
| --> | ||
| <!DOCTYPE suite SYSTEM "https://testng.org/testng-1.0.dtd"> | ||
| <suite name="multi-region-strong"> | ||
| <listeners> | ||
| <listener class-name="com.azure.cosmos.CosmosNettyLeakDetectorFactory"/> | ||
| </listeners> | ||
| <test name="multi-region-strong" group-by-instances="true"> | ||
| <groups> | ||
| <run> | ||
| <include name="multi-region-strong"/> | ||
| </run> | ||
| </groups> | ||
| <packages> | ||
| <package name="com.azure.cosmos.*"/> | ||
| </packages> | ||
| </test> | ||
| </suite> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.