Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.

Commit 8f17846

Browse files
committed
improve resends and tests
1 parent e74318e commit 8f17846

File tree

3 files changed

+187
-42
lines changed

3 files changed

+187
-42
lines changed

package.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@
1616
"author": "",
1717
"license": "",
1818
"dependencies": {
19-
"socket.io-client": "1.3.7",
20-
"debug": "*"
19+
"debug": "*",
20+
"sinon": "^1.17.5",
21+
"socket.io-client": "1.3.7"
2122
},
2223
"devDependencies": {
2324
"mocha": "*",

streamr-client.js

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -192,11 +192,12 @@
192192
debug("handleMessage: prevOffset is null, gap detection is impossible! message: %o", msg)
193193
}
194194

195-
debug("handleMessage: %o", msg)
196-
debug("handleMessage: lastReceivedOffset %d", this.lastReceivedOffset)
195+
// Gap check
196+
if (previousOffset != null && // previousOffset is required to check for gaps
197+
this.lastReceivedOffset != null && // and we need to know what msg was the previous one
198+
previousOffset > this.lastReceivedOffset && // previous message had larger offset than our previous msg => gap!
199+
!(this.options.resend_last != null && this.resending)) { // don't mind gaps when resending resend_last
197200

198-
// Check for gaps
199-
if (previousOffset != null && this.lastReceivedOffset != null && previousOffset > this.lastReceivedOffset) {
200201
this.queue.push(msg)
201202

202203
if (!this.resending) {
@@ -237,6 +238,34 @@
237238
return this.options.resend_all===true || this.options.resend_from >= 0 || this.options.resend_from_time >= 0 || this.options.resend_last > 0
238239
}
239240

241+
/**
242+
* Resend needs can change if messages have already been received.
243+
* This function always returns the effective resend options:
244+
*
245+
* If messages have been received:
246+
* - resend_all becomes resend_from
247+
* - resend_from becomes resend_from the latest received message
248+
* - resend_from_time becomes resend_from the latest received message
249+
* - resend_last stays the same
250+
*/
251+
Subscription.prototype.getEffectiveResendOptions = function() {
252+
if (this.hasReceivedMessages() && this.hasResendOptions()) {
253+
if (this.options.resend_all || this.options.resend_from || this.options.resend_from_time) {
254+
return { resend_from: this.lastReceivedOffset + 1 }
255+
}
256+
else if (this.options.resend_last) {
257+
return this.options
258+
}
259+
}
260+
else {
261+
return this.options
262+
}
263+
}
264+
265+
Subscription.prototype.hasReceivedMessages = function() {
266+
return this.lastReceivedOffset != null
267+
}
268+
240269
Subscription.prototype.isSubscribed = function() {
241270
return this.subscribed
242271
}
@@ -585,7 +614,7 @@
585614

586615
StreamrClient.prototype._requestResend = function(sub, resendOptions) {
587616
// If overriding resendOptions are given, need to remove resend options in sub.options
588-
var options = extend({}, sub.options)
617+
var options = extend({}, sub.getEffectiveResendOptions())
589618
if (resendOptions) {
590619
Object.keys(options).forEach(function (key) {
591620
if (key.match(/resend_.*/)) {

test/test.streamr-client.js

Lines changed: 150 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
var assert = require('assert'),
22
events = require('eventemitter2'),
3-
mockery = require('mockery')
3+
mockery = require('mockery'),
4+
sinon = require('sinon')
45

56
var STREAM_KEY = "_S"
67
var COUNTER_KEY = "_C"
@@ -46,7 +47,7 @@ describe('StreamrClient', function() {
4647
streamId,
4748
Date.now(), // timestamp
4849
offset,
49-
undefined, // previousOffset
50+
forcePreviousOffset, // previousOffset
5051
27, // contentType (JSON)
5152
content]
5253

@@ -1033,15 +1034,36 @@ describe('StreamrClient', function() {
10331034
checkResendRequest(request)
10341035

10351036
async(function() {
1036-
if (request.resend_from!=null && request.resend_to!=null)
1037+
console.log("Mock resend handler handling request: %o", request)
1038+
if (request.resend_all) {
1039+
if (resendLimits[request.channel]===undefined) {
1040+
client.socket.emit('no_resend', {channel: request.channel, sub: request.sub})
1041+
}
1042+
else {
1043+
resend(request.channel, request.sub, resendLimits[request.channel].from, resendLimits[request.channel].to)
1044+
}
1045+
}
1046+
else if (request.resend_last) {
1047+
if (resendLimits[request.channel] === undefined) {
1048+
throw "Testing resend_last needs resendLimits.channel.to"
1049+
}
1050+
resend(request.channel, request.sub, resendLimits[request.channel].to - (request.resend_last - 1), resendLimits[request.channel].to)
1051+
}
1052+
else if (request.resend_from!=null && request.resend_to!=null) {
10371053
resend(request.channel, request.sub, request.resend_from, request.resend_to)
1054+
}
1055+
else if (request.resend_from!=null) {
1056+
if (resendLimits[request.channel] === undefined) {
1057+
throw "Testing resend_from needs resendLimits.channel.to"
1058+
}
1059+
resend(request.channel, request.sub, request.resend_from, resendLimits[request.channel].to)
1060+
}
10381061
else if (request.resend_from_time!=null) {
10391062
resend(request.channel, request.sub, 99, 100)
10401063
}
1041-
else if (resendLimits[request.channel]===undefined)
1042-
client.socket.emit('no_resend', {channel: request.channel, sub: request.sub})
1043-
else
1044-
resend(request.channel, request.sub, resendLimits[request.channel].from, resendLimits[request.channel].to)
1064+
else {
1065+
throw "Unknown kind of resend request: "+JSON.stringify(request)
1066+
}
10451067
})
10461068
}
10471069
socket.on('resend', socket.defaultResendHandler)
@@ -1265,46 +1287,139 @@ describe('StreamrClient', function() {
12651287
client.socket.emit('b', msg("stream1", 12, {counter: 12}, undefined, 11))
12661288
})
12671289
})
1268-
1269-
it('should re-request from the latest counter on reconnect', function(done) {
1270-
var sub1 = client.subscribe("stream1", function(message) {}, {resend_all:true})
1271-
var sub2 = client.subscribe("stream2", function(message) {}, {resend_from:0})
1272-
var sub3 = client.subscribe("stream3", function(message) {}) // no resend for stream3
1273-
client.connect()
12741290

1275-
validResendRequests.push({channel:"stream1", resend_all:true})
1276-
validResendRequests.push({channel:"stream2", resend_from:0})
1291+
describe('on reconnect', function() {
1292+
var msgHandler
1293+
beforeEach(function() {
1294+
msgHandler = sinon.spy()
1295+
})
12771296

1278-
client.socket.on('subscribed', function(response) {
1279-
if (response.channel==='stream1') {
1280-
client.socket.emit('b', msg("stream1", 0))
1281-
client.socket.emit('b', msg("stream1", 1, {}, undefined, 0))
1282-
client.socket.emit('b', msg("stream1", 2, {}, undefined, 1))
1283-
}
1284-
else if (response.channel==='stream2') {
1285-
client.socket.emit('b', msg("stream2", 0))
1297+
it('no resend', function(done) {
1298+
client.subscribe("stream", msgHandler)
1299+
client.connect()
1300+
1301+
client.socket.on('subscribed', function(response) {
1302+
client.socket.emit('b', msg("stream", 0))
1303+
client.socket.emit('disconnect')
1304+
})
1305+
1306+
client.socket.once('disconnect', function() {
1307+
client.connect()
1308+
1309+
socket.on('resend', function() {
1310+
throw "Should not have made a resend request!"
1311+
})
1312+
1313+
socket.on('subscribed', function() {
1314+
assert.equal(msgHandler.callCount, 1)
1315+
done()
1316+
})
1317+
})
1318+
})
1319+
1320+
it('resend_all', function(done) {
1321+
validResendRequests.push({channel:"stream", resend_all: true})
1322+
resendLimits["stream"] = {
1323+
from: 0,
1324+
to: 5
12861325
}
1287-
else if (response.channel==='stream3') {
1288-
client.socket.emit('b', msg("stream3", 0))
1289-
client.socket.emit('disconnect')
1326+
1327+
client.subscribe("stream", msgHandler, { resend_all: true })
1328+
client.connect()
1329+
1330+
client.socket.on('subscribed', function(response) {
1331+
client.socket.emit('disconnect')
1332+
})
1333+
1334+
client.socket.once('disconnect', function() {
1335+
client.connect()
1336+
1337+
socket.on('resend', function(request) {
1338+
assert.equal(request.resend_from, 6)
1339+
assert.equal(request.resend_to, undefined)
1340+
done()
1341+
})
1342+
})
1343+
})
1344+
1345+
it('resend_from', function(done) {
1346+
validResendRequests.push({channel:"stream", resend_from: 3})
1347+
resendLimits["stream"] = {
1348+
from: 0,
1349+
to: 5
12901350
}
1351+
1352+
client.subscribe("stream", msgHandler, { resend_from: 3 })
1353+
client.connect()
1354+
1355+
client.socket.on('subscribed', function(response) {
1356+
client.socket.emit('disconnect')
1357+
})
1358+
1359+
client.socket.once('disconnect', function() {
1360+
client.connect()
1361+
1362+
socket.on('resend', function(request) {
1363+
assert.equal(request.resend_from, 6)
1364+
assert.equal(request.resend_to, undefined)
1365+
done()
1366+
})
1367+
})
12911368
})
12921369

1293-
client.socket.once('disconnect', function() {
1370+
it('resend_last', function(done) {
1371+
validResendRequests.push({channel:"stream", resend_last: 1})
1372+
resendLimits["stream"] = {
1373+
from: 0,
1374+
to: 5
1375+
}
1376+
1377+
client.subscribe("stream", msgHandler, { resend_last: 1 })
12941378
client.connect()
12951379

1296-
socket.on('subscribe', function(request) {
1297-
if (request.channel==='stream1' && request.from !== 3)
1298-
throw "Wrong starting index for "+request.channel+": "+request.from
1299-
else if (request.channel==='stream2' && request.from !== 1)
1300-
throw "Wrong starting index for "+request.channel+": "+request.from
1301-
else if (request.channel==='stream3' && request.from !== undefined)
1302-
throw "Should not have specified the from field for stream3: "+request.from
1380+
client.socket.on('subscribed', function(response) {
1381+
client.socket.emit('disconnect')
1382+
})
1383+
1384+
client.socket.once('disconnect', function() {
1385+
client.connect()
1386+
1387+
socket.on('resend', function(request) {
1388+
assert.equal(request.resend_last, 1)
1389+
done()
1390+
})
13031391
})
1392+
})
1393+
1394+
it('resend_last should accept a gap on reconnect', function(done) {
1395+
validResendRequests.push({channel:"stream", resend_last: 1})
1396+
resendLimits["stream"] = {
1397+
from: 0,
1398+
to: 0
1399+
}
1400+
1401+
client.subscribe("stream", msgHandler, { resend_last: 1 })
1402+
client.connect()
13041403

13051404
client.socket.on('subscribed', function(response) {
1306-
if (sub1.isSubscribed() && sub2.isSubscribed() && sub3.isSubscribed())
1405+
socket.off('resend', socket.defaultResendHandler)
1406+
client.socket.emit('disconnect')
1407+
})
1408+
1409+
client.socket.once('disconnect', function() {
1410+
client.connect()
1411+
1412+
socket.on('resend', function(request) {
1413+
assert.equal(request.resend_last, 1)
1414+
client.socket.emit('resending', {
1415+
channel: request.channel,
1416+
sub: request.sub
1417+
})
1418+
client.socket.emit('u', msg(request.channel, 10, {}, request.sub, 9))
1419+
client.socket.emit('resent', {channel: request.channel, sub: request.sub})
1420+
assert.equal(msgHandler.callCount, 2)
13071421
done()
1422+
})
13081423
})
13091424
})
13101425
})

0 commit comments

Comments
 (0)