Skip to content

Commit 614bf66

Browse files
committed
Add --memory-report option to performance tool
This gives the Xmx and max direct memory allocation at the beginning of the run and outputs Netty heap and direct allocation during the run. This can help to diagnose any allocation that can occur, because of some memory leak or more likely in an under-provisioned environment.
1 parent e93bc99 commit 614bf66

File tree

5 files changed

+113
-3
lines changed

5 files changed

+113
-3
lines changed

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ RUN set -eux; \
2323
if [ "$(uname -m)" = "x86_64" ] ; then JAVA_PATH="/usr/lib/jdk-$JAVA_VERSION"; \
2424
mkdir $JAVA_PATH && \
2525
tar --extract --file jdk.tar.gz --directory "$JAVA_PATH" --strip-components 1; \
26-
$JAVA_PATH/bin/jlink --compress=2 --output /jre --add-modules java.base,java.naming,java.xml,jdk.unsupported,jdk.crypto.cryptoki; \
26+
$JAVA_PATH/bin/jlink --compress=2 --output /jre --add-modules java.base,jdk.management,java.naming,java.xml,jdk.unsupported,jdk.crypto.cryptoki; \
2727
/jre/bin/java -version; \
2828
fi
2929

src/main/java/com/rabbitmq/stream/perf/DefaultPerformanceMetrics.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.concurrent.TimeUnit;
3636
import java.util.concurrent.atomic.AtomicInteger;
3737
import java.util.function.Function;
38+
import java.util.function.Supplier;
3839
import org.slf4j.Logger;
3940
import org.slf4j.LoggerFactory;
4041

