Skip to content

Commit 8d2b0b0

Browse files
authored
Avoid NullPointerException when x-region calls are made under high concurrency when using region scoped session container. (#46758)
* Make session container and bloom filter insertions atomic. * Make session container and bloom filter insertions atomic. * Make session container and bloom filter insertions atomic. * Fixing tests. * Ensure once a global session is used to not go back to using region-scoped session tokens. * Ensure once a global session is used to not go back to using region-scoped session tokens. * Ensure bloom filter is updated atomically with session container update. * Add a test which concurrently updates and reads from the RegionScopedSessionContainer which "occasional" cross region calls. * Fixing tests. * Clean up PR. * Add user-agent suffix and try-catch block in PartitionScopedRegionLevelProgress. * Track globalLsn progress when hub region is seen in bloom filter. * Centralize fallback to global session token flows and introduce use hub flow. * Fixing baseSessionToken resolution flow. * Enhancing RegionScopedSessionContainerConcurrencyTest to make lsns decrease from service perspective yet session token resolutions client-side should be monotonically increasing. * Enhancing RegionScopedSessionContainerConcurrencyTest to make lsns decrease from service perspective yet session token resolutions client-side should be monotonically increasing. * Enhancing RegionScopedSessionContainerConcurrencyTest to make lsns decrease from service perspective yet session token resolutions client-side should be monotonically increasing. * Add license header. * Fixing compilation errors. * Addressing review comments. * Addressing review comments. * Addressing review comments. * Updating CHANGELOG.md
1 parent dc2e088 commit 8d2b0b0

File tree

9 files changed

+1119
-347
lines changed

9 files changed

+1119
-347
lines changed

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RegionScopedSessionContainerConcurrencyTest.java

Lines changed: 639 additions & 0 deletions
Large diffs are not rendered by default.

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RegionScopedSessionContainerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -480,8 +480,8 @@ public void setSessionToken_NoSessionTokenForPartitionKeyRangeId() throws Except
480480

481481
assertThat(globalProgress).isNotNull();
482482
assertThat(globalProgress.getSessionToken()).isNotNull();
483-
assertThat(globalProgress.getSessionToken().convertToString()).isNotNull();
484-
assertThat(globalProgress.getSessionToken().convertToString()).isEqualTo(sessionToken);
483+
assertThat(globalProgress.getSessionToken().get().convertToString()).isNotNull();
484+
assertThat(globalProgress.getSessionToken().get().convertToString()).isEqualTo(sessionToken);
485485

486486
RxDocumentServiceRequest request2 = RxDocumentServiceRequest.create(mockDiagnosticsClientContext(), OperationType.Read, ResourceType.Document,
487487
collectionName + "/docs", Utils.getUTF8Bytes(""), new HashMap<>());

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
#### Bugs Fixed
1010
* Fixed a possible memory leak (Netty buffers) in Gateway mode caused by a race condition when timeouts are happening. - [47228](https://github.com/Azure/azure-sdk-for-java/pull/47228) and [47251](https://github.com/Azure/azure-sdk-for-java/pull/47251)
11+
* Fixed `NullPointerException` in region-scoped session container when false positive regions are reported for session progress tracking. [PR 46758](https://github.com/Azure/azure-sdk-for-java/pull/46758)
1112

1213
#### Other Changes
1314
* Enabled hostname validation for RNTBD connections to backend - [PR 47111](https://github.com/Azure/azure-sdk-for-java/pull/47111)

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,7 @@ public static class SubStatusCodes {
466466
public static final int INVALID_RESULT = 20910;
467467
public static final int CLOSED_CLIENT = 20912;
468468
public static final int PPCB_INVALID_STATE = 20913;
469+
public static final int REGION_SCOPED_SESSION_CONTAINER_IN_BAD_STATE = 20914;
469470

470471
//SDK Codes (Server)
471472
// IMPORTANT - whenever possible use consistency substatus codes that .Net SDK also uses

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/PartitionScopedRegionLevelProgress.java

Lines changed: 423 additions & 196 deletions
Large diffs are not rendered by default.

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RegionScopedSessionContainer.java

Lines changed: 40 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.List;
1919
import java.util.Locale;
2020
import java.util.Map;
21+
import java.util.Optional;
2122
import java.util.concurrent.ConcurrentHashMap;
2223
import java.util.concurrent.atomic.AtomicReference;
2324
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -201,13 +202,6 @@ public ISessionToken resolvePartitionLocalSessionToken(RxDocumentServiceRequest
201202
this.firstPreferredReadableRegionCached.set(extractFirstEffectivePreferredReadableRegion());
202203
}
203204

204-
boolean shouldUseBloomFilter = shouldUseBloomFilter(
205-
request,
206-
partitionKeyRangeId,
207-
partitionKeyInternal,
208-
partitionKeyDefinition,
209-
partitionScopedRegionLevelProgress);
210-
211205
return SessionTokenHelper.resolvePartitionLocalSessionToken(
212206
request,
213207
this.partitionKeyBasedBloomFilter,
@@ -217,7 +211,7 @@ public ISessionToken resolvePartitionLocalSessionToken(RxDocumentServiceRequest
217211
collectionRid,
218212
partitionKeyRangeId,
219213
this.firstPreferredReadableRegionCached.get(),
220-
shouldUseBloomFilter);
214+
this.globalEndpointManager);
221215
}
222216

223217
@Override
@@ -339,41 +333,6 @@ private void setSessionToken(RxDocumentServiceRequest request, ResourceId resour
339333
}
340334
}
341335

342-
private void recordPartitionKeyInBloomFilter(
343-
RxDocumentServiceRequest request,
344-
Long collectionRid,
345-
String regionRoutedTo,
346-
PartitionKeyInternal partitionKeyInternal,
347-
PartitionKeyDefinition partitionKeyDefinition) {
348-
349-
if (Strings.isNullOrEmpty(this.firstPreferredReadableRegionCached.get())) {
350-
this.firstPreferredReadableRegionCached.set(extractFirstEffectivePreferredReadableRegion());
351-
}
352-
353-
this.partitionKeyBasedBloomFilter.tryRecordPartitionKey(
354-
request,
355-
collectionRid,
356-
this.firstPreferredReadableRegionCached.get(),
357-
regionRoutedTo,
358-
partitionKeyInternal,
359-
partitionKeyDefinition);
360-
}
361-
362-
private void recordRegionScopedSessionToken(
363-
RxDocumentServiceRequest request,
364-
PartitionScopedRegionLevelProgress partitionScopedRegionLevelProgress,
365-
ISessionToken parsedSessionToken,
366-
String partitionKeyRangeId,
367-
String regionRoutedTo) {
368-
369-
partitionScopedRegionLevelProgress.tryRecordSessionToken(
370-
request,
371-
parsedSessionToken,
372-
partitionKeyRangeId,
373-
this.firstPreferredReadableRegionCached.get(),
374-
regionRoutedTo);
375-
}
376-
377336
private void addSessionTokenAndTryRecordEpkInBloomFilter(RxDocumentServiceRequest request, ResourceId resourceId, String partitionKeyRangeId, ISessionToken parsedSessionToken) {
378337

379338
final Long collectionResourceId = resourceId.getUniqueDocumentCollectionId();
@@ -399,27 +358,14 @@ private void addSessionTokenAndTryRecordEpkInBloomFilter(RxDocumentServiceReques
399358

400359
if (partitionScopedRegionLevelProgress != null) {
401360

402-
if (shouldUseBloomFilter(
403-
request,
404-
partitionKeyRangeId,
405-
partitionKeyInternal,
406-
partitionKeyDefinition,
407-
partitionScopedRegionLevelProgress)) {
408-
409-
this.recordPartitionKeyInBloomFilter(
410-
request,
411-
collectionResourceId,
412-
regionRoutedTo,
413-
partitionKeyInternal.v,
414-
partitionKeyDefinition.v);
415-
}
416-
417361
this.recordRegionScopedSessionToken(
418362
request,
419363
partitionScopedRegionLevelProgress,
420364
parsedSessionToken,
365+
collectionResourceId,
421366
partitionKeyRangeId,
422-
regionRoutedTo);
367+
regionRoutedTo
368+
);
423369

424370
} else {
425371
this.collectionResourceIdToPartitionScopedRegionLevelProgress.compute(
@@ -444,25 +390,31 @@ private void addSessionTokenAndTryRecordEpkInBloomFilter(RxDocumentServiceReques
444390
request,
445391
partitionScopedRegionLevelProgress,
446392
parsedSessionToken,
393+
collectionResourceId,
447394
partitionKeyRangeId,
448-
regionRoutedTo);
395+
regionRoutedTo
396+
);
449397
}
398+
}
399+
}
450400

451-
if (shouldUseBloomFilter(
452-
request,
453-
partitionKeyRangeId,
454-
partitionKeyInternal,
455-
partitionKeyDefinition,
456-
partitionScopedRegionLevelProgress)) {
401+
private void recordRegionScopedSessionToken(
402+
RxDocumentServiceRequest request,
403+
PartitionScopedRegionLevelProgress partitionScopedRegionLevelProgress,
404+
ISessionToken parsedSessionToken,
405+
Long collectionRid,
406+
String partitionKeyRangeId,
407+
String regionRoutedTo) {
457408

458-
this.recordPartitionKeyInBloomFilter(
459-
request,
460-
collectionResourceId,
461-
regionRoutedTo,
462-
partitionKeyInternal.v,
463-
partitionKeyDefinition.v);
464-
}
465-
}
409+
partitionScopedRegionLevelProgress.tryRecordSessionToken(
410+
request,
411+
parsedSessionToken,
412+
collectionRid,
413+
partitionKeyRangeId,
414+
this.firstPreferredReadableRegionCached.get(),
415+
regionRoutedTo,
416+
this.partitionKeyBasedBloomFilter,
417+
this.globalEndpointManager);
466418
}
467419

468420
private String getCombinedSessionToken(PartitionScopedRegionLevelProgress partitionScopedRegionLevelProgress) {
@@ -472,53 +424,28 @@ private String getCombinedSessionToken(PartitionScopedRegionLevelProgress partit
472424
StringBuilder result = new StringBuilder();
473425
if (tokens != null) {
474426
for (Iterator<Map.Entry<String, ConcurrentHashMap<String, PartitionScopedRegionLevelProgress.RegionLevelProgress>>> iterator = tokens.entrySet().iterator(); iterator.hasNext(); ) {
475-
Map.Entry<String, ConcurrentHashMap<String, PartitionScopedRegionLevelProgress.RegionLevelProgress>> entry = iterator.next();
427+
Map.Entry<String, ConcurrentHashMap<String, PartitionScopedRegionLevelProgress.RegionLevelProgress>> entry = iterator.next();
476428

477429
String partitionKeyRangeId = entry.getKey();
478-
String sessionTokenAsString = entry.getValue().get(PartitionScopedRegionLevelProgress.GLOBAL_PROGRESS_KEY).getSessionToken().convertToString();
479-
480-
result = result.append(partitionKeyRangeId).append(":").append(sessionTokenAsString);
481-
if (iterator.hasNext()) {
482-
result = result.append(",");
483-
}
484-
}
485-
}
486430

487-
return result.toString();
488-
}
489-
490-
// validate whether the request can be scoped to a logical partition
491-
// along with whether multi-write is enabled for the request / account
492-
// multi-write setup needs to be verified since multiple regions can make progress
493-
// independently as multiple regions can process writes
494-
private boolean shouldUseBloomFilter(
495-
RxDocumentServiceRequest request,
496-
String partitionKeyRangeId,
497-
Utils.ValueHolder<PartitionKeyInternal> partitionKeyInternal,
498-
Utils.ValueHolder<PartitionKeyDefinition> partitionKeyDefinition,
499-
PartitionScopedRegionLevelProgress partitionScopedRegionLevelProgress) {
431+
Optional<ISessionToken> sessionToken = entry.getValue().get(PartitionScopedRegionLevelProgress.GLOBAL_PROGRESS_KEY).getSessionToken();
500432

501-
checkNotNull(request, "request cannot be null!");
502-
checkNotNull(this.globalEndpointManager, "globalEndpointManager cannot be nulL!");
503-
checkNotNull(partitionScopedRegionLevelProgress, "partitionScopedRegionLevelProgress cannot be null!");
433+
if (sessionToken.isPresent()) {
504434

505-
partitionKeyInternal.v = request.getPartitionKeyInternal();
435+
ISessionToken sessionTokenInner = sessionToken.get();
436+
String sessionTokenAsString = sessionTokenInner.convertToString();
506437

507-
if (partitionKeyInternal.v == null) {
508-
return false;
509-
}
510-
511-
partitionKeyDefinition.v = request.getPartitionKeyDefinition();
512-
513-
if (partitionKeyDefinition.v == null) {
514-
return false;
515-
}
516-
517-
if (partitionScopedRegionLevelProgress.getHasPartitionSeenNonPointRequestsForDocuments(partitionKeyRangeId)) {
518-
return false;
438+
result = result.append(partitionKeyRangeId).append(":").append(sessionTokenAsString);
439+
if (iterator.hasNext()) {
440+
result = result.append(",");
441+
}
442+
} else {
443+
logger.warn("No session token present for partitionKeyRangeId {} in global progress", partitionKeyRangeId);
444+
}
445+
}
519446
}
520447

521-
return globalEndpointManager.canUseMultipleWriteLocations(request);
448+
return result.toString();
522449
}
523450

524451
private String extractFirstEffectivePreferredReadableRegion() {

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1535,6 +1535,10 @@ private void addUserAgentSuffix(UserAgentContainer userAgentContainer, Set<UserA
15351535
}
15361536
}
15371537

1538+
if (!(this.sessionContainer instanceof RegionScopedSessionContainer)) {
1539+
userAgentFeatureFlags.remove(UserAgentFeatureFlags.RegionScopedSessionCapturing);
1540+
}
1541+
15381542
userAgentContainer.setFeatureEnabledFlagsAsSuffix(userAgentFeatureFlags);
15391543
}
15401544

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionTokenHelper.java

