1616 * See the License for the specific language governing permissions and
1717 * limitations under the License.
1818 */
19-
19+
2020import buf from './buf' ;
2121
2222let
23- _CHUNK_HEADER_SIZE = 2 ,
23+ _CHUNK_HEADER_SIZE = 2 ,
2424 _MESSAGE_BOUNDARY = 0x00 ,
2525 _DEFAULT_BUFFER_SIZE = 1400 ; // http://stackoverflow.com/questions/2613734/maximum-packet-size-for-a-tcp-connection
2626
2929 * @access private
3030 */
3131class Chunker extends buf . BaseBuffer {
32- constructor ( channel , bufferSize ) {
33- super ( 0 ) ;
32+ constructor ( channel , bufferSize ) {
33+ super ( 0 ) ;
3434 this . _bufferSize = bufferSize || _DEFAULT_BUFFER_SIZE ;
3535 this . _ch = channel ;
36- this . _buffer = buf . alloc ( this . _bufferSize ) ;
36+ this . _buffer = buf . alloc ( this . _bufferSize ) ;
3737 this . _currentChunkStart = 0 ;
3838 this . _chunkOpen = false ;
3939 }
4040
41- putUInt8 ( position , val ) {
41+ putUInt8 ( position , val ) {
4242 this . _ensure ( 1 ) ;
43- this . _buffer . writeUInt8 ( val ) ;
43+ this . _buffer . writeUInt8 ( val ) ;
4444 }
4545
46- putInt8 ( position , val ) {
46+ putInt8 ( position , val ) {
4747 this . _ensure ( 1 ) ;
48- this . _buffer . writeInt8 ( val ) ;
48+ this . _buffer . writeInt8 ( val ) ;
4949 }
5050
51- putFloat64 ( position , val ) {
51+ putFloat64 ( position , val ) {
5252 this . _ensure ( 8 ) ;
53- this . _buffer . writeFloat64 ( val ) ;
53+ this . _buffer . writeFloat64 ( val ) ;
5454 }
5555
56- putBytes ( position , data ) {
57- // TODO: If data is larger than our chunk size or so, we're very likely better off just passing this buffer on rather than doing the copy here
58- // TODO: *however* note that we need some way to find out when the data has been written (and thus the buffer can be re-used) if we take that approach
59- while ( data . remaining ( ) > 0 )
60- {
56+ putBytes ( position , data ) {
57+ // TODO: If data is larger than our chunk size or so, we're very likely better off just passing this buffer on
58+ // rather than doing the copy here TODO: *however* note that we need some way to find out when the data has been
59+ // written (and thus the buffer can be re-used) if we take that approach
60+ while ( data . remaining ( ) > 0 ) {
6161 // Ensure there is an open chunk, and that it has at least one byte of space left
62- this . _ensure ( 1 ) ;
63- if ( this . _buffer . remaining ( ) > data . remaining ( ) ) {
64- this . _buffer . writeBytes ( data ) ;
62+ this . _ensure ( 1 ) ;
63+ if ( this . _buffer . remaining ( ) > data . remaining ( ) ) {
64+ this . _buffer . writeBytes ( data ) ;
6565 } else {
66- this . _buffer . writeBytes ( data . readSlice ( this . _buffer . remaining ( ) ) ) ;
66+ this . _buffer . writeBytes ( data . readSlice ( this . _buffer . remaining ( ) ) ) ;
6767 }
6868 }
6969 return this ;
7070 }
7171
72- flush ( ) {
73- if ( this . _buffer . position > 0 ) {
72+ flush ( ) {
73+ if ( this . _buffer . position > 0 ) {
7474 this . _closeChunkIfOpen ( ) ;
7575
7676 // Local copy and clear the buffer field. This ensures that the buffer is not re-released if the flush call fails
7777 let out = this . _buffer ;
7878 this . _buffer = null ;
7979
80- this . _ch . write ( out . getSlice ( 0 , out . position ) ) ;
80+ this . _ch . write ( out . getSlice ( 0 , out . position ) ) ;
8181
8282 // Alloc a new output buffer. We assume we're using NodeJS's buffer pooling under the hood here!
83- this . _buffer = buf . alloc ( this . _bufferSize ) ;
83+ this . _buffer = buf . alloc ( this . _bufferSize ) ;
8484 this . _chunkOpen = false ;
8585 }
8686 return this ;
8787 }
8888
89- /**
89+ /**
9090 * Bolt messages are encoded in one or more chunks, and the boundary between two messages
9191 * is encoded as a 0-length chunk, `00 00`. This inserts such a message boundary, closing
92- * any currently open chunk as needed
92+ * any currently open chunk as needed
9393 */
94- messageBoundary ( ) {
95-
94+ messageBoundary ( ) {
95+
9696 this . _closeChunkIfOpen ( ) ;
9797
98- if ( this . _buffer . remaining ( ) < _CHUNK_HEADER_SIZE ) {
98+ if ( this . _buffer . remaining ( ) < _CHUNK_HEADER_SIZE ) {
9999 this . flush ( ) ;
100100 }
101101
102102 // Write message boundary
103- this . _buffer . writeInt16 ( _MESSAGE_BOUNDARY ) ;
103+ this . _buffer . writeInt16 ( _MESSAGE_BOUNDARY ) ;
104104 }
105105
106106 /** Ensure at least the given size is available for writing */
107- _ensure ( size ) {
107+ _ensure ( size ) {
108108 let toWriteSize = this . _chunkOpen ? size : size + _CHUNK_HEADER_SIZE ;
109- if ( this . _buffer . remaining ( ) < toWriteSize ) {
109+ if ( this . _buffer . remaining ( ) < toWriteSize ) {
110110 this . flush ( ) ;
111111 }
112112
113- if ( ! this . _chunkOpen ) {
113+ if ( ! this . _chunkOpen ) {
114114 this . _currentChunkStart = this . _buffer . position ;
115115 this . _buffer . position = this . _buffer . position + _CHUNK_HEADER_SIZE ;
116116 this . _chunkOpen = true ;
117117 }
118118 }
119119
120- _closeChunkIfOpen ( ) {
121- if ( this . _chunkOpen ) {
120+ _closeChunkIfOpen ( ) {
121+ if ( this . _chunkOpen ) {
122122 let chunkSize = this . _buffer . position - ( this . _currentChunkStart + _CHUNK_HEADER_SIZE ) ;
123- this . _buffer . putUInt16 ( this . _currentChunkStart , chunkSize ) ;
123+ this . _buffer . putUInt16 ( this . _currentChunkStart , chunkSize ) ;
124124 this . _chunkOpen = false ;
125125 }
126126 }
@@ -139,67 +139,67 @@ class Dechunker {
139139 this . _state = this . AWAITING_CHUNK ;
140140 }
141141
142- AWAITING_CHUNK ( buf ) {
143- if ( buf . remaining ( ) >= 2 ) {
142+ AWAITING_CHUNK ( buf ) {
143+ if ( buf . remaining ( ) >= 2 ) {
144144 // Whole header available, read that
145- return this . _onHeader ( buf . readUInt16 ( ) ) ;
145+ return this . _onHeader ( buf . readUInt16 ( ) ) ;
146146 } else {
147147 // Only one byte available, read that and wait for the second byte
148148 this . _partialChunkHeader = buf . readUInt8 ( ) << 8 ;
149149 return this . IN_HEADER ;
150150 }
151151 }
152152
153- IN_HEADER ( buf ) {
153+ IN_HEADER ( buf ) {
154154 // First header byte read, now we read the next one
155- return this . _onHeader ( ( this . _partialChunkHeader | buf . readUInt8 ( ) ) & 0xFFFF ) ;
155+ return this . _onHeader ( ( this . _partialChunkHeader | buf . readUInt8 ( ) ) & 0xFFFF ) ;
156156 }
157157
158- IN_CHUNK ( buf ) {
159- if ( this . _chunkSize <= buf . remaining ( ) ) {
158+ IN_CHUNK ( buf ) {
159+ if ( this . _chunkSize <= buf . remaining ( ) ) {
160160 // Current packet is larger than current chunk, or same size:
161- this . _currentMessage . push ( buf . readSlice ( this . _chunkSize ) ) ;
161+ this . _currentMessage . push ( buf . readSlice ( this . _chunkSize ) ) ;
162162 return this . AWAITING_CHUNK ;
163163 } else {
164164 // Current packet is smaller than the chunk we're reading, split the current chunk itself up
165165 this . _chunkSize -= buf . remaining ( ) ;
166- this . _currentMessage . push ( buf . readSlice ( buf . remaining ( ) ) ) ;
166+ this . _currentMessage . push ( buf . readSlice ( buf . remaining ( ) ) ) ;
167167 return this . IN_CHUNK ;
168168 }
169169 }
170170
171- CLOSED ( buf ) {
171+ CLOSED ( buf ) {
172172 // no-op
173173 }
174174
175175 /** Called when a complete chunk header has been recieved */
176- _onHeader ( header ) {
177- if ( header == 0 ) {
176+ _onHeader ( header ) {
177+ if ( header == 0 ) {
178178 // Message boundary
179179 let message ;
180- if ( this . _currentMessage . length == 1 ) {
180+ if ( this . _currentMessage . length == 1 ) {
181181 message = this . _currentMessage [ 0 ] ;
182182 } else {
183183 message = new buf . CombinedBuffer ( this . _currentMessage ) ;
184184 }
185185 this . _currentMessage = [ ] ;
186- this . onmessage ( message ) ;
186+ this . onmessage ( message ) ;
187187 return this . AWAITING_CHUNK ;
188188 } else {
189189 this . _chunkSize = header ;
190190 return this . IN_CHUNK ;
191191 }
192192 }
193193
194- write ( buf ) {
195- while ( buf . hasRemaining ( ) ) {
196- this . _state = this . _state ( buf ) ;
194+ write ( buf ) {
195+ while ( buf . hasRemaining ( ) ) {
196+ this . _state = this . _state ( buf ) ;
197197 }
198198 }
199199}
200200
201201
202202export default {
203- Chunker : Chunker ,
204- Dechunker : Dechunker
203+ Chunker : Chunker ,
204+ Dechunker : Dechunker
205205}
0 commit comments