Skip to content

Commit 03a3214

Browse files
authored
TS-77 filtering listener's status message by subscribed address. (#327)
* TS-77 filtering listener's status message by subscribed address.
1 parent fee914a commit 03a3214

File tree

4 files changed

+193
-71
lines changed

4 files changed

+193
-71
lines changed

src/infrastructure/Listener.ts

Lines changed: 91 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -14,27 +14,29 @@
1414
* limitations under the License.
1515
*/
1616

17-
import {Observable, Subject} from 'rxjs';
18-
import {filter, map, share} from 'rxjs/operators';
17+
import { Observable, Subject } from 'rxjs';
18+
import { filter, map, share } from 'rxjs/operators';
1919
import * as WebSocket from 'ws';
20-
import {Address} from '../model/account/Address';
21-
import {PublicAccount} from '../model/account/PublicAccount';
22-
import {BlockInfo} from '../model/blockchain/BlockInfo';
23-
import {NetworkType} from '../model/blockchain/NetworkType';
24-
import {NamespaceId} from '../model/namespace/NamespaceId';
25-
import {AggregateTransaction} from '../model/transaction/AggregateTransaction';
26-
import {AggregateTransactionCosignature} from '../model/transaction/AggregateTransactionCosignature';
27-
import {CosignatoryModificationAction} from '../model/transaction/CosignatoryModificationAction';
28-
import {CosignatureSignedTransaction} from '../model/transaction/CosignatureSignedTransaction';
29-
import {Deadline} from '../model/transaction/Deadline';
30-
import {InnerTransaction} from '../model/transaction/InnerTransaction';
31-
import {MultisigAccountModificationTransaction} from '../model/transaction/MultisigAccountModificationTransaction';
32-
import {MultisigCosignatoryModification} from '../model/transaction/MultisigCosignatoryModification';
33-
import {Transaction} from '../model/transaction/Transaction';
34-
import {TransactionStatusError} from '../model/transaction/TransactionStatusError';
35-
import {TransferTransaction} from '../model/transaction/TransferTransaction';
36-
import {UInt64} from '../model/UInt64';
37-
import {CreateTransactionFromDTO, extractBeneficiary} from './transaction/CreateTransactionFromDTO';
20+
import { Address } from '../model/account/Address';
21+
import { PublicAccount } from '../model/account/PublicAccount';
22+
import { BlockInfo } from '../model/blockchain/BlockInfo';
23+
import { NamespaceId } from '../model/namespace/NamespaceId';
24+
import { AggregateTransaction } from '../model/transaction/AggregateTransaction';
25+
import { AggregateTransactionCosignature } from '../model/transaction/AggregateTransactionCosignature';
26+
import { CosignatoryModificationAction } from '../model/transaction/CosignatoryModificationAction';
27+
import { CosignatureSignedTransaction } from '../model/transaction/CosignatureSignedTransaction';
28+
import { Deadline } from '../model/transaction/Deadline';
29+
import { InnerTransaction } from '../model/transaction/InnerTransaction';
30+
import { MultisigAccountModificationTransaction } from '../model/transaction/MultisigAccountModificationTransaction';
31+
import { MultisigCosignatoryModification } from '../model/transaction/MultisigCosignatoryModification';
32+
import { Transaction } from '../model/transaction/Transaction';
33+
import { TransactionStatusError } from '../model/transaction/TransactionStatusError';
34+
import { TransferTransaction } from '../model/transaction/TransferTransaction';
35+
import { UInt64 } from '../model/UInt64';
36+
import {
37+
CreateTransactionFromDTO,
38+
extractBeneficiary
39+
} from './transaction/CreateTransactionFromDTO';
3840

