3131import java .util .concurrent .CompletionStage ;
3232import java .util .concurrent .ConcurrentHashMap ;
3333
34+ import java .util .concurrent .atomic .AtomicReference ;
3435import org .apache .commons .logging .Log ;
3536import org .apache .commons .logging .LogFactory ;
3637import org .apache .commons .pool2 .impl .GenericObjectPool ;
3738import org .apache .commons .pool2 .impl .GenericObjectPoolConfig ;
3839import org .springframework .beans .factory .DisposableBean ;
40+ import org .springframework .context .SmartLifecycle ;
3941import org .springframework .data .redis .connection .PoolException ;
4042import org .springframework .util .Assert ;
4143
5658 * @author Mark Paluch
5759 * @author Christoph Strobl
5860 * @author Asmir Mustafic
61+ * @author UHyeon Jeong
5962 * @since 2.0
6063 * @see #getConnection(Class)
6164 */
62- class LettucePoolingConnectionProvider implements LettuceConnectionProvider , RedisClientProvider , DisposableBean {
65+ class LettucePoolingConnectionProvider implements LettuceConnectionProvider , RedisClientProvider , DisposableBean ,
66+ SmartLifecycle {
6367
6468 private static final Log log = LogFactory .getLog (LettucePoolingConnectionProvider .class );
6569
70+ private final AtomicReference <State > state = new AtomicReference <>(State .CREATED );
6671 private final LettuceConnectionProvider connectionProvider ;
6772 private final GenericObjectPoolConfig <StatefulConnection <?, ?>> poolConfig ;
6873 private final Map <StatefulConnection <?, ?>, GenericObjectPool <StatefulConnection <?, ?>>> poolRef = new ConcurrentHashMap <>(
@@ -76,6 +81,10 @@ class LettucePoolingConnectionProvider implements LettuceConnectionProvider, Red
7681 private final Map <Class <?>, AsyncPool <StatefulConnection <?, ?>>> asyncPools = new ConcurrentHashMap <>(32 );
7782 private final BoundedPoolConfig asyncPoolConfig ;
7883
84+ enum State {
85+ CREATED , STARTING , STARTED , STOPPING , STOPPED , DESTROYED ;
86+ }
87+
7988 LettucePoolingConnectionProvider (LettuceConnectionProvider connectionProvider ,
8089 LettucePoolingClientConfiguration clientConfiguration ) {
8190
@@ -206,39 +215,51 @@ public CompletableFuture<Void> releaseAsync(StatefulConnection<?, ?> connection)
206215
207216 @ Override
208217 public void destroy () throws Exception {
218+ stop ();
219+ state .set (State .DESTROYED );
220+ }
209221
210- List <CompletableFuture <?>> futures = new ArrayList <>();
211- if (!poolRef .isEmpty () || !asyncPoolRef .isEmpty ()) {
212- log .warn ("LettucePoolingConnectionProvider contains unreleased connections" );
213- }
214222
215- if (!inProgressAsyncPoolRef .isEmpty ()) {
223+ @ Override
224+ public void start () {
225+ state .set (State .STARTED );
226+ }
216227
217- log .warn ("LettucePoolingConnectionProvider has active connection retrievals" );
218- inProgressAsyncPoolRef .forEach ((k , v ) -> futures .add (k .thenApply (StatefulConnection ::closeAsync )));
219- }
228+ @ Override
229+ public void stop () {
230+ if (state .compareAndSet (State .STARTED , State .STOPPING )) {
231+ List <CompletableFuture <?>> futures = new ArrayList <>();
232+ if (!poolRef .isEmpty () || !asyncPoolRef .isEmpty ()) {
233+ log .warn ("LettucePoolingConnectionProvider contains unreleased connections" );
234+ }
220235
221- if (!poolRef .isEmpty ()) {
236+ if (!inProgressAsyncPoolRef .isEmpty ()) {
222237
223- poolRef . forEach (( connection , pool ) -> pool . returnObject ( connection ) );
224- poolRef . clear ( );
225- }
238+ log . warn ( "LettucePoolingConnectionProvider has active connection retrievals" );
239+ inProgressAsyncPoolRef . forEach (( k , v ) -> futures . add ( k . thenApply ( StatefulConnection :: closeAsync )) );
240+ }
226241
227- if (!asyncPoolRef .isEmpty ()) {
242+ if (!poolRef .isEmpty ()) {
228243
229- asyncPoolRef .forEach ((connection , pool ) -> futures .add (pool .release (connection )));
230- asyncPoolRef .clear ();
231- }
244+ poolRef .forEach ((connection , pool ) -> pool .returnObject (connection ));
245+ poolRef .clear ();
246+ }
247+
248+ if (!asyncPoolRef .isEmpty ()) {
249+
250+ asyncPoolRef .forEach ((connection , pool ) -> futures .add (pool .release (connection )));
251+ asyncPoolRef .clear ();
252+ }
232253
233- pools .forEach ((type , pool ) -> pool .close ());
254+ pools .forEach ((type , pool ) -> pool .close ());
234255
235- CompletableFuture
256+ CompletableFuture
236257 .allOf (futures .stream ().map (it -> it .exceptionally (LettuceFutureUtils .ignoreErrors ()))
237- .toArray (CompletableFuture []::new )) //
258+ .toArray (CompletableFuture []::new )) //
238259 .thenCompose (ignored -> {
239260
240261 CompletableFuture [] poolClose = asyncPools .values ().stream ().map (AsyncPool ::closeAsync )
241- .map (it -> it .exceptionally (LettuceFutureUtils .ignoreErrors ())).toArray (CompletableFuture []::new );
262+ .map (it -> it .exceptionally (LettuceFutureUtils .ignoreErrors ())).toArray (CompletableFuture []::new );
242263
243264 return CompletableFuture .allOf (poolClose );
244265 }) //
@@ -248,6 +269,18 @@ public void destroy() throws Exception {
248269 }) //
249270 .join ();
250271
251- pools .clear ();
272+ pools .clear ();
273+ }
274+ state .set (State .STOPPED );
275+ }
276+
277+ @ Override
278+ public boolean isRunning () {
279+ return State .STARTED .equals (this .state .get ());
280+ }
281+
282+ @ Override
283+ public boolean isAutoStartup () {
284+ return true ;
252285 }
253286}
0 commit comments