@@ -3,6 +3,7 @@ package queue_test
33import (
44 "fmt"
55 "sync"
6+ "sync/atomic"
67 "time"
78
89 "github.com/google/uuid"
@@ -22,6 +23,7 @@ type QueueConnectionHandler struct {
2223 err error
2324 mutex sync.Mutex
2425 masterUpdated chan struct {}
26+ masterCnt int32
2527}
2628
2729// QueueConnectionHandler implements the ConnectionHandler interface.
@@ -87,6 +89,7 @@ func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection,
8789 }
8890 }
8991
92+ atomic .AddInt32 (& h .masterCnt , 1 )
9093 fmt .Printf ("Master %s is ready to work!\n " , conn .Addr ())
9194
9295 return nil
@@ -95,6 +98,9 @@ func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection,
9598// Deactivated doesn't do anything useful for the example.
9699func (h * QueueConnectionHandler ) Deactivated (conn * tarantool.Connection ,
97100 role connection_pool.Role ) error {
101+ if role == connection_pool .MasterRole {
102+ atomic .AddInt32 (& h .masterCnt , - 1 )
103+ }
98104 return nil
99105}
100106
@@ -184,8 +190,18 @@ func Example_connectionPool() {
184190 return
185191 }
186192
193+ for i := 0 ; i < 2 && atomic .LoadInt32 (& h .masterCnt ) != 1 ; i ++ {
194+ // The pool does not immediately detect role switching. It may happen
195+ // that requests will be sent to RO instances. In that case q.Take()
196+ // method will return a nil value.
197+ //
198+ // We need to make the example test output deterministic so we need to
199+ // avoid it here. But in real life, you need to take this into account.
200+ time .Sleep (poolOpts .CheckTimeout )
201+ }
202+
187203 // Take a data from the new master instance.
188- task , err := q .TakeTimeout ( 1 * time . Second )
204+ task , err := q .Take ( )
189205 if err != nil {
190206 fmt .Println ("Unable to got task:" , err )
191207 } else if task == nil {
0 commit comments