@@ -192,21 +192,29 @@ private <T> Publisher<T> doInConnection(ReactiveRedisCallback<T> action, boolean
192192
193193 Assert .notNull (action , "Callback object must not be null" );
194194
195- return Flux .usingWhen (Mono .fromSupplier (() -> {
196-
197- ReactiveRedisConnectionFactory factory = getConnectionFactory ();
198- ReactiveRedisConnection conn = factory .getReactiveConnection ();
199- ReactiveRedisConnection connToUse = preProcessConnection (conn , false );
200-
201- return (exposeConnection ? connToUse : createRedisConnectionProxy (connToUse ));
202- }), conn -> {
195+ return Flux .usingWhen (getConnection (exposeConnection ), conn -> {
203196 Publisher <T > result = action .doInRedis (conn );
204197
205198 return postProcessResult (result , conn , false );
206199
207200 }, ReactiveRedisConnection ::closeLater );
208201 }
209202
203+ /**
204+ * Creates a Mono which generates a new connection. The successors of {@link ReactiveRedisTemplate} might override
205+ * the default behaviour.
206+ *
207+ * @param exposeConnection whether to enforce exposure of the native Redis Connection to callback code
208+ * return a {@link Mono} wrapping the {@link ReactiveRedisConnection}.
209+ */
210+ protected Mono <ReactiveRedisConnection > getConnection (boolean exposeConnection ) {
211+ ReactiveRedisConnectionFactory factory = getConnectionFactory ();
212+ ReactiveRedisConnection conn = factory .getReactiveConnection ();
213+ ReactiveRedisConnection connToUse = preProcessConnection (conn , false );
214+
215+ return Mono .just (exposeConnection ? connToUse : createRedisConnectionProxy (connToUse ));
216+ }
217+
210218 /*
211219 * (non-Javadoc)
212220 * @see org.springframework.data.redis.core.ReactiveRedisOperations#convertAndSend(java.lang.String, java.lang.Object)
0 commit comments