diff --git a/work/chapter-1/src/test/java/com/example/part_3/Part3MultithreadingParallelizationTest.java b/work/chapter-1/src/test/java/com/example/part_3/Part3MultithreadingParallelizationTest.java index ffe5bee..3d80f8b 100644 --- a/work/chapter-1/src/test/java/com/example/part_3/Part3MultithreadingParallelizationTest.java +++ b/work/chapter-1/src/test/java/com/example/part_3/Part3MultithreadingParallelizationTest.java @@ -7,6 +7,7 @@ import java.time.Duration; import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicReference; import static com.example.part_3.Part3MultithreadingParallelization.*; @@ -14,19 +15,27 @@ public class Part3MultithreadingParallelizationTest { @Test public void publishOnParallelThreadSchedulerTest() { - Thread[] threads = new Thread[2]; + AtomicReference subscribeOnThread = new AtomicReference<>(); + AtomicReference publishOnThread = new AtomicReference<>(); StepVerifier - .create(publishOnParallelThreadScheduler(Flux.defer(() -> { - threads[0] = Thread.currentThread(); - return Flux.just("Hello"); - }))) - .expectSubscription() - .expectNext("Hello") - .verifyComplete(); + .create(publishOnParallelThreadScheduler(Flux.defer(() -> { + subscribeOnThread.set(Thread.currentThread()); + return Flux.just("Hello"); + })).map(value -> { + publishOnThread.set(Thread.currentThread()); + return value; + })) + .expectSubscription() + .expectNext("Hello") + .verifyComplete(); - Assert.assertTrue( - "Expected execution on different Threads", - !threads[0].equals(threads[1]) + Assert.assertEquals( + "Expected subscribeOn in the same Thread", + subscribeOnThread.get(), Thread.currentThread() + ); + Assert.assertNotEquals( + "Expected publishOn different Threads", + publishOnThread.get(), Thread.currentThread() ); }