Skip to content

Commit 358a8e1

Browse files
xargmiguelgrinberg
authored andcommitted
Made queues non-durable. Always retry after disconnect. (#120)
1 parent 97f4faa commit 358a8e1

File tree

1 file changed

+21
-6
lines changed

1 file changed

+21
-6
lines changed

socketio/kombu_manager.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,18 +71,33 @@ def _exchange(self):
7171
def _queue(self):
7272
queue_name = 'flask-socketio.' + str(uuid.uuid4())
7373
return kombu.Queue(queue_name, self._exchange(),
74+
durable=False,
7475
queue_arguments={'x-expires': 300000})
7576

7677
def _producer(self):
7778
return self._connection().Producer(exchange=self._exchange())
7879

80+
def __error_callback(self, exception, interval):
81+
self.server.logger.exception('Sleeping {}s'.format(interval))
82+
7983
def _publish(self, data):
80-
self.producer.publish(pickle.dumps(data))
84+
connection = self._connection()
85+
publish = connection.ensure(self.producer, self.producer.publish,
86+
errback=self.__error_callback)
87+
publish(pickle.dumps(data))
8188

8289
def _listen(self):
8390
reader_queue = self._queue()
84-
with self._connection().SimpleQueue(reader_queue) as queue:
85-
while True:
86-
message = queue.get(block=True)
87-
message.ack()
88-
yield message.payload
91+
92+
while True:
93+
connection = self._connection().ensure_connection(
94+
errback=self.__error_callback)
95+
try:
96+
with connection.SimpleQueue(reader_queue) as queue:
97+
while True:
98+
message = queue.get(block=True)
99+
message.ack()
100+
yield message.payload
101+
except connection.connection_errors:
102+
self.server.logger.exception("Connection error "
103+
"while reading from queue")

0 commit comments

Comments
 (0)