|
1 | 1 | import asyncio |
2 | 2 | import inspect |
3 | | -from typing import TYPE_CHECKING, Any, Coroutine, Deque, Generic, TypeVar, Union |
| 3 | +from typing import TYPE_CHECKING, Any, Coroutine, Generic, Tuple, TypeVar, Union |
4 | 4 |
|
5 | 5 | _T = TypeVar("_T") # noqa: WPS111 |
6 | 6 |
|
@@ -38,70 +38,52 @@ def remove_suffix(text: str, suffix: str) -> str: |
38 | 38 | return text |
39 | 39 |
|
40 | 40 |
|
41 | | -class DequeQueue( |
42 | | - asyncio.Queue, # type: ignore |
43 | | - Generic[_T], |
44 | | -): |
45 | | - """Deque based Queue.""" |
46 | | - |
47 | | - if TYPE_CHECKING: # noqa: WPS604 |
48 | | - _loop: asyncio.BaseEventLoop |
49 | | - _queue: Deque[_T] |
50 | | - _putters: Deque[Any] |
51 | | - _getters: Deque[Any] |
52 | | - _unfinished_tasks: int |
53 | | - _finished: asyncio.Event |
54 | | - _wakeup_next: Any |
| 41 | +class PriorityQueue(asyncio.PriorityQueue, Generic[_T]): # type: ignore |
| 42 | + """PriorityQueue based Queue.""" |
55 | 43 |
|
56 | 44 | async def put_first(self, item: _T) -> None: |
57 | 45 | """ |
58 | | - Wait till queue is not full. Put item in Queue as soon as possible. LIFO style. |
| 46 | + Put item in Queue with highest priority. |
59 | 47 |
|
60 | 48 | :param item: value to prepend |
61 | | - :raises BaseException: something goes wrong |
62 | | - :returns: nothing |
63 | 49 | """ |
64 | | - while self.full(): |
65 | | - putter = self._loop.create_future() |
66 | | - self._putters.appendleft(putter) |
67 | | - try: |
68 | | - await putter |
69 | | - except BaseException: # noqa: WPS424 |
70 | | - putter.cancel() # Just in case putter is not done yet. |
71 | | - try: # noqa: WPS505 |
72 | | - # Clean self._putters from canceled putters. |
73 | | - self._putters.remove(putter) |
74 | | - except ValueError: |
75 | | - # The putter could be removed from self._putters by a |
76 | | - # previous get_nowait call. |
77 | | - pass # noqa: WPS420 |
78 | | - if not self.full() and not putter.cancelled(): |
79 | | - # We were woken up by get_nowait(), but can't take |
80 | | - # the call. Wake up the next in line. |
81 | | - self._wakeup_next(self._putters) |
82 | | - raise |
83 | | - |
84 | | - return self.put_first_nowait(item) |
85 | | - |
86 | | - def put_first_nowait(self, item: _T) -> None: |
| 50 | + self.counter += 1 |
| 51 | + await self.put((0, self.counter, item)) |
| 52 | + |
| 53 | + async def put_last(self, item: _T) -> None: |
87 | 54 | """ |
88 | | - Put item in Queue as soon as possible. LIFO style. |
| 55 | + Put item in Queue with lowest priority. |
89 | 56 |
|
90 | | - :param item: value to prepend |
91 | | - :raises QueueFull: queue is full |
| 57 | + :param item: value to append |
92 | 58 | """ |
93 | | - if self.full(): |
94 | | - raise asyncio.QueueFull() |
| 59 | + self.counter += 1 |
| 60 | + await self.put((1, self.counter, item)) |
95 | 61 |
|
96 | | - self._put_first(item) |
97 | | - self._unfinished_tasks += 1 |
98 | | - self._finished.clear() |
99 | | - self._wakeup_next(self._getters) |
| 62 | + def _init(self, maxsize: int) -> None: |
| 63 | + super()._init(maxsize) |
| 64 | + self.counter = 0 |
100 | 65 |
|
101 | | - def _put_first(self, item: _T) -> None: |
102 | | - """ |
103 | | - Prepend item. |
104 | 66 |
|
105 | | - :param item: value to prepend |
106 | | - """ |
107 | | - self._queue.appendleft(item) |
| 67 | +if TYPE_CHECKING: # pragma: no cover |
| 68 | + |
| 69 | + class PriorityQueue( # type: ignore # noqa: F811 |
| 70 | + asyncio.PriorityQueue[Tuple[int, int, _T]], |
| 71 | + Generic[_T], |
| 72 | + ): |
| 73 | + """PriorityQueue based Queue.""" |
| 74 | + |
| 75 | + async def put_first(self, item: _T) -> None: |
| 76 | + """ |
| 77 | + Put item in Queue with highest priority. |
| 78 | +
|
| 79 | + :param item: value to prepend |
| 80 | + """ |
| 81 | + ... |
| 82 | + |
| 83 | + async def put_last(self, item: _T) -> None: |
| 84 | + """ |
| 85 | + Put item in Queue with lowest priority. |
| 86 | +
|
| 87 | + :param item: value to append |
| 88 | + """ |
| 89 | + ... |
0 commit comments