Skip to content

Commit 141eaee

Browse files
committed
(#8) add rx support to async pipes
1 parent 545fc1b commit 141eaee

File tree

1 file changed

+38
-0
lines changed

1 file changed

+38
-0
lines changed

src/app/pipes/rx/rxpipe.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/// <reference path="../../../typings/_custom.d.ts" />
2+
3+
import {Pipe, PipeFactory} from 'angular2/angular2';
4+
import {ObservablePipe} from 'angular2/pipes';
5+
import {async} from 'angular2/src/change_detection/change_detection';
6+
import * as Rx from 'rx';
7+
8+
export function isObservable(obs) {
9+
return obs && typeof obs.subscribe === 'function';
10+
}
11+
12+
//upgrade async pipe with Rx support
13+
export class RxPipe extends ObservablePipe {
14+
_subscription: any;
15+
_observable: any;
16+
constructor(ref) { super(ref); }
17+
supports(obs) { return isObservable(obs); }
18+
_subscribe(obs) {
19+
this._observable = obs;
20+
this._subscription = obs.subscribe(
21+
value => this._updateLatestValue(value),
22+
e => { throw e; }
23+
);
24+
}
25+
transform(value: any, args?: List<any>): any {
26+
return super.transform(value, args);
27+
}
28+
onDestroy(): void {
29+
return super.onDestroy();
30+
}
31+
}
32+
33+
export class RxPipeFactory implements PipeFactory {
34+
supports(obs) { return isObservable(obs); }
35+
create(cdRef): Pipe { return new RxPipe(cdRef); }
36+
}
37+
38+
export var rxAsync = [ new RxPipeFactory() ].concat(async);

0 commit comments

Comments
 (0)