1515 */
1616
1717import { Observable , of , OperatorFunction , Subject } from 'rxjs' ;
18- import { filter , flatMap , map , share } from 'rxjs/operators' ;
18+ import { filter , flatMap , map , share , switchMap } from 'rxjs/operators' ;
1919import { BlockInfoDTO } from 'symbol-openapi-typescript-fetch-client' ;
2020import * as WebSocket from 'ws' ;
2121import { Address } from '../model/account/Address' ;
@@ -46,6 +46,7 @@ export enum ListenerChannelName {
4646
4747interface ListenerMessage {
4848 readonly channelName : ListenerChannelName ;
49+ readonly channelParam : string ;
4950 readonly message : Transaction | string | NewBlock | TransactionStatusError | CosignatureSignedTransaction ;
5051}
5152
@@ -132,36 +133,61 @@ export class Listener implements IListener {
132133 if ( message . uid ) {
133134 this . uid = message . uid ;
134135 resolve ( ) ;
135- } else if ( message . transaction ) {
136- this . messageSubject . next ( {
137- channelName : message . meta . channelName ,
138- message : CreateTransactionFromDTO ( message ) ,
139- } ) ;
140- } else if ( message . block ) {
141- this . messageSubject . next ( {
142- channelName : ListenerChannelName . block ,
143- message : this . toNewBlock ( message ) ,
144- } ) ;
145- } else if ( message . code ) {
146- this . messageSubject . next ( {
147- channelName : ListenerChannelName . status ,
148- message : new TransactionStatusError (
149- Address . createFromEncoded ( message . address ) ,
150- message . hash ,
151- message . code ,
152- Deadline . createFromDTO ( message . deadline ) ,
153- ) ,
154- } ) ;
155- } else if ( message . parentHash ) {
156- this . messageSubject . next ( {
157- channelName : ListenerChannelName . cosignature ,
158- message : new CosignatureSignedTransaction ( message . parentHash , message . signature , message . signerPublicKey ) ,
159- } ) ;
160- } else if ( message . meta && message . meta . hash ) {
161- this . messageSubject . next ( {
162- channelName : message . meta . channelName ,
163- message : message . meta . hash ,
164- } ) ;
136+ return ;
137+ }
138+ const topic = message . topic as string ;
139+ const channelName = topic . indexOf ( '/' ) >= 0 ? topic . substr ( 0 , topic . indexOf ( '/' ) ) : topic ;
140+ const channelParam = topic . indexOf ( '/' ) >= 0 ? topic . split ( '/' ) [ 1 ] : '' ;
141+ switch ( channelName ) {
142+ case ListenerChannelName . confirmedAdded :
143+ case ListenerChannelName . unconfirmedAdded :
144+ case ListenerChannelName . partialAdded :
145+ this . messageSubject . next ( {
146+ channelName : ListenerChannelName [ channelName ] ,
147+ channelParam : channelParam ,
148+ message : CreateTransactionFromDTO ( message . data ) ,
149+ } ) ;
150+ break ;
151+ case ListenerChannelName . block :
152+ this . messageSubject . next ( {
153+ channelName : ListenerChannelName [ channelName ] ,
154+ channelParam : channelParam ,
155+ message : this . toNewBlock ( message . data ) ,
156+ } ) ;
157+ break ;
158+ case ListenerChannelName . status :
159+ this . messageSubject . next ( {
160+ channelName : ListenerChannelName [ channelName ] ,
161+ channelParam : channelParam ,
162+ message : new TransactionStatusError (
163+ Address . createFromRawAddress ( channelParam ) ,
164+ message . data . hash ,
165+ message . data . code ,
166+ Deadline . createFromDTO ( message . data . deadline ) ,
167+ ) ,
168+ } ) ;
169+ break ;
170+ case ListenerChannelName . cosignature :
171+ this . messageSubject . next ( {
172+ channelName : ListenerChannelName [ channelName ] ,
173+ channelParam : channelParam ,
174+ message : new CosignatureSignedTransaction (
175+ message . data . parentHash ,
176+ message . data . signature ,
177+ message . data . signerPublicKey ,
178+ ) ,
179+ } ) ;
180+ break ;
181+ case ListenerChannelName . partialRemoved :
182+ case ListenerChannelName . unconfirmedRemoved :
183+ this . messageSubject . next ( {
184+ channelName : ListenerChannelName [ channelName ] ,
185+ channelParam : channelParam ,
186+ message : message . data . meta . hash ,
187+ } ) ;
188+ break ;
189+ default :
190+ throw new Error ( `Channel: ${ channelName } is not supported.` ) ;
165191 }
166192 }
167193
@@ -258,9 +284,16 @@ export class Listener implements IListener {
258284 return this . messageSubject . asObservable ( ) . pipe (
259285 filter ( ( listenerMessage ) => listenerMessage . channelName === channel ) ,
260286 filter ( ( listenerMessage ) => listenerMessage . message instanceof Transaction ) ,
261- map ( ( listenerMessage ) => listenerMessage . message as T ) ,
262- filter ( ( transaction ) => this . filterHash ( transaction , transactionHash ) ) ,
263- this . filterByNotifyAccount ( address ) ,
287+ switchMap ( ( _ ) => {
288+ const transactionObservable = of ( _ . message as T ) . pipe (
289+ filter ( ( transaction ) => this . filterHash ( transaction , transactionHash ) ) ,
290+ ) ;
291+ if ( _ . channelParam . toUpperCase ( ) === address . plain ( ) ) {
292+ return transactionObservable ;
293+ } else {
294+ return transactionObservable . pipe ( this . filterByNotifyAccount ( address ) ) ;
295+ }
296+ } ) ,
264297 ) ;
265298 }
266299
@@ -306,6 +339,7 @@ export class Listener implements IListener {
306339 return this . messageSubject . asObservable ( ) . pipe (
307340 filter ( ( _ ) => _ . channelName === channel ) ,
308341 filter ( ( _ ) => typeof _ . message === 'string' ) ,
342+ filter ( ( _ ) => _ . channelParam . toUpperCase ( ) === address . plain ( ) ) ,
309343 map ( ( _ ) => _ . message as string ) ,
310344 filter ( ( _ ) => ! transactionHash || _ . toUpperCase ( ) == transactionHash . toUpperCase ( ) ) ,
311345 ) ;
@@ -325,9 +359,9 @@ export class Listener implements IListener {
325359 return this . messageSubject . asObservable ( ) . pipe (
326360 filter ( ( _ ) => _ . channelName === ListenerChannelName . status ) ,
327361 filter ( ( _ ) => _ . message instanceof TransactionStatusError ) ,
362+ filter ( ( _ ) => _ . channelParam . toUpperCase ( ) === address . plain ( ) ) ,
328363 map ( ( _ ) => _ . message as TransactionStatusError ) ,
329364 filter ( ( _ ) => ! transactionHash || _ . hash . toUpperCase ( ) == transactionHash . toUpperCase ( ) ) ,
330- filter ( ( _ ) => address . equals ( _ . address ) ) ,
331365 ) ;
332366 }
333367
@@ -391,6 +425,7 @@ export class Listener implements IListener {
391425 return this . messageSubject . asObservable ( ) . pipe (
392426 filter ( ( _ ) => _ . channelName . toUpperCase ( ) === ListenerChannelName . cosignature . toUpperCase ( ) ) ,
393427 filter ( ( _ ) => _ . message instanceof CosignatureSignedTransaction ) ,
428+ filter ( ( _ ) => _ . channelParam . toUpperCase ( ) === address . plain ( ) ) ,
394429 map ( ( _ ) => _ . message as CosignatureSignedTransaction ) ,
395430 ) ;
396431 }
0 commit comments