|
17 | 17 |
|
18 | 18 | import java.util.Collection; |
19 | 19 | import java.util.Collections; |
20 | | -import java.util.List; |
21 | 20 | import java.util.Map; |
22 | | -import java.util.concurrent.CompletableFuture; |
23 | 21 | import java.util.concurrent.atomic.AtomicLong; |
24 | | -import java.util.function.Consumer; |
25 | | -import java.util.function.Function; |
26 | | -import java.util.stream.Stream; |
27 | 22 |
|
28 | 23 | import org.junit.jupiter.api.BeforeAll; |
29 | 24 | import org.junit.jupiter.api.BeforeEach; |
30 | 25 | import org.junit.jupiter.api.Disabled; |
31 | 26 | import org.junit.jupiter.api.Test; |
32 | 27 | import org.neo4j.cypherdsl.core.Cypher; |
33 | 28 | import org.neo4j.cypherdsl.core.Node; |
34 | | -import org.neo4j.cypherdsl.core.Statement; |
35 | | -import org.neo4j.cypherdsl.core.executables.ExecutableResultStatement; |
36 | 29 | import org.neo4j.driver.Driver; |
37 | | -import org.neo4j.driver.Query; |
38 | | -import org.neo4j.driver.Record; |
39 | 30 | import org.neo4j.driver.Session; |
40 | | -import org.neo4j.driver.SimpleQueryRunner; |
41 | 31 | import org.neo4j.driver.Transaction; |
42 | | -import org.neo4j.driver.async.AsyncQueryRunner; |
43 | | -import org.neo4j.driver.reactivestreams.ReactiveQueryRunner; |
44 | 32 | import org.neo4j.driver.reactivestreams.ReactiveResult; |
45 | 33 | import org.neo4j.driver.reactivestreams.ReactiveSession; |
46 | | -import org.neo4j.driver.summary.ResultSummary; |
47 | | -import org.reactivestreams.Publisher; |
48 | 34 | import reactor.blockhound.BlockHound; |
49 | 35 | import reactor.core.publisher.Flux; |
50 | 36 | import reactor.core.publisher.Mono; |
@@ -154,15 +140,15 @@ void clientShouldIntegrateWithCypherDSL(@Autowired TransactionalOperator transac |
154 | 140 | Cypher.mapOf("value", |
155 | 141 | Cypher.literalOf(23).multiply(Cypher.literalOf(2)).subtract(Cypher.literalOf(4)))) |
156 | 142 | .named("n"); |
157 | | - NewReactiveExecutableResultStatement statement = new NewReactiveExecutableResultStatement(namedAnswer); |
| 143 | + var cypher = Cypher.create(namedAnswer).returning(namedAnswer).build().getCypher(); |
158 | 144 |
|
159 | 145 | AtomicLong vanishedId = new AtomicLong(); |
160 | 146 | transactionalOperator.execute(transaction -> { |
161 | | - Flux<Long> inner = client.getQueryRunner() |
162 | | - .flatMapMany(statement::fetchWith) |
| 147 | + var inner = client.getQueryRunner() |
| 148 | + .flatMap(qr -> Mono.from(qr.run(cypher))) |
| 149 | + .flatMapMany(r -> Flux.from(r.records())) |
163 | 150 | .doOnNext(r -> vanishedId.set(TestIdentitySupport.getInternalId(r.get("n").asNode()))) |
164 | 151 | .map(record -> record.get("n").get("value").asLong()); |
165 | | - |
166 | 152 | transaction.setRollbackOnly(); |
167 | 153 | return inner; |
168 | 154 | }).as(StepVerifier::create).expectNext(42L).verifyComplete(); |
@@ -221,70 +207,4 @@ public boolean isCypher5Compatible() { |
221 | 207 |
|
222 | 208 | } |
223 | 209 |
|
224 | | - private static class NewReactiveExecutableResultStatement implements ExecutableResultStatement { |
225 | | - |
226 | | - private final Statement delegate; |
227 | | - |
228 | | - NewReactiveExecutableResultStatement(Node namedAnswer) { |
229 | | - this.delegate = Cypher.create(namedAnswer).returning(namedAnswer).build(); |
230 | | - } |
231 | | - |
232 | | - /** |
233 | | - * This method should move into a future Cypher-DSL version. |
234 | | - * @param reactiveQueryRunner The runner to run the statement with |
235 | | - * @return a publisher of records |
236 | | - */ |
237 | | - Publisher<Record> fetchWith(ReactiveQueryRunner reactiveQueryRunner) { |
238 | | - return Mono.fromCallable(this::createQuery) |
239 | | - .flatMapMany(reactiveQueryRunner::run) |
240 | | - .flatMap(ReactiveResult::records); |
241 | | - } |
242 | | - |
243 | | - @Override |
244 | | - public <T> List<T> fetchWith(SimpleQueryRunner queryRunner, Function<Record, T> function) { |
245 | | - throw new UnsupportedOperationException(); |
246 | | - } |
247 | | - |
248 | | - @Override |
249 | | - public <T> CompletableFuture<List<T>> fetchWith(AsyncQueryRunner asyncQueryRunner, |
250 | | - Function<Record, T> function) { |
251 | | - throw new UnsupportedOperationException(); |
252 | | - } |
253 | | - |
254 | | - @Override |
255 | | - public ResultSummary streamWith(SimpleQueryRunner queryRunner, Consumer<Stream<Record>> consumer) { |
256 | | - throw new UnsupportedOperationException(); |
257 | | - } |
258 | | - |
259 | | - @Override |
260 | | - public ResultSummary executeWith(SimpleQueryRunner queryRunner) { |
261 | | - throw new UnsupportedOperationException(); |
262 | | - } |
263 | | - |
264 | | - @Override |
265 | | - public CompletableFuture<ResultSummary> executeWith(AsyncQueryRunner queryRunner) { |
266 | | - throw new UnsupportedOperationException(); |
267 | | - } |
268 | | - |
269 | | - Query createQuery() { |
270 | | - return new Query(this.delegate.getCypher(), this.delegate.getCatalog().getParameters()); |
271 | | - } |
272 | | - |
273 | | - @Override |
274 | | - public Map<String, Object> getParameters() { |
275 | | - return this.delegate.getCatalog().getParameters(); |
276 | | - } |
277 | | - |
278 | | - @Override |
279 | | - public Collection<String> getParameterNames() { |
280 | | - return this.delegate.getCatalog().getParameterNames(); |
281 | | - } |
282 | | - |
283 | | - @Override |
284 | | - public String getCypher() { |
285 | | - return this.delegate.getCypher(); |
286 | | - } |
287 | | - |
288 | | - } |
289 | | - |
290 | 210 | } |
0 commit comments