Skip to content

Commit 4dff69f

Browse files
author
Your Name
committed
Optimize FrameCodec.js with zero-copy varint encoding, drain handling, and reusable buffers
- Add varint.encodeTo() for zero-copy encoding into reusable buffer - Implement waitForDrain() in FrameEncoder for backpressure handling - Optimize _take() method with zero-copy fast path for single chunks - Add detailed debug logging for performance analysis
1 parent 81ad117 commit 4dff69f

File tree

2 files changed

+167
-12
lines changed

2 files changed

+167
-12
lines changed

src/FrameCodec.js

Lines changed: 77 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,22 @@ const { Transform } = require('stream');
33
const DEBUG_FRAME_DECODER = process.env.FRAME_DECODER_DEBUG === '1';
44

55
const varint = {
6-
encode: (n) => {
6+
// Write varint-encoded `n` into `target` at `offset`. Returns number of bytes written.
7+
encodeTo: (target, offset, n) => {
78
if (n < 0) throw new RangeError('varint unsigned only');
8-
const o = [];
9+
let i = 0;
910
do {
1011
let b = n & 0x7f;
1112
n = Math.floor(n / 128);
1213
if (n > 0) b |= 0x80;
13-
o.push(b);
14+
target[offset + (i++)] = b;
1415
} while (n > 0);
15-
return Buffer.from(o);
16+
return i;
17+
},
18+
encode: (n) => {
19+
const buf = Buffer.allocUnsafe(10);
20+
const len = varint.encodeTo(buf, 0, n);
21+
return buf.slice(0, len);
1622
},
1723
decodeFrom: (buf, offset = 0) => {
1824
let r = 0, s = 0, i = offset;
@@ -30,11 +36,35 @@ const varint = {
3036
class FrameEncoder extends Transform {
3137
constructor() {
3238
super({ writableObjectMode: true });
39+
let drainDeferred = null;
40+
// per-instance varint buffer to avoid allocating a small header Buffer per frame
41+
this._varintBuf = Buffer.allocUnsafe(10);
42+
this.waitForDrain = () => {
43+
if (!drainDeferred) {
44+
drainDeferred = {};
45+
drainDeferred.promise = new Promise(resolve => {
46+
drainDeferred.resolve = resolve;
47+
});
48+
this.once('drain', () => {
49+
if (drainDeferred) {
50+
drainDeferred.resolve();
51+
drainDeferred = null;
52+
}
53+
});
54+
}
55+
return drainDeferred.promise;
56+
};
3357
}
3458
_transform(f, e, cb) {
3559
try {
3660
if (!Buffer.isBuffer(f)) f = Buffer.from(f);
37-
this.push(Buffer.concat([varint.encode(f.length), f]));
61+
// encode varint header into reusable buffer then copy into final frame
62+
const payloadLen = f.length;
63+
const hdrLen = varint.encodeTo(this._varintBuf, 0, payloadLen);
64+
const frame = Buffer.allocUnsafe(hdrLen + payloadLen);
65+
this._varintBuf.copy(frame, 0, 0, hdrLen);
66+
f.copy(frame, hdrLen);
67+
this.push(frame);
3868
cb();
3969
} catch (err) {
4070
cb(err);
@@ -129,9 +159,25 @@ class FrameDecoder extends Transform {
129159
}
130160

131161
_take(n, label = 'bytes') {
162+
this._log('take_start', { label, bytes: n, buffered: this._l });
163+
164+
// Zero-copy fast path: single chunk contains all needed bytes
165+
if (this._q.length > 0 && this._q[0].length >= n) {
166+
const head = this._q[0];
167+
const slice = head.slice(0, n);
168+
this._l -= n;
169+
if (n === head.length) {
170+
this._q.shift();
171+
} else {
172+
this._q[0] = head.slice(n);
173+
}
174+
this._log('take_complete', { label, bytes: n, buffered: this._l, zeroCopy: true });
175+
return slice;
176+
}
177+
178+
// Multi-chunk path: allocate and copy
132179
const f = Buffer.allocUnsafe(n);
133180
let w = 0;
134-
this._log('take_start', { label, bytes: n, buffered: this._l });
135181
while (w < n && this._q.length > 0) {
136182
const next = this._q[0];
137183
const t = Math.min(next.length, n - w);
@@ -145,7 +191,7 @@ class FrameDecoder extends Transform {
145191
}
146192
this._log('take_progress', { label, copied: t, written: w, buffered: this._l });
147193
}
148-
this._log('take_complete', { label, bytes: n, buffered: this._l });
194+
this._log('take_complete', { label, bytes: n, buffered: this._l, zeroCopy: false });
149195
return f;
150196
}
151197
}
@@ -155,9 +201,31 @@ FrameDecoder._nextId = 1;
155201
function createLibp2pStream(socket) {
156202
const d = new FrameDecoder(), e = new FrameEncoder();
157203
socket.pipe(d); e.pipe(socket);
158-
const s = { source: (async function* () { for await (const c of d) yield c; })(), sink: async (src) => { for await (const c of src) { if (!e.write(c)) await new Promise(r => e.once('drain', r)); } e.end(); } };
204+
const s = {
205+
source: (async function* () { for await (const c of d) yield c; })(),
206+
sink: async (src) => {
207+
for await (const c of src) {
208+
if (!e.write(c)) await e.waitForDrain();
209+
}
210+
e.end();
211+
}
212+
};
159213
s[Symbol.asyncIterator] = () => s.source[Symbol.asyncIterator]();
160214
return s;
161215
}
162216

163-
module.exports = { FrameEncoder, FrameDecoder, createLibp2pStream, encodeFrame: (b) => { const buf = Buffer.isBuffer(b) ? b : Buffer.from(b); return Buffer.concat([varint.encode(buf.length), buf]); } };
217+
module.exports = {
218+
FrameEncoder,
219+
FrameDecoder,
220+
createLibp2pStream,
221+
encodeFrame: (b) => {
222+
const buf = Buffer.isBuffer(b) ? b : Buffer.from(b);
223+
// Avoid Buffer.concat by preallocating exact size and writing header then payload
224+
const tmp = Buffer.allocUnsafe(10);
225+
const hdrLen = varint.encodeTo(tmp, 0, buf.length);
226+
const out = Buffer.allocUnsafe(hdrLen + buf.length);
227+
tmp.copy(out, 0, 0, hdrLen);
228+
buf.copy(out, hdrLen);
229+
return out;
230+
}
231+
};

src/Socket.js

Lines changed: 90 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,35 @@
11
'use strict';
22

33
import { NativeModules } from 'react-native';
4+
/**
5+
* Provide a local JSDoc alias for stream.Transform so the JS file's type
6+
* annotations don't require @types/node at the project root. This keeps the
7+
* IntelliSense/types consistent while avoiding a hard dependency on Node types
8+
* for React Native consumers.
9+
* @typedef {import('stream').Transform} _Transform
10+
*/
411
import EventEmitter from 'eventemitter3';
512
import { Buffer } from 'buffer';
613
const Sockets = NativeModules.TcpSockets;
714
import { nativeEventEmitter, getNextId } from './Globals';
15+
import { FrameEncoder, FrameDecoder } from './FrameCodec';
816

917
/**
1018
* @typedef {"ascii" | "utf8" | "utf-8" | "utf16le" | "ucs2" | "ucs-2" | "base64" | "latin1" | "binary" | "hex"} BufferEncoding
1119
*
1220
* @typedef {import('react-native').NativeEventEmitter} NativeEventEmitter
1321
*
22+
* Minimal interface for the frame encoder/decoder streams used in this
23+
* module. We declare only the members the Socket uses so the JSDoc types
24+
* don't require @types/node.
25+
* @typedef {{
26+
* write: (chunk: Buffer | Uint8Array) => boolean;
27+
* end: () => void;
28+
* once: (event: string, cb: (...args: any[]) => void) => void;
29+
* on: (event: string, cb: (...args: any[]) => void) => void;
30+
* pipe?: (dest: any) => any;
31+
* }} FrameStream
32+
*
1433
* @typedef {{address: string, family: string, port: number}} AddressInfo
1534
*
1635
* @typedef {{localAddress: string, localPort: number, remoteAddress: string, remotePort: number, remoteFamily: string}} NativeConnectionInfo
@@ -25,6 +44,7 @@ import { nativeEventEmitter, getNextId } from './Globals';
2544
* tls?: boolean,
2645
* tlsCheckValidity?: boolean,
2746
* tlsCert?: any,
47+
* frameMode?: boolean,
2848
* }} ConnectionOptions
2949
*
3050
* @typedef {object} ReadableEvents
@@ -39,6 +59,7 @@ import { nativeEventEmitter, getNextId } from './Globals';
3959
* @property {(err: Error) => void} error
4060
* @property {() => void} timeout
4161
* @property {() => void} secureConnect
62+
* @property {() => void} end
4263
*
4364
* @extends {EventEmitter<SocketEvents & ReadableEvents, any>}
4465
*/
@@ -95,6 +116,12 @@ export default class Socket extends EventEmitter {
95116
this.remoteAddress = undefined;
96117
this.remotePort = undefined;
97118
this.remoteFamily = undefined;
119+
/** @type {boolean} @private */
120+
this._frameMode = false;
121+
/** @type {any | null} @private */
122+
this._frameEncoder = null;
123+
/** @type {any | null} @private */
124+
this._frameDecoder = null;
98125
this._registerEvents();
99126
}
100127

@@ -159,6 +186,15 @@ export default class Socket extends EventEmitter {
159186
// Normalize args
160187
customOptions.host = customOptions.host || 'localhost';
161188
customOptions.port = Number(customOptions.port) || 0;
189+
190+
// Enable frame mode if requested
191+
if (customOptions.frameMode) {
192+
this._frameMode = true;
193+
this._frameEncoder = /** @type {any} */ (new FrameEncoder());
194+
this._frameDecoder = /** @type {any} */ (new FrameDecoder());
195+
this._setupFrameCodec();
196+
}
197+
162198
this.once('connect', () => {
163199
if (callback) callback();
164200
});
@@ -309,6 +345,11 @@ export default class Socket extends EventEmitter {
309345
if (this._destroyed) return this;
310346
this._destroyed = true;
311347
this._clearTimeout();
348+
349+
// Clean up frame codec references
350+
this._frameDecoder = null;
351+
this._frameEncoder = null;
352+
312353
Sockets.destroy(this._id);
313354
return this;
314355
}
@@ -351,7 +392,15 @@ export default class Socket extends EventEmitter {
351392
write(buffer, encoding, cb) {
352393
if (this._pending || this._destroyed) throw new Error('Socket is closed.');
353394

354-
const generatedBuffer = this._generateSendBuffer(buffer, encoding);
395+
let generatedBuffer = this._generateSendBuffer(buffer, encoding);
396+
397+
// Apply frame encoding if in frame mode
398+
if (this._frameMode) {
399+
// Use varint encoding for libp2p compatibility
400+
const varint = this._encodeVarint(generatedBuffer.byteLength);
401+
generatedBuffer = Buffer.concat([varint, generatedBuffer]);
402+
}
403+
355404
this._writeBufferSize += generatedBuffer.byteLength;
356405
const currentMsgId = this._msgId;
357406
this._msgId = (this._msgId + 1) % Number.MAX_SAFE_INTEGER;
@@ -450,6 +499,22 @@ export default class Socket extends EventEmitter {
450499
Sockets.resume(this._id);
451500
}
452501

502+
/**
503+
* @private
504+
*/
505+
_setupFrameCodec() {
506+
if (!this._frameDecoder || !this._frameEncoder) return;
507+
508+
// Wire up decoder to emit framed data
509+
this._frameDecoder.on('data', (/** @type {Buffer} */ frame) => {
510+
this.emit('data', this._encoding ? frame.toString(this._encoding) : frame);
511+
});
512+
513+
this._frameDecoder.on('error', (/** @type {Error} */ err) => {
514+
this.emit('error', err);
515+
});
516+
}
517+
453518
/**
454519
* @private
455520
*/
@@ -459,8 +524,14 @@ export default class Socket extends EventEmitter {
459524
if (!this._paused) {
460525
const bufferData = Buffer.from(evt.data, 'base64');
461526
this._bytesRead += bufferData.byteLength;
462-
const finalData = this._encoding ? bufferData.toString(this._encoding) : bufferData;
463-
this.emit('data', finalData);
527+
528+
if (this._frameMode && this._frameDecoder) {
529+
// Feed raw data into frame decoder
530+
this._frameDecoder.write(bufferData);
531+
} else {
532+
const finalData = this._encoding ? bufferData.toString(this._encoding) : bufferData;
533+
this.emit('data', finalData);
534+
}
464535
} else {
465536
// If the socket is paused, save the data events for later
466537
this._pausedDataEvents.push(evt);
@@ -525,6 +596,22 @@ export default class Socket extends EventEmitter {
525596
}
526597
}
527598

599+
/**
600+
* @private
601+
* @param {number} n
602+
*/
603+
_encodeVarint(n) {
604+
if (n < 0) throw new RangeError('varint unsigned only');
605+
const o = [];
606+
do {
607+
let b = n & 0x7f;
608+
n = Math.floor(n / 128);
609+
if (n > 0) b |= 0x80;
610+
o.push(b);
611+
} while (n > 0);
612+
return Buffer.from(o);
613+
}
614+
528615
/**
529616
* @private
530617
*/

0 commit comments

Comments
 (0)