Skip to content
This repository was archived by the owner on Oct 12, 2021. It is now read-only.

Commit ff134d1

Browse files
committed
refactor(worker): reduce number of rx operator deps, and replace heavyweight rxjs/Subject with custom LiteSubject
1 parent 7e10722 commit ff134d1

File tree

3 files changed

+46
-17
lines changed

3 files changed

+46
-17
lines changed

service-worker/worker/src/plugins/push/index.ts

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
import {
2+
LiteSubject,
23
Operation,
34
Plugin,
45
PluginFactory,
56
VersionWorker
67
} from '@angular/service-worker/worker';
78

8-
import {Subject} from 'rxjs/Subject';
99
import {Observable} from 'rxjs/Observable';
10+
import {Observer} from 'rxjs/Observer';
1011

1112
interface PushManifest {
1213
showNotifications?: boolean;
@@ -40,21 +41,25 @@ export function Push(): PluginFactory<PushImpl> {
4041
export class PushImpl implements Plugin<PushImpl> {
4142

4243
private pushBuffer: any[] = [];
43-
private pushSubject: Subject<any> = new Subject<any>();
44+
private pushSubject: LiteSubject<any> = new LiteSubject<any>();
4445
pushes: Observable<any>;
4546

4647
private get pushManifest(): PushManifest {
4748
return this.worker.manifest['push'] as PushManifest || EMPTY_MANIFEST;
4849
}
4950

5051
constructor(private worker: VersionWorker) {
51-
this.pushes = Observable.create(observer => {
52-
this.pushBuffer.forEach(data => observer.next(data));
52+
this.pushes = Observable.create((observer: Observer<any>) => {
53+
if (this.pushBuffer !== null) {
54+
this.pushBuffer.forEach(data => observer.next(data));
55+
}
5356
this.pushBuffer = null;
54-
const sub = this.pushSubject.subscribe(observer);
57+
const sub = this.pushSubject.observable.subscribe(observer);
5558
return () => {
5659
sub.unsubscribe();
57-
this.pushBuffer = [];
60+
if (!this.pushSubject.hasSubscribers) {
61+
this.pushBuffer = [];
62+
}
5863
};
5964
});
6065
}

service-worker/worker/src/worker/logging.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import {Observable} from 'rxjs/Observable';
2-
import {Subject} from 'rxjs/Subject';
2+
import {LiteSubject} from './rxjs';
33

44
export enum Verbosity {
55
INFO = 1,
@@ -16,7 +16,7 @@ interface LogEntry {
1616
let logBuffer: LogEntry[] = [];
1717

1818
// Subject which will be used to broadcast log messages once there's a subscriber.
19-
let logSubject: Subject<LogEntry> = null;
19+
let logSubject: LiteSubject<LogEntry> = null;
2020

2121
let logLevel = Verbosity.INFO;
2222

@@ -25,7 +25,7 @@ let logLevel = Verbosity.INFO;
2525
let logStream = Observable.create(observer => {
2626
// Create the subject if it doesn't exist already.
2727
if (logSubject === null) {
28-
logSubject = new Subject<LogEntry>();
28+
logSubject = new LiteSubject<LogEntry>();
2929
}
3030

3131
// An Observable representing buffered messages. Initialized to empty.
@@ -40,7 +40,7 @@ let logStream = Observable.create(observer => {
4040
// Combine (possibly empty) buffered messages with the subject, and pipe them to the
4141
// subscriber.
4242
return buffered
43-
.concat(logSubject)
43+
.concat(logSubject.observable)
4444
.subscribe(observer);
4545
});
4646

service-worker/worker/src/worker/rxjs.ts

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,11 @@
1-
import 'rxjs/add/observable/combineLatest';
21
import 'rxjs/add/observable/concat';
32
import 'rxjs/add/observable/defer';
43
import 'rxjs/add/observable/empty';
54
import 'rxjs/add/observable/from';
65
import 'rxjs/add/observable/fromEvent';
76
import 'rxjs/add/observable/fromPromise';
8-
import 'rxjs/add/observable/merge';
97
import 'rxjs/add/observable/of';
10-
import 'rxjs/add/observable/timer';
118

12-
import 'rxjs/add/operator/cache';
13-
import 'rxjs/add/operator/catch';
149
import 'rxjs/add/operator/concat';
1510
import 'rxjs/add/operator/concatMap';
1611
import 'rxjs/add/operator/do';
@@ -20,14 +15,43 @@ import 'rxjs/add/operator/ignoreElements';
2015
import 'rxjs/add/operator/let';
2116
import 'rxjs/add/operator/map';
2217
import 'rxjs/add/operator/mergeMap';
23-
import 'rxjs/add/operator/publishReplay';
2418
import 'rxjs/add/operator/reduce';
25-
import 'rxjs/add/operator/share';
2619
import 'rxjs/add/operator/switchMap';
2720
import 'rxjs/add/operator/take';
2821
import 'rxjs/add/operator/toPromise';
2922

3023
import {Observable} from 'rxjs/Observable';
24+
import {Observer} from 'rxjs/Observer';
25+
26+
export class LiteSubject<T> {
27+
28+
observable: Observable<T>;
29+
private subscribers: Observer<T>[] = [];
30+
31+
constructor() {
32+
this.observable = Observable.create((obs: Observer<T>) => {
33+
this.subscribers.push(obs);
34+
return () => {
35+
let index = this.subscribers.indexOf(obs);
36+
if (index >= 0) {
37+
this.subscribers.splice(index, 1);
38+
}
39+
};
40+
});
41+
}
42+
43+
next(value: T): void {
44+
this.subscribers.forEach(obs => obs.next(value));
45+
}
46+
47+
complete(): void {
48+
this.subscribers.forEach(obs => obs.complete());
49+
}
50+
51+
get hasSubscribers(): boolean {
52+
return this.subscribers.length > 0;
53+
}
54+
}
3155

3256
export function doAsync<T>(fn: (T) => Observable<any>): any {
3357
return (obs: Observable<T>) => obs

0 commit comments

Comments
 (0)