@@ -18,12 +18,12 @@ type QueueConnectionHandler struct {
1818 name string
1919 cfg queue.Cfg
2020
21- uuid uuid.UUID
22- registered bool
23- err error
24- mutex sync.Mutex
25- masterUpdated chan struct {}
26- masterCnt int32
21+ uuid uuid.UUID
22+ registered bool
23+ err error
24+ mutex sync.Mutex
25+ updated chan struct {}
26+ masterCnt int32
2727}
2828
2929// QueueConnectionHandler implements the ConnectionHandler interface.
@@ -32,9 +32,9 @@ var _ connection_pool.ConnectionHandler = &QueueConnectionHandler{}
3232// NewQueueConnectionHandler creates a QueueConnectionHandler object.
3333func NewQueueConnectionHandler (name string , cfg queue.Cfg ) * QueueConnectionHandler {
3434 return & QueueConnectionHandler {
35- name : name ,
36- cfg : cfg ,
37- masterUpdated : make (chan struct {}, 10 ),
35+ name : name ,
36+ cfg : cfg ,
37+ updated : make (chan struct {}, 10 ),
3838 }
3939}
4040
@@ -53,15 +53,24 @@ func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection,
5353 }
5454
5555 master := role == connection_pool .MasterRole
56- if master {
57- defer func () {
58- h .masterUpdated <- struct {}{}
59- }()
60- }
6156
62- // Set up a queue module configuration for an instance.
6357 q := queue .New (conn , h .name )
6458
59+ // Check is queue ready to work.
60+ if state , err := q .State (); err != nil {
61+ h .updated <- struct {}{}
62+ h .err = err
63+ return err
64+ } else if master && state != queue .RunningState {
65+ return fmt .Errorf ("queue state is not RUNNING: %d" , state )
66+ } else if ! master && state != queue .InitState && state != queue .WaitingState {
67+ return fmt .Errorf ("queue state is not INIT and not WAITING: %d" , state )
68+ }
69+
70+ defer func () {
71+ h .updated <- struct {}{}
72+ }()
73+
6574 // Set up a queue module configuration for an instance. Ideally, this
6675 // should be done before box.cfg({}) or you need to wait some time
6776 // before start a work.
@@ -79,10 +88,6 @@ func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection,
7988 return nil
8089 }
8190
82- if h .err = q .Create (h .cfg ); h .err != nil {
83- return h .err
84- }
85-
8691 if ! h .registered {
8792 // We register a shared session at the first time.
8893 if h .uuid , h .err = q .Identify (nil ); h .err != nil {
@@ -96,6 +101,10 @@ func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection,
96101 }
97102 }
98103
104+ if h .err = q .Create (h .cfg ); h .err != nil {
105+ return h .err
106+ }
107+
99108 fmt .Printf ("Master %s is ready to work!\n " , conn .Addr ())
100109 atomic .AddInt32 (& h .masterCnt , 1 )
101110
@@ -113,7 +122,7 @@ func (h *QueueConnectionHandler) Deactivated(conn *tarantool.Connection,
113122
114123// Closes closes a QueueConnectionHandler object.
115124func (h * QueueConnectionHandler ) Close () {
116- close (h .masterUpdated )
125+ close (h .updated )
117126}
118127
119128// Example demonstrates how to use the queue package with the connection_pool
@@ -162,8 +171,10 @@ func Example_connectionPool() {
162171 }
163172 defer connPool .Close ()
164173
165- // Wait for a master instance identification in the queue.
166- <- h .masterUpdated
174+ // Wait for a queue initialization and master instance identification in
175+ // the queue.
176+ <- h .updated
177+ <- h .updated
167178 if h .err != nil {
168179 fmt .Printf ("Unable to identify in the pool: %s" , h .err )
169180 return
@@ -184,14 +195,17 @@ func Example_connectionPool() {
184195
185196 // Switch a master instance in the pool.
186197 roles := []bool {true , false }
187- err = test_helpers .SetClusterRO (servers , connOpts , roles )
188- if err != nil {
189- fmt .Printf ("Unable to set cluster roles: %s" , err )
190- return
198+ for {
199+ err := test_helpers .SetClusterRO (servers , connOpts , roles )
200+ if err == nil {
201+ break
202+ }
191203 }
192204
193- // Wait for a new master instance re-identification.
194- <- h .masterUpdated
205+ // Wait for a replica instance connection and a new master instance
206+ // re-identification.
207+ <- h .updated
208+ <- h .updated
195209 h .mutex .Lock ()
196210 err = h .err
197211 h .mutex .Unlock ()
@@ -211,17 +225,24 @@ func Example_connectionPool() {
211225 time .Sleep (poolOpts .CheckTimeout )
212226 }
213227
214- // Take a data from the new master instance.
215- task , err := q .Take ()
216- if err != nil {
217- fmt .Println ("Unable to got task:" , err )
218- } else if task == nil {
219- fmt .Println ("task == nil" )
220- } else if task .Data () == nil {
221- fmt .Println ("task.Data() == nil" )
222- } else {
223- task .Ack ()
224- fmt .Println ("Got data:" , task .Data ())
228+ for {
229+ // Take a data from the new master instance.
230+ task , err := q .Take ()
231+
232+ if err == connection_pool .ErrNoRwInstance {
233+ // It may be not registered yet by the pool.
234+ continue
235+ } else if err != nil {
236+ fmt .Println ("Unable to got task:" , err )
237+ } else if task == nil {
238+ fmt .Println ("task == nil" )
239+ } else if task .Data () == nil {
240+ fmt .Println ("task.Data() == nil" )
241+ } else {
242+ task .Ack ()
243+ fmt .Println ("Got data:" , task .Data ())
244+ }
245+ break
225246 }
226247
227248 // Output:
0 commit comments