2020import static org .springframework .data .redis .util .ByteUtils .*;
2121
2222import reactor .core .Disposable ;
23- import reactor .core .publisher .DirectProcessor ;
2423import reactor .core .publisher .Flux ;
2524import reactor .core .publisher .Mono ;
26- import reactor .core .publisher .MonoProcessor ;
25+ import reactor .core .publisher .Sinks ;
2726import reactor .test .StepVerifier ;
2827
2928import java .nio .ByteBuffer ;
@@ -92,8 +91,7 @@ void shouldSubscribeToMultiplePatterns() {
9291 container = createContainer ();
9392
9493 container .receive (PatternTopic .of ("foo*" ), PatternTopic .of ("bar*" )).as (StepVerifier ::create ).thenRequest (1 )
95- .thenAwait ()
96- .thenCancel ().verify ();
94+ .thenAwait ().thenCancel ().verify ();
9795
9896 verify (subscriptionMock ).pSubscribe (getByteBuffer ("foo*" ), getByteBuffer ("bar*" ));
9997 }
@@ -124,15 +122,15 @@ void shouldSubscribeToMultipleChannels() {
124122 @ Test // DATAREDIS-612
125123 void shouldEmitChannelMessage () {
126124
127- DirectProcessor <Message <ByteBuffer , ByteBuffer >> processor = DirectProcessor . create ();
125+ Sinks . Many <Message <ByteBuffer , ByteBuffer >> sink = Sinks . many (). unicast (). onBackpressureBuffer ();
128126
129- when (subscriptionMock .receive ()).thenReturn (processor );
127+ when (subscriptionMock .receive ()).thenReturn (sink . asFlux () );
130128 container = createContainer ();
131129
132130 Flux <Message <String , String >> messageStream = container .receive (ChannelTopic .of ("foo" ));
133131
134132 messageStream .as (StepVerifier ::create ).then (() -> {
135- processor . onNext (createChannelMessage ("foo" , "message" ));
133+ sink . tryEmitNext (createChannelMessage ("foo" , "message" ));
136134 }).assertNext (msg -> {
137135
138136 assertThat (msg .getChannel ()).isEqualTo ("foo" );
@@ -143,15 +141,15 @@ void shouldEmitChannelMessage() {
143141 @ Test // DATAREDIS-612
144142 void shouldEmitPatternMessage () {
145143
146- DirectProcessor <Message <ByteBuffer , ByteBuffer >> processor = DirectProcessor . create ();
144+ Sinks . Many <Message <ByteBuffer , ByteBuffer >> sink = Sinks . many (). unicast (). onBackpressureBuffer ();
147145
148- when (subscriptionMock .receive ()).thenReturn (processor );
146+ when (subscriptionMock .receive ()).thenReturn (sink . asFlux () );
149147 container = createContainer ();
150148
151149 Flux <PatternMessage <String , String , String >> messageStream = container .receive (PatternTopic .of ("foo*" ));
152150
153151 messageStream .as (StepVerifier ::create ).then (() -> {
154- processor . onNext (createPatternMessage ("foo*" , "foo" , "message" ));
152+ sink . tryEmitNext (createPatternMessage ("foo*" , "foo" , "message" ));
155153 }).assertNext (msg -> {
156154
157155 assertThat (msg .getPattern ()).isEqualTo ("foo*" );
@@ -163,20 +161,22 @@ void shouldEmitPatternMessage() {
163161 @ Test // DATAREDIS-612
164162 void shouldRegisterSubscription () {
165163
166- MonoProcessor <Void > subscribeMono = MonoProcessor .create ();
164+ Sinks .Many <Message <ByteBuffer , ByteBuffer >> sink = Sinks .many ().multicast ().onBackpressureBuffer ();
165+
166+ Sinks .One <Void > subscribeMono = Sinks .one ();
167167
168168 reset (subscriptionMock );
169- when (subscriptionMock .subscribe (any ())).thenReturn (subscribeMono );
169+ when (subscriptionMock .subscribe (any ())).thenReturn (subscribeMono . asMono () );
170170 when (subscriptionMock .unsubscribe ()).thenReturn (Mono .empty ());
171- when (subscriptionMock .receive ()).thenReturn (DirectProcessor . create ());
171+ when (subscriptionMock .receive ()).thenReturn (sink . asFlux ());
172172 container = createContainer ();
173173
174174 Flux <Message <String , String >> messageStream = container .receive (ChannelTopic .of ("foo*" ));
175175
176176 Disposable subscription = messageStream .subscribe ();
177177
178178 assertThat (container .getActiveSubscriptions ()).isEmpty ();
179- subscribeMono .onComplete ();
179+ subscribeMono .tryEmitEmpty ();
180180 assertThat (container .getActiveSubscriptions ()).isNotEmpty ();
181181 subscription .dispose ();
182182 assertThat (container .getActiveSubscriptions ()).isEmpty ();
@@ -185,10 +185,12 @@ void shouldRegisterSubscription() {
185185 @ Test // DATAREDIS-612, GH-1622
186186 void shouldRegisterSubscriptionMultipleSubscribers () {
187187
188+ Sinks .Many <Message <ByteBuffer , ByteBuffer >> sink = Sinks .many ().multicast ().onBackpressureBuffer ();
189+
188190 reset (subscriptionMock );
189191 when (subscriptionMock .subscribe (any ())).thenReturn (Mono .empty ());
190192 when (subscriptionMock .unsubscribe ()).thenReturn (Mono .empty ());
191- when (subscriptionMock .receive ()).thenReturn (DirectProcessor . create ());
193+ when (subscriptionMock .receive ()).thenReturn (sink . asFlux ());
192194 container = createContainer ();
193195
194196 Flux <Message <String , String >> messageStream = container .receive (new ChannelTopic ("foo*" ));
@@ -210,7 +212,8 @@ void shouldRegisterSubscriptionMultipleSubscribers() {
210212 @ Test // DATAREDIS-612, GH-1622
211213 void shouldUnsubscribeOnCancel () {
212214
213- when (subscriptionMock .receive ()).thenReturn (DirectProcessor .create ());
215+ Sinks .Many <Message <ByteBuffer , ByteBuffer >> sink = Sinks .many ().unicast ().onBackpressureBuffer ();
216+ when (subscriptionMock .receive ()).thenReturn (sink .asFlux ());
214217 container = createContainer ();
215218
216219 Flux <PatternMessage <String , String , String >> messageStream = container .receive (PatternTopic .of ("foo*" ));
@@ -227,12 +230,12 @@ void shouldUnsubscribeOnCancel() {
227230 @ Test // DATAREDIS-612
228231 void shouldTerminateSubscriptionsOnShutdown () {
229232
230- DirectProcessor <Message <ByteBuffer , ByteBuffer >> processor = DirectProcessor . create ();
233+ Sinks . Many <Message <ByteBuffer , ByteBuffer >> sink = Sinks . many (). unicast (). onBackpressureBuffer ();
231234
232- when (subscriptionMock .receive ()).thenReturn (processor );
235+ when (subscriptionMock .receive ()).thenReturn (sink . asFlux () );
233236 when (subscriptionMock .cancel ()).thenReturn (Mono .defer (() -> {
234237
235- processor . onError (new CancellationException ());
238+ sink . tryEmitError (new CancellationException ());
236239 return Mono .empty ();
237240 }));
238241 container = createContainer ();
@@ -247,19 +250,19 @@ void shouldTerminateSubscriptionsOnShutdown() {
247250 @ Test // DATAREDIS-612
248251 void shouldCleanupDownstream () {
249252
250- DirectProcessor <Message <ByteBuffer , ByteBuffer >> processor = DirectProcessor . create ();
253+ Sinks . Many <Message <ByteBuffer , ByteBuffer >> sink = Sinks . many (). unicast (). onBackpressureBuffer ();
251254
252- when (subscriptionMock .receive ()).thenReturn (processor );
255+ when (subscriptionMock .receive ()).thenReturn (sink . asFlux () );
253256 container = createContainer ();
254257
255258 Flux <PatternMessage <String , String , String >> messageStream = container .receive (PatternTopic .of ("foo*" ));
256259
257260 messageStream .as (StepVerifier ::create ).then (() -> {
258- assertThat (processor . hasDownstreams ()).isTrue ( );
259- processor . onNext (createPatternMessage ("foo*" , "foo" , "message" ));
261+ assertThat (sink . currentSubscriberCount ()).isGreaterThan ( 0 );
262+ sink . tryEmitNext (createPatternMessage ("foo*" , "foo" , "message" ));
260263 }).expectNextCount (1 ).thenCancel ().verify ();
261264
262- assertThat (processor . hasDownstreams ()).isFalse ( );
265+ assertThat (sink . currentSubscriberCount ()).isEqualTo ( 0 );
263266 }
264267
265268 private ReactiveRedisMessageListenerContainer createContainer () {
0 commit comments