@@ -232,21 +232,29 @@ <T> Publisher<T> doInConnection(ReactiveRedisCallback<T> action, boolean exposeC
232232
233233 Assert .notNull (action , "Callback object must not be null" );
234234
235- return Flux .usingWhen (Mono .fromSupplier (() -> {
236-
237- ReactiveRedisConnectionFactory factory = getConnectionFactory ();
238- ReactiveRedisConnection conn = factory .getReactiveConnection ();
239- ReactiveRedisConnection connToUse = preProcessConnection (conn , false );
240-
241- return (exposeConnection ? connToUse : createRedisConnectionProxy (connToUse ));
242- }), conn -> {
235+ return Flux .usingWhen (getConnection (exposeConnection ), conn -> {
243236 Publisher <T > result = action .doInRedis (conn );
244237
245238 return postProcessResult (result , conn , false );
246239
247240 }, ReactiveRedisConnection ::closeLater );
248241 }
249242
243+ /**
244+ * Creates a Mono which generates a new connection. The successors of {@link ReactiveRedisTemplate} might override
245+ * the default behaviour.
246+ *
247+ * @param exposeConnection whether to enforce exposure of the native Redis Connection to callback code
248+ * return a {@link Mono} wrapping the {@link ReactiveRedisConnection}.
249+ */
250+ protected Mono <ReactiveRedisConnection > getConnection (boolean exposeConnection ) {
251+ ReactiveRedisConnectionFactory factory = getConnectionFactory ();
252+ ReactiveRedisConnection conn = factory .getReactiveConnection ();
253+ ReactiveRedisConnection connToUse = preProcessConnection (conn , false );
254+
255+ return Mono .just (exposeConnection ? connToUse : createRedisConnectionProxy (connToUse ));
256+ }
257+
250258 /*
251259 * (non-Javadoc)
252260 * @see org.springframework.data.redis.core.ReactiveRedisOperations#convertAndSend(java.lang.String, java.lang.Object)
0 commit comments