3030import java .util .concurrent .CompletableFuture ;
3131import java .util .concurrent .CompletionStage ;
3232import java .util .concurrent .ConcurrentHashMap ;
33+ import java .util .concurrent .atomic .AtomicBoolean ;
3334
34- import java .util .concurrent .atomic .AtomicReference ;
3535import org .apache .commons .logging .Log ;
3636import org .apache .commons .logging .LogFactory ;
3737import org .apache .commons .pool2 .impl .GenericObjectPool ;
3838import org .apache .commons .pool2 .impl .GenericObjectPoolConfig ;
39+
3940import org .springframework .beans .factory .DisposableBean ;
40- import org .springframework .context .SmartLifecycle ;
4141import org .springframework .data .redis .connection .PoolException ;
4242import org .springframework .util .Assert ;
4343
6262 * @since 2.0
6363 * @see #getConnection(Class)
6464 */
65- class LettucePoolingConnectionProvider implements LettuceConnectionProvider , RedisClientProvider , DisposableBean ,
66- SmartLifecycle {
65+ class LettucePoolingConnectionProvider implements LettuceConnectionProvider , RedisClientProvider , DisposableBean {
6766
6867 private static final Log log = LogFactory .getLog (LettucePoolingConnectionProvider .class );
6968
70- private final AtomicReference < State > state = new AtomicReference <>( State . CREATED );
69+ private final AtomicBoolean disposed = new AtomicBoolean ( );
7170 private final LettuceConnectionProvider connectionProvider ;
7271 private final GenericObjectPoolConfig <StatefulConnection <?, ?>> poolConfig ;
7372 private final Map <StatefulConnection <?, ?>, GenericObjectPool <StatefulConnection <?, ?>>> poolRef = new ConcurrentHashMap <>(
@@ -81,10 +80,6 @@ class LettucePoolingConnectionProvider implements LettuceConnectionProvider, Red
8180 private final Map <Class <?>, AsyncPool <StatefulConnection <?, ?>>> asyncPools = new ConcurrentHashMap <>(32 );
8281 private final BoundedPoolConfig asyncPoolConfig ;
8382
84- enum State {
85- CREATED , STARTING , STARTED , STOPPING , STOPPED , DESTROYED ;
86- }
87-
8883 LettucePoolingConnectionProvider (LettuceConnectionProvider connectionProvider ,
8984 LettucePoolingClientConfiguration clientConfiguration ) {
9085
@@ -215,51 +210,43 @@ public CompletableFuture<Void> releaseAsync(StatefulConnection<?, ?> connection)
215210
216211 @ Override
217212 public void destroy () throws Exception {
218- stop ();
219- state .set (State .DESTROYED );
220- }
221-
222213
223- @ Override
224- public void start () {
225- state .set (State .STARTED );
226- }
214+ if (!disposed .compareAndSet (false , true )) {
215+ return ;
216+ }
227217
228- @ Override
229- public void stop () {
230- if (state .compareAndSet (State .STARTED , State .STOPPING )) {
231- List <CompletableFuture <?>> futures = new ArrayList <>();
232- if (!poolRef .isEmpty () || !asyncPoolRef .isEmpty ()) {
233- log .warn ("LettucePoolingConnectionProvider contains unreleased connections" );
234- }
218+ List <CompletableFuture <?>> futures = new ArrayList <>();
219+ if (!poolRef .isEmpty () || !asyncPoolRef .isEmpty ()) {
220+ log .warn ("LettucePoolingConnectionProvider contains unreleased connections" );
221+ }
235222
236- if (!inProgressAsyncPoolRef .isEmpty ()) {
223+ if (!inProgressAsyncPoolRef .isEmpty ()) {
237224
238- log .warn ("LettucePoolingConnectionProvider has active connection retrievals" );
239- inProgressAsyncPoolRef .forEach ((k , v ) -> futures .add (k .thenApply (StatefulConnection ::closeAsync )));
240- }
225+ log .warn ("LettucePoolingConnectionProvider has active connection retrievals" );
226+ inProgressAsyncPoolRef .forEach ((k , v ) -> futures .add (k .thenApply (StatefulConnection ::closeAsync )));
227+ }
241228
242- if (!poolRef .isEmpty ()) {
229+ if (!poolRef .isEmpty ()) {
243230
244- poolRef .forEach ((connection , pool ) -> pool .returnObject (connection ));
245- poolRef .clear ();
246- }
231+ poolRef .forEach ((connection , pool ) -> pool .returnObject (connection ));
232+ poolRef .clear ();
233+ }
247234
248- if (!asyncPoolRef .isEmpty ()) {
235+ if (!asyncPoolRef .isEmpty ()) {
249236
250- asyncPoolRef .forEach ((connection , pool ) -> futures .add (pool .release (connection )));
251- asyncPoolRef .clear ();
252- }
237+ asyncPoolRef .forEach ((connection , pool ) -> futures .add (pool .release (connection )));
238+ asyncPoolRef .clear ();
239+ }
253240
254- pools .forEach ((type , pool ) -> pool .close ());
241+ pools .forEach ((type , pool ) -> pool .close ());
255242
256- CompletableFuture
243+ CompletableFuture
257244 .allOf (futures .stream ().map (it -> it .exceptionally (LettuceFutureUtils .ignoreErrors ()))
258- .toArray (CompletableFuture []::new )) //
245+ .toArray (CompletableFuture []::new )) //
259246 .thenCompose (ignored -> {
260247
261248 CompletableFuture [] poolClose = asyncPools .values ().stream ().map (AsyncPool ::closeAsync )
262- .map (it -> it .exceptionally (LettuceFutureUtils .ignoreErrors ())).toArray (CompletableFuture []::new );
249+ .map (it -> it .exceptionally (LettuceFutureUtils .ignoreErrors ())).toArray (CompletableFuture []::new );
263250
264251 return CompletableFuture .allOf (poolClose );
265252 }) //
@@ -269,18 +256,7 @@ public void stop() {
269256 }) //
270257 .join ();
271258
272- pools .clear ();
273- }
274- state .set (State .STOPPED );
275- }
276-
277- @ Override
278- public boolean isRunning () {
279- return State .STARTED .equals (this .state .get ());
259+ pools .clear ();
280260 }
281261
282- @ Override
283- public boolean isAutoStartup () {
284- return true ;
285- }
286262}
0 commit comments