Skip to content

Commit 2da541c

Browse files
committed
Merge pull request #150 from pontusmelke/1.0-chunking-state-machine
Buffer channel reads
2 parents 67b400c + f5f8cde commit 2da541c

File tree

15 files changed

+1074
-79
lines changed

15 files changed

+1074
-79
lines changed

driver/src/main/java/org/neo4j/driver/internal/connector/socket/BufferingChunkedInput.java

Lines changed: 430 additions & 0 deletions
Large diffs are not rendered by default.

driver/src/main/java/org/neo4j/driver/internal/connector/socket/SocketClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ public static ByteChannel create( String host, int port, Config config, Logger l
228228
}
229229
case NONE:
230230
{
231-
channel = new AllOrNothingChannel( soChannel );
231+
channel = soChannel;
232232
break;
233233
}
234234
default:

driver/src/main/java/org/neo4j/driver/internal/connector/socket/SocketProtocolV1.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public SocketProtocolV1( ByteChannel channel ) throws IOException
3737
messageFormat = new PackStreamMessageFormatV1();
3838

3939
ChunkedOutput output = new ChunkedOutput( channel );
40-
ChunkedInput input = new ChunkedInput( channel );
40+
BufferingChunkedInput input = new BufferingChunkedInput( channel );
4141

4242
this.writer = new PackStreamMessageFormatV1.Writer( output, output.messageBoundaryHook() );
4343
this.reader = new PackStreamMessageFormatV1.Reader( input, input.messageBoundaryHook() );

driver/src/main/java/org/neo4j/driver/internal/connector/socket/SocketResponseHandler.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@
3333
import org.neo4j.driver.v1.exceptions.DatabaseException;
3434
import org.neo4j.driver.v1.exceptions.Neo4jException;
3535
import org.neo4j.driver.v1.exceptions.TransientException;
36+
import org.neo4j.driver.v1.summary.Notification;
3637
import org.neo4j.driver.v1.summary.StatementType;
38+
import org.neo4j.driver.v1.util.Function;
3739

