|
1 | | -from asyncio import ensure_future |
| 1 | +from asyncio import ensure_future, wait, CancelledError |
2 | 2 | from inspect import isasyncgen |
3 | 3 |
|
4 | 4 | from rx import AnonymousObserver, Observable |
|
7 | 7 | ObserverBase) |
8 | 8 | from rx.core.anonymousobserver import AnonymousObserver |
9 | 9 | from rx.core.autodetachobserver import AutoDetachObserver |
| 10 | +from rx import AnonymousObservable |
10 | 11 |
|
11 | 12 |
|
12 | 13 | # class AsyncgenDisposable(Disposable): |
@@ -102,33 +103,24 @@ def set_disposable(scheduler=None, value=None): |
102 | 103 | else: |
103 | 104 | auto_detach_observer.disposable = fix_subscriber(subscriber) |
104 | 105 |
|
105 | | - # Subscribe needs to set up the trampoline before for subscribing. |
106 | | - # Actually, the first call to Subscribe creates the trampoline so |
107 | | - # that it may assign its disposable before any observer executes |
108 | | - # OnNext over the CurrentThreadScheduler. This enables single- |
109 | | - # threaded cancellation |
110 | | - # https://social.msdn.microsoft.com/Forums/en-US/eb82f593-9684-4e27- |
111 | | - # 97b9-8b8886da5c33/whats-the-rationale-behind-how-currentthreadsche |
112 | | - # dulerschedulerequired-behaves?forum=rx |
113 | | - if current_thread_scheduler.schedule_required(): |
114 | | - current_thread_scheduler.schedule(set_disposable) |
115 | | - else: |
116 | | - set_disposable() |
| 106 | + def dispose(): |
| 107 | + async def await_task(): |
| 108 | + await task |
117 | 109 |
|
118 | | - # Hide the identity of the auto detach observer |
119 | | - return Disposable.create(auto_detach_observer.dispose) |
| 110 | + task.cancel() |
| 111 | + ensure_future(await_task(), loop=loop) |
120 | 112 |
|
| 113 | + return dispose |
121 | 114 |
|
122 | | -def asyncgen_to_observable(asyncgen): |
123 | | - def emit(observer): |
124 | | - ensure_future(iterate_asyncgen(asyncgen, observer)) |
125 | | - return AsyncgenObservable(emit, asyncgen) |
| 115 | + return AnonymousObservable(emit) |
126 | 116 |
|
127 | 117 |
|
128 | 118 | async def iterate_asyncgen(asyncgen, observer): |
129 | 119 | try: |
130 | 120 | async for item in asyncgen: |
131 | 121 | observer.on_next(item) |
132 | 122 | observer.on_completed() |
| 123 | + except CancelledError: |
| 124 | + pass |
133 | 125 | except Exception as e: |
134 | 126 | observer.on_error(e) |
0 commit comments