|
| 1 | +package com.rabbitmq.examples; |
| 2 | + |
| 3 | +import com.rabbitmq.client.ConnectionFactory; |
| 4 | +import com.rabbitmq.client.Connection; |
| 5 | +import com.rabbitmq.client.Channel; |
| 6 | +import com.rabbitmq.client.GetResponse; |
| 7 | +import com.rabbitmq.client.MessageProperties; |
| 8 | +import java.util.Arrays; |
| 9 | +import java.net.Socket; |
| 10 | +import java.io.IOException; |
| 11 | +import java.util.Random; |
| 12 | + |
| 13 | +/** |
| 14 | + * Class to explore how performance of sending and receiving messages varies with the buffer size and |
| 15 | + * enabling/disabling Nagle's algorithm. |
| 16 | + */ |
| 17 | +public class BufferPerformanceMetrics{ |
| 18 | + |
| 19 | + public static final String QUEUE = "performance-test-queue"; |
| 20 | + public static final String EXCHANGE = "performance-test-exchange"; |
| 21 | + public static final String ROUTING_KEY = "performance-test-rk"; |
| 22 | + public static final int MESSAGE_COUNT = 10000; |
| 23 | + public static final byte[] MESSAGE = "Hello world".getBytes(); |
| 24 | + public static double NANOSECONDS_PER_SECOND = 1000 * 1000 * 1000; |
| 25 | + public static final int REPEATS = 1000000; |
| 26 | + public static final int PEAK_SIZE = 20 * 1024; |
| 27 | + |
| 28 | + public static void main(String[] args) throws Exception{ |
| 29 | + Random rnd = new Random(); |
| 30 | + |
| 31 | + System.out.println("buffer size, publish rate with nagle, get rate with nagle," + |
| 32 | + " publish rate without nagle, get rate without nagle"); |
| 33 | + for(int repeat = 0; repeat < REPEATS; repeat++){ |
| 34 | + final int bufferSize = 1 + rnd.nextInt(PEAK_SIZE); |
| 35 | + |
| 36 | + double publishRateNagle = 0, publishRateNoNagle = 0, getRateNagle = 0, getRateNoNagle = 0; |
| 37 | + |
| 38 | + for(final boolean useNagle : new boolean[]{ false, true }){ |
| 39 | + ConnectionFactory factory = new ConnectionFactory(){ |
| 40 | + @Override public void configureSocket(Socket socket) throws IOException{ |
| 41 | + socket.setTcpNoDelay(!useNagle); |
| 42 | + socket.setReceiveBufferSize(bufferSize); |
| 43 | + socket.setSendBufferSize(bufferSize); |
| 44 | + } |
| 45 | + }; |
| 46 | + Connection connection = factory.newConnection("localhost"); |
| 47 | + Channel channel = connection.createChannel(); |
| 48 | + channel.exchangeDeclare(EXCHANGE, "direct"); |
| 49 | + channel.queueDeclare(QUEUE); |
| 50 | + channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY); |
| 51 | + |
| 52 | + long start; |
| 53 | + |
| 54 | + start = System.nanoTime(); |
| 55 | + for(int i = 0; i < MESSAGE_COUNT; i++){ |
| 56 | + channel.basicPublish(EXCHANGE, QUEUE, MessageProperties.BASIC, MESSAGE); |
| 57 | + } |
| 58 | + long publishTime = System.nanoTime() - start; |
| 59 | + |
| 60 | + start = System.nanoTime(); |
| 61 | + for(int i = 0; i < MESSAGE_COUNT; i++){ |
| 62 | + GetResponse response = channel.basicGet(QUEUE, true); |
| 63 | + assert(Arrays.equals(MESSAGE, response.getBody())); |
| 64 | + } |
| 65 | + long getTime = System.nanoTime() - start; |
| 66 | + |
| 67 | + double publishRate = MESSAGE_COUNT / (publishTime / NANOSECONDS_PER_SECOND); |
| 68 | + double getRate = MESSAGE_COUNT / (getTime / NANOSECONDS_PER_SECOND); |
| 69 | + if(useNagle){ |
| 70 | + publishRateNagle = publishRate; |
| 71 | + getRateNagle = getRate; |
| 72 | + } else { |
| 73 | + publishRateNoNagle = publishRate; |
| 74 | + getRateNoNagle = getRate; |
| 75 | + } |
| 76 | + |
| 77 | + connection.close(); |
| 78 | + // Small sleep to remove noise from hammering the server. |
| 79 | + Thread.sleep(100); |
| 80 | + } |
| 81 | + |
| 82 | + System.out.println(bufferSize + ", " + publishRateNagle + ", " + getRateNagle + ", " + publishRateNoNagle + ", " + getRateNoNagle); |
| 83 | + } |
| 84 | + } |
| 85 | +} |
0 commit comments