2020import WebSocketChannel from './ch-websocket' ;
2121import NodeChannel from './ch-node' ;
2222import { Chunker , Dechunker } from './chunking' ;
23- import { Packer , Unpacker } from './packstream' ;
23+ import packStreamUtil from './packstream-util ' ;
2424import { alloc } from './buf' ;
25- import { Node , Path , PathSegment , Relationship , UnboundRelationship } from '../graph-types' ;
2625import { newError } from './../error' ;
2726import ChannelConfig from './ch-config' ;
2827import urlUtil from './url-util' ;
@@ -54,11 +53,6 @@ RECORD = 0x71, // 0111 0001 // RECORD <value>
5453IGNORED = 0x7E , // 0111 1110 // IGNORED <metadata>
5554FAILURE = 0x7F , // 0111 1111 // FAILURE <metadata>
5655
57- // Signature bytes for higher-level graph objects
58- NODE = 0x4E ,
59- RELATIONSHIP = 0x52 ,
60- UNBOUND_RELATIONSHIP = 0x72 ,
61- PATH = 0x50 ,
6256//sent before version negotiation
6357MAGIC_PREAMBLE = 0x6060B017 ,
6458DEBUG = false ;
@@ -85,66 +79,6 @@ let NO_OP_OBSERVER = {
8579 onError : NO_OP
8680} ;
8781
88- /** Maps from packstream structures to Neo4j domain objects */
89- let _mappers = {
90- node : ( unpacker , buf ) => {
91- return new Node (
92- unpacker . unpack ( buf ) , // Identity
93- unpacker . unpack ( buf ) , // Labels
94- unpacker . unpack ( buf ) // Properties
95- ) ;
96- } ,
97- rel : ( unpacker , buf ) => {
98- return new Relationship (
99- unpacker . unpack ( buf ) , // Identity
100- unpacker . unpack ( buf ) , // Start Node Identity
101- unpacker . unpack ( buf ) , // End Node Identity
102- unpacker . unpack ( buf ) , // Type
103- unpacker . unpack ( buf ) // Properties
104- ) ;
105- } ,
106- unboundRel : ( unpacker , buf ) => {
107- return new UnboundRelationship (
108- unpacker . unpack ( buf ) , // Identity
109- unpacker . unpack ( buf ) , // Type
110- unpacker . unpack ( buf ) // Properties
111- ) ;
112- } ,
113- path : ( unpacker , buf ) => {
114- let nodes = unpacker . unpack ( buf ) ,
115- rels = unpacker . unpack ( buf ) ,
116- sequence = unpacker . unpack ( buf ) ;
117- let prevNode = nodes [ 0 ] ,
118- segments = [ ] ;
119-
120- for ( let i = 0 ; i < sequence . length ; i += 2 ) {
121- let relIndex = sequence [ i ] ,
122- nextNode = nodes [ sequence [ i + 1 ] ] ,
123- rel ;
124- if ( relIndex > 0 ) {
125- rel = rels [ relIndex - 1 ] ;
126- if ( rel instanceof UnboundRelationship ) {
127- // To avoid duplication, relationships in a path do not contain
128- // information about their start and end nodes, that's instead
129- // inferred from the path sequence. This is us inferring (and,
130- // for performance reasons remembering) the start/end of a rel.
131- rels [ relIndex - 1 ] = rel = rel . bind ( prevNode . identity , nextNode . identity ) ;
132- }
133- } else {
134- rel = rels [ - relIndex - 1 ] ;
135- if ( rel instanceof UnboundRelationship ) {
136- // See above
137- rels [ - relIndex - 1 ] = rel = rel . bind ( nextNode . identity , prevNode . identity ) ;
138- }
139- }
140- // Done hydrating one path segment.
141- segments . push ( new PathSegment ( prevNode , rel , nextNode ) ) ;
142- prevNode = nextNode ;
143- }
144- return new Path ( nodes [ 0 ] , nodes [ nodes . length - 1 ] , segments ) ;
145- }
146- } ;
147-
14882/**
14983 * A connection manages sending and recieving messages over a channel. A
15084 * connector is very closely tied to the Bolt protocol, it implements the
@@ -175,13 +109,16 @@ class Connection {
175109 this . url = url ;
176110 this . server = { address : url } ;
177111 this . creationTimestamp = Date . now ( ) ;
112+ this . _disableLosslessIntegers = disableLosslessIntegers ;
178113 this . _pendingObservers = [ ] ;
179114 this . _currentObserver = undefined ;
180115 this . _ch = channel ;
181116 this . _dechunker = new Dechunker ( ) ;
182117 this . _chunker = new Chunker ( channel ) ;
183- this . _packer = new Packer ( this . _chunker ) ;
184- this . _unpacker = new Unpacker ( disableLosslessIntegers ) ;
118+
119+ // initially assume that database supports latest Bolt version, create latest packer and unpacker
120+ this . _packer = packStreamUtil . createLatestPacker ( this . _chunker ) ;
121+ this . _unpacker = packStreamUtil . createLatestUnpacker ( disableLosslessIntegers ) ;
185122
186123 this . _isHandlingFailure = false ;
187124 this . _currentFailure = null ;
@@ -191,34 +128,18 @@ class Connection {
191128 // Set to true on fatal errors, to get this out of session pool.
192129 this . _isBroken = false ;
193130
194- // For deserialization, explain to the unpacker how to unpack nodes, rels, paths;
195- this . _unpacker . structMappers [ NODE ] = _mappers . node ;
196- this . _unpacker . structMappers [ RELATIONSHIP ] = _mappers . rel ;
197- this . _unpacker . structMappers [ UNBOUND_RELATIONSHIP ] = _mappers . unboundRel ;
198- this . _unpacker . structMappers [ PATH ] = _mappers . path ;
199-
200- let self = this ;
201131 // TODO: Using `onmessage` and `onerror` came from the WebSocket API,
202132 // it reads poorly and has several annoying drawbacks. Swap to having
203133 // Channel extend EventEmitter instead, then we can use `on('data',..)`
204134 this . _ch . onmessage = ( buf ) => {
205- let proposed = buf . readInt32 ( ) ;
206- if ( proposed == 1 ) {
207- // Ok, protocol running. Simply forward all messages past
208- // this to the dechunker
209- self . _ch . onmessage = ( buf ) => {
210- self . _dechunker . write ( buf ) ;
211- } ;
212-
213- if ( buf . hasRemaining ( ) ) {
214- self . _dechunker . write ( buf . readSlice ( buf . remaining ( ) ) ) ;
215- }
135+ const proposed = buf . readInt32 ( ) ;
136+ if ( proposed == 1 || proposed == 2 ) {
137+ this . _initializeProtocol ( proposed , buf ) ;
216138 } else if ( proposed == 1213486160 ) { //server responded 1213486160 == 0x48545450 == "HTTP"
217- self . _handleFatalError ( newError ( "Server responded HTTP. Make sure you are not trying to connect to the http endpoint " +
218- "(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)" ) ) ;
219- }
220- else {
221- self . _handleFatalError ( newError ( "Unknown Bolt protocol version: " + proposed ) ) ;
139+ this . _handleFatalError ( newError ( 'Server responded HTTP. Make sure you are not trying to connect to the http endpoint ' +
140+ '(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)' ) ) ;
141+ } else {
142+ this . _handleFatalError ( newError ( 'Unknown Bolt protocol version: ' + proposed ) ) ;
222143 }
223144 } ;
224145
@@ -235,21 +156,40 @@ class Connection {
235156 }
236157
237158 this . _dechunker . onmessage = ( buf ) => {
238- self . _handleMessage ( self . _unpacker . unpack ( buf ) ) ;
159+ this . _handleMessage ( this . _unpacker . unpack ( buf ) ) ;
239160 } ;
240161
241162 let handshake = alloc ( 5 * 4 ) ;
242163 //magic preamble
243164 handshake . writeInt32 ( MAGIC_PREAMBLE ) ;
244165 //proposed versions
166+ handshake . writeInt32 ( 2 ) ;
245167 handshake . writeInt32 ( 1 ) ;
246168 handshake . writeInt32 ( 0 ) ;
247169 handshake . writeInt32 ( 0 ) ;
248- handshake . writeInt32 ( 0 ) ;
249170 handshake . reset ( ) ;
250171 this . _ch . write ( handshake ) ;
251172 }
252173
174+ /**
175+ * Complete protocol initialization.
176+ * @param {number } version the selected protocol version.
177+ * @param {BaseBuffer } buffer the handshake response buffer.
178+ * @private
179+ */
180+ _initializeProtocol ( version , buffer ) {
181+ // re-create packer and unpacker because version might be lower than we initially assumed
182+ this . _packer = packStreamUtil . createPackerForProtocolVersion ( version , this . _chunker ) ;
183+ this . _unpacker = packStreamUtil . createUnpackerForProtocolVersion ( version , this . _disableLosslessIntegers ) ;
184+
185+ // Ok, protocol running. Simply forward all messages to the dechunker
186+ this . _ch . onmessage = buf => this . _dechunker . write ( buf ) ;
187+
188+ if ( buffer . hasRemaining ( ) ) {
189+ this . _dechunker . write ( buffer . readSlice ( buffer . remaining ( ) ) ) ;
190+ }
191+ }
192+
253193 /**
254194 * "Fatal" means the connection is dead. Only call this if something
255195 * happens that cannot be recovered from. This will lead to all subscribers
0 commit comments