|
15 | 15 | */ |
16 | 16 | package org.springframework.data.r2dbc.core; |
17 | 17 |
|
| 18 | +import static io.netty.buffer.ByteBufUtil.*; |
18 | 19 | import static org.assertj.core.api.Assertions.*; |
19 | 20 | import static org.springframework.data.relational.core.query.Criteria.*; |
20 | 21 |
|
| 22 | +import io.netty.buffer.ByteBufUtil; |
| 23 | +import io.netty.buffer.Unpooled; |
21 | 24 | import io.r2dbc.postgresql.PostgresqlConnectionConfiguration; |
22 | 25 | import io.r2dbc.postgresql.PostgresqlConnectionFactory; |
23 | 26 | import io.r2dbc.postgresql.codec.Box; |
|
30 | 33 | import io.r2dbc.postgresql.codec.Point; |
31 | 34 | import io.r2dbc.postgresql.codec.Polygon; |
32 | 35 | import io.r2dbc.postgresql.extension.CodecRegistrar; |
| 36 | +import io.r2dbc.spi.Blob; |
33 | 37 | import io.r2dbc.spi.ConnectionFactory; |
34 | 38 | import lombok.AllArgsConstructor; |
35 | 39 | import lombok.Data; |
| 40 | +import reactor.core.publisher.Flux; |
| 41 | +import reactor.core.publisher.Mono; |
36 | 42 | import reactor.test.StepVerifier; |
37 | 43 |
|
| 44 | +import java.nio.ByteBuffer; |
| 45 | +import java.nio.charset.StandardCharsets; |
38 | 46 | import java.time.Duration; |
39 | 47 | import java.util.Arrays; |
40 | 48 | import java.util.Collections; |
41 | 49 | import java.util.List; |
| 50 | +import java.util.concurrent.CompletableFuture; |
42 | 51 | import java.util.function.Consumer; |
43 | 52 |
|
44 | 53 | import javax.sql.DataSource; |
45 | 54 |
|
46 | 55 | import org.junit.jupiter.api.BeforeEach; |
47 | 56 | import org.junit.jupiter.api.Test; |
48 | 57 | import org.junit.jupiter.api.extension.RegisterExtension; |
49 | | - |
50 | 58 | import org.springframework.dao.DataAccessException; |
51 | 59 | import org.springframework.data.annotation.Id; |
52 | 60 | import org.springframework.data.r2dbc.convert.EnumWriteSupport; |
@@ -82,6 +90,13 @@ void before() { |
82 | 90 | + "primitive_array INT[]," // |
83 | 91 | + "multidimensional_array INT[]," // |
84 | 92 | + "collection_array INT[][])"); |
| 93 | + |
| 94 | + template.execute("DROP TABLE IF EXISTS with_blobs"); |
| 95 | + template.execute("CREATE TABLE with_blobs (" // |
| 96 | + + "id serial PRIMARY KEY," // |
| 97 | + + "byte_array bytea," // |
| 98 | + + "byte_buffer bytea," // |
| 99 | + + "byte_blob bytea)"); |
85 | 100 | } |
86 | 101 |
|
87 | 102 | @Test // gh-30 |
@@ -255,9 +270,9 @@ void shouldReadAndWriteInterval() { |
255 | 270 |
|
256 | 271 | template.execute("DROP TABLE IF EXISTS with_interval"); |
257 | 272 | template.execute("CREATE TABLE with_interval (" // |
258 | | - + "id serial PRIMARY KEY," // |
259 | | - + "interval INTERVAL" // |
260 | | - + ")"); |
| 273 | + + "id serial PRIMARY KEY," // |
| 274 | + + "interval INTERVAL" // |
| 275 | + + ")"); |
261 | 276 |
|
262 | 277 | R2dbcEntityTemplate template = new R2dbcEntityTemplate(client, |
263 | 278 | new DefaultReactiveDataAccessStrategy(PostgresDialect.INSTANCE)); |
@@ -289,6 +304,62 @@ private void selectAndAssert(Consumer<? super EntityWithArrays> assertion) { |
289 | 304 | .consumeNextWith(assertion).verifyComplete(); |
290 | 305 | } |
291 | 306 |
|
| 307 | + @Test // gh-1408 |
| 308 | + void shouldReadAndWriteBlobs() { |
| 309 | + |
| 310 | + R2dbcEntityTemplate template = new R2dbcEntityTemplate(client, |
| 311 | + new DefaultReactiveDataAccessStrategy(PostgresDialect.INSTANCE)); |
| 312 | + |
| 313 | + WithBlobs withBlobs = new WithBlobs(); |
| 314 | + byte[] content = "123ä".getBytes(StandardCharsets.UTF_8); |
| 315 | + |
| 316 | + withBlobs.byteArray = content; |
| 317 | + withBlobs.byteBuffer = ByteBuffer.wrap(content); |
| 318 | + withBlobs.byteBlob = Blob.from(Mono.just(ByteBuffer.wrap(content))); |
| 319 | + |
| 320 | + template.insert(withBlobs) // |
| 321 | + .as(StepVerifier::create) // |
| 322 | + .expectNextCount(1) // |
| 323 | + .verifyComplete(); |
| 324 | + |
| 325 | + template.selectOne(Query.empty(), WithBlobs.class) // |
| 326 | + .flatMap(it -> { |
| 327 | + return Flux.from(it.byteBlob.stream()).last().map(blob -> { |
| 328 | + it.byteBlob = Blob.from(Mono.just(blob)); |
| 329 | + return it; |
| 330 | + }); |
| 331 | + }).as(StepVerifier::create) // |
| 332 | + .consumeNextWith(actual -> { |
| 333 | + |
| 334 | + CompletableFuture<byte[]> cf = Mono.from(actual.byteBlob.stream()).map(Unpooled::wrappedBuffer) |
| 335 | + .map(ByteBufUtil::getBytes).toFuture(); |
| 336 | + assertThat(actual.getByteArray()).isEqualTo(content); |
| 337 | + assertThat(getBytes(Unpooled.wrappedBuffer(actual.getByteBuffer()))).isEqualTo(content); |
| 338 | + assertThat(cf.join()).isEqualTo(content); |
| 339 | + }).verifyComplete(); |
| 340 | + |
| 341 | + template.selectOne(Query.empty(), WithBlobs.class) |
| 342 | + .doOnNext(it -> it.byteArray = "foo".getBytes(StandardCharsets.UTF_8)).flatMap(template::update) // |
| 343 | + .as(StepVerifier::create) // |
| 344 | + .expectNextCount(1).verifyComplete(); |
| 345 | + |
| 346 | + template.selectOne(Query.empty(), WithBlobs.class) // |
| 347 | + .flatMap(it -> { |
| 348 | + return Flux.from(it.byteBlob.stream()).last().map(blob -> { |
| 349 | + it.byteBlob = Blob.from(Mono.just(blob)); |
| 350 | + return it; |
| 351 | + }); |
| 352 | + }).as(StepVerifier::create) // |
| 353 | + .consumeNextWith(actual -> { |
| 354 | + |
| 355 | + CompletableFuture<byte[]> cf = Mono.from(actual.byteBlob.stream()).map(Unpooled::wrappedBuffer) |
| 356 | + .map(ByteBufUtil::getBytes).toFuture(); |
| 357 | + assertThat(actual.getByteArray()).isEqualTo("foo".getBytes(StandardCharsets.UTF_8)); |
| 358 | + assertThat(getBytes(Unpooled.wrappedBuffer(actual.getByteBuffer()))).isEqualTo(content); |
| 359 | + assertThat(cf.join()).isEqualTo(content); |
| 360 | + }).verifyComplete(); |
| 361 | + } |
| 362 | + |
292 | 363 | @Data |
293 | 364 | @AllArgsConstructor |
294 | 365 | static class EntityWithEnum { |
@@ -336,4 +407,16 @@ static class EntityWithInterval { |
336 | 407 |
|
337 | 408 | } |
338 | 409 |
|
| 410 | + @Data |
| 411 | + @Table("with_blobs") |
| 412 | + static class WithBlobs { |
| 413 | + |
| 414 | + @Id Integer id; |
| 415 | + |
| 416 | + byte[] byteArray; |
| 417 | + ByteBuffer byteBuffer; |
| 418 | + Blob byteBlob; |
| 419 | + |
| 420 | + } |
| 421 | + |
339 | 422 | } |
0 commit comments