@@ -24999,10 +24999,9 @@ var Driver = (function () {
2499924999 // wrapper around Connection anyway, so it makes little difference.
2500025000
2500125001 // Queue up a 'reset', to ensure the next user gets a clean
25002- // session to work with. No need to flush, this will get sent
25003- // along with whatever the next thing the user wants to do with
25004- // this session ends up being, so we save the network round trip.
25002+ // session to work with.
2500525003 conn.reset();
25004+ conn.sync();
2500625005
2500725006 // Return connection to the pool
2500825007 conn._release();
@@ -27577,8 +27576,9 @@ var Chunker = (function (_buf$BaseBuffer) {
2757727576 }, {
2757827577 key: 'putBytes',
2757927578 value: function putBytes(position, data) {
27580- // TODO: If data is larger than our chunk size or so, we're very likely better off just passing this buffer on rather than doing the copy here
27581- // TODO: *however* note that we need some way to find out when the data has been written (and thus the buffer can be re-used) if we take that approach
27579+ // TODO: If data is larger than our chunk size or so, we're very likely better off just passing this buffer on
27580+ // rather than doing the copy here TODO: *however* note that we need some way to find out when the data has been
27581+ // written (and thus the buffer can be re-used) if we take that approach
2758227582 while (data.remaining() > 0) {
2758327583 // Ensure there is an open chunk, and that it has at least one byte of space left
2758427584 this._ensure(1);
@@ -27609,10 +27609,10 @@ var Chunker = (function (_buf$BaseBuffer) {
2760927609 return this;
2761027610 }
2761127611
27612- /**
27612+ /**
2761327613 * Bolt messages are encoded in one or more chunks, and the boundary between two messages
2761427614 * is encoded as a 0-length chunk, `00 00`. This inserts such a message boundary, closing
27615- * any currently open chunk as needed
27615+ * any currently open chunk as needed
2761627616 */
2761727617 }, {
2761827618 key: 'messageBoundary',
@@ -28148,7 +28148,19 @@ var Connection = (function () {
2814828148 }, {
2814928149 key: "reset",
2815028150 value: function reset(observer) {
28151- this._queueObserver(observer);
28151+ this._isHandlingFailure = true;
28152+ var self = this;
28153+ var wrappedObs = {
28154+ onNext: observer ? observer.onNext : NO_OP,
28155+ onError: observer ? observer.onError : NO_OP,
28156+ onCompleted: function onCompleted() {
28157+ self._isHandlingFailure = false;
28158+ if (observer) {
28159+ observer.onCompleted();
28160+ }
28161+ }
28162+ };
28163+ this._queueObserver(wrappedObs);
2815228164 this._packer.packStruct(RESET);
2815328165 this._chunker.messageBoundary();
2815428166 }
@@ -29027,10 +29039,10 @@ try {
2902729039 if (buffer instanceof _buf2['default'].NodeBuffer) {
2902829040 var start = buffer.position,
2902929041 end = start + length;
29030- buffer.position = end;
29042+ buffer.position = Math.min( end, buffer.length) ;
2903129043 return buffer._buffer.toString('utf8', start, end);
2903229044 } else if (buffer instanceof _buf2['default'].CombinedBuffer) {
29033- var out = streamDecodeCombinedBuffer(buffer._buffers , length, function (partBuffer) {
29045+ var out = streamDecodeCombinedBuffer(buffer, length, function (partBuffer) {
2903429046 return decoder.write(partBuffer._buffer);
2903529047 }, function () {
2903629048 return decoder.end();
@@ -29059,14 +29071,14 @@ try {
2905929071 },
2906029072 "decode": function decode(buffer, length) {
2906129073 if (buffer instanceof _buf2['default'].HeapBuffer) {
29062- return decoder.decode(buffer.readView(length));
29074+ return decoder.decode(buffer.readView(Math.min( length, buffer.length - buffer.position) ));
2906329075 } else {
2906429076 // Decoding combined buffer is complicated. For simplicity, for now,
2906529077 // we simply copy the combined buffer into a regular buffer and decode that.
2906629078 var tmpBuf = _buf2['default'].alloc(length);
2906729079 for (var i = 0; i < length; i++) {
2906829080 tmpBuf.writeUInt8(buffer.readUInt8());
29069- };
29081+ }
2907029082 tmpBuf.reset();
2907129083 return decoder.decode(tmpBuf.readView(length));
2907229084 }
@@ -29077,20 +29089,24 @@ try {
2907729089
2907829090var streamDecodeCombinedBuffer = function streamDecodeCombinedBuffer(combinedBuffers, length, decodeFn, endFn) {
2907929091 var remainingBytesToRead = length;
29092+ var position = combinedBuffers.position;
29093+ combinedBuffers._updatePos(Math.min(length, combinedBuffers.length - position));
2908029094 // Reduce CombinedBuffers to a decoded string
29081- var out = combinedBuffers.reduce(function (last, partBuffer) {
29095+ var out = combinedBuffers._buffers. reduce(function (last, partBuffer) {
2908229096 if (remainingBytesToRead <= 0) {
2908329097 return last;
29084- }
29085- if (partBuffer.length > remainingBytesToRead) {
29086- // When we don't want the whole buffer
29087- var lastSlice = partBuffer.readSlice(remainingBytesToRead);
29088- partBuffer._updatePos(remainingBytesToRead);
29089- remainingBytesToRead = 0;
29098+ } else if (position >= partBuffer.length) {
29099+ position -= partBuffer.length;
29100+ return '';
29101+ } else {
29102+ partBuffer._updatePos(position - partBuffer.position);
29103+ var bytesToRead = Math.min(partBuffer.length - position, remainingBytesToRead);
29104+ var lastSlice = partBuffer.readSlice(bytesToRead);
29105+ partBuffer._updatePos(bytesToRead);
29106+ remainingBytesToRead = Math.max(remainingBytesToRead - lastSlice.length, 0);
29107+ position = 0;
2909029108 return last + decodeFn(lastSlice);
2909129109 }
29092- remainingBytesToRead -= partBuffer.length;
29093- return last + decodeFn(partBuffer);
2909429110 }, '');
2909529111 return out + endFn();
2909629112};
0 commit comments