1515 */
1616package org .springframework .data .r2dbc .repository .query ;
1717
18+ import org .reactivestreams .Publisher ;
1819import reactor .core .publisher .Flux ;
1920import reactor .core .publisher .Mono ;
20- import reactor .core .publisher .MonoProcessor ;
2121
2222import java .util .ArrayList ;
2323import java .util .List ;
24+ import java .util .Map ;
25+ import java .util .Optional ;
26+ import java .util .concurrent .ConcurrentHashMap ;
2427
2528import org .springframework .data .relational .repository .query .RelationalParametersParameterAccessor ;
2629import org .springframework .data .repository .util .ReactiveWrapperConverters ;
3134 * to reactive parameter wrapper types upon creation. This class performs synchronization when accessing parameters.
3235 *
3336 * @author Mark Paluch
37+ * @author Christoph Strobl
3438 */
3539class R2dbcParameterAccessor extends RelationalParametersParameterAccessor {
3640
3741 private final Object [] values ;
38- private final List < MonoProcessor <?>> subscriptions ;
42+ private final R2dbcQueryMethod method ;
3943
4044 /**
4145 * Creates a new {@link R2dbcParameterAccessor}.
@@ -45,37 +49,7 @@ public R2dbcParameterAccessor(R2dbcQueryMethod method, Object... values) {
4549 super (method , values );
4650
4751 this .values = values ;
48- this .subscriptions = new ArrayList <>(values .length );
49-
50- for (int i = 0 ; i < values .length ; i ++) {
51-
52- Object value = values [i ];
53-
54- if (value == null || !ReactiveWrappers .supports (value .getClass ())) {
55- subscriptions .add (null );
56- continue ;
57- }
58-
59- if (ReactiveWrappers .isSingleValueType (value .getClass ())) {
60- subscriptions .add (ReactiveWrapperConverters .toWrapper (value , Mono .class ).toProcessor ());
61- } else {
62- subscriptions .add (ReactiveWrapperConverters .toWrapper (value , Flux .class ).collectList ().toProcessor ());
63- }
64- }
65- }
66-
67- /* (non-Javadoc)
68- * @see org.springframework.data.repository.query.ParametersParameterAccessor#getValue(int)
69- */
70- @ SuppressWarnings ("unchecked" )
71- @ Override
72- protected <T > T getValue (int index ) {
73-
74- if (subscriptions .get (index ) != null ) {
75- return (T ) subscriptions .get (index ).block ();
76- }
77-
78- return super .getValue (index );
52+ this .method = method ;
7953 }
8054
8155 /* (non-Javadoc)
@@ -97,4 +71,61 @@ public Object[] getValues() {
9771 public Object getBindableValue (int index ) {
9872 return getValue (getParameters ().getBindableParameter (index ).getIndex ());
9973 }
74+
75+ /**
76+ * Resolve parameters that were provided through reactive wrapper types. Flux is collected into a list, values from
77+ * Mono's are used directly.
78+ *
79+ * @return
80+ */
81+ @ SuppressWarnings ("unchecked" )
82+ public Mono <R2dbcParameterAccessor > resolveParameters () {
83+
84+ boolean hasReactiveWrapper = false ;
85+
86+ for (Object value : values ) {
87+ if (value == null || !ReactiveWrappers .supports (value .getClass ())) {
88+ continue ;
89+ }
90+
91+ hasReactiveWrapper = true ;
92+ break ;
93+ }
94+
95+ if (!hasReactiveWrapper ) {
96+ return Mono .just (this );
97+ }
98+
99+ Object [] resolved = new Object [values .length ];
100+ Map <Integer , Optional <?>> holder = new ConcurrentHashMap <>();
101+ List <Publisher <?>> publishers = new ArrayList <>();
102+
103+ for (int i = 0 ; i < values .length ; i ++) {
104+
105+ Object value = resolved [i ] = values [i ];
106+ if (value == null || !ReactiveWrappers .supports (value .getClass ())) {
107+ continue ;
108+ }
109+
110+ if (ReactiveWrappers .isSingleValueType (value .getClass ())) {
111+
112+ int index = i ;
113+ publishers .add (ReactiveWrapperConverters .toWrapper (value , Mono .class ) //
114+ .map (Optional ::of ) //
115+ .defaultIfEmpty (Optional .empty ()) //
116+ .doOnNext (it -> holder .put (index , (Optional <?>) it )));
117+ } else {
118+
119+ int index = i ;
120+ publishers .add (ReactiveWrapperConverters .toWrapper (value , Flux .class ) //
121+ .collectList () //
122+ .doOnNext (it -> holder .put (index , Optional .of (it ))));
123+ }
124+ }
125+
126+ return Flux .merge (publishers ).then ().thenReturn (resolved ).map (values -> {
127+ holder .forEach ((index , v ) -> values [index ] = v .orElse (null ));
128+ return new R2dbcParameterAccessor (method , values );
129+ });
130+ }
100131}
0 commit comments