Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
1a5cf3f
Enable 410-1022 on Head requests to bail out.
jeet1995 Nov 7, 2025
f571a5e
Bail fast on barrier on reads.
jeet1995 Nov 10, 2025
cf65b20
Enhance tests to ensure primary is contacted and barrier request coun…
jeet1995 Nov 11, 2025
617c961
Enhance tests to ensure barrier post the QuorumSelected phase is invo…
jeet1995 Nov 12, 2025
8899ea0
Merge branch 'main' of github.com:jeet1995/azure-sdk-for-java into fa…
jeet1995 Nov 12, 2025
f0d3331
Code comments
jeet1995 Nov 12, 2025
e3b0db0
Adding a way to run tests against a multi-region Strong account.
jeet1995 Nov 14, 2025
5d87a6f
Adding a way to run tests against a multi-region Strong account.
jeet1995 Nov 14, 2025
d46d999
Validate sub-status code too.
jeet1995 Nov 14, 2025
77c9c5a
Code cleanup.
jeet1995 Nov 14, 2025
6e98e04
Add CHANGELOG.md entry.
jeet1995 Nov 17, 2025
2ef0be6
Modify barrier hit criteria.
jeet1995 Nov 17, 2025
8206806
Merge branch 'main' of github.com:jeet1995/azure-sdk-for-java into fa…
jeet1995 Nov 17, 2025
42ef429
Modify barrier hit criteria.
jeet1995 Nov 17, 2025
ad13540
Add tests for barrier bail out in Bounded Staleness consistency.
jeet1995 Nov 18, 2025
41cec42
Add tests for barrier bail out in Bounded Staleness consistency.
jeet1995 Nov 18, 2025
e4f93b0
Refactoring
jeet1995 Nov 18, 2025
23a8dee
Addressing code comments.
jeet1995 Nov 18, 2025
cff675e
Verify write barrier criteria.
jeet1995 Nov 18, 2025
0fc232b
Verify barrier bail out criteria.
jeet1995 Nov 19, 2025
150f2d0
Verify barrier bail out criteria.
jeet1995 Nov 19, 2025
d3ee96e
Merge branch 'main' of https://github.com/jeet1995/azure-sdk-for-java…
jeet1995 Nov 19, 2025
84deb31
Managing merge.
jeet1995 Nov 19, 2025
a49950b
Fix compilation errors.
jeet1995 Nov 19, 2025
822ee0f
Fix tests.
jeet1995 Nov 19, 2025
bca781b
Fix tests.
jeet1995 Nov 19, 2025
510e9c4
Addressing comments.
jeet1995 Nov 20, 2025
a7683f2
Merge branch 'main' of https://github.com/jeet1995/azure-sdk-for-java…
jeet1995 Nov 20, 2025
6fce95b
Addressing comments.
jeet1995 Nov 20, 2025
73be72c
Addressing review comments.
jeet1995 Nov 22, 2025
2729916
Refactoring.
jeet1995 Nov 24, 2025
73cfe16
Addressing review comments.
jeet1995 Nov 24, 2025
abe4a76
Addressing review comments.
jeet1995 Nov 24, 2025
d2a625c
Addressing review comments.
jeet1995 Nov 24, 2025
3bcacbd
Merge branch 'main' of https://github.com/jeet1995/azure-sdk-for-java…
jeet1995 Nov 25, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions sdk/cosmos/azure-cosmos-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -789,5 +789,26 @@ Licensed under the MIT License.
</plugins>
</build>
</profile>
<profile>
<!-- tests which target a multi-region strong Cosmos DB account -->
<id>multi-region-strong</id>
<properties>
<test.groups>multi-region-strong</test.groups>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>3.5.3</version> <!-- {x-version-update;org.apache.maven.plugins:maven-failsafe-plugin;external_dependency} -->
<configuration>
<suiteXmlFiles>
<suiteXmlFile>src/test/resources/multi-region-strong.xml</suiteXmlFile>
</suiteXmlFiles>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos;

import com.azure.cosmos.implementation.GlobalEndpointManager;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import com.azure.cosmos.implementation.directconnectivity.Uri;
import com.azure.cosmos.implementation.directconnectivity.rntbd.ProactiveOpenConnectionsProcessor;
import com.azure.cosmos.implementation.faultinjection.IFaultInjectorProvider;
import com.azure.cosmos.models.CosmosContainerIdentity;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.function.BiFunction;

