Skip to content

Commit c2f0629

Browse files
author
Your Name
committed
#209 bounty implementation
Both Ring based and LL Deque implementations of the same queue strategy to run the p2p FSM and A/B Results: A (Queue-based original): 19,909ms B (Circular buffer): 20,984ms Circular buffer is 5.4% slower. The queue-based approach with .shift() and .slice() is already optimized by V8. The circular buffer adds modulo arithmetic overhead and buffer resizing complexity that doesn't pay off for this workload. Recommendation: Keep the original queue-based implementation.
1 parent 4c6d55e commit c2f0629

File tree

7 files changed

+773
-0
lines changed

7 files changed

+773
-0
lines changed

__tests__/framecodec.test.js

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
const { FrameEncoder, FrameDecoder, encodeFrame } = require('../src/FrameCodec');
2+
const { once } = require('events');
3+
const crypto = require('crypto');
4+
5+
let _seed = 0xdeadbeef;
6+
function seededRandom() {
7+
_seed = (_seed * 1664525 + 1013904223) >>> 0;
8+
return _seed / 0x100000000;
9+
}
10+
11+
function chunkBuffer(buf, cuts) {
12+
const out = [];
13+
let offset = 0;
14+
for (const len of cuts) {
15+
const end = Math.min(buf.length, offset + len);
16+
out.push(buf.slice(offset, end));
17+
offset = end;
18+
if (offset >= buf.length) break;
19+
}
20+
if (offset < buf.length) out.push(buf.slice(offset));
21+
return out;
22+
}
23+
24+
test('reconstructs frames from arbitrary chunks', (done) => {
25+
const encoder = new FrameEncoder();
26+
const decoder = new FrameDecoder();
27+
28+
const messages = [];
29+
for (let i = 0; i < 50; i++) {
30+
const len = Math.floor(Math.random() * 200);
31+
const payload = Buffer.alloc(len, i % 256);
32+
messages.push(payload);
33+
}
34+
35+
const wire = Buffer.concat(messages.map(m => encodeFrame(m)));
36+
37+
// create random chunk sizes that simulate fragmentation
38+
const cuts = [];
39+
let remaining = wire.length;
40+
while (remaining > 0) {
41+
const c = Math.max(1, Math.floor(Math.random() * 20));
42+
cuts.push(c);
43+
remaining -= c;
44+
}
45+
46+
const chunks = chunkBuffer(wire, cuts);
47+
48+
const out = [];
49+
decoder.on('data', (frame) => out.push(frame));
50+
decoder.on('end', () => {
51+
try {
52+
expect(out.length).toBe(messages.length);
53+
for (let i = 0; i < messages.length; i++) {
54+
expect(out[i]).toEqual(messages[i]);
55+
}
56+
done();
57+
} catch (err) {
58+
done(err);
59+
}
60+
});
61+
62+
// feed fragments with a small async schedule to simulate streaming
63+
(async () => {
64+
for (const c of chunks) {
65+
decoder.write(c);
66+
// occasionally wait
67+
if (Math.random() < 0.1) await new Promise(r => setTimeout(r, 0));
68+
}
69+
decoder.end();
70+
})();
71+
});
72+
73+
function randomInt(min, max) {
74+
return Math.floor(seededRandom() * (max - min + 1)) + min;
75+
}
76+
77+
function randomFramePayload(length) {
78+
return crypto.randomBytes(length);
79+
}
80+
81+
async function decodeWithFrameDecoder(chunks) {
82+
const decoder = new FrameDecoder();
83+
const frames = [];
84+
85+
decoder.on('data', (frame) => {
86+
// Ensure we never mutate the emitted Buffer instance
87+
frames.push(Buffer.from(frame));
88+
});
89+
90+
const errorPromise = once(decoder, 'error').then(([err]) => {
91+
throw err;
92+
});
93+
94+
const endPromise = once(decoder, 'end').then(() => frames);
95+
96+
(async () => {
97+
for (const chunk of chunks) {
98+
if (!decoder.write(chunk)) {
99+
await once(decoder, 'drain');
100+
}
101+
}
102+
decoder.end();
103+
})().catch((err) => decoder.emit('error', err));
104+
105+
return Promise.race([endPromise, errorPromise]);
106+
}
107+
108+
test('fuzzes frame decoding with heavy fragmentation', async () => {
109+
const warmup = 1000;
110+
const iterations = 1000000;
111+
112+
for (let i = 0; i < warmup; i++) {
113+
const frameCount = randomInt(1, 12);
114+
const frames = [];
115+
for (let j = 0; j < frameCount; j++) frames.push(randomFramePayload(randomInt(0, 512)));
116+
const wire = Buffer.concat(frames.map((frame) => encodeFrame(frame)));
117+
const chunks = [];
118+
let offset = 0;
119+
while (offset < wire.length) {
120+
const chunkLen = randomInt(1, Math.min(64, wire.length - offset));
121+
chunks.push(wire.slice(offset, offset + chunkLen));
122+
offset += chunkLen;
123+
}
124+
await decodeWithFrameDecoder(chunks);
125+
}
126+
127+
const start = Date.now();
128+
for (let i = 0; i < iterations; i++) {
129+
const frameCount = randomInt(1, 12);
130+
const frames = [];
131+
132+
for (let j = 0; j < frameCount; j++) {
133+
const payloadLength = randomInt(0, 512);
134+
frames.push(randomFramePayload(payloadLength));
135+
}
136+
137+
const wire = Buffer.concat(frames.map((frame) => encodeFrame(frame)));
138+
139+
const chunks = [];
140+
let offset = 0;
141+
while (offset < wire.length) {
142+
const chunkLen = randomInt(1, Math.min(64, wire.length - offset));
143+
chunks.push(wire.slice(offset, offset + chunkLen));
144+
offset += chunkLen;
145+
}
146+
147+
const decoded = await decodeWithFrameDecoder(chunks);
148+
149+
expect(decoded.length).toBe(frames.length);
150+
for (let k = 0; k < frames.length; k++) {
151+
expect(decoded[k]).toEqual(frames[k]);
152+
}
153+
}
154+
155+
console.log(`1M rounds: ${Date.now() - start}ms`);
156+
});

