Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-kafka-connect/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ Licensed under the MIT License.
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.caches=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.caches=com.azure.cosmos.kafka.connect
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.faultinjection=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.interceptor=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.guava25.base=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.routing=ALL-UNNAMED
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect=ALL-UNNAMED
Expand Down
2 changes: 2 additions & 0 deletions sdk/cosmos/azure-cosmos-test/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
### 1.0.0-beta.16 (Unreleased)

#### Features Added
* Added support for `FaultInjectionOperationType.HEAD_COLLECTION` - See [PR 47231](https://github.com/Azure/azure-sdk-for-java/pull/47231)

#### Breaking Changes

#### Bugs Fixed

#### Other Changes
* Added support for `CosmosTransportClientInterceptor` which allow to modify the store response on direct layer. - See [PR 47231](https://github.com/Azure/azure-sdk-for-java/pull/47231)

### 1.0.0-beta.15 (2025-10-21)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,9 @@ public enum FaultInjectionOperationType {
/**
* Read change feed items
*/
READ_FEED_ITEM
READ_FEED_ITEM,
/**
* Head collection request - barrier request for document operation
*/
HEAD_COLLECTION
}
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ private OperationType getEffectiveOperationType(FaultInjectionOperationType faul
return OperationType.ReadFeed;
case METADATA_REQUEST_ADDRESS_REFRESH: // address refresh can happen for any operations: read, write, query etc
return null;
case HEAD_COLLECTION:
return OperationType.Head;
default:
throw new IllegalStateException("FaultInjectionOperationType " + faultInjectionOperationType + " is not supported");
}
Expand All @@ -374,6 +376,7 @@ private ResourceType getEffectiveResourceType(FaultInjectionOperationType faultI
case METADATA_REQUEST_QUERY_PLAN:
return ResourceType.Document;
case METADATA_REQUEST_CONTAINER:
case HEAD_COLLECTION:
return ResourceType.DocumentCollection;
case METADATA_REQUEST_DATABASE_ACCOUNT:
return ResourceType.DatabaseAccount;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.test.implementation.interceptor;

import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;

import java.util.function.BiFunction;

public class CosmosInterceptorHelper {
public static void registerTransportClientInterceptor(
CosmosAsyncClient client,
BiFunction<RxDocumentServiceRequest, StoreResponse, StoreResponse> storeResponseInterceptor) {

CosmosTransportClientInterceptor transportClientInterceptor = new CosmosTransportClientInterceptor(storeResponseInterceptor);
ImplementationBridgeHelpers
.CosmosAsyncClientHelper
.getCosmosAsyncClientAccessor()
.registerTransportClientInterceptor(client, transportClientInterceptor);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.test.implementation.interceptor;

import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import com.azure.cosmos.implementation.interceptor.ITransportClientInterceptor;

import java.util.function.BiFunction;

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;

public class CosmosTransportClientInterceptor implements ITransportClientInterceptor {

private final BiFunction<RxDocumentServiceRequest, StoreResponse, StoreResponse> storeResponseInterceptor;
public CosmosTransportClientInterceptor(
BiFunction<RxDocumentServiceRequest, StoreResponse, StoreResponse> storeResponseInterceptor) {

checkNotNull(storeResponseInterceptor, "Argument 'storeResponseInterceptor' must not be null.");
this.storeResponseInterceptor = storeResponseInterceptor;
}

@Override
public BiFunction<RxDocumentServiceRequest, StoreResponse, StoreResponse> getStoreResponseInterceptor() {
return this.storeResponseInterceptor;
}
}
22 changes: 22 additions & 0 deletions sdk/cosmos/azure-cosmos-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -789,5 +789,27 @@ Licensed under the MIT License.
</plugins>
</build>
</profile>
<profile>
<!-- integration tests, requires Cosmos DB endpoint -->
<id>fault-injection-barrier</id>
<properties>
<test.groups>fault-injection-barrier</test.groups>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>3.5.3</version> <!-- {x-version-update;org.apache.maven.plugins:maven-failsafe-plugin;external_dependency} -->
<configuration>
<suiteXmlFiles>
<suiteXmlFile>src/test/resources/fault-injection-barrier-testng.xml</suiteXmlFile>
</suiteXmlFiles>

</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@
import com.azure.cosmos.test.faultinjection.FaultInjectionRule;
import com.azure.cosmos.test.faultinjection.FaultInjectionRuleBuilder;
import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorType;
import com.azure.cosmos.test.implementation.interceptor.CosmosInterceptorHelper;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.testng.SkipException;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
Expand Down Expand Up @@ -69,6 +71,7 @@ public class FaultInjectionServerErrorRuleOnDirectTests extends FaultInjectionTe
private CosmosAsyncClient clientWithoutPreferredRegions;
private CosmosAsyncContainer cosmosAsyncContainer;

private DatabaseAccount databaseAccount;
private List<String> accountLevelReadRegions;
private List<String> accountLevelWriteRegions;
private Map<String, String> readRegionMap;
Expand All @@ -81,7 +84,7 @@ public FaultInjectionServerErrorRuleOnDirectTests(CosmosClientBuilder clientBuil
this.subscriberValidationTimeout = TIMEOUT;
}

@BeforeClass(groups = {"multi-region", "long", "fast", "fi-multi-master"}, timeOut = TIMEOUT)
@BeforeClass(groups = {"multi-region", "long", "fast", "fi-multi-master", "fault-injection-barrier"}, timeOut = TIMEOUT)
public void beforeClass() {
clientWithoutPreferredRegions = getClientBuilder()
.preferredRegions(new ArrayList<>())
Expand All @@ -90,7 +93,7 @@ public void beforeClass() {
AsyncDocumentClient asyncDocumentClient = BridgeInternal.getContextClient(clientWithoutPreferredRegions);
GlobalEndpointManager globalEndpointManager = asyncDocumentClient.getGlobalEndpointManager();

DatabaseAccount databaseAccount = globalEndpointManager.getLatestDatabaseAccount();
this.databaseAccount = globalEndpointManager.getLatestDatabaseAccount();
this.cosmosAsyncContainer = getSharedMultiPartitionCosmosContainerWithIdAsPartitionKey(clientWithoutPreferredRegions);

AccountLevelLocationContext accountLevelReadableLocationContext
Expand Down Expand Up @@ -196,6 +199,41 @@ public static Object[] preferredRegionsConfigProvider() {
return new Object[] {false, true};
}

@DataProvider(name = "barrierRequestServerErrorResponseProvider")
public static Object[][] barrierRequestServerErrorResponseProvider() {
// OperationType, FaultInjectionErrorType, ErrorStatusCode, ErrorSubStatusCode
return new Object[][] {
// only include exceptions which can be applied by operation type
{ OperationType.Create, FaultInjectionServerErrorType.LEASE_NOT_FOUND, HttpConstants.StatusCodes.GONE, HttpConstants.SubStatusCodes.LEASE_NOT_FOUND },
{ OperationType.Create, FaultInjectionServerErrorType.INTERNAL_SERVER_ERROR, HttpConstants.StatusCodes.INTERNAL_SERVER_ERROR, HttpConstants.SubStatusCodes.UNKNOWN },
{ OperationType.Create, FaultInjectionServerErrorType.RETRY_WITH, HttpConstants.StatusCodes.RETRY_WITH, HttpConstants.SubStatusCodes.UNKNOWN },
{ OperationType.Create, FaultInjectionServerErrorType.TOO_MANY_REQUEST, HttpConstants.StatusCodes.TOO_MANY_REQUESTS, HttpConstants.SubStatusCodes.USER_REQUEST_RATE_TOO_LARGE },
{ OperationType.Create, FaultInjectionServerErrorType.TIMEOUT, HttpConstants.StatusCodes.GONE, HttpConstants.SubStatusCodes.SERVER_GENERATED_408 },
{ OperationType.Create, FaultInjectionServerErrorType.PARTITION_IS_MIGRATING, HttpConstants.StatusCodes.GONE, HttpConstants.SubStatusCodes.COMPLETING_PARTITION_MIGRATION },
{ OperationType.Create, FaultInjectionServerErrorType.PARTITION_IS_SPLITTING, HttpConstants.StatusCodes.GONE, HttpConstants.SubStatusCodes.COMPLETING_SPLIT_OR_MERGE },
{ OperationType.Create, FaultInjectionServerErrorType.SERVICE_UNAVAILABLE, HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, HttpConstants.SubStatusCodes.SERVER_GENERATED_503 },
{ OperationType.Create, FaultInjectionServerErrorType.NAME_CACHE_IS_STALE, HttpConstants.StatusCodes.GONE, HttpConstants.SubStatusCodes.NAME_CACHE_IS_STALE },
{ OperationType.Read, FaultInjectionServerErrorType.LEASE_NOT_FOUND, HttpConstants.StatusCodes.GONE, HttpConstants.SubStatusCodes.LEASE_NOT_FOUND },
{ OperationType.Read, FaultInjectionServerErrorType.INTERNAL_SERVER_ERROR, HttpConstants.StatusCodes.INTERNAL_SERVER_ERROR, HttpConstants.SubStatusCodes.UNKNOWN },
{ OperationType.Read, FaultInjectionServerErrorType.RETRY_WITH, HttpConstants.StatusCodes.RETRY_WITH, HttpConstants.SubStatusCodes.UNKNOWN },
{ OperationType.Read, FaultInjectionServerErrorType.TOO_MANY_REQUEST, HttpConstants.StatusCodes.TOO_MANY_REQUESTS, HttpConstants.SubStatusCodes.USER_REQUEST_RATE_TOO_LARGE },
{ OperationType.Read, FaultInjectionServerErrorType.TIMEOUT, HttpConstants.StatusCodes.GONE, HttpConstants.SubStatusCodes.SERVER_GENERATED_408 },
{ OperationType.Read, FaultInjectionServerErrorType.PARTITION_IS_MIGRATING, HttpConstants.StatusCodes.GONE, HttpConstants.SubStatusCodes.COMPLETING_PARTITION_MIGRATION },
{ OperationType.Read, FaultInjectionServerErrorType.PARTITION_IS_SPLITTING, HttpConstants.StatusCodes.GONE, HttpConstants.SubStatusCodes.COMPLETING_SPLIT_OR_MERGE },
{ OperationType.Read, FaultInjectionServerErrorType.SERVICE_UNAVAILABLE, HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, HttpConstants.SubStatusCodes.SERVER_GENERATED_503 },
{ OperationType.Read, FaultInjectionServerErrorType.NAME_CACHE_IS_STALE, HttpConstants.StatusCodes.GONE, HttpConstants.SubStatusCodes.NAME_CACHE_IS_STALE },
{ OperationType.Query, FaultInjectionServerErrorType.LEASE_NOT_FOUND, HttpConstants.StatusCodes.GONE, HttpConstants.SubStatusCodes.LEASE_NOT_FOUND },
{ OperationType.Query, FaultInjectionServerErrorType.INTERNAL_SERVER_ERROR, HttpConstants.StatusCodes.INTERNAL_SERVER_ERROR, HttpConstants.SubStatusCodes.UNKNOWN },
{ OperationType.Query, FaultInjectionServerErrorType.RETRY_WITH, HttpConstants.StatusCodes.RETRY_WITH, HttpConstants.SubStatusCodes.UNKNOWN },
{ OperationType.Query, FaultInjectionServerErrorType.TOO_MANY_REQUEST, HttpConstants.StatusCodes.TOO_MANY_REQUESTS, HttpConstants.SubStatusCodes.USER_REQUEST_RATE_TOO_LARGE },
{ OperationType.Query, FaultInjectionServerErrorType.TIMEOUT, HttpConstants.StatusCodes.GONE, HttpConstants.SubStatusCodes.SERVER_GENERATED_408 },
{ OperationType.Query, FaultInjectionServerErrorType.PARTITION_IS_MIGRATING, HttpConstants.StatusCodes.GONE, HttpConstants.SubStatusCodes.COMPLETING_PARTITION_MIGRATION },
{ OperationType.Query, FaultInjectionServerErrorType.PARTITION_IS_SPLITTING, HttpConstants.StatusCodes.GONE, HttpConstants.SubStatusCodes.COMPLETING_SPLIT_OR_MERGE },
{ OperationType.Query, FaultInjectionServerErrorType.SERVICE_UNAVAILABLE, HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, HttpConstants.SubStatusCodes.SERVER_GENERATED_503 },
{ OperationType.Query, FaultInjectionServerErrorType.NAME_CACHE_IS_STALE, HttpConstants.StatusCodes.GONE, HttpConstants.SubStatusCodes.NAME_CACHE_IS_STALE }
};
}

@Test(groups = {"multi-region", "long"}, dataProvider = "operationTypeProvider", timeOut = TIMEOUT)
public void faultInjectionServerErrorRuleTests_OperationType(OperationType operationType) throws JsonProcessingException {
// Test for SERVER_GONE, the operation type will be ignored after getting the addresses
Expand Down Expand Up @@ -1019,7 +1057,7 @@ public void faultInjectionServerErrorRuleTests_HitLimit() throws JsonProcessingE
}
}

@AfterClass(groups = {"multi-region", "long", "fast", "fi-multi-master"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true)
@AfterClass(groups = {"multi-region", "long", "fast", "fi-multi-master", "fault-injection-barrier"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true)
public void afterClass() {
safeClose(clientWithoutPreferredRegions);
}
Expand Down Expand Up @@ -1437,6 +1475,85 @@ public void faultInjectionInjectTcpResponseDelay() throws JsonProcessingExceptio
}
}

@Test(groups = {"fault-injection-barrier"}, dataProvider = "barrierRequestServerErrorResponseProvider", timeOut = 2 * TIMEOUT)
public void faultInjection_serverError_barrierRequest(
OperationType operationType,
FaultInjectionServerErrorType serverErrorType,
int statusCode,
int subStatusCode) throws JsonProcessingException {

// Test to verify server error type can be injected to barrier requests

// for barrier request flow, only test on strong consistency
if (this.databaseAccount.getConsistencyPolicy().getDefaultConsistencyLevel() != ConsistencyLevel.STRONG) {
throw new SkipException(
String.format(
"Test is not applicable to %s consistency level!",
this.databaseAccount.getConsistencyPolicy().getDefaultConsistencyLevel()));
}

CosmosAsyncClient newClient = null;
String faultInjectionRuleId = "barrier-" + serverErrorType + "-" + UUID.randomUUID();
FaultInjectionRule faultInjectionRule =
new FaultInjectionRuleBuilder(faultInjectionRuleId)
.condition(
new FaultInjectionConditionBuilder()
.operationType(FaultInjectionOperationType.HEAD_COLLECTION)
.build()
)
.result(
FaultInjectionResultBuilders
.getResultBuilder(serverErrorType)
.times(2)
.build()
)
.duration(Duration.ofMinutes(5))
.build();

try {
newClient = new CosmosClientBuilder()
.endpoint(TestConfigurations.HOST)
.key(TestConfigurations.MASTER_KEY)
.contentResponseOnWriteEnabled(true)
.buildAsyncClient();

CosmosAsyncContainer container =
newClient
.getDatabase(cosmosAsyncContainer.getDatabase().getId())
.getContainer(cosmosAsyncContainer.getId());

TestObject testItem = TestObject.create();
container.createItem(testItem).block();

CosmosFaultInjectionHelper.configureFaultInjectionRules(container, Arrays.asList(faultInjectionRule)).block();

// in order to trigger barrier request, we will need to also modify the store response of the original read/write operation so that GCLSN < LSN
CosmosInterceptorHelper.registerTransportClientInterceptor(
newClient,
(request, storeResponse) -> {
if (request.getResourceType() == ResourceType.Document && request.getOperationType() == operationType) {
// Decrement so that GCLSN < LSN to simulate the replication lag
logger.info("faultInjection_serverError_barrierRequest reducing gclsn");
storeResponse.setGCLSN(storeResponse.getLSN() - 2L);
}
return storeResponse;
}
);

CosmosDiagnostics cosmosDiagnostics = this.performDocumentOperation(container, operationType, testItem, false);
validateFaultInjectionRuleAppliedForBarrier(
cosmosDiagnostics,
operationType,
statusCode,
subStatusCode,
faultInjectionRule.getId());

} finally {
faultInjectionRule.disable();
safeClose(newClient);
}
}

private void validateFaultInjectionRuleApplied(
CosmosDiagnostics cosmosDiagnostics,
OperationType operationType,
Expand All @@ -1445,6 +1562,42 @@ private void validateFaultInjectionRuleApplied(
String ruleId,
boolean canRetryOnFaultInjectedError) throws JsonProcessingException {

validateFaultInjectionRuleApplied(
cosmosDiagnostics,
operationType,
statusCode,
subStatusCode,
ruleId,
canRetryOnFaultInjectedError,
false);
}

private void validateFaultInjectionRuleAppliedForBarrier(
CosmosDiagnostics cosmosDiagnostics,
OperationType operationType,
int statusCode,
int subStatusCode,
String ruleId) throws JsonProcessingException {

validateFaultInjectionRuleApplied(
cosmosDiagnostics,
operationType,
statusCode,
subStatusCode,
ruleId,
true,
true);
}

private void validateFaultInjectionRuleApplied(
CosmosDiagnostics cosmosDiagnostics,
OperationType operationType,
int statusCode,
int subStatusCode,
String ruleId,
boolean canRetryOnFaultInjectedError,
boolean validateForBarrier) throws JsonProcessingException {

List<ObjectNode> clientSideRequestStatisticsNodes = new ArrayList<>();
assertThat(cosmosDiagnostics.getDiagnosticsContext()).isNotNull();

Expand All @@ -1462,8 +1615,10 @@ private void validateFaultInjectionRuleApplied(
}

List<JsonNode> responseStatisticsNodes = new ArrayList<>();

String diagnosticsNodeName = validateForBarrier ? "supplementalResponseStatisticsList" : "responseStatisticsList";
for (ObjectNode diagnosticNode : clientSideRequestStatisticsNodes) {
JsonNode responseStatisticsList = diagnosticNode.get("responseStatisticsList");
JsonNode responseStatisticsList = diagnosticNode.get(diagnosticsNodeName);
assertThat(responseStatisticsList.isArray()).isTrue();

for (JsonNode responseStatisticsNode : responseStatisticsList) {
Expand Down
Loading
Loading