@@ -25,14 +25,17 @@ const ignoreStreamId = 0
2525const (
2626 connDisconnected = 0
2727 connConnected = 1
28- connClosed = 2
28+ connShutdown = 2
29+ connClosed = 3
2930)
3031
3132const (
3233 connTransportNone = ""
3334 connTransportSsl = "ssl"
3435)
3536
37+ const shutdownEventKey = "box.shutdown"
38+
3639type ConnEventKind int
3740type ConnLogKind int
3841
@@ -45,6 +48,8 @@ const (
4548 ReconnectFailed
4649 // Either reconnect attempts exhausted, or explicit Close is called.
4750 Closed
51+ // Shutdown signals that shutdown callback is processing.
52+ Shutdown
4853
4954 // LogReconnectFailed is logged when reconnect attempt failed.
5055 LogReconnectFailed ConnLogKind = iota + 1
@@ -134,10 +139,19 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
134139// always returns array of array (array of tuples for space related methods).
135140// For Eval* and Call* Tarantool always returns array, but does not forces
136141// array of arrays.
142+ //
143+ // If connected to Tarantool 2.10 or newer, connection supports server graceful
144+ // shutdown. In this case, server will wait until all client requests will be
145+ // finished and client disconnects before going down (server also may go down
146+ // by timeout). Client reconnect will happen if connection options enable
147+ // reconnect. Beware that graceful shutdown event initialization is asynchronous.
148+ //
149+ // More on graceful shutdown: https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
137150type Connection struct {
138151 addr string
139152 c net.Conn
140153 mutex sync.Mutex
154+ cond * sync.Cond
141155 // Schema contains schema loaded on connection.
142156 Schema * Schema
143157 // requestId contains the last request ID for requests with nil context.
@@ -162,6 +176,11 @@ type Connection struct {
162176 serverProtocolInfo ProtocolInfo
163177 // watchMap is a map of key -> chan watchState.
164178 watchMap sync.Map
179+
180+ // shutdownWatcher is the "box.shutdown" event watcher.
181+ shutdownWatcher Watcher
182+ // requestCnt is a counter of active requests.
183+ requestCnt int64
165184}
166185
167186var _ = Connector (& Connection {}) // Check compatibility with connector interface.
@@ -387,6 +406,8 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {
387406 conn .opts .Logger = defaultLogger {}
388407 }
389408
409+ conn .cond = sync .NewCond (& conn .mutex )
410+
390411 if err = conn .createConnection (false ); err != nil {
391412 ter , ok := err .(Error )
392413 if conn .opts .Reconnect <= 0 {
@@ -612,10 +633,20 @@ func (conn *Connection) dial() (err error) {
612633 conn .lockShards ()
613634 conn .c = connection
614635 atomic .StoreUint32 (& conn .state , connConnected )
636+ conn .cond .Broadcast ()
615637 conn .unlockShards ()
616638 go conn .writer (w , connection )
617639 go conn .reader (r , connection )
618640
641+ // Subscribe shutdown event to process graceful shutdown.
642+ if conn .shutdownWatcher == nil && isFeatureInSlice (WatchersFeature , conn .serverProtocolInfo .Features ) {
643+ watcher , werr := conn .newWatcherImpl (shutdownEventKey , shutdownEventCallback )
644+ if werr != nil {
645+ return werr
646+ }
647+ conn .shutdownWatcher = watcher
648+ }
649+
619650 return
620651}
621652
@@ -745,10 +776,17 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error)
745776 if conn .state != connClosed {
746777 close (conn .control )
747778 atomic .StoreUint32 (& conn .state , connClosed )
779+ conn .cond .Broadcast ()
780+ // Free the resources.
781+ if conn .shutdownWatcher != nil {
782+ go conn .shutdownWatcher .Unregister ()
783+ conn .shutdownWatcher = nil
784+ }
748785 conn .notify (Closed )
749786 }
750787 } else {
751788 atomic .StoreUint32 (& conn .state , connDisconnected )
789+ conn .cond .Broadcast ()
752790 conn .notify (Disconnected )
753791 }
754792 if conn .c != nil {
@@ -767,9 +805,7 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error)
767805 return
768806}
769807
770- func (conn * Connection ) reconnect (neterr error , c net.Conn ) {
771- conn .mutex .Lock ()
772- defer conn .mutex .Unlock ()
808+ func (conn * Connection ) reconnectImpl (neterr error , c net.Conn ) {
773809 if conn .opts .Reconnect > 0 {
774810 if c == conn .c {
775811 conn .closeConnection (neterr , false )
@@ -782,6 +818,13 @@ func (conn *Connection) reconnect(neterr error, c net.Conn) {
782818 }
783819}
784820
821+ func (conn * Connection ) reconnect (neterr error , c net.Conn ) {
822+ conn .mutex .Lock ()
823+ defer conn .mutex .Unlock ()
824+ conn .reconnectImpl (neterr , c )
825+ conn .cond .Broadcast ()
826+ }
827+
785828func (conn * Connection ) lockShards () {
786829 for i := range conn .shard {
787830 conn .shard [i ].rmut .Lock ()
@@ -1009,6 +1052,15 @@ func (conn *Connection) newFuture(ctx context.Context) (fut *Future) {
10091052 fut .done = nil
10101053 shard .rmut .Unlock ()
10111054 return
1055+ case connShutdown :
1056+ fut .err = ClientError {
1057+ ErrConnectionShutdown ,
1058+ "server shutdown in progress" ,
1059+ }
1060+ fut .ready = nil
1061+ fut .done = nil
1062+ shard .rmut .Unlock ()
1063+ return
10121064 }
10131065 pos := (fut .requestId / conn .opts .Concurrency ) & (requestsMap - 1 )
10141066 if ctx != nil {
@@ -1060,11 +1112,25 @@ func (conn *Connection) contextWatchdog(fut *Future, ctx context.Context) {
10601112 }
10611113}
10621114
1115+ func (conn * Connection ) incrementRequestCnt () {
1116+ atomic .AddInt64 (& conn .requestCnt , int64 (1 ))
1117+ }
1118+
1119+ func (conn * Connection ) decrementRequestCnt () {
1120+ if atomic .AddInt64 (& conn .requestCnt , int64 (- 1 )) == 0 {
1121+ conn .cond .Broadcast ()
1122+ }
1123+ }
1124+
10631125func (conn * Connection ) send (req Request , streamId uint64 ) * Future {
1126+ conn .incrementRequestCnt ()
1127+
10641128 fut := conn .newFuture (req .Ctx ())
10651129 if fut .ready == nil {
1130+ conn .decrementRequestCnt ()
10661131 return fut
10671132 }
1133+
10681134 if req .Ctx () != nil {
10691135 select {
10701136 case <- req .Ctx ().Done ():
@@ -1075,6 +1141,7 @@ func (conn *Connection) send(req Request, streamId uint64) *Future {
10751141 go conn .contextWatchdog (fut , req .Ctx ())
10761142 }
10771143 conn .putFuture (fut , req , streamId )
1144+
10781145 return fut
10791146}
10801147
@@ -1141,6 +1208,7 @@ func (conn *Connection) markDone(fut *Future) {
11411208 if conn .rlimit != nil {
11421209 <- conn .rlimit
11431210 }
1211+ conn .decrementRequestCnt ()
11441212}
11451213
11461214func (conn * Connection ) peekFuture (reqid uint32 ) (fut * Future ) {
@@ -1426,6 +1494,15 @@ func subscribeWatchChannel(conn *Connection, key string) (chan watchState, error
14261494 return st , nil
14271495}
14281496
1497+ func isFeatureInSlice (expected ProtocolFeature , actualSlice []ProtocolFeature ) bool {
1498+ for _ , actual := range actualSlice {
1499+ if expected == actual {
1500+ return true
1501+ }
1502+ }
1503+ return false
1504+ }
1505+
14291506// NewWatcher creates a new Watcher object for the connection.
14301507//
14311508// You need to require WatchersFeature to use watchers, see examples for the
@@ -1464,20 +1541,16 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher,
14641541 // asynchronous. We do not expect any response from a Tarantool instance
14651542 // That's why we can't just check the Tarantool response for an unsupported
14661543 // request error.
1467- watchersRequired := false
1468- for _ , feature := range conn .opts .RequiredProtocolInfo .Features {
1469- if feature == WatchersFeature {
1470- watchersRequired = true
1471- break
1472- }
1473- }
1474-
1475- if ! watchersRequired {
1544+ if ! isFeatureInSlice (WatchersFeature , conn .opts .RequiredProtocolInfo .Features ) {
14761545 err := fmt .Errorf ("the feature %s must be required by connection " +
14771546 "options to create a watcher" , WatchersFeature )
14781547 return nil , err
14791548 }
14801549
1550+ return conn .newWatcherImpl (key , callback )
1551+ }
1552+
1553+ func (conn * Connection ) newWatcherImpl (key string , callback WatchCallback ) (Watcher , error ) {
14811554 st , err := subscribeWatchChannel (conn , key )
14821555 if err != nil {
14831556 return nil , err
@@ -1531,7 +1604,11 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher,
15311604
15321605 if state .cnt == 0 {
15331606 // The last one sends IPROTO_UNWATCH.
1534- conn .Do (newUnwatchRequest (key )).Get ()
1607+ if ! conn .ClosedNow () {
1608+ // conn.ClosedNow() check is a workaround for calling
1609+ // Unregister from connectionClose().
1610+ conn .Do (newUnwatchRequest (key )).Get ()
1611+ }
15351612 conn .watchMap .Delete (key )
15361613 close (state .unready )
15371614 }
@@ -1637,3 +1714,51 @@ func (conn *Connection) ClientProtocolInfo() ProtocolInfo {
16371714 info .Auth = conn .opts .Auth
16381715 return info
16391716}
1717+
1718+ func shutdownEventCallback (event WatchEvent ) {
1719+ // Receives "true" on server shutdown.
1720+ // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
1721+ // step 2.
1722+ val , ok := event .Value .(bool )
1723+ if ok && val {
1724+ go event .Conn .shutdown ()
1725+ }
1726+ }
1727+
1728+ func (conn * Connection ) shutdown () {
1729+ // Forbid state changes.
1730+ conn .mutex .Lock ()
1731+ defer conn .mutex .Unlock ()
1732+
1733+ if ! atomic .CompareAndSwapUint32 (& (conn .state ), connConnected , connShutdown ) {
1734+ return
1735+ }
1736+ conn .cond .Broadcast ()
1737+ conn .notify (Shutdown )
1738+
1739+ c := conn .c
1740+ for {
1741+ if (atomic .LoadUint32 (& conn .state ) != connShutdown ) || (c != conn .c ) {
1742+ return
1743+ }
1744+ if atomic .LoadInt64 (& conn .requestCnt ) == 0 {
1745+ break
1746+ }
1747+ // Use cond var on conn.mutex since request execution may
1748+ // call reconnect(). It is ok if state changes as part of
1749+ // reconnect since Tarantool server won't allow to reconnect
1750+ // in the middle of shutting down.
1751+ conn .cond .Wait ()
1752+ }
1753+
1754+ // Start to reconnect based on common rules, same as in net.box.
1755+ // Reconnect also closes the connection: server waits until all
1756+ // subscribed connections are terminated.
1757+ // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
1758+ // step 3.
1759+ conn .reconnectImpl (
1760+ ClientError {
1761+ ErrConnectionClosed ,
1762+ "connection closed after server shutdown" ,
1763+ }, conn .c )
1764+ }
0 commit comments