decodeStartTime = new AtomicReference<>();
+ private static final boolean leakDetectionDebuggingEnabled = ResourceLeakDetector.getLevel().ordinal() >=
+ ResourceLeakDetector.Level.ADVANCED.ordinal();
+
/**
* Deserialize from an input {@link ByteBuf} to an {@link RntbdResponse} instance.
*
@@ -32,6 +36,11 @@ protected void decode(final ChannelHandlerContext context, final ByteBuf in, fin
decodeStartTime.compareAndSet(null, Instant.now());
+ // BREADCRUMB: Track buffer at decode entry
+ if (leakDetectionDebuggingEnabled) {
+ in.touch("RntbdResponseDecoder.decode: entry");
+ }
+
if (RntbdFramer.canDecodeHead(in)) {
final RntbdResponse response = RntbdResponse.decode(in);
@@ -41,9 +50,34 @@ protected void decode(final ChannelHandlerContext context, final ByteBuf in, fin
response.setDecodeStartTime(decodeStartTime.getAndSet(null));
logger.debug("{} DECODE COMPLETE: {}", context.channel(), response);
+
+ // BREADCRUMB: Track buffer before discard
+ if (leakDetectionDebuggingEnabled) {
+ in.touch("RntbdResponseDecoder.decode: before discardReadBytes");
+ }
+
in.discardReadBytes();
+
+ // BREADCRUMB: Track response before adding to output
+ if (leakDetectionDebuggingEnabled) {
+ response.touch("RntbdResponseDecoder.decode: before retain and adding to output");
+ }
+
out.add(response.retain());
+ } else if (leakDetectionDebuggingEnabled) {
+ logger.info("{} RntbdResponseDecoder: response is null, not enough data to decode yet",
+ context.channel());
}
+ } else if (leakDetectionDebuggingEnabled) {
+ logger.info("{} RntbdResponseDecoder: cannot decode head yet, readableBytes={}",
+ context.channel(), in.readableBytes());
}
}
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ // BREADCRUMB: Track exceptions that might lead to leaked buffers
+ logger.warn("{} RntbdResponseDecoder.exceptionCaught: {}", ctx.channel(), cause.getMessage(), cause);
+ super.exceptionCaught(ctx, cause);
+ }
}
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java
index 488596e825d5..5c5bc5355b32 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java
@@ -11,6 +11,8 @@
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.logging.LogLevel;
import io.netty.resolver.DefaultAddressResolverGroup;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.ResourceLeakDetector;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
@@ -39,7 +41,8 @@
* HttpClient that is implemented using reactor-netty.
*/
public class ReactorNettyClient implements HttpClient {
-
+ private static final boolean leakDetectionDebuggingEnabled = ResourceLeakDetector.getLevel().ordinal() >=
+ ResourceLeakDetector.Level.ADVANCED.ordinal();
private static final String REACTOR_NETTY_REQUEST_RECORD_KEY = "reactorNettyRequestRecordKey";
private static final Logger logger = LoggerFactory.getLogger(ReactorNettyClient.class.getSimpleName());
@@ -348,7 +351,17 @@ public HttpHeaders headers() {
public Mono body() {
return ByteBufFlux
.fromInbound(
- bodyIntern().doOnDiscard(ByteBuf.class, io.netty.util.ReferenceCountUtil::safeRelease)
+ bodyIntern().doOnDiscard(
+ ByteBuf.class,
+ buf -> {
+ if (leakDetectionDebuggingEnabled) {
+ buf.touch("ReactorNettyHttpResponse.body - onDiscard - refCnt: " + buf.refCnt());
+ logger.info("ReactorNettyHttpResponse.body - onDiscard - refCnt: {}", buf.refCnt());
+ }
+ if (buf.refCnt() > 0) {
+ ReferenceCountUtil.safeRelease(buf);
+ }
+ })
)
.aggregate()
.doOnSubscribe(this::updateSubscriptionState);
@@ -400,8 +413,21 @@ private void releaseOnNotSubscribedResponse(ReactorNettyResponseState reactorNet
if (logger.isDebugEnabled()) {
logger.debug("Releasing body, not yet subscribed");
}
- this.bodyIntern()
- .doOnNext(io.netty.util.ReferenceCountUtil::safeRelease)
+
+ if (leakDetectionDebuggingEnabled) {
+ logger.info("Releasing body, not yet subscribed");
+ }
+
+ body()
+ .doOnNext(buf -> {
+ if (leakDetectionDebuggingEnabled) {
+ buf.touch("ReactorNettyHttpResponse.releaseOnNotSubscribedResponse - refCnt: " + buf.refCnt());
+ logger.info("ReactorNettyHttpResponse.releaseOnNotSubscribedResponse - refCnt: {}", buf.refCnt());
+ }
+ if (buf.refCnt() > 0) {
+ ReferenceCountUtil.safeRelease(buf);
+ }
+ })
.subscribe(v -> {}, ex -> {}, () -> {});
}
}
diff --git a/sdk/cosmos/fabric-cosmos-spark-auth_3/pom.xml b/sdk/cosmos/fabric-cosmos-spark-auth_3/pom.xml
index 2941fa16041d..75035c197bc1 100644
--- a/sdk/cosmos/fabric-cosmos-spark-auth_3/pom.xml
+++ b/sdk/cosmos/fabric-cosmos-spark-auth_3/pom.xml
@@ -506,6 +506,12 @@
maven-surefire-plugin
3.5.3
+
+ true
+ 1
+ 256
+ paranoid
+
**/*.*
**/*Test.*
@@ -513,6 +519,16 @@
**/*Spec.*
true
+
+
+ surefire.testng.verbose
+ 2
+
+
+ listener
+ com.azure.cosmos.CosmosNettyLeakDetectorFactory
+
+
diff --git a/sdk/cosmos/live-platform-matrix.json b/sdk/cosmos/live-platform-matrix.json
index 7f3e3af80997..613390ddd671 100644
--- a/sdk/cosmos/live-platform-matrix.json
+++ b/sdk/cosmos/live-platform-matrix.json
@@ -14,7 +14,7 @@
"-Pcircuit-breaker-read-all-read-many": "CircuitBreakerReadAllAndReadMany",
"-Pmulti-region": "MultiRegion",
"-Plong": "Long",
- "-DargLine=\"-Dazure.cosmos.directModeProtocol=Tcp\"": "TCP",
+ "-DargLine=\"-DCOSMOS.CLIENT_LEAK_DETECTION_ENABLED=true -Dazure.cosmos.directModeProtocol=Tcp\"": "TCP",
"Session": "",
"ubuntu": "",
"@{ enableMultipleWriteLocations = $false; defaultConsistencyLevel = 'Session' }": "",
@@ -38,7 +38,7 @@
}
},
"AdditionalArgs": [
- "-DargLine=\"-Dazure.cosmos.directModeProtocol=Tcp\""
+ "-DargLine=\"-DCOSMOS.CLIENT_LEAK_DETECTION_ENABLED=true -Dazure.cosmos.directModeProtocol=Tcp\""
],
"ProfileFlag": "-Pe2e",
"Agent": {
@@ -52,7 +52,7 @@
"ProfileFlag": [ "-Pcfp-split", "-Psplit", "-Pquery", "-Pfast", "-Pdirect" ],
"ArmTemplateParameters": "@{ enableMultipleWriteLocations = $false; defaultConsistencyLevel = 'Session' }",
"AdditionalArgs": [
- "-DargLine=\"-Dio.netty.handler.ssl.noOpenSsl=true\""
+ "-DargLine=\"-DCOSMOS.CLIENT_LEAK_DETECTION_ENABLED=true -Dio.netty.handler.ssl.noOpenSsl=true\""
],
"Agent": {
"ubuntu": { "OSVmImage": "env:LINUXVMIMAGE", "Pool": "env:LINUXPOOL" }
@@ -71,7 +71,7 @@
{
"DESIRED_CONSISTENCY": "BoundedStaleness",
"ACCOUNT_CONSISTENCY": "Strong",
- "AdditionalArgs": "-DargLine=\"-Dazure.cosmos.directModeProtocol=Tcp\"",
+ "AdditionalArgs": "-DargLine=\"-DCOSMOS.CLIENT_LEAK_DETECTION_ENABLED=true -Dazure.cosmos.directModeProtocol=Tcp\"",
"ProfileFlag": "-Pe2e",
"ArmTemplateParameters": "@{ enableMultipleWriteLocations = $false; defaultConsistencyLevel = 'Strong' }",
"Agent": {
@@ -109,7 +109,7 @@
},
"PROTOCOLS": "[\"Tcp\"]",
"ProfileFlag": [ "-Pmulti-master" ],
- "AdditionalArgs": "\"-DCOSMOS.PARTITION_LEVEL_CIRCUIT_BREAKER_DEFAULT_CONFIG_OPT_IN=TRUE\"",
+ "AdditionalArgs": "\"-DCOSMOS.CLIENT_LEAK_DETECTION_ENABLED=true -DCOSMOS.PARTITION_LEVEL_CIRCUIT_BREAKER_DEFAULT_CONFIG_OPT_IN=TRUE\"",
"Agent": {
"ubuntu": { "OSVmImage": "env:LINUXVMIMAGE", "Pool": "env:LINUXPOOL" }
}
@@ -125,7 +125,7 @@
},
"PROTOCOLS": "[\"Tcp\"]",
"ProfileFlag": [ "-Pfi-multi-master" ],
- "AdditionalArgs": "\"-DCOSMOS.PARTITION_LEVEL_CIRCUIT_BREAKER_DEFAULT_CONFIG_OPT_IN=TRUE\"",
+ "AdditionalArgs": "\"-DCOSMOS.CLIENT_LEAK_DETECTION_ENABLED=true -DCOSMOS.PARTITION_LEVEL_CIRCUIT_BREAKER_DEFAULT_CONFIG_OPT_IN=TRUE\"",
"Agent": {
"ubuntu": { "OSVmImage": "env:LINUXVMIMAGE", "Pool": "env:LINUXPOOL" }
}
@@ -141,7 +141,7 @@
},
"PROTOCOLS": "[\"Tcp\"]",
"ProfileFlag": [ "-Pmulti-master" ],
- "AdditionalArgs": "\"-DCOSMOS.PARTITION_LEVEL_CIRCUIT_BREAKER_DEFAULT_CONFIG_OPT_IN=FALSE\"",
+ "AdditionalArgs": "\"-DCOSMOS.CLIENT_LEAK_DETECTION_ENABLED=true -DCOSMOS.PARTITION_LEVEL_CIRCUIT_BREAKER_DEFAULT_CONFIG_OPT_IN=FALSE\"",
"Agent": {
"ubuntu": { "OSVmImage": "env:LINUXVMIMAGE", "Pool": "env:LINUXPOOL" }
}
@@ -157,7 +157,7 @@
},
"PROTOCOLS": "[\"Tcp\"]",
"ProfileFlag": [ "-Pfi-multi-master" ],
- "AdditionalArgs": "\"-DCOSMOS.PARTITION_LEVEL_CIRCUIT_BREAKER_DEFAULT_CONFIG_OPT_IN=FALSE\"",
+ "AdditionalArgs": "\"-DCOSMOS.CLIENT_LEAK_DETECTION_ENABLED=true -DCOSMOS.PARTITION_LEVEL_CIRCUIT_BREAKER_DEFAULT_CONFIG_OPT_IN=FALSE\"",
"Agent": {
"ubuntu": { "OSVmImage": "env:LINUXVMIMAGE", "Pool": "env:LINUXPOOL" }
}
diff --git a/sdk/cosmos/tests.yml b/sdk/cosmos/tests.yml
index 17af3836ec2d..69d782fcc9a0 100644
--- a/sdk/cosmos/tests.yml
+++ b/sdk/cosmos/tests.yml
@@ -33,7 +33,7 @@ extends:
TestResultsFiles: '**/junitreports/TEST-*.xml'
AdditionalVariables:
- name: AdditionalArgs
- value: '-DCOSMOS.CLIENT_TELEMETRY_ENDPOINT=$(cosmos-client-telemetry-endpoint) -DCOSMOS.CLIENT_TELEMETRY_COSMOS_ACCOUNT=$(cosmos-client-telemetry-cosmos-account)'
+ value: '-DCOSMOS.CLIENT_LEAK_DETECTION_ENABLED=true'
- template: /eng/pipelines/templates/stages/archetype-sdk-tests-isolated.yml
parameters:
@@ -65,7 +65,7 @@ extends:
TestResultsFiles: '**/junitreports/TEST-*.xml'
AdditionalVariables:
- name: AdditionalArgs
- value: '-DCOSMOS.CLIENT_TELEMETRY_ENDPOINT=$(cosmos-client-telemetry-endpoint) -DCOSMOS.CLIENT_TELEMETRY_COSMOS_ACCOUNT=$(cosmos-client-telemetry-cosmos-account) -DCOSMOS.HTTP2_ENABLED=true'
+ value: '-DCOSMOS.CLIENT_LEAK_DETECTION_ENABLED=true -DCOSMOS.HTTP2_ENABLED=true'
- template: /eng/pipelines/templates/stages/archetype-sdk-tests-isolated.yml
parameters:
@@ -97,7 +97,7 @@ extends:
TestResultsFiles: '**/junitreports/TEST-*.xml'
AdditionalVariables:
- name: AdditionalArgs
- value: '-DCOSMOS.CLIENT_TELEMETRY_ENDPOINT=$(cosmos-client-telemetry-endpoint) -DCOSMOS.CLIENT_TELEMETRY_COSMOS_ACCOUNT=$(cosmos-client-telemetry-cosmos-account) -DACCOUNT_HOST=$(thinclient-test-endpoint) -DACCOUNT_KEY=$(thinclient-test-key) -DCOSMOS.THINCLIENT_ENABLED=true -DCOSMOS.HTTP2_ENABLED=true'
+ value: '-DCOSMOS.CLIENT_LEAK_DETECTION_ENABLED=true -DACCOUNT_HOST=$(thinclient-test-endpoint) -DACCOUNT_KEY=$(thinclient-test-key) -DCOSMOS.THINCLIENT_ENABLED=true -DCOSMOS.HTTP2_ENABLED=true'
- template: /eng/pipelines/templates/stages/archetype-sdk-tests-isolated.yml
parameters:
@@ -129,7 +129,7 @@ extends:
TestResultsFiles: '**/junitreports/TEST-*.xml'
AdditionalVariables:
- name: AdditionalArgs
- value: '-DCOSMOS.CLIENT_TELEMETRY_ENDPOINT=$(cosmos-client-telemetry-endpoint) -DCOSMOS.CLIENT_TELEMETRY_COSMOS_ACCOUNT=$(cosmos-client-telemetry-cosmos-account) -DACCOUNT_HOST=$(thin-client-canary-multi-region-session-endpoint) -DACCOUNT_KEY=$(thin-client-canary-multi-region-session-key) -DCOSMOS.THINCLIENT_ENABLED=true'
+ value: '-DCOSMOS.CLIENT_LEAK_DETECTION_ENABLED=true -DACCOUNT_HOST=$(thin-client-canary-multi-region-session-endpoint) -DACCOUNT_KEY=$(thin-client-canary-multi-region-session-key) -DCOSMOS.THINCLIENT_ENABLED=true'
- template: /eng/pipelines/templates/stages/archetype-sdk-tests-isolated.yml
parameters:
@@ -161,7 +161,7 @@ extends:
TestResultsFiles: '**/junitreports/TEST-*.xml'
AdditionalVariables:
- name: AdditionalArgs
- value: '-DCOSMOS.CLIENT_TELEMETRY_ENDPOINT=$(cosmos-client-telemetry-endpoint) -DCOSMOS.CLIENT_TELEMETRY_COSMOS_ACCOUNT=$(cosmos-client-telemetry-cosmos-account) -DACCOUNT_HOST=$(thin-client-canary-multi-writer-session-endpoint) -DACCOUNT_KEY=$(thin-client-canary-multi-writer-session-key) -DCOSMOS.THINCLIENT_ENABLED=true'
+ value: '-DCOSMOS.CLIENT_LEAK_DETECTION_ENABLED=true -DACCOUNT_HOST=$(thin-client-canary-multi-writer-session-endpoint) -DACCOUNT_KEY=$(thin-client-canary-multi-writer-session-key) -DCOSMOS.THINCLIENT_ENABLED=true'
- template: /eng/pipelines/templates/stages/archetype-sdk-tests-isolated.yml
parameters:
@@ -183,10 +183,10 @@ extends:
safeName: azurespringdatacosmos
TimeoutInMinutes: 90
TestGoals: 'verify'
- TestOptions: '$(ProfileFlag) -DskipCompile=true -DskipTestCompile=true -DcreateSourcesJar=false'
+ TestOptions: '$(ProfileFlag) $(AdditionalArgs) -DskipCompile=true -DskipTestCompile=true -DcreateSourcesJar=false'
AdditionalVariables:
- name: AdditionalArgs
- value: '-DCOSMOS.CLIENT_TELEMETRY_ENDPOINT=$(cosmos-client-telemetry-endpoint) -DCOSMOS.CLIENT_TELEMETRY_COSMOS_ACCOUNT=$(cosmos-client-telemetry-cosmos-account)'
+ value: '-DCOSMOS.CLIENT_LEAK_DETECTION_ENABLED=false'
- template: /eng/pipelines/templates/stages/archetype-sdk-tests-isolated.yml
parameters:
@@ -212,5 +212,5 @@ extends:
TestOptions: '$(ProfileFlag) $(AdditionalArgs)'
AdditionalVariables:
- name: AdditionalArgs
- value: '-DCOSMOS.CLIENT_TELEMETRY_ENDPOINT=$(cosmos-client-telemetry-endpoint) -DCOSMOS.CLIENT_TELEMETRY_COSMOS_ACCOUNT=$(cosmos-client-telemetry-cosmos-account)'
+ value: '-DCOSMOS.CLIENT_LEAK_DETECTION_ENABLED=true'