3840
public class SocketResponseHandler implements MessageHandler
3941
{
@@ -95,8 +97,9 @@ private void collectNotifications( StreamCollector collector, Value notification
9597
{
9698
if ( notifications != null )
9799
{
98-
collector.notifications( notifications.asList( InternalNotification
99-
.VALUE_TO_NOTIFICATION ) );
100+
Function<Value,Notification> notification = InternalNotification
101+
.VALUE_TO_NOTIFICATION;
102+
collector.notifications( notifications.asList( notification ) );
100103
}
101104
}
102105

driver/src/main/java/org/neo4j/driver/internal/connector/socket/TLSSocketChannel.java

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ public class TLSSocketChannel implements ByteChannel
6464
private ByteBuffer plainIn;
6565
private ByteBuffer plainOut;
6666

67+
private static final ByteBuffer DUMMY_BUFFER = ByteBuffer.allocateDirect( 0 );
68+
6769
public TLSSocketChannel( String host, int port, SocketChannel channel, Logger logger,
6870
TrustStrategy trustStrategy )
6971
throws GeneralSecurityException, IOException
@@ -123,7 +125,7 @@ private void runHandshake() throws IOException
123125
break;
124126
case NEED_UNWRAP:
125127
// Unwrap the ssl packet to value ssl handshake information
126-
handshakeStatus = unwrap( null );
128+
handshakeStatus = unwrap( DUMMY_BUFFER );
127129
plainIn.clear();
128130
break;
129131
case NEED_WRAP:
@@ -329,17 +331,17 @@ private HandshakeStatus wrap( ByteBuffer buffer ) throws IOException
329331
*/
330332
static int bufferCopy( ByteBuffer from, ByteBuffer to )
331333
{
332-
if ( from == null || to == null )
333-
{
334-
return 0;
335-
}
334+
int maxTransfer = Math.min( to.remaining(), from.remaining() );
336335

337-
int i;
338-
for ( i = 0; to.remaining() > 0 && from.remaining() > 0; i++ )
339-
{
340-
to.put( from.get() );
341-
}
342-
return i;
336+
//use a temp buffer and move all data in one go
337+
ByteBuffer temporaryBuffer = from.duplicate();
338+
temporaryBuffer.limit( temporaryBuffer.position() + maxTransfer );
339+
to.put( temporaryBuffer );
340+
341+
//move postion so it appears as if we read the buffer
342+
from.position( from.position() + maxTransfer );
343+
344+
return maxTransfer;
343345
}
344346

345347
/**
@@ -391,23 +393,18 @@ public int read( ByteBuffer dst ) throws IOException
391393
*/
392394
int toRead = dst.remaining();
393395
plainIn.flip();
394-
if ( plainIn.remaining() >= toRead )
396+
if ( plainIn.hasRemaining() )
395397
{
396398
bufferCopy( plainIn, dst );
397399
plainIn.compact();
398400
}
399401
else
400402
{
401-
dst.put( plainIn ); // Copy whatever left in the plainIn to dst
402-
do
403-
{
404-
plainIn.clear(); // Clear plainIn
405-
unwrap( dst ); // Read more data from the underline channel and save the data read into dst
406-
}
407-
while ( dst.remaining() > 0 ); // If enough bytes read then return otherwise continue reading from channel
403+
plainIn.clear(); // Clear plainIn
404+
unwrap( dst ); // Read more data from the underline channel and save the data read into dst
408405
}
409406

410-
return toRead;
407+
return toRead - dst.remaining();
411408
}
412409

413410
@Override

driver/src/main/java/org/neo4j/driver/internal/messaging/PackStreamMessageFormatV1.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import org.neo4j.driver.internal.InternalNode;
3232
import org.neo4j.driver.internal.InternalPath;
3333
import org.neo4j.driver.internal.InternalRelationship;
34-
import org.neo4j.driver.internal.connector.socket.ChunkedInput;
34+
import org.neo4j.driver.internal.connector.socket.BufferingChunkedInput;
3535
import org.neo4j.driver.internal.connector.socket.ChunkedOutput;
3636
import org.neo4j.driver.internal.packstream.PackInput;
3737
import org.neo4j.driver.internal.packstream.PackOutput;
@@ -44,12 +44,12 @@
4444
import org.neo4j.driver.internal.value.NodeValue;
4545
import org.neo4j.driver.internal.value.PathValue;
4646
import org.neo4j.driver.internal.value.RelationshipValue;
47+
import org.neo4j.driver.v1.Value;
48+
import org.neo4j.driver.v1.exceptions.ClientException;
4749
import org.neo4j.driver.v1.types.Entity;
4850
import org.neo4j.driver.v1.types.Node;
4951
import org.neo4j.driver.v1.types.Path;
5052
import org.neo4j.driver.v1.types.Relationship;
51-
import org.neo4j.driver.v1.Value;
52-
import org.neo4j.driver.v1.exceptions.ClientException;
5353

5454
import static org.neo4j.driver.v1.Values.value;
5555

@@ -87,7 +87,7 @@ public MessageFormat.Writer newWriter( WritableByteChannel ch )
8787
@Override
8888
public MessageFormat.Reader newReader( ReadableByteChannel ch )
8989
{
90-
ChunkedInput input = new ChunkedInput( ch );
90+
BufferingChunkedInput input = new BufferingChunkedInput( ch );
9191
return new Reader( input, input.messageBoundaryHook() );
9292
}
9393

@@ -534,7 +534,8 @@ private Value unpackValue() throws IOException
534534
{
535535
case NODE:
536536
ensureCorrectStructSize( "NODE", NODE_FIELDS, size );
537-
return new NodeValue( unpackNode() );
537+
InternalNode adapted = unpackNode();
538+
return new NodeValue( adapted );
538539
case RELATIONSHIP:
539540
ensureCorrectStructSize( "RELATIONSHIP", 5, size );
540541
return unpackRelationship();
@@ -555,7 +556,8 @@ private Value unpackRelationship() throws IOException
555556
String relType = unpacker.unpackString();
556557
Map<String,Value> props = unpackMap();
557558

558-
return new RelationshipValue( new InternalRelationship( urn, startUrn, endUrn, relType, props ) );
559+
InternalRelationship adapted = new InternalRelationship( urn, startUrn, endUrn, relType, props );
560+
return new RelationshipValue( adapted );
559561
}
560562

561563
private InternalNode unpackNode() throws IOException

driver/src/main/java/org/neo4j/driver/internal/packstream/BufferedChannelInput.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,11 @@ public class BufferedChannelInput implements PackInput
3030
{
3131
private final ByteBuffer buffer;
3232
private ReadableByteChannel channel;
33+
private static final int DEFAULT_BUFFER_CAPACITY = 8192;
3334

34-
public BufferedChannelInput( int bufferCapacity )
35+
public BufferedChannelInput(ReadableByteChannel ch )
3536
{
36-
this( bufferCapacity, null );
37+
this( DEFAULT_BUFFER_CAPACITY, ch );
3738
}
3839

3940
public BufferedChannelInput( int bufferCapacity, ReadableByteChannel ch )

driver/src/main/java/org/neo4j/driver/internal/packstream/PackStream.java

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.neo4j.driver.internal.packstream;
2020

2121
import java.io.IOException;
22-
import java.nio.channels.ReadableByteChannel;
2322
import java.nio.channels.WritableByteChannel;
2423
import java.nio.charset.Charset;
2524
import java.util.List;
@@ -147,8 +146,6 @@ public class PackStream
147146
private static final String EMPTY_STRING = "";
148147
private static final Charset UTF_8 = Charset.forName( "UTF-8" );
149148

150-
private static final int DEFAULT_BUFFER_CAPACITY = 8192;
151-
152149
private PackStream() {}
153150

154151
public static class Packer
@@ -428,29 +425,11 @@ public static class Unpacker
428425
{
429426
private PackInput in;
430427

431-
public Unpacker( ReadableByteChannel channel )
432-
{
433-
this( DEFAULT_BUFFER_CAPACITY );
434-
reset( channel );
435-
}
436-
437-
public Unpacker( int bufferCapacity )
438-
{
439-
assert bufferCapacity >= 8 : "Buffer must be at least 8 bytes.";
440-
this.in = new BufferedChannelInput( bufferCapacity );
441-
}
442-
443428
public Unpacker( PackInput in )
444429
{
445430
this.in = in;
446431
}
447432

448-
public Unpacker reset( ReadableByteChannel ch )
449-
{
450-
((BufferedChannelInput)in).reset( ch );
451-
return this;
452-
}
453-
454433
public boolean hasNext() throws IOException
455434
{
456435
return in.hasMoreData();

driver/src/main/java/org/neo4j/driver/v1/util/Functions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
*/
2525
public class Functions
2626
{
27+
@SuppressWarnings( "unchecked" )
2728
public static <T> Function<T,T> identity()
2829
{
2930
return IDENTITY;

0 commit comments

Comments
 (0)