@@ -273,6 +273,68 @@ void streamFiltering() {
273273 assertThat (receivedCount )
274274 .hasValueGreaterThanOrEqualTo (2 * messageWaveCount )
275275 .hasValueLessThan (waveCount * messageWaveCount );
276+
277+ consumer .close ();
278+ }
279+
280+ @ Test
281+ void streamFilteringWithClientSideFiltering () {
282+ int messageTotalCount = 1_000 ;
283+ List <String > filterValues = List .of ("apple" , "orange" , "banana" );
284+ Random random = new Random ();
285+ String selection = filterValues .get (random .nextInt (filterValues .size ()));
286+ AtomicInteger selectionMessageCount = new AtomicInteger (0 );
287+ publish (
288+ messageTotalCount ,
289+ msg -> {
290+ String filterValue = filterValues .get (random .nextInt (filterValues .size ()));
291+ if (filterValue .equals (selection )) {
292+ selectionMessageCount .incrementAndGet ();
293+ }
294+ return msg .subject (filterValue ).annotation ("x-stream-filter-value" , filterValue );
295+ });
296+
297+ assertThat (selectionMessageCount ).hasPositiveValue ().hasValueLessThan (messageTotalCount );
298+
299+ AtomicInteger receivedCount = new AtomicInteger (0 );
300+ Consumer consumer =
301+ connection .consumerBuilder ().queue (name ).stream ()
302+ .offset (FIRST )
303+ .filterValues (selection )
304+ .builder ()
305+ .messageHandler (
306+ (ctx , msg ) -> {
307+ receivedCount .incrementAndGet ();
308+ ctx .accept ();
309+ })
310+ .build ();
311+ waitUntilStable (receivedCount ::get );
312+ assertThat (receivedCount )
313+ .hasValueGreaterThanOrEqualTo (selectionMessageCount .get ())
314+ .hasValueLessThan (messageTotalCount );
315+ consumer .close ();
316+
317+ receivedCount .set (0 );
318+ AtomicInteger selectedMessageCount = new AtomicInteger (0 );
319+ consumer =
320+ connection .consumerBuilder ().queue (name ).stream ()
321+ .offset (FIRST )
322+ .filterValues (selection )
323+ .builder ()
324+ .messageHandler (
325+ (ctx , msg ) -> {
326+ receivedCount .incrementAndGet ();
327+ if (selection .equals (msg .subject ())) {
328+ selectedMessageCount .incrementAndGet ();
329+ }
330+ ctx .accept ();
331+ })
332+ .build ();
333+ waitUntilStable (receivedCount ::get );
334+ assertThat (selectedMessageCount )
335+ .hasValue (selectionMessageCount .get ())
336+ .hasValueLessThan (receivedCount .get ())
337+ .hasValueLessThan (messageTotalCount );
276338 consumer .close ();
277339 }
278340
0 commit comments