3941
enum ListenerChannelName {
4042
block = 'block',
@@ -82,11 +84,11 @@ export class Listener {
8284
constructor(/**
8385
* Listener configuration.
8486
*/
85-
private config: string,
87+
private config: string,
8688
/**
8789
* WebSocket injected when using listeners in client.
8890
*/
89-
private websocketInjected?: any) {
91+
private websocketInjected?: any) {
9092
this.config = config.replace(/\/$/, '');
9193
this.url = `${this.config}/ws`;
9294
this.messageSubject = new Subject();
@@ -114,58 +116,76 @@ export class Listener {
114116
};
115117
this.webSocket.onmessage = (msg) => {
116118
const message = JSON.parse(msg.data as string);
117-
118-
if (message.uid) {
119-
this.uid = message.uid;
120-
resolve();
121-
} else if (message.transaction) {
122-
this.messageSubject.next({channelName: message.meta.channelName, message: CreateTransactionFromDTO(message)});
123-
} else if (message.block) {
124-
const networkType = parseInt(message.block.version.toString(16).substr(0, 2), 16);
125-
this.messageSubject.next({
126-
channelName: ListenerChannelName.block, message: new BlockInfo(
127-
message.meta.hash,
128-
message.meta.generationHash,
129-
message.meta.totalFee ? UInt64.fromNumericString(message.meta.totalFee) : new UInt64([0, 0]),
130-
message.meta.numTransactions,
131-
message.block.signature,
132-
PublicAccount.createFromPublicKey(message.block.signerPublicKey, networkType),
133-
networkType,
134-
parseInt(message.block.version.toString(16).substr(2, 2), 16), // Tx version
135-
message.block.type,
136-
UInt64.fromNumericString(message.block.height),
137-
UInt64.fromNumericString(message.block.timestamp),
138-
UInt64.fromNumericString(message.block.difficulty),
139-
message.block.feeMultiplier,
140-
message.block.previousBlockHash,
141-
message.block.blockTransactionsHash,
142-
message.block.blockReceiptsHash,
143-
message.block.stateHash,
144-
extractBeneficiary(message, networkType), // passing `message` as `blockDTO`
145-
),
146-
});
147-
} else if (message.status) {
148-
this.messageSubject.next({
149-
channelName: ListenerChannelName.status, message: new TransactionStatusError(
150-
message.hash,
151-
message.status,
152-
Deadline.createFromDTO(message.deadline)),
153-
});
154-
} else if (message.parentHash) {
155-
this.messageSubject.next({
156-
channelName: ListenerChannelName.cosignature,
157-
message: new CosignatureSignedTransaction(message.parentHash, message.signature, message.signerPublicKey),
158-
});
159-
} else if (message.meta && message.meta.hash) {
160-
this.messageSubject.next({channelName: message.meta.channelName, message: message.meta.hash});
161-
}
119+
this.handleMessage(message, resolve);
162120
};
163121
} else {
164122
resolve();
165123
}
166124
});
167125
}
168126

