1414 * limitations under the License.
1515 */
1616
17- import { Observable , of , OperatorFunction , Subject } from 'rxjs' ;
18- import { catchError , filter , flatMap , map , mergeMap , share , switchMap } from 'rxjs/operators' ;
17+ import { Observable , of , Subject } from 'rxjs' ;
18+ import { catchError , filter , map , mergeMap , share , switchMap } from 'rxjs/operators' ;
1919import { BlockInfoDTO } from 'symbol-openapi-typescript-fetch-client' ;
2020import * as WebSocket from 'ws' ;
2121import { UnresolvedAddress } from '../model' ;
2222import { Address } from '../model/account/Address' ;
2323import { PublicAccount } from '../model/account/PublicAccount' ;
2424import { FinalizedBlock } from '../model/blockchain/FinalizedBlock' ;
2525import { NewBlock } from '../model/blockchain/NewBlock' ;
26- import { NamespaceName } from '../model/namespace/NamespaceName' ;
2726import { AggregateTransaction } from '../model/transaction/AggregateTransaction' ;
2827import { CosignatureSignedTransaction } from '../model/transaction/CosignatureSignedTransaction' ;
2928import { Deadline } from '../model/transaction/Deadline' ;
@@ -335,24 +334,25 @@ export class Listener implements IListener {
335334 transactionHash ?: string ,
336335 subscribeMultisig = false ,
337336 ) : Observable < T > {
338- return this . getResolvedAddress ( unresolvedAddress ) . pipe (
339- mergeMap ( ( address : Address ) => {
340- return this . subscribeWithMultig ( address , channel , subscribeMultisig ) . pipe (
341- switchMap ( ( subscribers ) => {
342- return this . messageSubject . asObservable ( ) . pipe (
343- filter ( ( listenerMessage ) => listenerMessage . channelName === channel ) ,
344- filter ( ( listenerMessage ) => listenerMessage . message instanceof Transaction ) ,
345- switchMap ( ( _ ) => {
346- const transactionObservable = of ( _ . message as T ) . pipe (
347- filter ( ( transaction ) => this . filterHash ( transaction , transactionHash ) ) ,
348- ) ;
349- if ( subscribers . includes ( _ . channelParam . toUpperCase ( ) ) ) {
350- return transactionObservable ;
351- } else {
352- return transactionObservable . pipe ( this . filterByNotifyAccount ( address ) ) ;
353- }
354- } ) ,
337+ return this . subscribeWithMultig ( unresolvedAddress , channel , subscribeMultisig ) . pipe (
338+ switchMap ( ( subscribers ) => {
339+ return this . messageSubject . asObservable ( ) . pipe (
340+ filter ( ( listenerMessage ) => listenerMessage . channelName === channel ) ,
341+ filter ( ( listenerMessage ) => listenerMessage . message instanceof Transaction ) ,
342+ switchMap ( ( _ ) => {
343+ const transactionObservable = of ( _ . message as T ) . pipe (
344+ filter ( ( transaction ) => this . filterHash ( transaction , transactionHash ) ) ,
355345 ) ;
346+ if ( subscribers . includes ( _ . channelParam . toUpperCase ( ) ) ) {
347+ return transactionObservable ;
348+ } else {
349+ return transactionObservable . pipe (
350+ filter (
351+ ( transaction ) =>
352+ transaction . isSigned ( unresolvedAddress ) || transaction . shouldNotifyAccount ( unresolvedAddress ) ,
353+ ) ,
354+ ) ;
355+ }
356356 } ) ,
357357 ) ;
358358 } ) ,
@@ -414,18 +414,14 @@ export class Listener implements IListener {
414414 transactionHash : string | undefined ,
415415 subscribeMultisig = false ,
416416 ) : Observable < string > {
417- return this . getResolvedAddress ( unresolvedAddress ) . pipe (
418- mergeMap ( ( address : Address ) => {
419- return this . subscribeWithMultig ( address , channel , subscribeMultisig ) . pipe (
420- switchMap ( ( subscribers ) => {
421- return this . messageSubject . asObservable ( ) . pipe (
422- filter ( ( _ ) => _ . channelName === channel ) ,
423- filter ( ( _ ) => typeof _ . message === 'string' ) ,
424- filter ( ( _ ) => subscribers . includes ( _ . channelParam . toUpperCase ( ) ) ) ,
425- map ( ( _ ) => _ . message as string ) ,
426- filter ( ( _ ) => ! transactionHash || _ . toUpperCase ( ) == transactionHash . toUpperCase ( ) ) ,
427- ) ;
428- } ) ,
417+ return this . subscribeWithMultig ( unresolvedAddress , channel , subscribeMultisig ) . pipe (
418+ switchMap ( ( subscribers ) => {
419+ return this . messageSubject . asObservable ( ) . pipe (
420+ filter ( ( _ ) => _ . channelName === channel ) ,
421+ filter ( ( _ ) => typeof _ . message === 'string' ) ,
422+ filter ( ( _ ) => subscribers . includes ( _ . channelParam . toUpperCase ( ) ) ) ,
423+ map ( ( _ ) => _ . message as string ) ,
424+ filter ( ( _ ) => ! transactionHash || _ . toUpperCase ( ) == transactionHash . toUpperCase ( ) ) ,
429425 ) ;
430426 } ) ,
431427 ) ;
@@ -469,39 +465,6 @@ export class Listener implements IListener {
469465 }
470466 }
471467
472- /**
473- * It filters a transaction by address using the aliases.
474- *
475- * This method delegates the rest loading as much as possible. It tries to filter by signer first.
476- *
477- * Note: this filter performs one extra rest call and it should be down in the pipeline.
478- *
479- * @param address the address.
480- * @return an observable filter.
481- */
482- private filterByNotifyAccount < T extends Transaction > ( address : Address ) : OperatorFunction < T , T > {
483- return ( transactionObservable ) : Observable < T > => {
484- return transactionObservable . pipe (
485- flatMap ( ( transaction ) => {
486- if ( transaction . isSigned ( address ) ) {
487- return of ( transaction ) ;
488- }
489- const namespaceIdsObservable = this . namespaceRepository . getAccountsNames ( [ address ] ) . pipe (
490- map ( ( names ) => {
491- return ( [ ] as NamespaceName [ ] )
492- . concat ( ...Array . from ( names . map ( ( accountName ) => accountName . names ) ) )
493- . map ( ( name ) => name . namespaceId ) ;
494- } ) ,
495- ) ;
496- return namespaceIdsObservable . pipe (
497- filter ( ( namespaceIds ) => transaction . shouldNotifyAccount ( address , namespaceIds ) ) ,
498- map ( ( ) => transaction ) ,
499- ) ;
500- } ) ,
501- ) ;
502- } ;
503- }
504-
505468 /**
506469 * Returns an observable stream of {@link CosignatureSignedTransaction} for specific address.
507470 * Each time a cosigner signs a transaction the address initialized,
@@ -512,17 +475,13 @@ export class Listener implements IListener {
512475 * @return an observable stream of {@link CosignatureSignedTransaction}
513476 */
514477 public cosignatureAdded ( unresolvedAddress : UnresolvedAddress , subscribeMultisig = false ) : Observable < CosignatureSignedTransaction > {
515- return this . getResolvedAddress ( unresolvedAddress ) . pipe (
516- mergeMap ( ( address : Address ) => {
517- return this . subscribeWithMultig ( address , ListenerChannelName . cosignature , subscribeMultisig ) . pipe (
518- switchMap ( ( subscribers ) => {
519- return this . messageSubject . asObservable ( ) . pipe (
520- filter ( ( _ ) => _ . channelName . toUpperCase ( ) === ListenerChannelName . cosignature . toUpperCase ( ) ) ,
521- filter ( ( _ ) => _ . message instanceof CosignatureSignedTransaction ) ,
522- filter ( ( _ ) => subscribers . includes ( _ . channelParam . toUpperCase ( ) ) ) ,
523- map ( ( _ ) => _ . message as CosignatureSignedTransaction ) ,
524- ) ;
525- } ) ,
478+ return this . subscribeWithMultig ( unresolvedAddress , ListenerChannelName . cosignature , subscribeMultisig ) . pipe (
479+ switchMap ( ( subscribers ) => {
480+ return this . messageSubject . asObservable ( ) . pipe (
481+ filter ( ( _ ) => _ . channelName . toUpperCase ( ) === ListenerChannelName . cosignature . toUpperCase ( ) ) ,
482+ filter ( ( _ ) => _ . message instanceof CosignatureSignedTransaction ) ,
483+ filter ( ( _ ) => subscribers . includes ( _ . channelParam . toUpperCase ( ) ) ) ,
484+ map ( ( _ ) => _ . message as CosignatureSignedTransaction ) ,
526485 ) ;
527486 } ) ,
528487 ) ;
@@ -604,22 +563,26 @@ export class Listener implements IListener {
604563 * @param multisig subscribe multisig account
605564 * @returns {string[] }
606565 */
607- private subscribeWithMultig ( cosigner : Address , channel : ListenerChannelName , multisig = false ) : Observable < string [ ] > {
566+ private subscribeWithMultig ( cosigner : UnresolvedAddress , channel : ListenerChannelName , multisig = false ) : Observable < string [ ] > {
608567 if ( ! multisig ) {
609568 this . subscribeTo ( `${ channel . toString ( ) } /${ cosigner . plain ( ) } ` ) ;
610569 return of ( [ cosigner . plain ( ) ] ) ;
611570 }
612- return this . multisigRepository ! . getMultisigAccountInfo ( cosigner ) . pipe (
613- map ( ( multisigInfo ) => {
614- const subscribers = [ cosigner ] . concat ( multisigInfo . multisigAddresses ) ;
615- subscribers . forEach ( ( m ) => {
616- this . subscribeTo ( `${ channel . toString ( ) } /${ m . plain ( ) } ` ) ;
617- } ) ;
618- return subscribers . map ( ( m ) => m . plain ( ) ) ;
619- } ) ,
620- catchError ( ( ) => {
621- this . subscribeTo ( `${ channel . toString ( ) } /${ cosigner . plain ( ) } ` ) ;
622- return of ( [ cosigner . plain ( ) ] ) ;
571+ return this . getResolvedAddress ( cosigner ) . pipe (
572+ mergeMap ( ( address : Address ) => {
573+ return this . multisigRepository ! . getMultisigAccountInfo ( address ) . pipe (
574+ map ( ( multisigInfo ) => {
575+ const subscribers = [ cosigner ] . concat ( multisigInfo . multisigAddresses ) ;
576+ subscribers . forEach ( ( m ) => {
577+ this . subscribeTo ( `${ channel . toString ( ) } /${ m . plain ( ) } ` ) ;
578+ } ) ;
579+ return subscribers . map ( ( m ) => m . plain ( ) ) ;
580+ } ) ,
581+ catchError ( ( ) => {
582+ this . subscribeTo ( `${ channel . toString ( ) } /${ cosigner . plain ( ) } ` ) ;
583+ return of ( [ cosigner . plain ( ) ] ) ;
584+ } ) ,
585+ ) ;
623586 } ) ,
624587 ) ;
625588 }
0 commit comments