Skip to content

Commit 8cc7785

Browse files
committed
As per celery#5605, the Consul backend does not cleanly associate responses
from Consul with the outbound Celery request that caused it. This leaves it prone to mistaking the (final) response from an operation N as the response to an (early) part of operation N + 1. This changes fix that by using a separate connection for each request. That of course has the downside of (a) being relatively expensive and (b) increasing the rate of connection requests into Consul: - The former is annoying, but at least the backend works reliably. - The latter can cause Consul to reject excessive connection attempt, but if it does, at least it returns a clear indication of this (IIRC, it responds with an HTTP 429"too many connections" indication). Additionally, this issue can be ameliorated by enabling retries in the python-consul2 (which I believe should be turned on regards less to handle transient network issues). This is fixed by the PR in https:/github.com/poppyred/python-consul2/pull/31. Note that we have never seen (b) outside a test specifically trying to hammer the system, but we see (a) all the time in our normal system tests. To opt-out from the new behaviour add a parameter "one_client=1" to the connection URL.
1 parent 117cd9c commit 8cc7785

File tree

2 files changed

+30
-14
lines changed

2 files changed

+30
-14
lines changed

celery/backends/consul.py

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ class ConsulBackend(KeyValueStoreBackend):
3131

3232
supports_autoexpire = True
3333

34-
client = None
3534
consistency = 'consistent'
3635
path = None
3736

@@ -40,15 +39,33 @@ def __init__(self, *args, **kwargs):
4039

4140
if self.consul is None:
4241
raise ImproperlyConfigured(CONSUL_MISSING)
43-
42+
#
43+
# By default, for correctness, we use a client connection per
44+
# operation. If set, self.one_client will be used for all operations.
45+
# This provides for the original behaviour to be selected, and is
46+
# also convenient for mocking in the unit tests.
47+
#
48+
self.one_client = None
4449
self._init_from_params(**parse_url(self.url))
4550

4651
def _init_from_params(self, hostname, port, virtual_host, **params):
4752
logger.debug('Setting on Consul client to connect to %s:%d',
4853
hostname, port)
4954
self.path = virtual_host
50-
self.client = consul.Consul(host=hostname, port=port,
51-
consistency=self.consistency)
55+
self.hostname = hostname
56+
self.port = port
57+
#
58+
# Optionally, allow a single client connection to be used to reduce
59+
# the connection load on Consul by adding a "one_client=1" parameter
60+
# to the URL.
61+
#
62+
if params.get('one_client', None):
63+
self.one_client = self.client()
64+
65+
def client(self):
66+
return self.one_client or consul.Consul(host=self.hostname,
67+
port=self.port,
68+
consistency=self.consistency)
5269

5370
def _key_to_consul_key(self, key):
5471
key = bytes_to_str(key)
@@ -58,7 +75,7 @@ def get(self, key):
5875
key = self._key_to_consul_key(key)
5976
logger.debug('Trying to fetch key %s from Consul', key)
6077
try:
61-
_, data = self.client.kv.get(key)
78+
_, data = self.client().kv.get(key)
6279
return data['Value']
6380
except TypeError:
6481
pass
@@ -84,17 +101,16 @@ def set(self, key, value):
84101

85102
logger.debug('Trying to create Consul session %s with TTL %d',
86103
session_name, self.expires)
87-
session_id = self.client.session.create(name=session_name,
88-
behavior='delete',
89-
ttl=self.expires)
104+
client = self.client()
105+
session_id = client.session.create(name=session_name,
106+
behavior='delete',
107+
ttl=self.expires)
90108
logger.debug('Created Consul session %s', session_id)
91109

92110
logger.debug('Writing key %s to Consul', key)
93-
return self.client.kv.put(key=key,
94-
value=value,
95-
acquire=session_id)
111+
return client.kv.put(key=key, value=value, acquire=session_id)
96112

97113
def delete(self, key):
98114
key = self._key_to_consul_key(key)
99115
logger.debug('Removing key %s from Consul', key)
100-
return self.client.kv.delete(key)
116+
return self.client().kv.delete(key)

t/unit/backends/test_consul.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ def test_consul_consistency(self):
2222
def test_get(self):
2323
index = 100
2424
data = {'Key': 'test-consul-1', 'Value': 'mypayload'}
25-
self.backend.client = Mock(name='c.client')
26-
self.backend.client.kv.get.return_value = (index, data)
25+
self.backend.one_client = Mock(name='c.client')
26+
self.backend.one_client.kv.get.return_value = (index, data)
2727
assert self.backend.get(data['Key']) == 'mypayload'
2828

2929
def test_index_bytes_key(self):

0 commit comments

Comments
 (0)