Skip to content

Commit 0366de7

Browse files
author
David R. MacIver
committed
a great big pile of buffering and a bounded queue means that the VM can run indefinitely without backing up. Occasional slowdowns as we wait for the buffers to clear, but run at high speed for most of the time and the slow speed is still 2x the old one from printing directly
1 parent d2c00b0 commit 0366de7

File tree

1 file changed

+16
-10
lines changed

1 file changed

+16
-10
lines changed

src/com/rabbitmq/tools/Tracer.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,11 @@
3131

3232
package com.rabbitmq.tools;
3333

34-
import java.io.DataInputStream;
35-
import java.io.DataOutputStream;
36-
import java.io.EOFException;
37-
import java.io.IOException;
38-
import java.io.PrintStream;
34+
import java.io.*;
3935
import java.net.ServerSocket;
4036
import java.net.Socket;
4137
import java.util.HashMap;
42-
import java.util.concurrent.LinkedBlockingQueue;
38+
import java.util.concurrent.*;
4339

4440
import com.rabbitmq.client.AMQP;
4541
import com.rabbitmq.client.impl.AMQCommand;
@@ -73,11 +69,14 @@ public class Tracer implements Runnable {
7369
new Boolean(System.getProperty("com.rabbitmq.tools.Tracer.SILENT_MODE"))
7470
.booleanValue();
7571

72+
final static int LOG_QUEUE_SIZE = 1024 * 1024;
73+
final static int BUFFER_SIZE = 10 * 1024 * 1024;
74+
7675
private static class AsyncLogger extends Thread{
7776
final PrintStream ps;
78-
final LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<Object>();
77+
final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(LOG_QUEUE_SIZE, true);
7978
AsyncLogger(PrintStream ps){
80-
this.ps = ps;
79+
this.ps = new PrintStream(new BufferedOutputStream(ps, BUFFER_SIZE), false);
8180
start();
8281
}
8382

@@ -94,14 +93,21 @@ void printMessage(Object message){
9493
@Override public void run(){
9594
try {
9695
while(true){
97-
printMessage(queue.take());
96+
Object message = queue.poll(50, TimeUnit.MILLISECONDS);
97+
if(message != null) printMessage(message);
98+
else ps.flush();
99+
98100
}
99101
} catch (InterruptedException interrupt){
100102
}
101103
}
102104

103105
void log(Object message){
104-
queue.add(message);
106+
try {
107+
queue.put(message);
108+
} catch(InterruptedException ex){
109+
throw new RuntimeException(ex);
110+
}
105111
}
106112
}
107113

0 commit comments

Comments
 (0)