3939import java .util .Arrays ;
4040import java .util .List ;
4141import java .util .UUID ;
42+ import java .net .Socket ;
4243
4344import org .apache .commons .cli .CommandLine ;
4445import org .apache .commons .cli .CommandLineParser ;
6162import com .rabbitmq .client .AMQP .Queue ;
6263import com .rabbitmq .client .QueueingConsumer .Delivery ;
6364
64-
6565public class MulticastMain {
6666
6767 public static void main (String [] args ) {
@@ -85,6 +85,7 @@ public static void main(String[] args) {
8585 int minMsgSize = intArg (cmd , 's' , 0 );
8686 int maxRedirects = intArg (cmd , 'd' , 0 );
8787 int timeLimit = intArg (cmd , 'z' , 0 );
88+ final int bufferSize = intArg (cmd , 'b' , -1 );
8889 List flags = lstArg (cmd , 'f' );
8990
9091 //setup
@@ -98,7 +99,15 @@ public static void main(String[] args) {
9899 Connection [] consumerConnections = new Connection [consumerCount ];
99100 for (int i = 0 ; i < consumerCount ; i ++) {
100101 System .out .println ("starting consumer #" + i );
101- Connection conn = new ConnectionFactory (params ).newConnection (addresses , maxRedirects );
102+ Connection conn = new ConnectionFactory (params ){
103+ @ Override public void configureSocket (Socket socket ) throws IOException {
104+ super .configureSocket (socket );
105+ if (bufferSize > 0 ){
106+ socket .setReceiveBufferSize (bufferSize );
107+ socket .setSendBufferSize (bufferSize );
108+ }
109+ }
110+ }.newConnection (addresses , maxRedirects );
102111 consumerConnections [i ] = conn ;
103112 Channel channel = conn .createChannel ();
104113 if (consumerTxSize > 0 ) channel .txSelect ();
@@ -160,6 +169,7 @@ private static Options getOptions() {
160169 Options options = new Options ();
161170 options .addOption (new Option ("h" , "host" , true , "broker host" ));
162171 options .addOption (new Option ("p" , "port" , true , "broker port" ));
172+ options .addOption (new Option ("b" , "buffer" , true , "buffer size" ));
163173 options .addOption (new Option ("t" , "type" , true , "exchange type" ));
164174 options .addOption (new Option ("e" , "exchange" , true , "exchange name" ));
165175 options .addOption (new Option ("i" , "interval" , true , "sampling interval" ));
0 commit comments