|
1 | | -import pickle |
2 | | -from abc import abstractmethod |
3 | 1 | from logging import getLogger |
4 | 2 | from typing import Any, AsyncGenerator, Callable, Optional, TypeVar |
5 | 3 |
|
@@ -49,68 +47,66 @@ def __init__( |
49 | 47 |
|
50 | 48 | async def shutdown(self) -> None: |
51 | 49 | """Closes redis connection pool.""" |
| 50 | + await super().shutdown() |
52 | 51 | await self.connection_pool.disconnect() |
53 | 52 |
|
54 | | - async def listen(self) -> AsyncGenerator[BrokerMessage, None]: |
55 | | - """ |
56 | | - Listen redis queue for new messages. |
57 | 53 |
|
58 | | - This function listens to the queue |
59 | | - and yields new messages if they have BrokerMessage type. |
| 54 | +class PubSubBroker(BaseRedisBroker): |
| 55 | + """Broker that works with Redis and broadcasts tasks to all workers.""" |
60 | 56 |
|
61 | | - :yields: broker messages. |
62 | | - """ |
63 | | - async for message in self._listen_to_raw_messages(): |
64 | | - try: |
65 | | - redis_message = pickle.loads(message) |
66 | | - if isinstance(redis_message, BrokerMessage): |
67 | | - yield redis_message |
68 | | - except ( |
69 | | - TypeError, |
70 | | - AttributeError, |
71 | | - pickle.UnpicklingError, |
72 | | - ) as exc: |
73 | | - logger.debug( |
74 | | - "Cannot read broker message %s", |
75 | | - exc, |
76 | | - exc_info=True, |
77 | | - ) |
78 | | - |
79 | | - @abstractmethod |
80 | | - async def _listen_to_raw_messages(self) -> AsyncGenerator[bytes, None]: |
| 57 | + async def kick(self, message: BrokerMessage) -> None: |
81 | 58 | """ |
82 | | - Generator for reading raw data from Redis. |
| 59 | + Publish message over PUBSUB channel. |
83 | 60 |
|
84 | | - :yields: raw data. |
| 61 | + :param message: message to send. |
85 | 62 | """ |
86 | | - yield # type: ignore |
87 | | - |
| 63 | + async with Redis(connection_pool=self.connection_pool) as redis_conn: |
| 64 | + await redis_conn.publish(self.queue_name, message.message) |
88 | 65 |
|
89 | | -class PubSubBroker(BaseRedisBroker): |
90 | | - """Broker that works with Redis and broadcasts tasks to all workers.""" |
| 66 | + async def listen(self) -> AsyncGenerator[bytes, None]: |
| 67 | + """ |
| 68 | + Listen redis queue for new messages. |
91 | 69 |
|
92 | | - async def kick(self, message: BrokerMessage) -> None: # noqa: D102 |
93 | | - async with Redis(connection_pool=self.connection_pool) as redis_conn: |
94 | | - await redis_conn.publish(self.queue_name, pickle.dumps(message)) |
| 70 | + This function listens to the pubsub channel |
| 71 | + and yields all messages with proper types. |
95 | 72 |
|
96 | | - async def _listen_to_raw_messages(self) -> AsyncGenerator[bytes, None]: |
| 73 | + :yields: broker messages. |
| 74 | + """ |
97 | 75 | async with Redis(connection_pool=self.connection_pool) as redis_conn: |
98 | 76 | redis_pubsub_channel = redis_conn.pubsub() |
99 | 77 | await redis_pubsub_channel.subscribe(self.queue_name) |
100 | 78 | async for message in redis_pubsub_channel.listen(): |
101 | 79 | if not message: |
102 | 80 | continue |
| 81 | + if message["type"] != "message": |
| 82 | + logger.debug("Received non-message from redis: %s", message) |
| 83 | + continue |
103 | 84 | yield message["data"] |
104 | 85 |
|
105 | 86 |
|
106 | 87 | class ListQueueBroker(BaseRedisBroker): |
107 | 88 | """Broker that works with Redis and distributes tasks between workers.""" |
108 | 89 |
|
109 | | - async def kick(self, message: BrokerMessage) -> None: # noqa: D102 |
| 90 | + async def kick(self, message: BrokerMessage) -> None: |
| 91 | + """ |
| 92 | + Put a message in a list. |
| 93 | +
|
| 94 | + This method appends a message to the list of all messages. |
| 95 | +
|
| 96 | + :param message: message to append. |
| 97 | + """ |
110 | 98 | async with Redis(connection_pool=self.connection_pool) as redis_conn: |
111 | | - await redis_conn.lpush(self.queue_name, pickle.dumps(message)) |
| 99 | + await redis_conn.lpush(self.queue_name, message.message) |
112 | 100 |
|
113 | | - async def _listen_to_raw_messages(self) -> AsyncGenerator[bytes, None]: |
| 101 | + async def listen(self) -> AsyncGenerator[bytes, None]: |
| 102 | + """ |
| 103 | + Listen redis queue for new messages. |
| 104 | +
|
| 105 | + This function listens to the queue |
| 106 | + and yields new messages if they have BrokerMessage type. |
| 107 | +
|
| 108 | + :yields: broker messages. |
| 109 | + """ |
114 | 110 | redis_brpop_data_position = 1 |
115 | 111 | async with Redis(connection_pool=self.connection_pool) as redis_conn: |
116 | 112 | while True: # noqa: WPS457 |
|
0 commit comments