|
14 | 14 | * limitations under the License. |
15 | 15 | */ |
16 | 16 |
|
17 | | -import {Observable, Subject} from 'rxjs'; |
18 | | -import {filter, map, share} from 'rxjs/operators'; |
| 17 | +import { Observable, Subject } from 'rxjs'; |
| 18 | +import { filter, map, share } from 'rxjs/operators'; |
19 | 19 | import * as WebSocket from 'ws'; |
20 | | -import {Address} from '../model/account/Address'; |
21 | | -import {PublicAccount} from '../model/account/PublicAccount'; |
22 | | -import {BlockInfo} from '../model/blockchain/BlockInfo'; |
23 | | -import {NetworkType} from '../model/blockchain/NetworkType'; |
24 | | -import {NamespaceId} from '../model/namespace/NamespaceId'; |
25 | | -import {AggregateTransaction} from '../model/transaction/AggregateTransaction'; |
26 | | -import {AggregateTransactionCosignature} from '../model/transaction/AggregateTransactionCosignature'; |
27 | | -import {CosignatoryModificationAction} from '../model/transaction/CosignatoryModificationAction'; |
28 | | -import {CosignatureSignedTransaction} from '../model/transaction/CosignatureSignedTransaction'; |
29 | | -import {Deadline} from '../model/transaction/Deadline'; |
30 | | -import {InnerTransaction} from '../model/transaction/InnerTransaction'; |
31 | | -import {MultisigAccountModificationTransaction} from '../model/transaction/MultisigAccountModificationTransaction'; |
32 | | -import {MultisigCosignatoryModification} from '../model/transaction/MultisigCosignatoryModification'; |
33 | | -import {Transaction} from '../model/transaction/Transaction'; |
34 | | -import {TransactionStatusError} from '../model/transaction/TransactionStatusError'; |
35 | | -import {TransferTransaction} from '../model/transaction/TransferTransaction'; |
36 | | -import {UInt64} from '../model/UInt64'; |
37 | | -import {CreateTransactionFromDTO, extractBeneficiary} from './transaction/CreateTransactionFromDTO'; |
| 20 | +import { Address } from '../model/account/Address'; |
| 21 | +import { PublicAccount } from '../model/account/PublicAccount'; |
| 22 | +import { BlockInfo } from '../model/blockchain/BlockInfo'; |
| 23 | +import { NamespaceId } from '../model/namespace/NamespaceId'; |
| 24 | +import { AggregateTransaction } from '../model/transaction/AggregateTransaction'; |
| 25 | +import { AggregateTransactionCosignature } from '../model/transaction/AggregateTransactionCosignature'; |
| 26 | +import { CosignatoryModificationAction } from '../model/transaction/CosignatoryModificationAction'; |
| 27 | +import { CosignatureSignedTransaction } from '../model/transaction/CosignatureSignedTransaction'; |
| 28 | +import { Deadline } from '../model/transaction/Deadline'; |
| 29 | +import { InnerTransaction } from '../model/transaction/InnerTransaction'; |
| 30 | +import { MultisigAccountModificationTransaction } from '../model/transaction/MultisigAccountModificationTransaction'; |
| 31 | +import { MultisigCosignatoryModification } from '../model/transaction/MultisigCosignatoryModification'; |
| 32 | +import { Transaction } from '../model/transaction/Transaction'; |
| 33 | +import { TransactionStatusError } from '../model/transaction/TransactionStatusError'; |
| 34 | +import { TransferTransaction } from '../model/transaction/TransferTransaction'; |
| 35 | +import { UInt64 } from '../model/UInt64'; |
| 36 | +import { |
| 37 | + CreateTransactionFromDTO, |
| 38 | + extractBeneficiary |
| 39 | +} from './transaction/CreateTransactionFromDTO'; |
38 | 40 |
|
39 | 41 | enum ListenerChannelName { |
40 | 42 | block = 'block', |
@@ -82,11 +84,11 @@ export class Listener { |
82 | 84 | constructor(/** |
83 | 85 | * Listener configuration. |
84 | 86 | */ |
85 | | - private config: string, |
| 87 | + private config: string, |
86 | 88 | /** |
87 | 89 | * WebSocket injected when using listeners in client. |
88 | 90 | */ |
89 | | - private websocketInjected?: any) { |
| 91 | + private websocketInjected?: any) { |
90 | 92 | this.config = config.replace(/\/$/, ''); |
91 | 93 | this.url = `${this.config}/ws`; |
92 | 94 | this.messageSubject = new Subject(); |
@@ -114,58 +116,76 @@ export class Listener { |
114 | 116 | }; |
115 | 117 | this.webSocket.onmessage = (msg) => { |
116 | 118 | const message = JSON.parse(msg.data as string); |
117 | | - |
118 | | - if (message.uid) { |
119 | | - this.uid = message.uid; |
120 | | - resolve(); |
121 | | - } else if (message.transaction) { |
122 | | - this.messageSubject.next({channelName: message.meta.channelName, message: CreateTransactionFromDTO(message)}); |
123 | | - } else if (message.block) { |
124 | | - const networkType = parseInt(message.block.version.toString(16).substr(0, 2), 16); |
125 | | - this.messageSubject.next({ |
126 | | - channelName: ListenerChannelName.block, message: new BlockInfo( |
127 | | - message.meta.hash, |
128 | | - message.meta.generationHash, |
129 | | - message.meta.totalFee ? UInt64.fromNumericString(message.meta.totalFee) : new UInt64([0, 0]), |
130 | | - message.meta.numTransactions, |
131 | | - message.block.signature, |
132 | | - PublicAccount.createFromPublicKey(message.block.signerPublicKey, networkType), |
133 | | - networkType, |
134 | | - parseInt(message.block.version.toString(16).substr(2, 2), 16), // Tx version |
135 | | - message.block.type, |
136 | | - UInt64.fromNumericString(message.block.height), |
137 | | - UInt64.fromNumericString(message.block.timestamp), |
138 | | - UInt64.fromNumericString(message.block.difficulty), |
139 | | - message.block.feeMultiplier, |
140 | | - message.block.previousBlockHash, |
141 | | - message.block.blockTransactionsHash, |
142 | | - message.block.blockReceiptsHash, |
143 | | - message.block.stateHash, |
144 | | - extractBeneficiary(message, networkType), // passing `message` as `blockDTO` |
145 | | - ), |
146 | | - }); |
147 | | - } else if (message.status) { |
148 | | - this.messageSubject.next({ |
149 | | - channelName: ListenerChannelName.status, message: new TransactionStatusError( |
150 | | - message.hash, |
151 | | - message.status, |
152 | | - Deadline.createFromDTO(message.deadline)), |
153 | | - }); |
154 | | - } else if (message.parentHash) { |
155 | | - this.messageSubject.next({ |
156 | | - channelName: ListenerChannelName.cosignature, |
157 | | - message: new CosignatureSignedTransaction(message.parentHash, message.signature, message.signerPublicKey), |
158 | | - }); |
159 | | - } else if (message.meta && message.meta.hash) { |
160 | | - this.messageSubject.next({channelName: message.meta.channelName, message: message.meta.hash}); |
161 | | - } |
| 119 | + this.handleMessage(message, resolve); |
162 | 120 | }; |
163 | 121 | } else { |
164 | 122 | resolve(); |
165 | 123 | } |
166 | 124 | }); |
167 | 125 | } |
168 | 126 |
|
| 127 | + /** |
| 128 | + * @internal |
| 129 | + * |
| 130 | + * This method handles one incoming message from the web socket and it dispatches it to the message subject listener. |
| 131 | + * |
| 132 | + * @param message the object payload. |
| 133 | + * @param resolve the method to notify when the uid has been resolved and the listener connection has been stablished. |
| 134 | + */ |
| 135 | + handleMessage(message: any, resolve) { |
| 136 | + if (message.uid) { |
| 137 | + this.uid = message.uid; |
| 138 | + resolve(); |
| 139 | + } else if (message.transaction) { |
| 140 | + this.messageSubject.next({ |
| 141 | + channelName: message.meta.channelName, |
| 142 | + message: CreateTransactionFromDTO(message) |
| 143 | + }); |
| 144 | + } else if (message.block) { |
| 145 | + const networkType = parseInt(message.block.version.toString(16).substr(0, 2), 16); |
| 146 | + this.messageSubject.next({ |
| 147 | + channelName: ListenerChannelName.block, message: new BlockInfo( |
| 148 | + message.meta.hash, |
| 149 | + message.meta.generationHash, |
| 150 | + message.meta.totalFee ? UInt64.fromNumericString(message.meta.totalFee) : new UInt64([0, 0]), |
| 151 | + message.meta.numTransactions, |
| 152 | + message.block.signature, |
| 153 | + PublicAccount.createFromPublicKey(message.block.signerPublicKey, networkType), |
| 154 | + networkType, |
| 155 | + parseInt(message.block.version.toString(16).substr(2, 2), 16), // Tx version |
| 156 | + message.block.type, |
| 157 | + UInt64.fromNumericString(message.block.height), |
| 158 | + UInt64.fromNumericString(message.block.timestamp), |
| 159 | + UInt64.fromNumericString(message.block.difficulty), |
| 160 | + message.block.feeMultiplier, |
| 161 | + message.block.previousBlockHash, |
| 162 | + message.block.blockTransactionsHash, |
| 163 | + message.block.blockReceiptsHash, |
| 164 | + message.block.stateHash, |
| 165 | + extractBeneficiary(message, networkType), // passing `message` as `blockDTO` |
| 166 | + ), |
| 167 | + }); |
| 168 | + } else if (message.status) { |
| 169 | + this.messageSubject.next({ |
| 170 | + channelName: ListenerChannelName.status, message: new TransactionStatusError( |
| 171 | + Address.createFromEncoded(message.address), |
| 172 | + message.hash, |
| 173 | + message.status, |
| 174 | + Deadline.createFromDTO(message.deadline)), |
| 175 | + }); |
| 176 | + } else if (message.parentHash) { |
| 177 | + this.messageSubject.next({ |
| 178 | + channelName: ListenerChannelName.cosignature, |
| 179 | + message: new CosignatureSignedTransaction(message.parentHash, message.signature, message.signerPublicKey), |
| 180 | + }); |
| 181 | + } else if (message.meta && message.meta.hash) { |
| 182 | + this.messageSubject.next({ |
| 183 | + channelName: message.meta.channelName, |
| 184 | + message: message.meta.hash |
| 185 | + }); |
| 186 | + } |
| 187 | + } |
| 188 | + |
169 | 189 | /** |
170 | 190 | * returns a boolean that repressents the open state |
171 | 191 | * @returns a boolean |
@@ -207,7 +227,7 @@ export class Listener { |
207 | 227 | public newBlock(): Observable<BlockInfo> { |
208 | 228 | this.subscribeTo('block'); |
209 | 229 | return this.messageSubject |
210 | | - .asObservable().pipe( |
| 230 | + .asObservable().pipe( |
211 | 231 | share(), |
212 | 232 | filter((_) => _.channelName === ListenerChannelName.block), |
213 | 233 | filter((_) => _.message instanceof BlockInfo), |
@@ -310,7 +330,8 @@ export class Listener { |
310 | 330 | return this.messageSubject.asObservable().pipe( |
311 | 331 | filter((_) => _.channelName === ListenerChannelName.status), |
312 | 332 | filter((_) => _.message instanceof TransactionStatusError), |
313 | | - map((_) => _.message as TransactionStatusError)); |
| 333 | + map((_) => _.message as TransactionStatusError), |
| 334 | + filter((_) => address.equals(_.address))); |
314 | 335 | } |
315 | 336 |
|
316 | 337 | /** |
@@ -394,7 +415,7 @@ export class Listener { |
394 | 415 | } |
395 | 416 |
|
396 | 417 | return transaction.signer!.address.equals(address) || ( |
397 | | - transaction instanceof TransferTransaction |
| 418 | + transaction instanceof TransferTransaction |
398 | 419 | && (transaction.recipientAddress as Address).equals(address) |
399 | 420 | ); |
400 | 421 | } |
|
0 commit comments