|
480 | 480 | return id.toString() |
481 | 481 | }; |
482 | 482 |
|
483 | | - function Subscription(streamId, callback, options) { |
| 483 | + function Subscription(streamId, streamPartition, authKey, callback, options) { |
484 | 484 | EventEmitter.call(this); // call parent constructor |
485 | 485 |
|
486 | 486 | if (!streamId) |
|
492 | 492 |
|
493 | 493 | this.id = generateSubscriptionId() |
494 | 494 | this.streamId = streamId |
| 495 | + this.streamPartition = streamPartition |
| 496 | + this.authKey = authKey |
495 | 497 | this.callback = callback |
496 | 498 | this.options = options || {} |
497 | 499 | this.queue = [] |
|
607 | 609 | // Normal case where prevOffset == null || lastReceivedOffset == null || prevOffset === lastReceivedOffset |
608 | 610 | else { |
609 | 611 | this.lastReceivedOffset = offset |
610 | | - this.callback(content, this.streamId, timestamp, offset) |
| 612 | + this.callback(content, msg) |
611 | 613 | if (content[BYE_KEY]) { |
612 | 614 | this.emit('done') |
613 | 615 | } |
|
679 | 681 | // Automatically connect on first subscribe |
680 | 682 | autoConnect: true, |
681 | 683 | // Automatically disconnect on last unsubscribe |
682 | | - autoDisconnect: true |
| 684 | + autoDisconnect: true, |
| 685 | + authKey: null |
683 | 686 | } |
684 | 687 | this.subsByStream = {} |
685 | 688 | this.subById = {} |
|
720 | 723 | return this.subsByStream[streamId] || [] |
721 | 724 | } |
722 | 725 |
|
723 | | - StreamrClient.prototype.subscribe = function(streamId, callback, options) { |
| 726 | + StreamrClient.prototype.subscribe = function(options, callback, legacyOptions) { |
724 | 727 | var _this = this |
725 | 728 |
|
726 | | - if (!streamId) |
727 | | - throw "subscribe: Invalid arguments: stream id is required!" |
728 | | - else if (typeof streamId !== 'string') |
729 | | - throw "subscribe: stream id must be a string!" |
730 | | - |
731 | | - if (!callback) |
| 729 | + if (!options) { |
| 730 | + throw "subscribe: Invalid arguments: subscription options is required!" |
| 731 | + } else if (!callback) { |
732 | 732 | throw "subscribe: Invalid arguments: callback is required!" |
| 733 | + } |
| 734 | + |
| 735 | + // Backwards compatibility for giving a streamId as first argument |
| 736 | + if (typeof options === 'string') { |
| 737 | + options = { |
| 738 | + stream: options |
| 739 | + } |
| 740 | + } else if (typeof options !== 'object') { |
| 741 | + throw "subscribe: options must be an object" |
| 742 | + } |
| 743 | + |
| 744 | + // Backwards compatibility for giving an options object as third argument |
| 745 | + extend(options, legacyOptions) |
| 746 | + |
| 747 | + if (!options.stream) { |
| 748 | + throw "subscribe: Invalid arguments: options.stream is not given" |
| 749 | + } |
733 | 750 |
|
734 | 751 | // Create the Subscription object and bind handlers |
735 | | - var sub = new Subscription(streamId, callback, options) |
| 752 | + var sub = new Subscription(options.stream, options.partition || 0, options.authKey || this.options.authKey, callback, options) |
736 | 753 | sub.on('gap', function(from, to) { |
737 | 754 | _this._requestResend(sub, {resend_from: from, resend_to: to}) |
738 | 755 | }) |
|
985 | 1002 |
|
986 | 1003 | // If this is the first subscription for this stream, send a subscription request to the server |
987 | 1004 | if (!subs._subscribing && subscribedSubs.length === 0) { |
988 | | - var req = extend({}, sub.options, {type: 'subscribe', stream: sub.streamId}) |
| 1005 | + var req = extend({}, sub.options, { type: 'subscribe', stream: sub.streamId, authKey: sub.authKey }) |
989 | 1006 | debug("_requestSubscribe: subscribing client: %o", req) |
990 | 1007 | subs._subscribing = true |
991 | 1008 | _this.connection.send(req) |
|
1021 | 1038 |
|
1022 | 1039 | sub.resending = true |
1023 | 1040 |
|
1024 | | - var request = extend({}, options, resendOptions, {type: 'resend', stream: sub.streamId, sub: sub.id}) |
| 1041 | + var request = extend({}, options, resendOptions, {type: 'resend', stream: sub.streamId, partition: sub.streamPartition, authKey: sub.authKey, sub: sub.id}) |
1025 | 1042 | debug("_requestResend: %o", request) |
1026 | 1043 | this.connection.send(request) |
1027 | 1044 | } |
|
0 commit comments