127+
/**
128+
* @internal
129+
*
130+
* This method handles one incoming message from the web socket and it dispatches it to the message subject listener.
131+
*
132+
* @param message the object payload.
133+
* @param resolve the method to notify when the uid has been resolved and the listener connection has been stablished.
134+
*/
135+
handleMessage(message: any, resolve) {
136+
if (message.uid) {
137+
this.uid = message.uid;
138+
resolve();
139+
} else if (message.transaction) {
140+
this.messageSubject.next({
141+
channelName: message.meta.channelName,
142+
message: CreateTransactionFromDTO(message)
143+
});
144+
} else if (message.block) {
145+
const networkType = parseInt(message.block.version.toString(16).substr(0, 2), 16);
146+
this.messageSubject.next({
147+
channelName: ListenerChannelName.block, message: new BlockInfo(
148+
message.meta.hash,
149+
message.meta.generationHash,
150+
message.meta.totalFee ? UInt64.fromNumericString(message.meta.totalFee) : new UInt64([0, 0]),
151+
message.meta.numTransactions,
152+
message.block.signature,
153+
PublicAccount.createFromPublicKey(message.block.signerPublicKey, networkType),
154+
networkType,
155+
parseInt(message.block.version.toString(16).substr(2, 2), 16), // Tx version
156+
message.block.type,
157+
UInt64.fromNumericString(message.block.height),
158+
UInt64.fromNumericString(message.block.timestamp),
159+
UInt64.fromNumericString(message.block.difficulty),
160+
message.block.feeMultiplier,
161+
message.block.previousBlockHash,
162+
message.block.blockTransactionsHash,
163+
message.block.blockReceiptsHash,
164+
message.block.stateHash,
165+
extractBeneficiary(message, networkType), // passing `message` as `blockDTO`
166+
),
167+
});
168+
} else if (message.status) {
169+
this.messageSubject.next({
170+
channelName: ListenerChannelName.status, message: new TransactionStatusError(
171+
Address.createFromEncoded(message.address),
172+
message.hash,
173+
message.status,
174+
Deadline.createFromDTO(message.deadline)),
175+
});
176+
} else if (message.parentHash) {
177+
this.messageSubject.next({
178+
channelName: ListenerChannelName.cosignature,
179+
message: new CosignatureSignedTransaction(message.parentHash, message.signature, message.signerPublicKey),
180+
});
181+
} else if (message.meta && message.meta.hash) {
182+
this.messageSubject.next({
183+
channelName: message.meta.channelName,
184+
message: message.meta.hash
185+
});
186+
}
187+
}
188+
169189
/**
170190
* returns a boolean that repressents the open state
171191
* @returns a boolean
@@ -207,7 +227,7 @@ export class Listener {
207227
public newBlock(): Observable<BlockInfo> {
208228
this.subscribeTo('block');
209229
return this.messageSubject
210-
.asObservable().pipe(
230+
.asObservable().pipe(
211231
share(),
212232
filter((_) => _.channelName === ListenerChannelName.block),
213233
filter((_) => _.message instanceof BlockInfo),
@@ -310,7 +330,8 @@ export class Listener {
310330
return this.messageSubject.asObservable().pipe(
311331
filter((_) => _.channelName === ListenerChannelName.status),
312332
filter((_) => _.message instanceof TransactionStatusError),
313-
map((_) => _.message as TransactionStatusError));
333+
map((_) => _.message as TransactionStatusError),
334+
filter((_) => address.equals(_.address)));
314335
}
315336

316337
/**
@@ -394,7 +415,7 @@ export class Listener {
394415
}
395416

396417
return transaction.signer!.address.equals(address) || (
397-
transaction instanceof TransferTransaction
418+
transaction instanceof TransferTransaction
398419
&& (transaction.recipientAddress as Address).equals(address)
399420
);
400421
}

src/model/transaction/TransactionStatusError.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616

1717
import {Deadline} from './Deadline';
18+
import {Address} from "../account/Address";
1819

1920
/**
2021
* Transaction status error model returned by listeners
@@ -23,11 +24,17 @@ export class TransactionStatusError {
2324

2425
/**
2526
* @internal
27+
* @param address
2628
* @param hash
2729
* @param status
2830
* @param deadline
2931
*/
3032
constructor(
33+
/**
34+
* The address of the account that signed the invalid transaction.
35+
* It's the address listened when calling Lister.status.
36+
*/
37+
public readonly address: Address,
3138
/**
3239
* The transaction hash.
3340
*/

test/infrastructure/Listener.spec.ts

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@
1616

1717
import {expect} from 'chai';
1818
import {Listener} from '../../src/infrastructure/Listener';
19+
import {Address} from "../../src/model/account/Address";
20+
import {deepEqual} from "assert";
21+
import {UInt64} from "../../src/model/UInt64";
22+
import {timeout} from "rxjs/operators";
1923

2024
describe('Listener', () => {
2125
it('should createComplete a WebSocket instance given url parameter', () => {
@@ -32,6 +36,93 @@ describe('Listener', () => {
3236
});
3337
});
3438

39+
describe('onStatusWhenAddressIsTheSame', () => {
40+
it('Should forward status', (done) => {
41+
42+
43+
const errorEncodedAddress = '906415867F121D037AF447E711B0F5E4D52EBBF066D96860EB';
44+
45+
const errorAddress = Address.createFromEncoded(errorEncodedAddress);
46+
47+
class WebSocketMock {
48+
constructor(public readonly url: string) {
49+
}
50+
51+
send(payload: string) {
52+
expect(payload).to.be.eq(`{"subscribe":"status/${errorAddress.plain()}"}`);
53+
}
54+
}
55+
56+
const statusInfoErrorDTO = {
57+
address: errorEncodedAddress,
58+
deadline: '1010',
59+
hash: 'transaction-hash',
60+
status: 'error-message',
61+
};
62+
63+
const listener = new Listener('ws://localhost:3000', WebSocketMock);
64+
65+
listener.open();
66+
67+
listener.status(errorAddress).pipe(timeout(2000)).subscribe((transactionStatusError) => {
68+
expect(transactionStatusError.address).to.deep.equal(errorAddress);
69+
expect(transactionStatusError.hash).to.be.equal(statusInfoErrorDTO.hash);
70+
expect(transactionStatusError.status).to.be.equal(statusInfoErrorDTO.status);
71+
deepEqual(transactionStatusError.deadline.toDTO(), UInt64.fromNumericString(statusInfoErrorDTO.deadline).toDTO());
72+
done();
73+
}, err => {
74+
done('Should have not timed out!');
75+
});
76+
77+
listener.handleMessage(statusInfoErrorDTO, null);
78+
79+
80+
});
81+
});
82+
83+
describe('onStatusWhenAddressIsDifferentAddress', () => {
84+
it('Should not forward status', (done) => {
85+
86+
87+
const errorEncodedAddress = '906415867F121D037AF447E711B0F5E4D52EBBF066D96860EB';
88+
89+
const subscribedEncodedAddress = '906415867F121D037AF447E711B0F5E4D52EBBF066D96AAAAA';
90+
const subscribedAddress = Address.createFromEncoded(subscribedEncodedAddress);
91+
92+
class WebSocketMock {
93+
94+
constructor(public readonly url: string) {
95+
}
96+
97+
send(payload: string) {
98+
expect(payload).to.be.eq(`{"subscribe":"status/${subscribedAddress.plain()}"}`);
99+
}
100+
}
101+
102+
const statusInfoErrorDTO = {
103+
address: errorEncodedAddress,
104+
deadline: '1010',
105+
hash: 'transaction-hash',
106+
status: 'error-message',
107+
};
108+
109+
const listener = new Listener('ws://localhost:3000', WebSocketMock);
110+
111+
listener.open();
112+
113+
listener.status(subscribedAddress).pipe(timeout(100)).subscribe(status => {
114+
done('Should have timed out!');
115+
}, err => {
116+
expect(err.name).to.be.eq('TimeoutError');
117+
done();
118+
});
119+
120+
listener.handleMessage(statusInfoErrorDTO, null);
121+
122+
123+
});
124+
});
125+
35126
describe('onerror', () => {
36127
it('should reject because of wrong server url', async () => {
37128
const listener = new Listener('https://notcorrecturl:0000');

test/model/transaction/TransactionStatusError.spec.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,24 @@ import {expect} from 'chai';
1919
import {TransactionStatusError} from '../../../src/model/transaction/TransactionStatusError';
2020
import {Deadline} from '../../../src/model/transaction/Deadline';
2121
import { UInt64 } from '../../../src/model/UInt64';
22+
import {Address} from "../../../src/model/account/Address";
2223

2324
describe('TransactionStatusError', () => {
2425

2526
it('should createComplete an TransactionStatusError object', () => {
2627
const statusInfoErrorDTO = {
28+
address: Address.createFromRawAddress('SBILTA367K2LX2FEXG5TFWAS7GEFYAGY7QLFBYKC'),
2729
deadline: '1010',
2830
hash: 'transaction-hash',
2931
status: 'error-message',
3032
};
31-
3233
const transactionStatusError = new TransactionStatusError(
34+
statusInfoErrorDTO.address,
3335
statusInfoErrorDTO.hash,
3436
statusInfoErrorDTO.status,
3537
Deadline.createFromDTO(statusInfoErrorDTO.deadline));
3638

39+
expect(transactionStatusError.address).to.be.equal(statusInfoErrorDTO.address);
3740
expect(transactionStatusError.hash).to.be.equal(statusInfoErrorDTO.hash);
3841
expect(transactionStatusError.status).to.be.equal(statusInfoErrorDTO.status);
3942
deepEqual(transactionStatusError.deadline.toDTO(), UInt64.fromNumericString(statusInfoErrorDTO.deadline).toDTO());

0 commit comments

Comments
 (0)