Skip to content

Commit f6d28a9

Browse files
ShaheedHaquejeyrce
authored andcommitted
The Consul backend must correctly associate requests and responses (celery#6823)
* As per celery#5605, the Consul backend does not cleanly associate responses from Consul with the outbound Celery request that caused it. This leaves it prone to mistaking the (final) response from an operation N as the response to an (early) part of operation N + 1. This changes fix that by using a separate connection for each request. That of course has the downside of (a) being relatively expensive and (b) increasing the rate of connection requests into Consul: - The former is annoying, but at least the backend works reliably. - The latter can cause Consul to reject excessive connection attempt, but if it does, at least it returns a clear indication of this (IIRC, it responds with an HTTP 429"too many connections" indication). Additionally, this issue can be ameliorated by enabling retries in the python-consul2 (which I believe should be turned on regards less to handle transient network issues). This is fixed by the PR in https:/github.com/poppyred/python-consul2/pull/31. Note that we have never seen (b) outside a test specifically trying to hammer the system, but we see (a) all the time in our normal system tests. To opt-out from the new behaviour add a parameter "one_client=1" to the connection URL. * Increase code coverage. * Rewrite Consul backend documentation, and describe the options now available.
1 parent ad2d961 commit f6d28a9

File tree

3 files changed

+83
-18
lines changed

3 files changed

+83
-18
lines changed

celery/backends/consul.py

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ class ConsulBackend(KeyValueStoreBackend):
3131

3232
supports_autoexpire = True
3333

34-
client = None
3534
consistency = 'consistent'
3635
path = None
3736

@@ -40,15 +39,33 @@ def __init__(self, *args, **kwargs):
4039

4140
if self.consul is None:
4241
raise ImproperlyConfigured(CONSUL_MISSING)
43-
42+
#
43+
# By default, for correctness, we use a client connection per
44+
# operation. If set, self.one_client will be used for all operations.
45+
# This provides for the original behaviour to be selected, and is
46+
# also convenient for mocking in the unit tests.
47+
#
48+
self.one_client = None
4449
self._init_from_params(**parse_url(self.url))
4550

4651
def _init_from_params(self, hostname, port, virtual_host, **params):
4752
logger.debug('Setting on Consul client to connect to %s:%d',
4853
hostname, port)
4954
self.path = virtual_host
50-
self.client = consul.Consul(host=hostname, port=port,
51-
consistency=self.consistency)
55+
self.hostname = hostname
56+
self.port = port
57+
#
58+
# Optionally, allow a single client connection to be used to reduce
59+
# the connection load on Consul by adding a "one_client=1" parameter
60+
# to the URL.
61+
#
62+
if params.get('one_client', None):
63+
self.one_client = self.client()
64+
65+
def client(self):
66+
return self.one_client or consul.Consul(host=self.hostname,
67+
port=self.port,
68+
consistency=self.consistency)
5269

5370
def _key_to_consul_key(self, key):
5471
key = bytes_to_str(key)
@@ -58,7 +75,7 @@ def get(self, key):
5875
key = self._key_to_consul_key(key)
5976
logger.debug('Trying to fetch key %s from Consul', key)
6077
try:
61-
_, data = self.client.kv.get(key)
78+
_, data = self.client().kv.get(key)
6279
return data['Value']
6380
except TypeError:
6481
pass
@@ -84,17 +101,16 @@ def set(self, key, value):
84101

85102
logger.debug('Trying to create Consul session %s with TTL %d',
86103
session_name, self.expires)
87-
session_id = self.client.session.create(name=session_name,
88-
behavior='delete',
89-
ttl=self.expires)
104+
client = self.client()
105+
session_id = client.session.create(name=session_name,
106+
behavior='delete',
107+
ttl=self.expires)
90108
logger.debug('Created Consul session %s', session_id)
91109

92110
logger.debug('Writing key %s to Consul', key)
93-
return self.client.kv.put(key=key,
94-
value=value,
95-
acquire=session_id)
111+
return client.kv.put(key=key, value=value, acquire=session_id)
96112

97113
def delete(self, key):
98114
key = self._key_to_consul_key(key)
99115
logger.debug('Removing key %s from Consul', key)
100-
return self.client.kv.delete(key)
116+
return self.client().kv.delete(key)

docs/userguide/configuration.rst

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2016,14 +2016,52 @@ without any further configuration. For larger clusters you could use NFS,
20162016
Consul K/V store backend settings
20172017
---------------------------------
20182018

2019-
The Consul backend can be configured using a URL, for example:
2019+
.. note::
2020+
2021+
The Consul backend requires the :pypi:`python-consul2` library:
2022+
2023+
To install this package use :command:`pip`:
2024+
2025+
.. code-block:: console
2026+
2027+
$ pip install python-consul2
2028+
2029+
The Consul backend can be configured using a URL, for example::
20202030

20212031
CELERY_RESULT_BACKEND = 'consul://localhost:8500/'
20222032

2023-
The backend will storage results in the K/V store of Consul
2024-
as individual keys.
2033+
or::
2034+
2035+
result_backend = 'consul://localhost:8500/'
2036+
2037+
The backend will store results in the K/V store of Consul
2038+
as individual keys. The backend supports auto expire of results using TTLs in
2039+
Consul. The full syntax of the URL is::
2040+
2041+
consul://host:port[?one_client=1]
2042+
2043+
The URL is formed out of the following parts:
2044+
2045+
* ``host``
2046+
2047+
Host name of the Consul server.
2048+
2049+
* ``port``
2050+
2051+
The port the Consul server is listening to.
2052+
2053+
* ``one_client``
2054+
2055+
By default, for correctness, the backend uses a separate client connection
2056+
per operation. In cases of extreme load, the rate of creation of new
2057+
connections can cause HTTP 429 "too many connections" error responses from
2058+
the Consul server when under load. The recommended way to handle this is to
2059+
enable retries in ``python-consul2`` using the patch at
2060+
https://github.com/poppyred/python-consul2/pull/31.
20252061

2026-
The backend supports auto expire of results using TTLs in Consul.
2062+
Alternatively, if ``one_client`` is set, a single client connection will be
2063+
used for all operations instead. This should eliminate the HTTP 429 errors,
2064+
but the storage of results in the backend can become unreliable.
20272065

20282066
.. _conf-messaging:
20292067

t/unit/backends/test_consul.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,21 @@ def test_consul_consistency(self):
2222
def test_get(self):
2323
index = 100
2424
data = {'Key': 'test-consul-1', 'Value': 'mypayload'}
25-
self.backend.client = Mock(name='c.client')
26-
self.backend.client.kv.get.return_value = (index, data)
25+
self.backend.one_client = Mock(name='c.client')
26+
self.backend.one_client.kv.get.return_value = (index, data)
2727
assert self.backend.get(data['Key']) == 'mypayload'
2828

29+
def test_set(self):
30+
self.backend.one_client = Mock(name='c.client')
31+
self.backend.one_client.session.create.return_value = 'c8dfa770-4ea3-2ee9-d141-98cf0bfe9c59'
32+
self.backend.one_client.kv.put.return_value = True
33+
assert self.backend.set('Key', 'Value') is True
34+
35+
def test_delete(self):
36+
self.backend.one_client = Mock(name='c.client')
37+
self.backend.one_client.kv.delete.return_value = True
38+
assert self.backend.delete('Key') is True
39+
2940
def test_index_bytes_key(self):
3041
key = 'test-consul-2'
3142
assert self.backend._key_to_consul_key(key) == key

0 commit comments

Comments
 (0)