3333import java .util .concurrent .ScheduledFuture ;
3434import java .util .concurrent .TimeUnit ;
3535import java .util .concurrent .atomic .AtomicInteger ;
36+ import java .util .function .BiFunction ;
3637import java .util .function .Function ;
3738import java .util .function .Supplier ;
3839import org .slf4j .Logger ;
@@ -108,6 +109,8 @@ private long getConsumedCount() {
108109
109110 @ Override
110111 public void start (String description ) throws Exception {
112+ long startTime = System .nanoTime ();
113+
111114 String metricPublished = "rabbitmqStreamPublished" ;
112115 String metricProducerConfirmed = "rabbitmqStreamProducer_confirmed" ;
113116 String metricConsumed = "rabbitmqStreamConsumed" ;
@@ -150,14 +153,18 @@ public void start(String description) throws Exception {
150153 .entrySet ()
151154 .forEach (entry -> meters .put (entry .getValue (), registryMeters .get (entry .getKey ())));
152155
153- Map <String , Function <Meter , String >> formatMeter = new HashMap <>();
156+ Map <String , BiFunction <Meter , Duration , String >> formatMeter = new HashMap <>();
154157 metersNamesAndLabels .entrySet ().stream ()
155158 .filter (entry -> !entry .getKey ().contains ("bytes" ))
156159 .forEach (
157160 entry -> {
158161 formatMeter .put (
159162 entry .getValue (),
160- meter -> String .format ("%s %.0f msg/s, " , entry .getValue (), meter .getMeanRate ()));
163+ (meter , duration ) -> {
164+ double rate =
165+ duration .getSeconds () <= 5 ? meter .getMeanRate () : meter .getOneMinuteRate ();
166+ return String .format ("%s %.0f msg/s, " , entry .getValue (), rate );
167+ });
161168 });
162169
163170 metersNamesAndLabels .entrySet ().stream ()
@@ -166,7 +173,11 @@ public void start(String description) throws Exception {
166173 entry -> {
167174 formatMeter .put (
168175 entry .getValue (),
169- meter -> formatByteRate (entry .getValue (), meter .getMeanRate ()) + ", " );
176+ (meter , duration ) -> {
177+ double rate =
178+ duration .getSeconds () <= 5 ? meter .getMeanRate () : meter .getOneMinuteRate ();
179+ return formatByteRate (entry .getValue (), rate ) + ", " ;
180+ });
170181 });
171182
172183 Histogram chunkSize = metricRegistry .getHistograms ().get (metricChunkSize );
@@ -195,6 +206,7 @@ public void start(String description) throws Exception {
195206 () -> {
196207 try {
197208 if (checkActivity ()) {
209+ Duration duration = Duration .ofNanos (System .nanoTime () - startTime );
198210 StringBuilder builder = new StringBuilder ();
199211 builder .append (reportCount .get ()).append (", " );
200212 meters
@@ -203,7 +215,7 @@ public void start(String description) throws Exception {
203215 entry -> {
204216 String meterName = entry .getKey ();
205217 Meter meter = entry .getValue ();
206- builder .append (formatMeter .get (meterName ).apply (meter ));
218+ builder .append (formatMeter .get (meterName ).apply (meter , duration ));
207219 });
208220 builder .append (formatLatency .apply (latency )).append (", " );
209221 builder .append (formatChunkSize .apply (chunkSize ));
@@ -222,8 +234,6 @@ public void start(String description) throws Exception {
222234 1 ,
223235 TimeUnit .SECONDS );
224236
225- long start = System .currentTimeMillis ();
226-
227237 this .closingSequence =
228238 () -> {
229239 consoleReportingTask .cancel (true );
@@ -232,18 +242,19 @@ public void start(String description) throws Exception {
232242
233243 scheduledExecutorService .shutdownNow ();
234244
235- long duration = System .currentTimeMillis () - start ;
245+ Duration d = Duration .ofNanos (System .nanoTime () - startTime );
246+ Duration duration = d .getSeconds () <= 0 ? Duration .ofSeconds (1 ) : d ;
236247
237248 Function <Map .Entry <String , Meter >, String > formatMeterSummary =
238249 entry -> {
239250 if (entry .getKey ().contains ("bytes" )) {
240251 return formatByteRate (
241- entry .getKey (), entry .getValue ().getCount () * 1000 / duration )
252+ entry .getKey (), entry .getValue ().getCount () / duration . getSeconds () )
242253 + ", " ;
243254 } else {
244255 return String .format (
245256 "%s %d msg/s, " ,
246- entry .getKey (), entry .getValue ().getCount () * 1000 / duration );
257+ entry .getKey (), entry .getValue ().getCount () / duration . getSeconds () );
247258 }
248259 };
249260
0 commit comments