|
15 | 15 | */ |
16 | 16 |
|
17 | 17 | import { Observable, of, Subject } from 'rxjs'; |
18 | | -import { catchError, filter, map, mergeMap, share, switchMap } from 'rxjs/operators'; |
| 18 | +import { catchError, distinctUntilChanged, filter, map, mergeMap, share, switchMap } from 'rxjs/operators'; |
19 | 19 | import { BlockInfoDTO } from 'symbol-openapi-typescript-fetch-client'; |
20 | 20 | import * as WebSocket from 'ws'; |
21 | 21 | import { UnresolvedAddress } from '../model'; |
@@ -74,6 +74,8 @@ export class Listener implements IListener { |
74 | 74 | */ |
75 | 75 | private uid: string; |
76 | 76 |
|
| 77 | + private SIGINT = false; |
| 78 | + |
77 | 79 | /** |
78 | 80 | * Constructor |
79 | 81 | * @param url - Listener websocket server url. default: rest-gateway's url with ''/ws'' suffix. (e.g. http://localhost:3000/ws). |
@@ -120,6 +122,18 @@ export class Listener implements IListener { |
120 | 122 | this.webSocket.onerror = (err: Error): void => { |
121 | 123 | reject(err); |
122 | 124 | }; |
| 125 | + this.webSocket.onclose = (closeEvent?: any): void => { |
| 126 | + if (this.SIGINT) { |
| 127 | + return; |
| 128 | + } |
| 129 | + if (closeEvent) { |
| 130 | + reject({ |
| 131 | + client: this.uid, |
| 132 | + code: closeEvent.code, |
| 133 | + reason: closeEvent.reason, |
| 134 | + }); |
| 135 | + } |
| 136 | + }; |
123 | 137 | this.webSocket.onmessage = (msg: any): void => { |
124 | 138 | const message = JSON.parse(msg.data as string); |
125 | 139 | this.handleMessage(message, resolve); |
@@ -232,6 +246,7 @@ export class Listener implements IListener { |
232 | 246 | this.webSocket && |
233 | 247 | (this.webSocket.readyState === this.webSocket.OPEN || this.webSocket.readyState === this.webSocket.CONNECTING) |
234 | 248 | ) { |
| 249 | + this.SIGINT = true; |
235 | 250 | this.webSocket.close(); |
236 | 251 | } |
237 | 252 | } |
@@ -339,6 +354,11 @@ export class Listener implements IListener { |
339 | 354 | return this.messageSubject.asObservable().pipe( |
340 | 355 | filter((listenerMessage) => listenerMessage.channelName === channel), |
341 | 356 | filter((listenerMessage) => listenerMessage.message instanceof Transaction), |
| 357 | + distinctUntilChanged((prev, curr) => { |
| 358 | + const currentHash = (curr.message as Transaction).transactionInfo!.hash; |
| 359 | + const previousHash = (prev.message as Transaction).transactionInfo!.hash; |
| 360 | + return (currentHash && previousHash && previousHash === currentHash) || !currentHash || !previousHash; |
| 361 | + }), |
342 | 362 | switchMap((_) => { |
343 | 363 | const transactionObservable = of(_.message as T).pipe( |
344 | 364 | filter((transaction) => this.filterHash(transaction, transactionHash)), |
@@ -421,7 +441,8 @@ export class Listener implements IListener { |
421 | 441 | filter((_) => typeof _.message === 'string'), |
422 | 442 | filter((_) => subscribers.includes(_.channelParam.toUpperCase())), |
423 | 443 | map((_) => _.message as string), |
424 | | - filter((_) => !transactionHash || _.toUpperCase() == transactionHash.toUpperCase()), |
| 444 | + filter((_) => !transactionHash || _.toUpperCase() === transactionHash.toUpperCase()), |
| 445 | + distinctUntilChanged(), |
425 | 446 | ); |
426 | 447 | }), |
427 | 448 | ); |
|
0 commit comments