public class RntbdTransportClientWithStoreResponseInterceptor extends RntbdTransportClient {
private final RntbdTransportClient underlying;
private BiFunction<RxDocumentServiceRequest, StoreResponse, StoreResponse> responseInterceptor;

public RntbdTransportClientWithStoreResponseInterceptor(RntbdTransportClient underlying) {
super(underlying);
this.underlying = underlying;
}

public void setResponseInterceptor(BiFunction<RxDocumentServiceRequest, StoreResponse, StoreResponse> responseInterceptor) {
this.responseInterceptor = responseInterceptor;
}

@Override
public Mono<StoreResponse> invokeStoreAsync(Uri physicalAddress, RxDocumentServiceRequest request) {
return this.underlying.invokeStoreAsync(physicalAddress, request)
.map(response -> {
if (responseInterceptor != null) {
return responseInterceptor.apply(request, response);
}
return response;
});
}

@Override
public void configureFaultInjectorProvider(IFaultInjectorProvider injectorProvider) {
this.underlying.configureFaultInjectorProvider(injectorProvider);
}

@Override
public GlobalEndpointManager getGlobalEndpointManager() {
return this.underlying.getGlobalEndpointManager();
}

@Override
public ProactiveOpenConnectionsProcessor getProactiveOpenConnectionsProcessor() {
return this.underlying.getProactiveOpenConnectionsProcessor();
}

@Override
public void recordOpenConnectionsAndInitCachesCompleted(List<CosmosContainerIdentity> cosmosContainerIdentities) {
this.underlying.recordOpenConnectionsAndInitCachesCompleted(cosmosContainerIdentities);
}

@Override
public void recordOpenConnectionsAndInitCachesStarted(List<CosmosContainerIdentity> cosmosContainerIdentities) {
this.underlying.recordOpenConnectionsAndInitCachesStarted(cosmosContainerIdentities);
}

@Override
public void close() {
this.underlying.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos;

import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import com.azure.cosmos.implementation.directconnectivity.WFConstants;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;

public class StoreResponseInterceptorUtils {

public static BiFunction<RxDocumentServiceRequest, StoreResponse, StoreResponse> forceBarrierFollowedByBarrierFailure(
ConsistencyLevel operationConsistencyLevel,
String regionName,
int maxAllowedFailureCount,
AtomicInteger failureCount,
int statusCode,
int subStatusCode) {

return (request, storeResponse) -> {

if (OperationType.Create.equals(request.getOperationType()) && regionName.equals(request.requestContext.regionalRoutingContextToRoute.getGatewayRegionalEndpoint().toString())) {

long localLsn = Long.parseLong(storeResponse.getHeaderValue(WFConstants.BackendHeaders.LOCAL_LSN));
long manipulatedGCLSN = localLsn - 1;

storeResponse.setHeaderValue(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, String.valueOf(manipulatedGCLSN));

return storeResponse;
}

if (OperationType.Read.equals(request.getOperationType()) && regionName.equals(request.requestContext.regionalRoutingContextToRoute.getGatewayRegionalEndpoint().toString())) {

if (ConsistencyLevel.STRONG.equals(operationConsistencyLevel)) {

long globalLsn = Long.parseLong(storeResponse.getHeaderValue(WFConstants.BackendHeaders.LSN));
long itemLsn = Long.parseLong(storeResponse.getHeaderValue(WFConstants.BackendHeaders.ITEM_LSN));
long manipulatedGlobalCommittedLSN = Math.min(globalLsn, itemLsn) - 1;

storeResponse.setHeaderValue(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, String.valueOf(manipulatedGlobalCommittedLSN));

return storeResponse;
} else if (ConsistencyLevel.BOUNDED_STALENESS.equals(operationConsistencyLevel)) {

long manipulatedItemLSN = -1;
long manipulatedGlobalLSN = 0;

storeResponse.setHeaderValue(WFConstants.BackendHeaders.LSN, String.valueOf(manipulatedGlobalLSN));
storeResponse.setHeaderValue(WFConstants.BackendHeaders.LOCAL_LSN, String.valueOf(manipulatedGlobalLSN));
storeResponse.setHeaderValue(WFConstants.BackendHeaders.ITEM_LSN, String.valueOf(manipulatedItemLSN));
storeResponse.setHeaderValue(WFConstants.BackendHeaders.ITEM_LOCAL_LSN, String.valueOf(manipulatedItemLSN));

return storeResponse;
}

return storeResponse;
}

if (OperationType.Head.equals(request.getOperationType()) && regionName.equals(request.requestContext.regionalRoutingContextToRoute.getGatewayRegionalEndpoint().toString())) {
if (failureCount.incrementAndGet() <= maxAllowedFailureCount) {
throw Utils.createCosmosException(statusCode, subStatusCode, new Exception("An intercepted exception occurred. Check status and substatus code for details."), null);
}
}

return storeResponse;
};
}

public static BiFunction<RxDocumentServiceRequest, StoreResponse, StoreResponse> forceSuccessfulBarriersOnReadUntilQuorumSelectionThenForceBarrierFailures(
ConsistencyLevel operationConsistencyLevel,
String regionName,
int allowedSuccessfulHeadRequestsWithoutBarrierBeingMet,
AtomicInteger successfulHeadRequestCount,
int maxAllowedFailureCount,
AtomicInteger failureCount,
int statusCode,
int subStatusCode) {
return (request, storeResponse) -> {

if (OperationType.Read.equals(request.getOperationType()) && regionName.equals(request.requestContext.regionalRoutingContextToRoute.getGatewayRegionalEndpoint().toString())) {

if (ConsistencyLevel.STRONG.equals(operationConsistencyLevel)) {

long globalLsn = Long.parseLong(storeResponse.getHeaderValue(WFConstants.BackendHeaders.LSN));
long itemLsn = Long.parseLong(storeResponse.getHeaderValue(WFConstants.BackendHeaders.ITEM_LSN));
long manipulatedGlobalCommittedLSN = Math.min(globalLsn, itemLsn) - 1;

storeResponse.setHeaderValue(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, String.valueOf(manipulatedGlobalCommittedLSN));

return storeResponse;
} else if (ConsistencyLevel.BOUNDED_STALENESS.equals(operationConsistencyLevel)) {

long manipulatedItemLSN = -1;
long manipulatedGlobalLSN = 0;

storeResponse.setHeaderValue(WFConstants.BackendHeaders.LSN, String.valueOf(manipulatedGlobalLSN));
storeResponse.setHeaderValue(WFConstants.BackendHeaders.LOCAL_LSN, String.valueOf(manipulatedGlobalLSN));
storeResponse.setHeaderValue(WFConstants.BackendHeaders.ITEM_LSN, String.valueOf(manipulatedItemLSN));
storeResponse.setHeaderValue(WFConstants.BackendHeaders.ITEM_LOCAL_LSN, String.valueOf(manipulatedItemLSN));

return storeResponse;
}

return storeResponse;
}

if (OperationType.Head.equals(request.getOperationType()) && regionName.equals(request.requestContext.regionalRoutingContextToRoute.getGatewayRegionalEndpoint().toString())) {

if (successfulHeadRequestCount.incrementAndGet() <= allowedSuccessfulHeadRequestsWithoutBarrierBeingMet) {

if (ConsistencyLevel.STRONG.equals(operationConsistencyLevel)) {

long localLsn = Long.parseLong(storeResponse.getHeaderValue(WFConstants.BackendHeaders.LOCAL_LSN));
long itemLsn = Long.parseLong(storeResponse.getHeaderValue(WFConstants.BackendHeaders.ITEM_LSN));
long manipulatedGCLSN = Math.min(localLsn, itemLsn) - 1;

storeResponse.setHeaderValue(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, String.valueOf(manipulatedGCLSN));

return storeResponse;
} else if (ConsistencyLevel.BOUNDED_STALENESS.equals(operationConsistencyLevel)) {

long manipulatedItemLSN = -1;
long manipulatedGlobalLSN = -1;

storeResponse.setHeaderValue(WFConstants.BackendHeaders.LSN, String.valueOf(manipulatedGlobalLSN));
storeResponse.setHeaderValue(WFConstants.BackendHeaders.LOCAL_LSN, String.valueOf(manipulatedGlobalLSN));
storeResponse.setHeaderValue(WFConstants.BackendHeaders.ITEM_LSN, String.valueOf(manipulatedItemLSN));
storeResponse.setHeaderValue(WFConstants.BackendHeaders.ITEM_LOCAL_LSN, String.valueOf(manipulatedItemLSN));

return storeResponse;
}

return storeResponse;
}

if (failureCount.incrementAndGet() <= maxAllowedFailureCount) {
throw Utils.createCosmosException(statusCode, subStatusCode, new Exception("An intercepted exception occurred. Check status and substatus code for details."), null);
}
}

return storeResponse;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,10 @@ public static StoreReader getStoreReader(ConsistencyReader consistencyReader) {
return get(StoreReader.class, consistencyReader, "storeReader");
}

public static StoreReader getStoreReader(ConsistencyWriter consistencyWriter) {
return get(StoreReader.class, consistencyWriter, "storeReader");
}

public static void setStoreReader(ConsistencyReader consistencyReader, StoreReader storeReader) {
set(consistencyReader, storeReader, "storeReader");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public CosmosAsyncDatabase getDatabase(String id) {

@BeforeSuite(groups = {"thinclient", "fast", "long", "direct", "multi-region", "multi-master", "flaky-multi-master", "emulator",
"emulator-vnext", "split", "query", "cfp-split", "circuit-breaker-misc-gateway", "circuit-breaker-misc-direct",
"circuit-breaker-read-all-read-many", "fi-multi-master", "long-emulator", "fi-thinclient-multi-region", "fi-thinclient-multi-master"}, timeOut = SUITE_SETUP_TIMEOUT)
"circuit-breaker-read-all-read-many", "fi-multi-master", "long-emulator", "fi-thinclient-multi-region", "fi-thinclient-multi-master", "multi-region-strong"}, timeOut = SUITE_SETUP_TIMEOUT)
public void beforeSuite() {

logger.info("beforeSuite Started");
Expand All @@ -230,7 +230,7 @@ public void parallelizeUnitTests(ITestContext context) {

@AfterSuite(groups = {"thinclient", "fast", "long", "direct", "multi-region", "multi-master", "flaky-multi-master",
"emulator", "split", "query", "cfp-split", "circuit-breaker-misc-gateway", "circuit-breaker-misc-direct",
"circuit-breaker-read-all-read-many", "fi-multi-master", "long-emulator", "fi-thinclient-multi-region", "fi-thinclient-multi-master"}, timeOut = SUITE_SHUTDOWN_TIMEOUT)
"circuit-breaker-read-all-read-many", "fi-multi-master", "long-emulator", "fi-thinclient-multi-region", "fi-thinclient-multi-master", "multi-region-strong"}, timeOut = SUITE_SHUTDOWN_TIMEOUT)
public void afterSuite() {

logger.info("afterSuite Started");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<!--
~ The MIT License (MIT)
~ Copyright (c) 2018 Microsoft Corporation
~
~ Permission is hereby granted, free of charge, to any person obtaining a copy
~ of this software and associated documentation files (the "Software"), to deal
~ in the Software without restriction, including without limitation the rights
~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
~ copies of the Software, and to permit persons to whom the Software is
~ furnished to do so, subject to the following conditions:
~
~ The above copyright notice and this permission notice shall be included in all
~ copies or substantial portions of the Software.
~
~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
~ SOFTWARE.
-->
<!DOCTYPE suite SYSTEM "https://testng.org/testng-1.0.dtd">
<suite name="multi-region-strong">
<listeners>
<listener class-name="com.azure.cosmos.CosmosNettyLeakDetectorFactory"/>
</listeners>
<test name="multi-region-strong" group-by-instances="true">
<groups>
<run>
<include name="multi-region-strong"/>
</run>
</groups>
<packages>
<package name="com.azure.cosmos.*"/>
</packages>
</test>
</suite>
3 changes: 2 additions & 1 deletion sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
* Fixed a possible memory leak (Netty buffers) in Gateway mode caused by a race condition when timeouts are happening. - [47228](https://github.com/Azure/azure-sdk-for-java/pull/47228) and [47251](https://github.com/Azure/azure-sdk-for-java/pull/47251)

#### Other Changes
* Changed to use incremental change feed to get partition key ranges. - [46810](https://github.com/Azure/azure-sdk-for-java/pull/46810)
* Changed to use incremental change feed to get partition key ranges. - [PR 46810](https://github.com/Azure/azure-sdk-for-java/pull/46810)
* Optimized 410 `Lease Not Found` handling for Strong Consistency account by avoiding unnecessary retries in the barrier attainment flow. - [PR 47232](https://github.com/Azure/azure-sdk-for-java/pull/47232)

### 4.75.0 (2025-10-21)
> [!IMPORTANT]
Expand Down
Loading
Loading