bench-ab.js

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
const { FrameDecoder, encodeFrame } = require('./src/FrameCodec');
2+
const { FrameDecoderCirc } = require('./src/FrameCodecCirc');
3+
const { once } = require('events');
4+
const crypto = require('crypto');
5+
6+
let _seed = 0xdeadbeef;
7+
function seededRandom() {
8+
_seed = (_seed * 1664525 + 1013904223) >>> 0;
9+
return _seed / 0x100000000;
10+
}
11+
12+
function randomInt(min, max) {
13+
return Math.floor(seededRandom() * (max - min + 1)) + min;
14+
}
15+
16+
function randomFramePayload(length) {
17+
return crypto.randomBytes(length);
18+
}
19+
20+
async function decodeWithFrameDecoder(chunks, DecoderClass) {
21+
const decoder = new DecoderClass();
22+
const frames = [];
23+
24+
decoder.on('data', (frame) => {
25+
frames.push(Buffer.from(frame));
26+
});
27+
28+
const errorPromise = once(decoder, 'error').then(([err]) => {
29+
throw err;
30+
});
31+
32+
const endPromise = once(decoder, 'end').then(() => frames);
33+
34+
(async () => {
35+
for (const chunk of chunks) {
36+
if (!decoder.write(chunk)) {
37+
await once(decoder, 'drain');
38+
}
39+
}
40+
decoder.end();
41+
})().catch((err) => decoder.emit('error', err));
42+
43+
return Promise.race([endPromise, errorPromise]);
44+
}
45+
46+
async function runBench(DecoderClass, name) {
47+
const warmup = 1000;
48+
const iterations = 1000000;
49+
50+
for (let i = 0; i < warmup; i++) {
51+
const frameCount = randomInt(1, 12);
52+
const frames = [];
53+
for (let j = 0; j < frameCount; j++) frames.push(randomFramePayload(randomInt(0, 512)));
54+
const wire = Buffer.concat(frames.map((frame) => encodeFrame(frame)));
55+
const chunks = [];
56+
let offset = 0;
57+
while (offset < wire.length) {
58+
const chunkLen = randomInt(1, Math.min(64, wire.length - offset));
59+
chunks.push(wire.slice(offset, offset + chunkLen));
60+
offset += chunkLen;
61+
}
62+
await decodeWithFrameDecoder(chunks, DecoderClass);
63+
}
64+
65+
const start = Date.now();
66+
for (let i = 0; i < iterations; i++) {
67+
const frameCount = randomInt(1, 12);
68+
const frames = [];
69+
70+
for (let j = 0; j < frameCount; j++) {
71+
const payloadLength = randomInt(0, 512);
72+
frames.push(randomFramePayload(payloadLength));
73+
}
74+
75+
const wire = Buffer.concat(frames.map((frame) => encodeFrame(frame)));
76+
77+
const chunks = [];
78+
let offset = 0;
79+
while (offset < wire.length) {
80+
const chunkLen = randomInt(1, Math.min(64, wire.length - offset));
81+
chunks.push(wire.slice(offset, offset + chunkLen));
82+
offset += chunkLen;
83+
}
84+
85+
const decoded = await decodeWithFrameDecoder(chunks, DecoderClass);
86+
87+
if (decoded.length !== frames.length) throw new Error('length mismatch');
88+
for (let k = 0; k < frames.length; k++) {
89+
if (!decoded[k].equals(frames[k])) throw new Error('frame mismatch');
90+
}
91+
}
92+
93+
const elapsed = Date.now() - start;
94+
console.log(`${name}: ${elapsed}ms`);
95+
return elapsed;
96+
}
97+
98+
(async () => {
99+
console.log('A/B Benchmark: FrameDecoder vs FrameDecoderCirc\n');
100+
101+
_seed = 0xdeadbeef;
102+
const timeA = await runBench(FrameDecoder, 'A: Queue-based (original)');
103+
104+
_seed = 0xdeadbeef;
105+
const timeB = await runBench(FrameDecoderCirc, 'B: Circular buffer');
106+
107+
console.log(`\nDelta: ${timeB - timeA > 0 ? '+' : ''}${timeB - timeA}ms (${((timeB / timeA - 1) * 100).toFixed(1)}%)`);
108+
})();
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
const net = require('react-native-tcp-socket');
2+
const { createLibp2pStream, FrameEncoder, FrameDecoder } = require('react-native-tcp-socket');
3+
4+
/**
5+
* Example 1: Using createLibp2pStream for async iterator compatibility
6+
* This is the recommended approach for libp2p integration
7+
*/
8+
function exampleLibp2pStream() {
9+
const socket = net.createConnection({ host: '127.0.0.1', port: 4001 }, () => {
10+
console.log('Connected to libp2p peer');
11+
});
12+
13+
// Create a libp2p-compatible stream with source/sink pattern
14+
const stream = createLibp2pStream(socket);
15+
16+
// Consume frames via async iterator
17+
(async () => {
18+
for await (const frame of stream) {
19+
console.log('Received frame:', frame.length, 'bytes');
20+
// Handle libp2p protocol frames here
21+
}
22+
})();
23+
24+
// Send frames via sink
25+
(async () => {
26+
await stream.sink((async function* () {
27+
// libp2p multistream-select protocol
28+
yield Buffer.from('/multistream/1.0.0\n');
29+
yield Buffer.from('/libp2p/circuit/relay/0.2.0/hop\n');
30+
})());
31+
})();
32+
33+
socket.on('error', (err) => {
34+
console.error('Socket error:', err);
35+
});
36+
37+
socket.on('close', () => {
38+
console.log('Connection closed');
39+
});
40+
}
41+
42+
/**
43+
* Example 2: Using FrameEncoder/FrameDecoder directly with streams
44+
* This approach gives you more control over the framing process
45+
*/
46+
function exampleDirectFraming() {
47+
const socket = net.createConnection({ host: '127.0.0.1', port: 4001 }, () => {
48+
console.log('Connected using direct framing');
49+
});
50+
51+
const encoder = new FrameEncoder();
52+
const decoder = new FrameDecoder();
53+
54+
// Pipe socket through decoder/encoder
55+
socket.pipe(decoder);
56+
encoder.pipe(socket);
57+
58+
// Handle incoming frames
59+
decoder.on('data', (frame) => {
60+
console.log('Received frame:', frame.length, 'bytes');
61+
// Process libp2p protocol messages
62+
});
63+
64+
// Send frames
65+
encoder.write(Buffer.from('/multistream/1.0.0\n'));
66+
encoder.write(Buffer.from('/libp2p/circuit/relay/0.2.0/hop\n'));
67+
68+
socket.on('error', (err) => {
69+
console.error('Socket error:', err);
70+
});
71+
72+
socket.on('close', () => {
73+
console.log('Connection closed');
74+
encoder.end();
75+
});
76+
}
77+
78+
/**
79+
* Example 3: Server-side libp2p framing
80+
* Shows how to handle framed connections on the server side
81+
*/
82+
function exampleServer() {
83+
const server = net.createServer((socket) => {
84+
console.log('Client connected');
85+
86+
const encoder = new FrameEncoder();
87+
const decoder = new FrameDecoder();
88+
89+
socket.pipe(decoder);
90+
encoder.pipe(socket);
91+
92+
decoder.on('data', (frame) => {
93+
console.log('Server received frame:', frame.toString());
94+
// Echo back the frame
95+
encoder.write(frame);
96+
});
97+
98+
socket.on('error', (err) => {
99+
console.error('Server socket error:', err);
100+
});
101+
102+
socket.on('close', () => {
103+
console.log('Client disconnected');
104+
});
105+
});
106+
107+
server.listen(4002, '127.0.0.1', () => {
108+
console.log('Server listening on port 4002');
109+
});
110+
}
111+
112+
// Uncomment to run the examples:
113+
// exampleLibp2pStream();
114+
// exampleDirectFraming();
115+
// exampleServer();

0 commit comments

Comments
 (0)