Lines changed: 7 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -173,50 +173,21 @@ static ISessionToken resolvePartitionLocalSessionToken(RxDocumentServiceRequest
173173
Long collectionRid,
174174
String partitionKeyRangeId,
175175
String firstEffectivePreferredReadableRegion,
176-
boolean canUseBloomFilter) {
176+
GlobalEndpointManager globalEndpointManager) {
177177

178178
if (partitionScopedRegionLevelProgress != null) {
179179

180-
Set<String> partitionKeyPossibleRegions = new HashSet<>();
181-
182180
if (partitionScopedRegionLevelProgress.isPartitionKeyRangeIdPresent(partitionKeyRangeId)) {
183-
184-
if (canUseBloomFilter) {
185-
partitionKeyPossibleRegions = partitionKeyBasedBloomFilter
186-
.tryGetPossibleRegionsLogicalPartitionResolvedTo(
187-
request,
188-
collectionRid,
189-
partitionKey,
190-
partitionKeyDefinition);
191-
192-
return partitionScopedRegionLevelProgress
193-
.tryResolveSessionToken(
194-
request,
195-
partitionKeyPossibleRegions,
196-
partitionKeyRangeId,
197-
firstEffectivePreferredReadableRegion,
198-
true);
199-
200-
}
201-
202181
return partitionScopedRegionLevelProgress
203182
.tryResolveSessionToken(
204183
request,
205-
partitionKeyPossibleRegions,
184+
collectionRid,
206185
partitionKeyRangeId,
207186
firstEffectivePreferredReadableRegion,
208-
false);
187+
partitionKeyBasedBloomFilter,
188+
globalEndpointManager);
209189

210190
} else {
211-
if (canUseBloomFilter) {
212-
partitionKeyPossibleRegions = partitionKeyBasedBloomFilter
213-
.tryGetPossibleRegionsLogicalPartitionResolvedTo(
214-
request,
215-
collectionRid,
216-
partitionKey,
217-
partitionKeyDefinition);
218-
}
219-
220191
ISessionToken parentSessionToken = null;
221192

222193
Collection<String> parents = request.requestContext.resolvedPartitionKeyRange.getParents();
@@ -232,10 +203,11 @@ static ISessionToken resolvePartitionLocalSessionToken(RxDocumentServiceRequest
232203
resolvedSessionTokenForParentPkRangeId = partitionScopedRegionLevelProgress
233204
.tryResolveSessionToken(
234205
request,
235-
partitionKeyPossibleRegions,
206+
collectionRid,
236207
parentPkRangeId,
237208
firstEffectivePreferredReadableRegion,
238-
canUseBloomFilter);
209+
partitionKeyBasedBloomFilter,
210+
globalEndpointManager);
239211

240212
if (resolvedSessionTokenForParentPkRangeId != null) {
241213
parentSessionToken = parentSessionToken != null ?

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/UserAgentFeatureFlags.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ public enum UserAgentFeatureFlags {
1919
PerPartitionCircuitBreaker(1 << 1),
2020
ThinClient(1 << 2),
2121
// BinaryEncoding(1 << 3),
22-
Http2(1 << 4);
22+
Http2(1 << 4),
23+
RegionScopedSessionCapturing(1 << 5);
2324

2425
private final int value;
2526

0 commit comments

Comments
 (0)