@@ -34,11 +34,21 @@ class KombuManager(PubSubManager): # pragma: no cover
3434 :param write_only: If set ot ``True``, only initialize to emit events. The
3535 default of ``False`` initializes the class for emitting
3636 and receiving.
37+ :param connection_options: additional keyword arguments to be passed to
38+ ``kombu.Connection()``.
39+ :param exchange_options: additional keyword arguments to be passed to
40+ ``kombu.Exchange()``.
41+ :param queue_options: additional keyword arguments to be passed to
42+ ``kombu.Queue()``.
43+ :param producer_options: additional keyword arguments to be passed to
44+ ``kombu.Producer()``.
3745 """
3846 name = 'kombu'
3947
4048 def __init__ (self , url = 'amqp://guest:guest@localhost:5672//' ,
41- channel = 'socketio' , write_only = False , logger = None ):
49+ channel = 'socketio' , write_only = False , logger = None ,
50+ connection_options = None , exchange_options = None ,
51+ queue_options = None , producer_options = None ):
4252 if kombu is None :
4353 raise RuntimeError ('Kombu package is not installed '
4454 '(Run "pip install kombu" in your '
@@ -47,6 +57,10 @@ def __init__(self, url='amqp://guest:guest@localhost:5672//',
4757 write_only = write_only ,
4858 logger = logger )
4959 self .url = url
60+ self .connection_options = connection_options or {}
61+ self .exchange_options = exchange_options or {}
62+ self .queue_options = queue_options or {}
63+ self .producer_options = producer_options or {}
5064 self .producer = self ._producer ()
5165
5266 def initialize (self ):
@@ -65,19 +79,22 @@ def initialize(self):
6579 'with ' + self .server .async_mode )
6680
6781 def _connection (self ):
68- return kombu .Connection (self .url )
82+ return kombu .Connection (self .url , ** self . connection_options )
6983
7084 def _exchange (self ):
71- return kombu .Exchange (self .channel , type = 'fanout' , durable = False )
85+ options = {'type' : 'fanout' , 'durable' : False }
86+ options .update (self .exchange_options )
87+ return kombu .Exchange (self .channel , ** options )
7288
7389 def _queue (self ):
7490 queue_name = 'flask-socketio.' + str (uuid .uuid4 ())
75- return kombu . Queue ( queue_name , self . _exchange (),
76- durable = False ,
77- queue_arguments = { 'x-expires' : 300000 } )
91+ options = { 'durable' : False , 'queue_arguments' : { 'x-expires' : 300000 }}
92+ options . update ( self . queue_options )
93+ return kombu . Queue ( queue_name , self . _exchange (), ** options )
7894
7995 def _producer (self ):
80- return self ._connection ().Producer (exchange = self ._exchange ())
96+ return self ._connection ().Producer (exchange = self ._exchange (),
97+ ** self .producer_options )
8198
8299 def __error_callback (self , exception , interval ):
83100 self ._get_logger ().exception ('Sleeping {}s' .format (interval ))
0 commit comments