@@ -47,16 +48,19 @@ class DefaultPerformanceMetrics implements PerformanceMetrics {
4748
private final boolean summaryFile;
4849
private final PrintWriter out;
4950
private final boolean includeByteRates;
51+
private final Supplier<String> memoryReportSupplier;
5052
private volatile Closeable closingSequence = () -> {};
5153

5254
DefaultPerformanceMetrics(
5355
CompositeMeterRegistry meterRegistry,
5456
String metricsPrefix,
5557
boolean summaryFile,
5658
boolean includeByteRates,
59+
Supplier<String> memoryReportSupplier,
5760
PrintWriter out) {
5861
this.summaryFile = summaryFile;
5962
this.includeByteRates = includeByteRates;
63+
this.memoryReportSupplier = memoryReportSupplier;
6064
this.out = out;
6165
DropwizardConfig dropwizardConfig =
6266
new DropwizardConfig() {
@@ -205,6 +209,10 @@ public void start(String description) throws Exception {
205209
builder.append(formatLatency.apply(latency)).append(", ");
206210
builder.append(formatChunkSize.apply(chunkSize));
207211
this.out.println(builder);
212+
String memoryReport = this.memoryReportSupplier.get();
213+
if (!memoryReport.isEmpty()) {
214+
this.out.println(memoryReport);
215+
}
208216
}
209217
reportCount.incrementAndGet();
210218
} catch (Exception e) {
@@ -259,7 +267,7 @@ static String formatByteRate(String label, double bytes) {
259267
// based on
260268
// https://stackoverflow.com/questions/3758606/how-can-i-convert-byte-size-into-a-human-readable-format-in-java
261269
if (-1000 < bytes && bytes < 1000) {
262-
return bytes + " B";
270+
return bytes + " B/s";
263271
}
264272
CharacterIterator ci = new StringCharacterIterator("kMGTPE");
265273
while (bytes <= -999_950 || bytes >= 999_950) {

src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,11 @@
4242
import com.rabbitmq.stream.perf.Utils.NamedThreadFactory;
4343
import io.micrometer.core.instrument.Counter;
4444
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
45+
import io.netty.buffer.ByteBufAllocator;
46+
import io.netty.buffer.ByteBufAllocatorMetric;
47+
import io.netty.buffer.ByteBufAllocatorMetricProvider;
4548
import io.netty.handler.ssl.SslContextBuilder;
49+
import io.netty.util.internal.PlatformDependent;
4650
import java.io.PrintStream;
4751
import java.io.PrintWriter;
4852
import java.net.URI;
@@ -65,6 +69,7 @@
6569
import java.util.concurrent.atomic.AtomicInteger;
6670
import java.util.concurrent.atomic.AtomicLong;
6771
import java.util.function.BiFunction;
72+
import java.util.function.Supplier;
6873
import java.util.stream.Collectors;
6974
import java.util.stream.IntStream;
7075
import org.slf4j.Logger;
@@ -286,6 +291,12 @@ public class StreamPerfTest implements Callable<Integer> {
286291
defaultValue = "false")
287292
private boolean includeByteRates;
288293

294+
@CommandLine.Option(
295+
names = {"--memory-report", "-mr"},
296+
description = "report information on memory settings and usage",
297+
defaultValue = "false")
298+
private boolean memoryReport;
299+
289300
private MetricsCollector metricsCollector;
290301
private PerformanceMetrics performanceMetrics;
291302

@@ -370,15 +381,61 @@ public Integer call() throws Exception {
370381
// FIXME assign codec
371382
this.codec = createCodec(this.codecClass);
372383

384+
ByteBufAllocator byteBufAllocator = ByteBufAllocator.DEFAULT;
385+
373386
CompositeMeterRegistry meterRegistry = new CompositeMeterRegistry();
374387
String metricsPrefix = "rabbitmq.stream";
375388
this.metricsCollector = new MicrometerMetricsCollector(meterRegistry, metricsPrefix);
376389

377390
Counter producerConfirm = meterRegistry.counter(metricsPrefix + ".producer_confirmed");
378391

392+
Supplier<String> memoryReportSupplier;
393+
if (this.memoryReport) {
394+
long physicalMemory = Utils.physicalMemory();
395+
String physicalMemoryReport =
396+
physicalMemory == 0
397+
? ""
398+
: format(
399+
", physical memory %s (%d bytes)",
400+
Utils.formatByte(physicalMemory), physicalMemory);
401+
this.out.println(
402+
format(
403+
"Max memory %s (%d bytes), max direct memory %s (%d bytes)%s",
404+
Utils.formatByte(Runtime.getRuntime().maxMemory()),
405+
Runtime.getRuntime().maxMemory(),
406+
Utils.formatByte(PlatformDependent.maxDirectMemory()),
407+
PlatformDependent.maxDirectMemory(),
408+
physicalMemoryReport));
409+
410+
if (byteBufAllocator instanceof ByteBufAllocatorMetricProvider) {
411+
ByteBufAllocatorMetric allocatorMetric =
412+
((ByteBufAllocatorMetricProvider) byteBufAllocator).metric();
413+
memoryReportSupplier =
414+
() -> {
415+
long usedHeapMemory = allocatorMetric.usedHeapMemory();
416+
long usedDirectMemory = allocatorMetric.usedDirectMemory();
417+
return format(
418+
"Used heap memory %s (%d bytes), used direct memory %s (%d bytes)",
419+
Utils.formatByte(usedHeapMemory),
420+
usedHeapMemory,
421+
Utils.formatByte(usedDirectMemory),
422+
usedDirectMemory);
423+
};
424+
} else {
425+
memoryReportSupplier = () -> "";
426+
}
427+
} else {
428+
memoryReportSupplier = () -> "";
429+
}
430+
379431
this.performanceMetrics =
380432
new DefaultPerformanceMetrics(
381-
meterRegistry, metricsPrefix, this.summaryFile, this.includeByteRates, this.out);
433+
meterRegistry,
434+
metricsPrefix,
435+
this.summaryFile,
436+
this.includeByteRates,
437+
memoryReportSupplier,
438+
this.out);
382439

383440
this.messageSize = this.messageSize < 8 ? 8 : this.messageSize; // we need to store a long in it
384441

@@ -430,6 +487,7 @@ public Integer call() throws Exception {
430487
.addressResolver(addressResolver)
431488
.scheduledExecutorService(envExecutor)
432489
.metricsCollector(metricsCollector)
490+
.byteBufAllocator(byteBufAllocator)
433491
.maxProducersByConnection(this.producersByConnection)
434492
.maxTrackingConsumersByConnection(this.trackingConsumersByConnection)
435493
.maxConsumersByConnection(this.consumersByConnection);

src/main/java/com/rabbitmq/stream/perf/Utils.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import com.rabbitmq.stream.StreamCreator.LeaderLocator;
1919
import com.rabbitmq.stream.compression.Compression;
2020
import java.security.cert.X509Certificate;
21+
import java.text.CharacterIterator;
22+
import java.text.StringCharacterIterator;
2123
import java.time.Duration;
2224
import java.time.Instant;
2325
import java.time.format.DateTimeFormatter;
@@ -102,6 +104,33 @@ static List<String> streams(String range, List<String> streams) {
102104
}
103105
}
104106

107+
static String formatByte(double bytes) {
108+
// based on
109+
// https://stackoverflow.com/questions/3758606/how-can-i-convert-byte-size-into-a-human-readable-format-in-java
110+
if (-1000 < bytes && bytes < 1000) {
111+
return String.valueOf(bytes);
112+
}
113+
CharacterIterator ci = new StringCharacterIterator("kMGTPE");
114+
while (bytes <= -999_950 || bytes >= 999_950) {
115+
bytes /= 1000;
116+
ci.next();
117+
}
118+
return String.format("%.1f %cB", bytes / 1000.0, ci.current());
119+
}
120+
121+
static long physicalMemory() {
122+
try {
123+
com.sun.management.OperatingSystemMXBean os =
124+
(com.sun.management.OperatingSystemMXBean)
125+
java.lang.management.ManagementFactory.getOperatingSystemMXBean();
126+
return os.getTotalPhysicalMemorySize();
127+
} catch (Throwable e) {
128+
// we can get NoClassDefFoundError, so we catch from Throwable and below
129+
LOGGER.warn("Could not get physical memory", e);
130+
return 0;
131+
}
132+
}
133+
105134
private static void throwConversionException(String format, String... arguments) {
106135
throw new CommandLine.TypeConversionException(String.format(format, (Object[]) arguments));
107136
}

src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,16 @@ void byteRatesShouldBeIncludedWhenOptionIsEnabled() throws Exception {
250250
assertThat(consoleOutput()).contains("written bytes").contains("read bytes");
251251
}
252252

253+
@Test
254+
void memoryReportShouldBeIncludedWhenOptionIsEnabled() throws Exception {
255+
Future<?> run = run(builder().memoryReport());
256+
waitUntilStreamExists(s);
257+
waitOneSecond();
258+
run.cancel(true);
259+
waitRunEnds();
260+
assertThat(consoleOutput()).contains("Max memory").contains("max direct memory");
261+
}
262+
253263
@Test
254264
void subEntriesWithCompressionShouldRun() throws Exception {
255265
Future<?> run = run(builder().subEntrySize(10).compression(Compression.GZIP));
@@ -311,6 +321,11 @@ ArgumentsBuilder help() {
311321
return this;
312322
}
313323

324+
ArgumentsBuilder memoryReport() {
325+
arguments.put("memory-report", "");
326+
return this;
327+
}
328+
314329
ArgumentsBuilder maxLengthBytes(ByteCapacity capacity) {
315330
arguments.put("max-length-bytes", capacity.toString());
316331
return this;

0 commit comments

Comments
 (0)