Skip to content

Commit e7abb9a

Browse files
committed
Correctly release connection after switching to Pub/Sub mode.
LettuceConnection.switchToPubSub now correctly releases its underlying connection when switching to Pub/Sub. Also, we improved safeguards to avoid using closed connections. Closes #2331
1 parent 14eebce commit e7abb9a

File tree

6 files changed

+39
-25
lines changed

6 files changed

+39
-25
lines changed

src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,11 +331,18 @@ public void close() throws DataAccessException {
331331

332332
super.close();
333333

334+
JedisSubscription subscription = this.subscription;
335+
if (subscription != null) {
336+
subscription.close();
337+
this.subscription = null;
338+
}
339+
334340
// return the connection to the pool
335341
if (pool != null) {
336342
jedis.close();
337343
return;
338344
}
345+
339346
// else close the connection normally (doing the try/catch dance)
340347
Exception exc = null;
341348
try {
@@ -348,8 +355,10 @@ public void close() throws DataAccessException {
348355
} catch (Exception ex) {
349356
exc = ex;
350357
}
351-
if (exc != null)
358+
359+
if (exc != null) {
352360
throw convertJedisAccessException(exc);
361+
}
353362
}
354363

355364
/*

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.springframework.dao.QueryTimeoutException;
6464
import org.springframework.data.redis.ExceptionTranslationStrategy;
6565
import org.springframework.data.redis.FallbackExceptionTranslationStrategy;
66+
import org.springframework.data.redis.RedisSystemException;
6667
import org.springframework.data.redis.connection.*;
6768
import org.springframework.data.redis.connection.convert.TransactionResultConverter;
6869
import org.springframework.data.redis.connection.lettuce.LettuceConnectionProvider.TargetAware;
@@ -423,22 +424,29 @@ public void close() throws DataAccessException {
423424

424425
isClosed = true;
425426

427+
reset();
428+
}
429+
430+
private void reset() {
431+
426432
if (asyncDedicatedConn != null) {
427433
try {
428434
if (customizedDatabaseIndex()) {
429435
potentiallySelectDatabase(defaultDbIndex);
430436
}
431437
connectionProvider.release(asyncDedicatedConn);
438+
asyncDedicatedConn = null;
432439
} catch (RuntimeException ex) {
433440
throw convertLettuceAccessException(ex);
434441
}
435442
}
436443

444+
LettuceSubscription subscription = this.subscription;
437445
if (subscription != null) {
438446
if (subscription.isAlive()) {
439447
subscription.doClose();
440448
}
441-
subscription = null;
449+
this.subscription = null;
442450
}
443451

444452
this.dbIndex = defaultDbIndex;
@@ -461,7 +469,8 @@ public boolean isClosed() {
461469
public RedisClusterAsyncCommands<byte[], byte[]> getNativeConnection() {
462470

463471
LettuceSubscription subscription = this.subscription;
464-
return (subscription != null ? subscription.getNativeConnection().async() : getAsyncConnection());
472+
return (subscription != null && subscription.isAlive() ? subscription.getNativeConnection().async()
473+
: getAsyncConnection());
465474
}
466475

467476
/*
@@ -609,8 +618,8 @@ public List<Object> exec() {
609618
LettuceTransactionResultConverter resultConverter = new LettuceTransactionResultConverter(
610619
new LinkedList<>(txResults), exceptionConverter);
611620

612-
pipeline(newLettuceResult(exec, source -> resultConverter
613-
.convert(LettuceConverters.transactionResultUnwrapper().convert(source))));
621+
pipeline(newLettuceResult(exec,
622+
source -> resultConverter.convert(LettuceConverters.transactionResultUnwrapper().convert(source))));
614623
return null;
615624
}
616625

@@ -813,7 +822,8 @@ public void setPipeliningFlushPolicy(PipeliningFlushPolicy pipeliningFlushPolicy
813822
@SuppressWarnings("unchecked")
814823
protected StatefulRedisPubSubConnection<byte[], byte[]> switchToPubSub() {
815824

816-
close();
825+
checkSubscription();
826+
reset();
817827
return connectionProvider.getConnection(StatefulRedisPubSubConnection.class);
818828
}
819829

@@ -988,6 +998,10 @@ RedisClusterCommands<byte[], byte[]> getDedicatedConnection() {
988998

989999
protected RedisClusterAsyncCommands<byte[], byte[]> getAsyncDedicatedConnection() {
9901000

1001+
if (isClosed()) {
1002+
throw new RedisSystemException("Connection is closed", null);
1003+
}
1004+
9911005
StatefulConnection<byte[], byte[]> connection = getOrCreateDedicatedConnection();
9921006

9931007
if (connection instanceof StatefulRedisConnection) {

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceSubscription.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ protected LettuceSubscription(MessageListener listener,
6060

6161
this.connection = pubsubConnection;
6262
this.listener = new LettuceMessageListener(listener,
63-
listener instanceof SubscriptionListener ? (SubscriptionListener) listener : SubscriptionListener.NO_OP_SUBSCRIPTION_LISTENER);
63+
listener instanceof SubscriptionListener ? (SubscriptionListener) listener
64+
: SubscriptionListener.NO_OP_SUBSCRIPTION_LISTENER);
6465
this.connectionProvider = connectionProvider;
6566
this.pubsub = connection.sync();
6667
this.pubSubAsync = connection.async();
@@ -79,6 +80,10 @@ protected StatefulRedisPubSubConnection<byte[], byte[]> getNativeConnection() {
7980
@Override
8081
protected void doClose() {
8182

83+
if (!isAlive()) {
84+
return;
85+
}
86+
8287
List<CompletableFuture<?>> futures = new ArrayList<>();
8388

8489
if (!getChannels().isEmpty()) {

src/main/java/org/springframework/data/redis/connection/util/AbstractSubscription.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ protected AbstractSubscription(MessageListener listener, @Nullable byte[][] chan
104104
@Override
105105
public void close() {
106106
doClose();
107+
alive.set(false);
107108
}
108109

109110
/**
@@ -275,8 +276,7 @@ private void checkPulse() {
275276

276277
private void closeIfUnsubscribed() {
277278
if (channels.isEmpty() && patterns.isEmpty()) {
278-
alive.set(false);
279-
doClose();
279+
close();
280280
}
281281
}
282282

src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryTests.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,12 @@
2222
import io.lettuce.core.KqueueProvider;
2323
import io.lettuce.core.ReadFrom;
2424
import io.lettuce.core.RedisException;
25-
import io.lettuce.core.RedisFuture;
2625
import io.lettuce.core.api.async.RedisAsyncCommands;
2726
import io.lettuce.core.api.reactive.BaseRedisReactiveCommands;
2827
import reactor.test.StepVerifier;
2928

3029
import java.io.File;
3130
import java.time.Duration;
32-
import java.util.concurrent.ExecutionException;
3331
import java.util.function.Consumer;
3432

3533
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
@@ -225,19 +223,11 @@ void testDisableSharedConnection() throws Exception {
225223
factory.setShareNativeConnection(false);
226224
RedisConnection conn2 = factory.getConnection();
227225
assertThat(conn2.getNativeConnection()).isNotSameAs(connection.getNativeConnection());
228-
// Give some time for native connection to asynchronously initialize, else close doesn't work
229226
Thread.sleep(100);
230227
conn2.close();
231228
assertThat(conn2.isClosed()).isTrue();
232-
// Give some time for native connection to asynchronously close
233-
Thread.sleep(100);
234-
RedisFuture<String> future = ((RedisAsyncCommands<byte[], byte[]>) conn2.getNativeConnection()).ping();
235-
try {
236-
future.get();
237-
fail("The native connection should be closed");
238-
} catch (ExecutionException e) {
239-
// expected, Lettuce async failures are signalled on the Future
240-
}
229+
230+
assertThatExceptionOfType(RedisSystemException.class).isThrownBy(conn2::getNativeConnection);
241231
}
242232

243233
@SuppressWarnings("unchecked")

src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionIntegrationTests.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -154,10 +154,6 @@ void testClosePooledConnectionNotShared() {
154154
RedisConnection connection = factory2.getConnection();
155155
// Use the connection to make sure the channel is initialized, else nothing happens on close
156156
connection.ping();
157-
connection.close();
158-
// The dedicated connection should not be closed
159-
connection.ping();
160-
161157
connection.close();
162158
factory2.destroy();
163159
pool.destroy();

0 commit comments

Comments
 (0)