11from typing import List , Dict , Set
22
3- import redis
4- import valkey
5-
6- from .connection_types import RedisSentinel , BrokerConnectionClass
3+ from .broker_types import ConnectionErrorTypes , BrokerMetaData
74from .rq_classes import JobExecution , DjangoQueue , DjangoWorker
85from .settings import SCHEDULER_CONFIG
96from .settings import logger , Broker
@@ -28,31 +25,32 @@ class QueueNotFoundError(Exception):
2825 pass
2926
3027
31- def _get_redis_connection (config , use_strict_redis = False ):
28+ def _get_broker_connection (config , use_strict_broker = False ):
3229 """
3330 Returns a redis connection from a connection config
3431 """
3532 if SCHEDULER_CONFIG .BROKER == Broker .FAKEREDIS :
3633 import fakeredis
3734
38- redis_cls = fakeredis .FakeRedis if use_strict_redis else fakeredis .FakeStrictRedis
35+ broker_cls = fakeredis .FakeRedis if not use_strict_broker else fakeredis .FakeStrictRedis
3936 else :
40- redis_cls = BrokerConnectionClass [(SCHEDULER_CONFIG .BROKER , use_strict_redis ) ]
37+ broker_cls = BrokerMetaData [(SCHEDULER_CONFIG .BROKER , use_strict_broker )][ 0 ]
4138 logger .debug (f"Getting connection for { config } " )
4239 if "URL" in config :
43- if config .get ("SSL" ) or config .get ("URL" ).startswith ("rediss://" ):
44- return redis_cls .from_url (
40+ ssl_url_protocol = BrokerMetaData [(SCHEDULER_CONFIG .BROKER , use_strict_broker )][2 ]
41+ if config .get ("SSL" ) or config .get ("URL" ).startswith (f"{ ssl_url_protocol } ://" ):
42+ return broker_cls .from_url (
4543 config ["URL" ],
4644 db = config .get ("DB" ),
4745 ssl_cert_reqs = config .get ("SSL_CERT_REQS" , "required" ),
4846 )
4947 else :
50- return redis_cls .from_url (
48+ return broker_cls .from_url (
5149 config ["URL" ],
5250 db = config .get ("DB" ),
5351 )
5452 if "UNIX_SOCKET_PATH" in config :
55- return redis_cls (unix_socket_path = config ["UNIX_SOCKET_PATH" ], db = config ["DB" ])
53+ return broker_cls (unix_socket_path = config ["UNIX_SOCKET_PATH" ], db = config ["DB" ])
5654
5755 if "SENTINELS" in config :
5856 connection_kwargs = {
@@ -63,13 +61,14 @@ def _get_redis_connection(config, use_strict_redis=False):
6361 }
6462 connection_kwargs .update (config .get ("CONNECTION_KWARGS" , {}))
6563 sentinel_kwargs = config .get ("SENTINEL_KWARGS" , {})
66- sentinel = RedisSentinel (config ["SENTINELS" ], sentinel_kwargs = sentinel_kwargs , ** connection_kwargs )
64+ SentinelClass = BrokerMetaData [(SCHEDULER_CONFIG .BROKER , use_strict_broker )][1 ]
65+ sentinel = SentinelClass (config ["SENTINELS" ], sentinel_kwargs = sentinel_kwargs , ** connection_kwargs )
6766 return sentinel .master_for (
6867 service_name = config ["MASTER_NAME" ],
69- redis_class = redis_cls ,
68+ redis_class = broker_cls ,
7069 )
7170
72- return redis_cls (
71+ return broker_cls (
7372 host = config ["HOST" ],
7473 port = config ["PORT" ],
7574 db = config .get ("DB" , 0 ),
@@ -82,8 +81,8 @@ def _get_redis_connection(config, use_strict_redis=False):
8281
8382
8483def get_connection (queue_settings , use_strict_redis = False ):
85- """Returns a Redis connection to use based on parameters in SCHEDULER_QUEUES"""
86- return _get_redis_connection (queue_settings , use_strict_redis )
84+ """Returns a Broker connection to use based on parameters in SCHEDULER_QUEUES"""
85+ return _get_broker_connection (queue_settings , use_strict_redis )
8786
8887
8988def get_queue (
@@ -116,7 +115,7 @@ def get_all_workers() -> Set[DjangoWorker]:
116115 try :
117116 curr_workers : Set [DjangoWorker ] = set (DjangoWorker .all (connection = connection ))
118117 workers_set .update (curr_workers )
119- except ( redis . ConnectionError , valkey . ConnectionError ) as e :
118+ except ConnectionErrorTypes as e :
120119 logger .error (f"Could not connect for queue { queue_name } : { e } " )
121120 return workers_set
122121
@@ -142,7 +141,7 @@ def get_queues(*queue_names, **kwargs) -> List[DjangoQueue]:
142141 for name in queue_names [1 :]:
143142 if not _queues_share_connection_params (queue_params , QUEUES [name ]):
144143 raise ValueError (
145- f'Queues must have the same redis connection. "{ name } " and'
144+ f'Queues must have the same broker connection. "{ name } " and'
146145 f' "{ queue_names [0 ]} " have different connections'
147146 )
148147 queue = get_queue (name , ** kwargs )
0 commit comments