3535import java .io .DataOutputStream ;
3636import java .io .EOFException ;
3737import java .io .IOException ;
38+ import java .io .PrintStream ;
3839import java .net .ServerSocket ;
3940import java .net .Socket ;
4041import java .util .HashMap ;
42+ import java .util .concurrent .LinkedBlockingQueue ;
4143
4244import com .rabbitmq .client .AMQP ;
4345import com .rabbitmq .client .impl .AMQCommand ;
4648import com .rabbitmq .client .impl .Frame ;
4749import com .rabbitmq .utility .BlockingCell ;
4850
51+
4952/**
5053 * AMQP Protocol Analyzer program. Listens on a configurable port and when a
5154 * connection arrives, makes an outbound connection to a configurable host and
@@ -66,6 +69,53 @@ public class Tracer implements Runnable {
6669 new Boolean (System .getProperty ("com.rabbitmq.tools.Tracer.NO_DECODE_FRAMES" ))
6770 .booleanValue ();
6871
72+ static class AsyncLogger extends Thread {
73+ final PrintStream ps ;
74+ final LinkedBlockingQueue <Object > queue = new LinkedBlockingQueue <Object >();
75+ private static final Object CLOSE = new Object ();
76+
77+ public AsyncLogger (PrintStream ps ){
78+ this .ps = ps ;
79+ start ();
80+ }
81+
82+ void printMessage (Object message ){
83+ if (message instanceof Throwable ){
84+ ((Throwable )message ).printStackTrace (ps );
85+ } else if (message instanceof String ){
86+ ps .println (message );
87+ } else {
88+ throw new RuntimeException ("Unrecognised object " + message );
89+ }
90+ }
91+
92+ @ Override public void run (){
93+ try {
94+ while (true ){
95+ Object message = queue .take ();
96+ if (message == CLOSE ){
97+ while ((message = queue .poll ()) != null ){
98+ if (message != CLOSE ){
99+ printMessage (message );
100+ }
101+ }
102+ break ;
103+ }
104+ printMessage (message );
105+ }
106+ } catch (InterruptedException interrupt ){
107+ }
108+ }
109+
110+ public void log (Object message ){
111+ queue .add (message );
112+ }
113+
114+ public void close (){
115+ queue .add (CLOSE );
116+ }
117+ }
118+
69119 public static void main (String [] args ) {
70120 int listenPort = args .length > 0 ? Integer .parseInt (args [0 ]) : 5673 ;
71121 String connectHost = args .length > 1 ? args [1 ] : "localhost" ;
@@ -85,9 +135,10 @@ public static void main(String[] args) {
85135 try {
86136 ServerSocket server = new ServerSocket (listenPort );
87137 int counter = 0 ;
138+ AsyncLogger logger = new AsyncLogger (System .out );
88139 while (true ) {
89140 Socket conn = server .accept ();
90- new Tracer (conn , counter ++, connectHost , connectPort );
141+ new Tracer (conn , counter ++, connectHost , connectPort , logger );
91142 }
92143 } catch (IOException ioe ) {
93144 ioe .printStackTrace ();
@@ -109,7 +160,9 @@ public static void main(String[] args) {
109160
110161 public DataOutputStream oos ;
111162
112- public Tracer (Socket sock , int id , String host , int port ) throws IOException {
163+ public AsyncLogger logger ;
164+
165+ public Tracer (Socket sock , int id , String host , int port , AsyncLogger logger ) throws IOException {
113166 this .inSock = sock ;
114167 this .outSock = new Socket (host , port );
115168 this .id = id ;
@@ -118,6 +171,7 @@ public Tracer(Socket sock, int id, String host, int port) throws IOException {
118171 this .ios = new DataOutputStream (inSock .getOutputStream ());
119172 this .ois = new DataInputStream (outSock .getInputStream ());
120173 this .oos = new DataOutputStream (outSock .getOutputStream ());
174+ this .logger = logger ;
121175
122176 new Thread (this ).start ();
123177 }
@@ -135,18 +189,18 @@ public void run() {
135189 new Thread (outHandler ).start ();
136190 Object result = w .uninterruptibleGet ();
137191 if (result instanceof Exception ) {
138- (( Exception ) result ). printStackTrace ( );
192+ logger . log ( result );
139193 }
140194 } catch (EOFException eofe ) {
141- eofe . printStackTrace ( );
195+ logger . log ( eofe );
142196 } catch (IOException ioe ) {
143- ioe . printStackTrace ( );
197+ logger . log ( ioe );
144198 } finally {
145199 try {
146200 inSock .close ();
147201 outSock .close ();
148202 } catch (IOException ioe2 ) {
149- ioe2 . printStackTrace ( );
203+ logger . log ( ioe2 );
150204 }
151205 }
152206 }
@@ -174,7 +228,7 @@ public Frame readFrame() throws IOException {
174228 }
175229
176230 public void report (int channel , Object object ) {
177- System . out . println ("" + System .currentTimeMillis () + ": conn#" + id + " ch#" + channel + (inBound ? " -> " : " <- " ) + object );
231+ logger . log ("" + System .currentTimeMillis () + ": conn#" + id + " ch#" + channel + (inBound ? " -> " : " <- " ) + object );
178232 }
179233
180234 public void reportFrame (Frame f )
0 commit comments