Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.

Commit e74318e

Browse files
committed
tests passing
1 parent f5bcdfc commit e74318e

File tree

2 files changed

+50
-60
lines changed

2 files changed

+50
-60
lines changed

streamr-client.js

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -186,22 +186,22 @@
186186
var content = getMessageField('content', msg)
187187
var timestamp = getMessageField('timestamp', msg)
188188
var offset = getMessageField('offset', msg)
189-
var prevOffset = getMessageField('prevOffset', msg)
189+
var previousOffset = getMessageField('previousOffset', msg)
190190

191-
if (prevOffset == null) {
191+
if (previousOffset == null) {
192192
debug("handleMessage: prevOffset is null, gap detection is impossible! message: %o", msg)
193193
}
194194

195195
debug("handleMessage: %o", msg)
196196
debug("handleMessage: lastReceivedOffset %d", this.lastReceivedOffset)
197197

198198
// Check for gaps
199-
if (prevOffset != null && this.lastReceivedOffset != null && prevOffset > this.lastReceivedOffset) {
199+
if (previousOffset != null && this.lastReceivedOffset != null && previousOffset > this.lastReceivedOffset) {
200200
this.queue.push(msg)
201201

202202
if (!this.resending) {
203203
var from = this.lastReceivedOffset + 1
204-
var to = prevOffset
204+
var to = previousOffset
205205
debug("Gap detected, requesting resend for stream %s from %d to %d", this.streamId, from, to)
206206
this.trigger('gap', from, to)
207207
}
@@ -225,21 +225,11 @@
225225
debug("Attempting to process %d queued messages for stream %s", this.queue.length, this.streamId)
226226

227227
var i
228-
for (i=0;i<this.queue.length;i++) {
228+
var length = this.queue.length
229+
for (i=0; i<length; i++) {
229230
var msg = this.queue[i]
230231
this.handleMessage(msg)
231232
}
232-
233-
// All messages in queue were processed
234-
if (i===this.queue.length) {
235-
this.queue = []
236-
}
237-
// Some messages could not be processed, so compact the queue
238-
// and request another resend for the gap!
239-
else {
240-
this.queue.splice(0, i)
241-
this.trigger('gap', this.lastReceivedOffset + 1, getMessageField('previousOffset', msg) - 1)
242-
}
243233
}
244234
}
245235

test/test.streamr-client.js

Lines changed: 44 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ describe('StreamrClient', function() {
3636

3737
var previousOffsetByStreamId = {}
3838

39-
function msg(streamId, offset, content, subId) {
39+
function msg(streamId, offset, content, subId, forcePreviousOffset) {
4040
content = content || {}
4141

4242
// unicast message to subscription
@@ -57,7 +57,7 @@ describe('StreamrClient', function() {
5757
}
5858
// broadcast message to all subscriptions
5959
else {
60-
var previousOffset = previousOffsetByStreamId[streamId]
60+
var previousOffset = forcePreviousOffset || previousOffsetByStreamId[streamId]
6161
previousOffsetByStreamId[streamId] = offset
6262

6363
return [
@@ -88,7 +88,7 @@ describe('StreamrClient', function() {
8888

8989
s.defaultSubscribeHandler = function(request) {
9090
async(function() {
91-
s.emit('subscribed', {channel: request.channel, from: 0})
91+
s.emit('subscribed', {channel: request.channel})
9292
})
9393
}
9494
s.on('subscribe', s.defaultSubscribeHandler)
@@ -1114,32 +1114,33 @@ describe('StreamrClient', function() {
11141114
})
11151115
})
11161116

1117-
// TODO: jatka tästä
1118-
it('should emit a resend request if the first message is not the expected one', function(done) {
1119-
client.subscribe("stream1", function(message) {})
1120-
client.connect()
1121-
1122-
validResendRequests.push({channel:"stream1", resend_from:0, resend_to:1})
1117+
it('should not emit a resend request if there is no gap in messages', function(done) {
1118+
client.subscribe("stream1", function(message) {
1119+
if (message.done) {
1120+
done()
1121+
}
1122+
})
1123+
client.connect()
11231124

1124-
client.socket.once('subscribed', function() {
1125-
assert(client.socket.defaultResendHandler!=null)
1126-
client.socket.emit('b', msg("stream1",2))
1127-
})
1125+
socket.once('resend', function(req) {
1126+
throw "Should not have made a resend request:" + JSON.stringify(req)
1127+
})
1128+
1129+
client.socket.once('subscribed', function() {
1130+
client.socket.emit('b', msg("stream1", 0))
1131+
client.socket.emit('b', msg("stream1", 10, {done: true}, undefined, 0))
1132+
})
1133+
})
11281134

1129-
client.socket.once('resent', function() {
1130-
done()
1131-
})
1132-
})
1133-
11341135
it('should emit a resend request if there is a gap in messages', function(done) {
11351136
client.subscribe("stream1", function(message) {})
11361137
client.connect()
11371138

11381139
validResendRequests.push({channel:"stream1", resend_from:1, resend_to:9})
11391140

11401141
client.socket.once('subscribed', function() {
1141-
client.socket.emit('b', msg("stream1",0))
1142-
client.socket.emit('b', msg("stream1",10))
1142+
client.socket.emit('b', msg("stream1", 0))
1143+
client.socket.emit('b', msg("stream1", 10, {}, undefined, 9))
11431144
})
11441145

11451146
client.socket.once('resent', function() {
@@ -1154,8 +1155,8 @@ describe('StreamrClient', function() {
11541155
validResendRequests.push({channel:"stream1", resend_from:1, resend_to:9})
11551156

11561157
client.socket.once('subscribed', function() {
1157-
client.socket.emit('b', msg("stream1",0))
1158-
client.socket.emit('b', msg("stream1",10))
1158+
client.socket.emit('b', msg("stream1", 0))
1159+
client.socket.emit('b', msg("stream1", 10, {}, undefined, 9))
11591160
})
11601161

11611162
client.socket.once('resend', function(request) {
@@ -1175,8 +1176,8 @@ describe('StreamrClient', function() {
11751176
validResendRequests.push({channel:"stream1", resend_from:1, resend_to:1})
11761177

11771178
client.socket.once('subscribed', function() {
1178-
client.socket.emit('b', msg("stream1",0))
1179-
client.socket.emit('b', msg("stream1",2))
1179+
client.socket.emit('b', msg("stream1", 0))
1180+
client.socket.emit('b', msg("stream1", 2, {}, undefined, 1))
11801181
})
11811182

11821183
client.socket.on('resend', function(request) {
@@ -1196,9 +1197,9 @@ describe('StreamrClient', function() {
11961197
validResendRequests.push({channel:"stream1", resend_from:1, resend_to:9})
11971198

11981199
client.socket.once('subscribed', function() {
1199-
client.socket.emit('b', msg("stream1",0))
1200-
client.socket.emit('b', msg("stream1",10))
1201-
client.socket.emit('b', msg("stream1",11))
1200+
client.socket.emit('b', msg("stream1", 0))
1201+
client.socket.emit('b', msg("stream1", 10, {}, undefined, 9))
1202+
client.socket.emit('b', msg("stream1", 11, {}, undefined, 10))
12021203
})
12031204

12041205
var counter = 0
@@ -1223,7 +1224,7 @@ describe('StreamrClient', function() {
12231224

12241225
client.socket.once('subscribed', function() {
12251226
client.socket.emit('b', msg("stream1", 0, {counter: 0}))
1226-
client.socket.emit('b', msg("stream1",10, {counter: 10}))
1227+
client.socket.emit('b', msg("stream1",10, {counter: 10}, undefined, 9))
12271228
client.socket.emit('b', msg("stream1",11, {counter: 11}))
12281229
client.socket.emit('b', msg("stream1",12, {counter: 12}))
12291230
})
@@ -1240,16 +1241,16 @@ describe('StreamrClient', function() {
12401241

12411242
client.socket.once('subscribed', function() {
12421243
client.socket.emit('b', msg("stream1", 0, {counter: 0}))
1243-
client.socket.emit('b', msg("stream1", 10, {counter: 10}))
1244-
client.socket.emit('b', msg("stream1", 11, {counter: 11}))
1245-
client.socket.emit('b', msg("stream1", 11, {counter: 11})) // bogus message
1246-
client.socket.emit('b', msg("stream1", 5, {counter: 5})) // bogus message
1247-
client.socket.emit('b', msg("stream1", 12, {counter: 12}))
1244+
client.socket.emit('b', msg("stream1", 10, {counter: 10}, undefined, 9))
1245+
client.socket.emit('b', msg("stream1", 11, {counter: 11}, undefined, 10))
1246+
client.socket.emit('b', msg("stream1", 11, {counter: 11}, undefined, 10)) // bogus message
1247+
client.socket.emit('b', msg("stream1", 5, {counter: 5}, undefined, 4)) // bogus message
1248+
client.socket.emit('b', msg("stream1", 12, {counter: 12}, undefined, 11))
12481249
})
12491250
})
12501251

12511252
it('should do another resend request if there are gaps in the queue', function(done) {
1252-
var subscription = client.subscribe("stream1", function(message, streamId, timetamp, counter) {
1253+
client.subscribe("stream1", function(message, streamId, timetamp, counter) {
12531254
if (counter===12)
12541255
done()
12551256
})
@@ -1260,8 +1261,8 @@ describe('StreamrClient', function() {
12601261

12611262
client.socket.once('subscribed', function() {
12621263
client.socket.emit('b', msg("stream1", 0, {counter: 0}))
1263-
client.socket.emit('b', msg("stream1", 10, {counter: 10}))
1264-
client.socket.emit('b', msg("stream1", 12, {counter: 12}))
1264+
client.socket.emit('b', msg("stream1", 10, {counter: 10}, undefined, 9))
1265+
client.socket.emit('b', msg("stream1", 12, {counter: 12}, undefined, 11))
12651266
})
12661267
})
12671268

@@ -1276,23 +1277,23 @@ describe('StreamrClient', function() {
12761277

12771278
client.socket.on('subscribed', function(response) {
12781279
if (response.channel==='stream1') {
1279-
client.socket.emit('b', msg("stream1",0))
1280-
client.socket.emit('b', msg("stream1",1))
1281-
client.socket.emit('b', msg("stream1",2))
1280+
client.socket.emit('b', msg("stream1", 0))
1281+
client.socket.emit('b', msg("stream1", 1, {}, undefined, 0))
1282+
client.socket.emit('b', msg("stream1", 2, {}, undefined, 1))
12821283
}
12831284
else if (response.channel==='stream2') {
1284-
client.socket.emit('b', msg("stream2",0))
1285+
client.socket.emit('b', msg("stream2", 0))
12851286
}
12861287
else if (response.channel==='stream3') {
1287-
client.socket.emit('b', msg("stream3",0))
1288+
client.socket.emit('b', msg("stream3", 0))
1289+
client.socket.emit('disconnect')
12881290
}
1289-
client.socket.emit('disconnect')
12901291
})
12911292

12921293
client.socket.once('disconnect', function() {
12931294
client.connect()
12941295

1295-
client.socket.on('subscribe', function(request) {
1296+
socket.on('subscribe', function(request) {
12961297
if (request.channel==='stream1' && request.from !== 3)
12971298
throw "Wrong starting index for "+request.channel+": "+request.from
12981299
else if (request.channel==='stream2' && request.from !== 1)
@@ -1308,7 +1309,6 @@ describe('StreamrClient', function() {
13081309
})
13091310
})
13101311

1311-
13121312
})
13131313

13141314
describe("Subscription", function() {

0 commit comments

Comments
 (0)