|
1 | 1 | from inspect import isasyncgen |
2 | | -from asyncio import ensure_future |
3 | | -from rx import Observable, AnonymousObserver |
4 | | -from rx.core import ObservableBase, Disposable, ObserverBase |
| 2 | +from asyncio import ensure_future, wait, CancelledError |
| 3 | +from rx import AnonymousObservable |
5 | 4 |
|
6 | | -from rx.concurrency import current_thread_scheduler |
7 | 5 |
|
8 | | -from rx.core import Observer, Observable, Disposable |
9 | | -from rx.core.anonymousobserver import AnonymousObserver |
10 | | -from rx.core.autodetachobserver import AutoDetachObserver |
11 | | - |
12 | | - |
13 | | -# class AsyncgenDisposable(Disposable): |
14 | | -# """Represents a Disposable that disposes the asyncgen automatically.""" |
15 | | - |
16 | | -# def __init__(self, asyncgen): |
17 | | -# """Initializes a new instance of the AsyncgenDisposable class.""" |
18 | | - |
19 | | -# self.asyncgen = asyncgen |
20 | | -# self.is_disposed = False |
21 | | - |
22 | | -# super(AsyncgenDisposable, self).__init__() |
23 | | - |
24 | | -# def dispose(self): |
25 | | -# """Sets the status to disposed""" |
26 | | -# self.asyncgen.aclose() |
27 | | -# self.is_disposed = True |
28 | | - |
29 | | - |
30 | | -class AsyncgenObserver(AutoDetachObserver): |
31 | | - def __init__(self, asyncgen, *args, **kwargs): |
32 | | - self._asyncgen = asyncgen |
33 | | - self.is_disposed = False |
34 | | - super(AsyncgenObserver, self).__init__(*args, **kwargs) |
35 | | - |
36 | | - async def dispose_asyncgen(self): |
37 | | - if self.is_disposed: |
38 | | - return |
39 | | - |
40 | | - try: |
41 | | - # await self._asyncgen.aclose() |
42 | | - await self._asyncgen.athrow(StopAsyncIteration) |
43 | | - self.is_disposed = True |
44 | | - except: |
45 | | - pass |
46 | | - |
47 | | - def dispose(self): |
48 | | - if self.is_disposed: |
49 | | - return |
50 | | - disposed = super(AsyncgenObserver, self).dispose() |
51 | | - # print("DISPOSE observer!", disposed) |
52 | | - ensure_future(self.dispose_asyncgen()) |
53 | | - |
54 | | - |
55 | | -class AsyncgenObservable(ObservableBase): |
56 | | - """Class to create an Observable instance from a delegate-based |
57 | | - implementation of the Subscribe method.""" |
58 | | - |
59 | | - def __init__(self, subscribe, asyncgen): |
60 | | - """Creates an observable sequence object from the specified |
61 | | - subscription function. |
62 | | -
|
63 | | - Keyword arguments: |
64 | | - :param types.FunctionType subscribe: Subscribe method implementation. |
65 | | - """ |
66 | | - |
67 | | - self._subscribe = subscribe |
68 | | - self._asyncgen = asyncgen |
69 | | - super(AsyncgenObservable, self).__init__() |
70 | | - |
71 | | - def _subscribe_core(self, observer): |
72 | | - # print("GET SUBSCRIBER", observer) |
73 | | - return self._subscribe(observer) |
74 | | - # print("SUBSCRIBER RESULT", subscriber) |
75 | | - # return subscriber |
76 | | - |
77 | | - def subscribe(self, on_next=None, on_error=None, on_completed=None, observer=None): |
78 | | - |
79 | | - if isinstance(on_next, Observer): |
80 | | - observer = on_next |
81 | | - elif hasattr(on_next, "on_next") and callable(on_next.on_next): |
82 | | - observer = on_next |
83 | | - elif not observer: |
84 | | - observer = AnonymousObserver(on_next, on_error, on_completed) |
85 | | - |
86 | | - auto_detach_observer = AsyncgenObserver(self._asyncgen, observer) |
87 | | - |
88 | | - def fix_subscriber(subscriber): |
89 | | - """Fixes subscriber to make sure it returns a Disposable instead |
90 | | - of None or a dispose function""" |
91 | | - |
92 | | - if not hasattr(subscriber, "dispose"): |
93 | | - subscriber = Disposable.create(subscriber) |
94 | | - |
95 | | - return subscriber |
96 | | - |
97 | | - def set_disposable(scheduler=None, value=None): |
98 | | - try: |
99 | | - subscriber = self._subscribe_core(auto_detach_observer) |
100 | | - except Exception as ex: |
101 | | - if not auto_detach_observer.fail(ex): |
102 | | - raise |
103 | | - else: |
104 | | - auto_detach_observer.disposable = fix_subscriber(subscriber) |
| 6 | +def asyncgen_to_observable(asyncgen, loop=None): |
| 7 | + def emit(observer): |
| 8 | + task = ensure_future( |
| 9 | + iterate_asyncgen(asyncgen, observer), |
| 10 | + loop=loop) |
105 | 11 |
|
106 | | - # Subscribe needs to set up the trampoline before for subscribing. |
107 | | - # Actually, the first call to Subscribe creates the trampoline so |
108 | | - # that it may assign its disposable before any observer executes |
109 | | - # OnNext over the CurrentThreadScheduler. This enables single- |
110 | | - # threaded cancellation |
111 | | - # https://social.msdn.microsoft.com/Forums/en-US/eb82f593-9684-4e27- |
112 | | - # 97b9-8b8886da5c33/whats-the-rationale-behind-how-currentthreadsche |
113 | | - # dulerschedulerequired-behaves?forum=rx |
114 | | - if current_thread_scheduler.schedule_required(): |
115 | | - current_thread_scheduler.schedule(set_disposable) |
116 | | - else: |
117 | | - set_disposable() |
| 12 | + def dispose(): |
| 13 | + async def await_task(): |
| 14 | + await task |
118 | 15 |
|
119 | | - # Hide the identity of the auto detach observer |
120 | | - return Disposable.create(auto_detach_observer.dispose) |
| 16 | + task.cancel() |
| 17 | + ensure_future(await_task(), loop=loop) |
121 | 18 |
|
| 19 | + return dispose |
122 | 20 |
|
123 | | -def asyncgen_to_observable(asyncgen): |
124 | | - def emit(observer): |
125 | | - ensure_future(iterate_asyncgen(asyncgen, observer)) |
126 | | - return AsyncgenObservable(emit, asyncgen) |
| 21 | + return AnonymousObservable(emit) |
127 | 22 |
|
128 | 23 |
|
129 | 24 | async def iterate_asyncgen(asyncgen, observer): |
130 | 25 | try: |
131 | 26 | async for item in asyncgen: |
132 | 27 | observer.on_next(item) |
133 | 28 | observer.on_completed() |
| 29 | + except CancelledError: |
| 30 | + pass |
134 | 31 | except Exception as e: |
135 | 32 | observer.on_error(e) |
| 33 | + |
0 commit comments