Skip to content

Commit f5f8cde

Browse files
committed
Ported FragmentedMessageDeliveryTest to driver
1 parent cd9aa03 commit f5f8cde

File tree

3 files changed

+168
-1
lines changed

3 files changed

+168
-1
lines changed

driver/src/main/java/org/neo4j/driver/v1/util/Functions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
*/
2525
public class Functions
2626
{
27+
@SuppressWarnings( "unchecked" )
2728
public static <T> Function<T,T> identity()
2829
{
2930
return IDENTITY;
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/**
2+
* Copyright (c) 2002-2016 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.messaging;
20+
21+
import org.junit.Test;
22+
23+
import java.io.ByteArrayInputStream;
24+
import java.io.ByteArrayOutputStream;
25+
import java.io.IOException;
26+
import java.nio.ByteBuffer;
27+
import java.nio.channels.Channels;
28+
import java.nio.channels.ReadableByteChannel;
29+
import java.util.ArrayList;
30+
import java.util.Collections;
31+
32+
import org.neo4j.driver.internal.connector.socket.ChunkedOutput;
33+
import org.neo4j.driver.v1.Value;
34+
import org.neo4j.driver.v1.util.DumpMessage;
35+
36+
import static java.util.Arrays.asList;
37+
import static org.hamcrest.MatcherAssert.assertThat;
38+
import static org.hamcrest.Matchers.equalTo;
39+
40+
/**
41+
* This tests network fragmentation of messages. Given a set of messages, it will serialize and chunk the message up
42+
* to a specified chunk size. Then it will split that data into a specified number of fragments, trying every possible
43+
* permutation of fragment sizes for the specified number. For instance, assuming an unfragmented message size of 15,
44+
* and a fragment count of 3, it will create fragment size permutations like:
45+
* <p>
46+
* [1,1,13]
47+
* [1,2,12]
48+
* [1,3,11]
49+
* ..
50+
* [12,1,1]
51+
* <p>
52+
* For each permutation, it delivers the fragments to the protocol implementation, and asserts the protocol handled
53+
* them properly.
54+
*/
55+
public class FragmentedMessageDeliveryTest
56+
{
57+
private final MessageFormat format = new PackStreamMessageFormatV1();
58+
59+
// Only test one chunk size for now, this can be parameterized to test lots of different ones
60+
private int chunkSize = 16;
61+
62+
// Only test one message for now. This can be parameterized later to test lots of different ones
63+
private Message[] messages = new Message[]{ new RunMessage( "Mjölnir", Collections.<String, Value>emptyMap() )};
64+
65+
@Test
66+
public void testFragmentedMessageDelivery() throws Throwable
67+
{
68+
// Given
69+
byte[] unfragmented = serialize( messages );
70+
71+
// When & Then
72+
int n = unfragmented.length;
73+
for ( int i = 1; i < n - 1; i++ )
74+
{
75+
for ( int j = 1; j < n - i; j++ )
76+
{
77+
testPermutation( unfragmented, i, j, n - i - j );
78+
}
79+
}
80+
}
81+
82+
private void testPermutation( byte[] unfragmented, int... sizes ) throws IOException
83+
{
84+
int pos = 0;
85+
ByteBuffer[] fragments = new ByteBuffer[sizes.length];
86+
for ( int i = 0; i < sizes.length; i++ )
87+
{
88+
fragments[i] = ByteBuffer.wrap( unfragmented, pos, sizes[i] );
89+
pos += sizes[i];
90+
}
91+
testPermutation( unfragmented, fragments );
92+
}
93+
94+
private void testPermutation( byte[] unfragmented, ByteBuffer[] fragments ) throws IOException
95+
{
96+
97+
// When data arrives split up according to the current permutation
98+
ReadableByteChannel[] channels = new ReadableByteChannel[fragments.length];
99+
for ( int i = 0; i < fragments.length; i++ )
100+
{
101+
channels[i] = packet( fragments[i] );
102+
}
103+
104+
ReadableByteChannel fragmentedChannel = packets( channels );
105+
MessageFormat.Reader reader = format.newReader( fragmentedChannel );
106+
107+
ArrayList<Message> packedMessages = new ArrayList<>();
108+
DumpMessage.unpack( packedMessages, reader );
109+
110+
assertThat( packedMessages, equalTo(asList(messages)) );
111+
}
112+
113+
private ReadableByteChannel packet( ByteBuffer buffer )
114+
{
115+
//NOTE buffer.array is ok here since we know buffer is backed by array
116+
return Channels.newChannel(
117+
new ByteArrayInputStream( buffer.array() ) );
118+
}
119+
120+
121+
private ReadableByteChannel packets( final ReadableByteChannel... channels )
122+
{
123+
124+
return new ReadableByteChannel()
125+
{
126+
private int index = 0;
127+
128+
@Override
129+
public int read( ByteBuffer dst ) throws IOException
130+
{
131+
return channels[index++].read( dst );
132+
}
133+
134+
@Override
135+
public boolean isOpen()
136+
{
137+
return false;
138+
}
139+
140+
@Override
141+
public void close() throws IOException
142+
{
143+
144+
}
145+
};
146+
}
147+
148+
private byte[] serialize( Message... msgs ) throws IOException
149+
{
150+
151+
final ByteArrayOutputStream out = new ByteArrayOutputStream( 128 );
152+
153+
ChunkedOutput output = new ChunkedOutput( chunkSize + 2 /* for chunk header */, Channels.newChannel( out ) );
154+
155+
PackStreamMessageFormatV1.Writer writer =
156+
new PackStreamMessageFormatV1.Writer( output, output.messageBoundaryHook() );
157+
for ( Message message : messages )
158+
{
159+
writer.write( message );
160+
}
161+
writer.flush();
162+
163+
return out.toByteArray();
164+
}
165+
}

examples/src/test/java/org/neo4j/docs/driver/ExamplesIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ public void retainResultsForLaterProcessing() throws Throwable
150150
{
151151
StdIOCapture stdIO = new StdIOCapture();
152152
try ( AutoCloseable captured = stdIO.capture();
153-
Driver driver = GraphDatabase.driver( "bolt://localhost" ); )
153+
Driver driver = GraphDatabase.driver( "bolt://localhost" ) )
154154
{
155155
try ( Session setup = driver.session() )
156156
{
@@ -195,6 +195,7 @@ public void transactionRollback() throws Throwable
195195
}
196196
}
197197

198+
@SuppressWarnings( "unchecked" )
198199
@Test
199200
public void resultSummary() throws Throwable
200201
{

0 commit comments

Comments
 (0)