@@ -24,9 +24,7 @@ import (
2424)
2525
2626var (
27- ErrEmptyInstances = errors .New ("instances (second argument) should not be empty" )
2827 ErrWrongCheckTimeout = errors .New ("wrong check timeout, must be greater than 0" )
29- ErrNoConnection = errors .New ("no active connections" )
3028 ErrTooManyArgs = errors .New ("too many arguments" )
3129 ErrIncorrectResponse = errors .New ("incorrect response format" )
3230 ErrIncorrectStatus = errors .New ("incorrect instance status: status should be `running`" )
@@ -155,9 +153,6 @@ func newEndpoint(name string, dialer tarantool.Dialer, opts tarantool.Opts) *end
155153// opts. Instances must have unique names.
156154func ConnectWithOpts (ctx context.Context , instances []Instance ,
157155 opts Opts ) (* ConnectionPool , error ) {
158- if len (instances ) == 0 {
159- return nil , ErrEmptyInstances
160- }
161156 unique := make (map [string ]bool )
162157 for _ , instance := range instances {
163158 if _ , ok := unique [instance .Name ]; ok {
@@ -178,28 +173,23 @@ func ConnectWithOpts(ctx context.Context, instances []Instance,
178173 connPool := & ConnectionPool {
179174 ends : make (map [string ]* endpoint ),
180175 opts : opts ,
181- state : unknownState ,
176+ state : connectedState ,
182177 done : make (chan struct {}),
183178 rwPool : rwPool ,
184179 roPool : roPool ,
185180 anyPool : anyPool ,
186181 }
187182
188- somebodyAlive , ctxCanceled := connPool .fillPools (ctx , instances )
189- if ! somebodyAlive {
183+ canceled := connPool .fillPools (ctx , instances )
184+ if canceled {
190185 connPool .state .set (closedState )
191- if ctxCanceled {
192- return nil , ErrContextCanceled
193- }
194- return nil , ErrNoConnection
186+ return nil , ErrContextCanceled
195187 }
196188
197- connPool .state .set (connectedState )
198-
199- for _ , s := range connPool .ends {
189+ for _ , endpoint := range connPool .ends {
200190 endpointCtx , cancel := context .WithCancel (context .Background ())
201- s .cancel = cancel
202- go connPool .controller (endpointCtx , s )
191+ endpoint .cancel = cancel
192+ go connPool .controller (endpointCtx , endpoint )
203193 }
204194
205195 return connPool , nil
@@ -252,8 +242,12 @@ func (p *ConnectionPool) ConfiguredTimeout(mode Mode) (time.Duration, error) {
252242 return conn .ConfiguredTimeout (), nil
253243}
254244
255- // Add adds a new instance into the pool. This function adds the instance
256- // only after successful connection.
245+ // Add adds a new instance into the pool. The pool will try to connect to the
246+ // instance later if it is unable to establish a connection.
247+ //
248+ // The function may return an error and don't add the instance into the pool
249+ // if the context has been cancelled or on concurrent Close()/CloseGraceful()
250+ // call.
257251func (p * ConnectionPool ) Add (ctx context.Context , instance Instance ) error {
258252 e := newEndpoint (instance .Name , instance .Dialer , instance .Opts )
259253
@@ -268,19 +262,34 @@ func (p *ConnectionPool) Add(ctx context.Context, instance Instance) error {
268262 return ErrExists
269263 }
270264
271- endpointCtx , cancel := context .WithCancel (context .Background ())
272- e .cancel = cancel
265+ endpointCtx , endpointCancel := context .WithCancel (context .Background ())
266+ connectCtx , connectCancel := context .WithCancel (ctx )
267+ e .cancel = func () {
268+ connectCancel ()
269+ endpointCancel ()
270+ }
273271
274272 p .ends [instance .Name ] = e
275273 p .endsMutex .Unlock ()
276274
277- if err := p .tryConnect (ctx , e ); err != nil {
278- p .endsMutex .Lock ()
279- delete (p .ends , instance .Name )
280- p .endsMutex .Unlock ()
281- e .cancel ()
282- close (e .closed )
283- return err
275+ if err := p .tryConnect (connectCtx , e ); err != nil {
276+ var canceled bool
277+ select {
278+ case <- connectCtx .Done ():
279+ canceled = true
280+ case <- endpointCtx .Done ():
281+ canceled = true
282+ default :
283+ canceled = false
284+ }
285+ if canceled {
286+ p .endsMutex .Lock ()
287+ delete (p .ends , instance .Name )
288+ p .endsMutex .Unlock ()
289+ e .cancel ()
290+ close (e .closed )
291+ return err
292+ }
284293 }
285294
286295 go p .controller (endpointCtx , e )
@@ -1145,64 +1154,30 @@ func (p *ConnectionPool) deactivateConnections() {
11451154 }
11461155}
11471156
1148- func (p * ConnectionPool ) processConnection (conn * tarantool.Connection ,
1149- name string , end * endpoint ) bool {
1150- role , err := p .getConnectionRole (conn )
1151- if err != nil {
1152- conn .Close ()
1153- log .Printf ("tarantool: storing connection to %s failed: %s\n " , name , err )
1154- return false
1155- }
1156-
1157- if ! p .handlerDiscovered (name , conn , role ) {
1158- conn .Close ()
1159- return false
1160- }
1161- if p .addConnection (name , conn , role ) != nil {
1162- conn .Close ()
1163- p .handlerDeactivated (name , conn , role )
1164- return false
1165- }
1166-
1167- end .conn = conn
1168- end .role = role
1169- return true
1170- }
1171-
1172- func (p * ConnectionPool ) fillPools (ctx context.Context ,
1173- instances []Instance ) (bool , bool ) {
1174- somebodyAlive := false
1175- ctxCanceled := false
1176-
1157+ func (p * ConnectionPool ) fillPools (ctx context.Context , instances []Instance ) bool {
11771158 // It is called before controller() goroutines, so we don't expect
11781159 // concurrency issues here.
11791160 for _ , instance := range instances {
11801161 end := newEndpoint (instance .Name , instance .Dialer , instance .Opts )
11811162 p .ends [instance .Name ] = end
1182- connOpts := instance .Opts
1183- connOpts .Notify = end .notify
1184- conn , err := tarantool .Connect (ctx , instance .Dialer , connOpts )
1185- if err != nil {
1163+
1164+ if err := p .tryConnect (ctx , end ); err != nil {
11861165 log .Printf ("tarantool: connect to %s failed: %s\n " ,
11871166 instance .Name , err )
11881167 select {
11891168 case <- ctx .Done ():
1190- ctxCanceled = true
1191-
11921169 p .ends [instance .Name ] = nil
11931170 log .Printf ("tarantool: operation was canceled" )
11941171
11951172 p .deactivateConnections ()
11961173
1197- return false , ctxCanceled
1174+ return true
11981175 default :
11991176 }
1200- } else if p .processConnection (conn , instance .Name , end ) {
1201- somebodyAlive = true
12021177 }
12031178 }
12041179
1205- return somebodyAlive , ctxCanceled
1180+ return false
12061181}
12071182
12081183func (p * ConnectionPool ) updateConnection (e * endpoint ) {
0 commit comments