1515 */
1616package org .springframework .data .neo4j .core ;
1717
18- import reactor .core .publisher .Flux ;
19- import reactor .core .publisher .Mono ;
20- import reactor .util .function .Tuple2 ;
21-
22- import java .util .Map ;
23- import java .util .function .BiFunction ;
24- import java .util .function .Function ;
25- import java .util .function .Supplier ;
26-
2718import org .neo4j .driver .Driver ;
2819import org .neo4j .driver .Record ;
2920import org .neo4j .driver .reactive .RxQueryRunner ;
4132import org .springframework .data .neo4j .core .transaction .ReactiveNeo4jTransactionManager ;
4233import org .springframework .lang .Nullable ;
4334import org .springframework .util .Assert ;
35+ import reactor .core .publisher .Flux ;
36+ import reactor .core .publisher .Mono ;
37+ import reactor .util .function .Tuple2 ;
38+
39+ import java .util .Map ;
40+ import java .util .function .BiFunction ;
41+ import java .util .function .Function ;
42+ import java .util .function .Supplier ;
4443
4544/**
4645 * Reactive variant of the {@link Neo4jClient}.
@@ -54,13 +53,15 @@ class DefaultReactiveNeo4jClient implements ReactiveNeo4jClient {
5453
5554 private final Driver driver ;
5655 private final TypeSystem typeSystem ;
56+ private final ReactiveDatabaseSelectionProvider databaseSelectionProvider ;
5757 private final ConversionService conversionService ;
5858 private final Neo4jPersistenceExceptionTranslator persistenceExceptionTranslator = new Neo4jPersistenceExceptionTranslator ();
5959
60- DefaultReactiveNeo4jClient (Driver driver ) {
60+ DefaultReactiveNeo4jClient (Driver driver , @ Nullable ReactiveDatabaseSelectionProvider databaseSelectionProvider ) {
6161
6262 this .driver = driver ;
6363 this .typeSystem = driver .defaultTypeSystem ();
64+ this .databaseSelectionProvider = databaseSelectionProvider ;
6465 this .conversionService = new DefaultConversionService ();
6566 new Neo4jConversions ().registerConvertersIn ((ConverterRegistry ) conversionService );
6667 }
@@ -180,7 +181,7 @@ public Mono<ResultSummary> run() {
180181
181182 class DefaultRecordFetchSpec <T > implements RecordFetchSpec <T >, MappingSpec <T > {
182183
183- private final String targetDatabase ;
184+ private final Mono < DatabaseSelection > targetDatabase ;
184185
185186 private final Supplier <String > cypherSupplier ;
186187
@@ -192,9 +193,18 @@ class DefaultRecordFetchSpec<T> implements RecordFetchSpec<T>, MappingSpec<T> {
192193 this (targetDatabase , cypherSupplier , parameters , null );
193194 }
194195
195- DefaultRecordFetchSpec (String targetDatabase , Supplier <String > cypherSupplier , NamedParameters parameters ,
196+ DefaultRecordFetchSpec (@ Nullable String targetDatabase , Supplier <String > cypherSupplier , NamedParameters parameters ,
196197 @ Nullable BiFunction <TypeSystem , Record , T > mappingFunction ) {
197- this .targetDatabase = targetDatabase ;
198+
199+ this .targetDatabase = Mono .defer (() -> {
200+ if (targetDatabase != null ) {
201+ return ReactiveDatabaseSelectionProvider .createStaticDatabaseSelectionProvider (targetDatabase )
202+ .getDatabaseSelection ();
203+ } else if (databaseSelectionProvider != null ) {
204+ return databaseSelectionProvider .getDatabaseSelection ();
205+ }
206+ return Mono .just (DatabaseSelection .undecided ());
207+ });
198208 this .cypherSupplier = cypherSupplier ;
199209 this .parameters = parameters ;
200210 this .mappingFunction = mappingFunction ;
@@ -229,33 +239,36 @@ Flux<T> executeWith(Tuple2<String, Map<String, Object>> t, RxQueryRunner runner)
229239 @ Override
230240 public Mono <T > one () {
231241
232- return doInQueryRunnerForMono (targetDatabase ,
233- (runner ) -> prepareStatement ().flatMapMany (t -> executeWith (t , runner )).singleOrEmpty ())
242+ return targetDatabase . flatMap ( databaseSelection -> doInQueryRunnerForMono (databaseSelection . getValue () ,
243+ (runner ) -> prepareStatement ().flatMapMany (t -> executeWith (t , runner )).singleOrEmpty ()))
234244 .onErrorMap (RuntimeException .class , DefaultReactiveNeo4jClient .this ::potentiallyConvertRuntimeException );
235245 }
236246
237247 @ Override
238248 public Mono <T > first () {
239249
240- return doInQueryRunnerForMono (targetDatabase ,
241- runner -> prepareStatement ().flatMapMany (t -> executeWith (t , runner )).next ())
250+ return targetDatabase . flatMap ( databaseSelection -> doInQueryRunnerForMono (databaseSelection . getValue () ,
251+ runner -> prepareStatement ().flatMapMany (t -> executeWith (t , runner )).next ()))
242252 .onErrorMap (RuntimeException .class , DefaultReactiveNeo4jClient .this ::potentiallyConvertRuntimeException );
243253 }
244254
245255 @ Override
246256 public Flux <T > all () {
247257
248- return doInStatementRunnerForFlux (targetDatabase ,
249- runner -> prepareStatement ().flatMapMany (t -> executeWith (t , runner ))).onErrorMap (RuntimeException .class ,
250- DefaultReactiveNeo4jClient .this ::potentiallyConvertRuntimeException );
258+ return targetDatabase .flatMapMany (databaseSelection ->
259+ doInStatementRunnerForFlux (databaseSelection .getValue (),
260+ runner -> prepareStatement ().flatMapMany (t -> executeWith (t , runner )))
261+ )
262+ .onErrorMap (RuntimeException .class , DefaultReactiveNeo4jClient .this ::potentiallyConvertRuntimeException );
251263 }
252264
253265 Mono <ResultSummary > run () {
254266
255- return doInQueryRunnerForMono (targetDatabase , runner -> prepareStatement ().flatMap (t -> {
256- RxResult rxResult = runner .run (t .getT1 (), t .getT2 ());
257- return Flux .from (rxResult .records ()).then (Mono .from (rxResult .consume ()).map (ResultSummaries ::process ));
258- })).onErrorMap (RuntimeException .class , DefaultReactiveNeo4jClient .this ::potentiallyConvertRuntimeException );
267+ return targetDatabase .flatMap (databaseSelection ->
268+ doInQueryRunnerForMono (databaseSelection .getValue (), runner -> prepareStatement ().flatMap (t -> {
269+ RxResult rxResult = runner .run (t .getT1 (), t .getT2 ());
270+ return Flux .from (rxResult .records ()).then (Mono .from (rxResult .consume ()).map (ResultSummaries ::process ));
271+ }))).onErrorMap (RuntimeException .class , DefaultReactiveNeo4jClient .this ::potentiallyConvertRuntimeException );
259272 }
260273 }
261274
0 commit comments