Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/cosmos/azure-cosmos-test/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
### 1.0.0-beta.16 (Unreleased)

#### Features Added
* Added support for `FaultInjectionOperationType.HEAD_COLLECTION` - See [PR 47231](https://github.com/Azure/azure-sdk-for-java/pull/47231)

#### Breaking Changes

#### Bugs Fixed

#### Other Changes
* Added support for `CosmosTransportClientInterceptor` which allow to modify the store response on direct layer. - See [PR 47231](https://github.com/Azure/azure-sdk-for-java/pull/47231)

### 1.0.0-beta.15 (2025-10-21)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,9 @@ public enum FaultInjectionOperationType {
/**
* Read change feed items
*/
READ_FEED_ITEM
READ_FEED_ITEM,
/**
* Head collection request - barrier request for document operation
*/
HEAD_COLLECTION
}
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ private OperationType getEffectiveOperationType(FaultInjectionOperationType faul
return OperationType.ReadFeed;
case METADATA_REQUEST_ADDRESS_REFRESH: // address refresh can happen for any operations: read, write, query etc
return null;
case HEAD_COLLECTION:
return OperationType.Head;
default:
throw new IllegalStateException("FaultInjectionOperationType " + faultInjectionOperationType + " is not supported");
}
Expand All @@ -374,6 +376,7 @@ private ResourceType getEffectiveResourceType(FaultInjectionOperationType faultI
case METADATA_REQUEST_QUERY_PLAN:
return ResourceType.Document;
case METADATA_REQUEST_CONTAINER:
case HEAD_COLLECTION:
return ResourceType.DocumentCollection;
case METADATA_REQUEST_DATABASE_ACCOUNT:
return ResourceType.DatabaseAccount;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.test.implementation.interceptor;

import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;

import java.util.function.BiFunction;

public class CosmosInterceptorHelper {
public static void registerTransportClientInterceptor(
CosmosAsyncClient client,
BiFunction<RxDocumentServiceRequest, StoreResponse, StoreResponse> storeResponseInterceptor) {

CosmosTransportClientInterceptor transportClientInterceptor = new CosmosTransportClientInterceptor(storeResponseInterceptor);
ImplementationBridgeHelpers
.CosmosAsyncClientHelper
.getCosmosAsyncClientAccessor()
.registerTransportClientInterceptor(client, transportClientInterceptor);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.test.implementation.interceptor;

import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import com.azure.cosmos.implementation.interceptor.ITransportClientInterceptor;

import java.util.function.BiFunction;

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;

public class CosmosTransportClientInterceptor implements ITransportClientInterceptor {

private final BiFunction<RxDocumentServiceRequest, StoreResponse, StoreResponse> storeResponseInterceptor;
public CosmosTransportClientInterceptor(
BiFunction<RxDocumentServiceRequest, StoreResponse, StoreResponse> storeResponseInterceptor) {

checkNotNull(storeResponseInterceptor, "Argument 'storeResponseInterceptor' must not be null.");
this.storeResponseInterceptor = storeResponseInterceptor;
}

@Override
public BiFunction<RxDocumentServiceRequest, StoreResponse, StoreResponse> getStoreResponseInterceptor() {
return this.storeResponseInterceptor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@
import com.azure.cosmos.test.faultinjection.FaultInjectionRule;
import com.azure.cosmos.test.faultinjection.FaultInjectionRuleBuilder;
import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorType;
import com.azure.cosmos.test.implementation.interceptor.CosmosInterceptorHelper;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.testng.SkipException;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
Expand Down Expand Up @@ -69,6 +71,7 @@ public class FaultInjectionServerErrorRuleOnDirectTests extends FaultInjectionTe
private CosmosAsyncClient clientWithoutPreferredRegions;
private CosmosAsyncContainer cosmosAsyncContainer;

private DatabaseAccount databaseAccount;
private List<String> accountLevelReadRegions;
private List<String> accountLevelWriteRegions;
private Map<String, String> readRegionMap;
Expand All @@ -90,7 +93,7 @@ public void beforeClass() {
AsyncDocumentClient asyncDocumentClient = BridgeInternal.getContextClient(clientWithoutPreferredRegions);
GlobalEndpointManager globalEndpointManager = asyncDocumentClient.getGlobalEndpointManager();

DatabaseAccount databaseAccount = globalEndpointManager.getLatestDatabaseAccount();
this.databaseAccount = globalEndpointManager.getLatestDatabaseAccount();
this.cosmosAsyncContainer = getSharedMultiPartitionCosmosContainerWithIdAsPartitionKey(clientWithoutPreferredRegions);

AccountLevelLocationContext accountLevelReadableLocationContext
Expand Down Expand Up @@ -196,6 +199,41 @@ public static Object[] preferredRegionsConfigProvider() {
return new Object[] {false, true};
}

@DataProvider(name = "barrierRequestServerErrorResponseProvider")
public static Object[][] barrierRequestServerErrorResponseProvider() {
// OperationType, FaultInjectionErrorType, ErrorStatusCode, ErrorSubStatusCode
return new Object[][] {
// only include exceptions which can be applied by operation type
{ OperationType.Create, FaultInjectionServerErrorType.LEASE_NOT_FOUND, HttpConstants.StatusCodes.GONE, HttpConstants.SubStatusCodes.LEASE_NOT_FOUND },
{ OperationType.Create, FaultInjectionServerErrorType.INTERNAL_SERVER_ERROR, HttpConstants.StatusCodes.INTERNAL_SERVER_ERROR, HttpConstants.SubStatusCodes.UNKNOWN },
{ OperationType.Create, FaultInjectionServerErrorType.RETRY_WITH, HttpConstants.StatusCodes.RETRY_WITH, HttpConstants.SubStatusCodes.UNKNOWN },
{ OperationType.Create, FaultInjectionServerErrorType.TOO_MANY_REQUEST, HttpConstants.StatusCodes.TOO_MANY_REQUESTS, HttpConstants.SubStatusCodes.USER_REQUEST_RATE_TOO_LARGE },
{ OperationType.Create, FaultInjectionServerErrorType.TIMEOUT, HttpConstants.StatusCodes.GONE, HttpConstants.SubStatusCodes.SERVER_GENERATED_408 },
{ OperationType.Create, FaultInjectionServerErrorType.PARTITION_IS_MIGRATING, HttpConstants.StatusCodes.GONE, HttpConstants.SubStatusCodes.COMPLETING_PARTITION_MIGRATION },
{ OperationType.Create, FaultInjectionServerErrorType.PARTITION_IS_SPLITTING, HttpConstants.StatusCodes.GONE, HttpConstants.SubStatusCodes.COMPLETING_SPLIT_OR_MERGE },
{ OperationType.Create, FaultInjectionServerErrorType.SERVICE_UNAVAILABLE, HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, HttpConstants.SubStatusCodes.SERVER_GENERATED_503 },
{ OperationType.Create, FaultInjectionServerErrorType.NAME_CACHE_IS_STALE, HttpConstants.StatusCodes.GONE, HttpConstants.SubStatusCodes.NAME_CACHE_IS_STALE },
{ OperationType.Read, FaultInjectionServerErrorType.LEASE_NOT_FOUND, HttpConstants.StatusCodes.GONE, HttpConstants.SubStatusCodes.LEASE_NOT_FOUND },
{ OperationType.Read, FaultInjectionServerErrorType.INTERNAL_SERVER_ERROR, HttpConstants.StatusCodes.INTERNAL_SERVER_ERROR, HttpConstants.SubStatusCodes.UNKNOWN },
{ OperationType.Read, FaultInjectionServerErrorType.RETRY_WITH, HttpConstants.StatusCodes.RETRY_WITH, HttpConstants.SubStatusCodes.UNKNOWN },
{ OperationType.Read, FaultInjectionServerErrorType.TOO_MANY_REQUEST, HttpConstants.StatusCodes.TOO_MANY_REQUESTS, HttpConstants.SubStatusCodes.USER_REQUEST_RATE_TOO_LARGE },
{ OperationType.Read, FaultInjectionServerErrorType.TIMEOUT, HttpConstants.StatusCodes.GONE, HttpConstants.SubStatusCodes.SERVER_GENERATED_408 },
{ OperationType.Read, FaultInjectionServerErrorType.PARTITION_IS_MIGRATING, HttpConstants.StatusCodes.GONE, HttpConstants.SubStatusCodes.COMPLETING_PARTITION_MIGRATION },
{ OperationType.Read, FaultInjectionServerErrorType.PARTITION_IS_SPLITTING, HttpConstants.StatusCodes.GONE, HttpConstants.SubStatusCodes.COMPLETING_SPLIT_OR_MERGE },
{ OperationType.Read, FaultInjectionServerErrorType.SERVICE_UNAVAILABLE, HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, HttpConstants.SubStatusCodes.SERVER_GENERATED_503 },
{ OperationType.Read, FaultInjectionServerErrorType.NAME_CACHE_IS_STALE, HttpConstants.StatusCodes.GONE, HttpConstants.SubStatusCodes.NAME_CACHE_IS_STALE },
{ OperationType.Query, FaultInjectionServerErrorType.LEASE_NOT_FOUND, HttpConstants.StatusCodes.GONE, HttpConstants.SubStatusCodes.LEASE_NOT_FOUND },
{ OperationType.Query, FaultInjectionServerErrorType.INTERNAL_SERVER_ERROR, HttpConstants.StatusCodes.INTERNAL_SERVER_ERROR, HttpConstants.SubStatusCodes.UNKNOWN },
{ OperationType.Query, FaultInjectionServerErrorType.RETRY_WITH, HttpConstants.StatusCodes.RETRY_WITH, HttpConstants.SubStatusCodes.UNKNOWN },
{ OperationType.Query, FaultInjectionServerErrorType.TOO_MANY_REQUEST, HttpConstants.StatusCodes.TOO_MANY_REQUESTS, HttpConstants.SubStatusCodes.USER_REQUEST_RATE_TOO_LARGE },
{ OperationType.Query, FaultInjectionServerErrorType.TIMEOUT, HttpConstants.StatusCodes.GONE, HttpConstants.SubStatusCodes.SERVER_GENERATED_408 },
{ OperationType.Query, FaultInjectionServerErrorType.PARTITION_IS_MIGRATING, HttpConstants.StatusCodes.GONE, HttpConstants.SubStatusCodes.COMPLETING_PARTITION_MIGRATION },
{ OperationType.Query, FaultInjectionServerErrorType.PARTITION_IS_SPLITTING, HttpConstants.StatusCodes.GONE, HttpConstants.SubStatusCodes.COMPLETING_SPLIT_OR_MERGE },
{ OperationType.Query, FaultInjectionServerErrorType.SERVICE_UNAVAILABLE, HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, HttpConstants.SubStatusCodes.SERVER_GENERATED_503 },
{ OperationType.Query, FaultInjectionServerErrorType.NAME_CACHE_IS_STALE, HttpConstants.StatusCodes.GONE, HttpConstants.SubStatusCodes.NAME_CACHE_IS_STALE }
};
}

@Test(groups = {"multi-region", "long"}, dataProvider = "operationTypeProvider", timeOut = TIMEOUT)
public void faultInjectionServerErrorRuleTests_OperationType(OperationType operationType) throws JsonProcessingException {
// Test for SERVER_GONE, the operation type will be ignored after getting the addresses
Expand Down Expand Up @@ -1437,6 +1475,84 @@ public void faultInjectionInjectTcpResponseDelay() throws JsonProcessingExceptio
}
}

@Test(groups = {"fast"}, dataProvider = "barrierRequestServerErrorResponseProvider", timeOut = 2 * TIMEOUT)
public void faultInjection_serverError_barrierRequest(
OperationType operationType,
FaultInjectionServerErrorType serverErrorType,
int statusCode,
int subStatusCode) throws JsonProcessingException {

// Test to verify server error type can be injected to barrier requests

// for barrier request flow, only test on strong consistency
if (this.databaseAccount.getConsistencyPolicy().getDefaultConsistencyLevel() != ConsistencyLevel.STRONG) {
throw new SkipException(
String.format(
"Test is not applicable to %s consistency level!",
this.databaseAccount.getConsistencyPolicy().getDefaultConsistencyLevel()));
}

CosmosAsyncClient newClient = null;
String faultInjectionRuleId = "barrier-" + serverErrorType + "-" + UUID.randomUUID();
FaultInjectionRule faultInjectionRule =
new FaultInjectionRuleBuilder(faultInjectionRuleId)
.condition(
new FaultInjectionConditionBuilder()
.operationType(FaultInjectionOperationType.HEAD_COLLECTION)
.build()
)
.result(
FaultInjectionResultBuilders
.getResultBuilder(serverErrorType)
.times(2)
.build()
)
.duration(Duration.ofMinutes(5))
.build();

try {
newClient = new CosmosClientBuilder()
.endpoint(TestConfigurations.HOST)
.key(TestConfigurations.MASTER_KEY)
.contentResponseOnWriteEnabled(true)
.buildAsyncClient();

CosmosAsyncContainer container =
newClient
.getDatabase(cosmosAsyncContainer.getDatabase().getId())
.getContainer(cosmosAsyncContainer.getId());

TestObject testItem = TestObject.create();
container.createItem(testItem).block();

CosmosFaultInjectionHelper.configureFaultInjectionRules(container, Arrays.asList(faultInjectionRule)).block();

// in order to trigger barrier request, we will need to also modify the store response of the original read/write operation so that GCLSN < LSN
CosmosInterceptorHelper.registerTransportClientInterceptor(
newClient,
(request, storeResponse) -> {
if (request.getResourceType() == ResourceType.Document && request.getOperationType() == operationType) {
// Decrement so that GCLSN < LSN to simulate the replication lag
storeResponse.setGCLSN(storeResponse.getLSN() - 2L);
}
return storeResponse;
}
);

CosmosDiagnostics cosmosDiagnostics = this.performDocumentOperation(container, operationType, testItem,false);
validateFaultInjectionRuleAppliedForBarrier(
cosmosDiagnostics,
operationType,
statusCode,
subStatusCode,
faultInjectionRule.getId());

} finally {
faultInjectionRule.disable();
safeClose(newClient);
}
}

private void validateFaultInjectionRuleApplied(
CosmosDiagnostics cosmosDiagnostics,
OperationType operationType,
Expand All @@ -1445,6 +1561,42 @@ private void validateFaultInjectionRuleApplied(
String ruleId,
boolean canRetryOnFaultInjectedError) throws JsonProcessingException {

validateFaultInjectionRuleApplied(
cosmosDiagnostics,
operationType,
statusCode,
subStatusCode,
ruleId,
canRetryOnFaultInjectedError,
false);
}

private void validateFaultInjectionRuleAppliedForBarrier(
CosmosDiagnostics cosmosDiagnostics,
OperationType operationType,
int statusCode,
int subStatusCode,
String ruleId) throws JsonProcessingException {

validateFaultInjectionRuleApplied(
cosmosDiagnostics,
operationType,
statusCode,
subStatusCode,
ruleId,
true,
true);
}

private void validateFaultInjectionRuleApplied(
CosmosDiagnostics cosmosDiagnostics,
OperationType operationType,
int statusCode,
int subStatusCode,
String ruleId,
boolean canRetryOnFaultInjectedError,
boolean validateForBarrier) throws JsonProcessingException {

List<ObjectNode> clientSideRequestStatisticsNodes = new ArrayList<>();
assertThat(cosmosDiagnostics.getDiagnosticsContext()).isNotNull();

Expand All @@ -1462,8 +1614,10 @@ private void validateFaultInjectionRuleApplied(
}

List<JsonNode> responseStatisticsNodes = new ArrayList<>();

String diagnosticsNodeName = validateForBarrier ? "supplementalResponseStatisticsList" : "responseStatisticsList";
for (ObjectNode diagnosticNode : clientSideRequestStatisticsNodes) {
JsonNode responseStatisticsList = diagnosticNode.get("responseStatisticsList");
JsonNode responseStatisticsList = diagnosticNode.get(diagnosticsNodeName);
assertThat(responseStatisticsList.isArray()).isTrue();

for (JsonNode responseStatisticsNode : responseStatisticsList) {
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#### Other Changes
* Changed to use incremental change feed to get partition key ranges. - [46810](https://github.com/Azure/azure-sdk-for-java/pull/46810)
* Added support for `ITransportClientInterceptor` which allow to modify the store response on direct layer. - See [PR 47231](https://github.com/Azure/azure-sdk-for-java/pull/47231)

### 4.75.0 (2025-10-21)
> [!IMPORTANT]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.azure.cosmos.implementation.clienttelemetry.TagName;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdMetrics;
import com.azure.cosmos.implementation.faultinjection.IFaultInjectorProvider;
import com.azure.cosmos.implementation.interceptor.ITransportClientInterceptor;
import com.azure.cosmos.implementation.throughputControl.sdk.config.SDKThroughputControlGroupInternal;
import com.azure.cosmos.implementation.throughputControl.server.config.ServerThroughputControlGroup;
import com.azure.cosmos.models.CosmosAuthorizationTokenResolver;
Expand Down Expand Up @@ -794,6 +795,10 @@ CosmosItemSerializer getEffectiveItemSerializer(CosmosItemSerializer requestOpti
return this.asyncDocumentClient.getEffectiveItemSerializer(requestOptionsItemSerializer);
}

void registerTransportClientInterceptor(ITransportClientInterceptor transportClientInterceptor) {
this.asyncDocumentClient.registerTransportClientInterceptor(transportClientInterceptor);
}

boolean isTransportLevelTracingEnabled() {

CosmosClientTelemetryConfig effectiveConfig = this.clientTelemetryConfig != null ?
Expand Down Expand Up @@ -950,6 +955,14 @@ public List<CosmosOperationPolicy> getOperationPolicies(CosmosAsyncClient client
public CosmosItemSerializer getEffectiveItemSerializer(CosmosAsyncClient client, CosmosItemSerializer requestOptionsItemSerializer) {
return client.getEffectiveItemSerializer(requestOptionsItemSerializer);
}

@Override
public void registerTransportClientInterceptor(
CosmosAsyncClient client,
ITransportClientInterceptor transportClientInterceptor) {

client.registerTransportClientInterceptor(transportClientInterceptor);
}
}
);
}
Expand Down
Loading
Loading