55package kotlinx.coroutines.reactive
66
77import kotlinx.coroutines.*
8- import org.junit.*
9- import org.junit.runner.*
10- import org.junit.runners.*
118import org.reactivestreams.*
129import org.reactivestreams.tck.*
10+ import org.testng.*
11+ import org.testng.annotations.*
1312
14- @RunWith(Parameterized ::class )
15- class ReactiveStreamTckTest (
16- private val dispatcher : Dispatcher
17- ) : PublisherVerification<Long>(TestEnvironment ()) {
1813
19- enum class Dispatcher (val dispatcher : CoroutineDispatcher ) {
20- DEFAULT (Dispatchers .Default ),
21- UNCONFINED (Dispatchers .Unconfined )
22- }
23-
24- private val scope = CoroutineScope (dispatcher.dispatcher)
25-
26- companion object {
27- @Parameterized.Parameters (name = " {0}" )
28- @JvmStatic
29- fun params (): Collection <Array <Any >> = Dispatcher .values().map { arrayOf<Any >(it) }
30- }
31-
32- override fun createPublisher (elements : Long ): Publisher <Long > =
33- scope.publish {
34- for (i in 1 .. elements) send(i)
35- }
36-
37- override fun createFailedPublisher (): Publisher <Long > =
38- scope.publish {
39- throw TestException ()
40- }
41-
42- @Before
43- override fun setUp () {
44- super .setUp()
45- }
46-
47- @Test
48- override fun required_spec306_afterSubscriptionIsCancelledRequestMustBeNops () {
49- super .required_spec306_afterSubscriptionIsCancelledRequestMustBeNops()
50- }
51-
52- @Test
53- override fun required_spec303_mustNotAllowUnboundedRecursion () {
54- super .required_spec303_mustNotAllowUnboundedRecursion()
55- }
56-
57- @Test
58- override fun required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled () {
59- super .required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled()
60- }
61-
62- @Test
63- override fun required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe () {
64- super .required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe()
65- }
66-
67- @Test
68- override fun required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe () {
69- super .required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe()
70- }
71-
72- @Test
73- override fun required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber () {
74- // This test fails on default dispatcher because it retains a reference to the last task
75- // in the structure of its GlobalQueue
76- // So we skip it with the default dispatcher.
77- // todo: remove it when CoroutinesScheduler is improved
78- if (dispatcher == Dispatcher .DEFAULT ) return
79- super .required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber()
80- }
81-
82- @Test
83- override fun required_validate_boundedDepthOfOnNextAndRequestRecursion () {
84- super .required_validate_boundedDepthOfOnNextAndRequestRecursion()
85- }
86-
87- @Test
88- override fun required_spec317_mustSupportAPendingElementCountUpToLongMaxValue () {
89- super .required_spec317_mustSupportAPendingElementCountUpToLongMaxValue()
90- }
91-
92- @Test
93- override fun required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue () {
94- super .required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue()
95- }
96-
97- @Test
98- override fun required_validate_maxElementsFromPublisher () {
99- super .required_validate_maxElementsFromPublisher()
100- }
101-
102- @Test
103- @Ignore // This OPTIONAL requirement is not implemented, which is fine
104- override fun optional_spec105_emptyStreamMustTerminateBySignallingOnComplete () {
105- super .optional_spec105_emptyStreamMustTerminateBySignallingOnComplete()
106- }
107-
108- @Test
109- override fun required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates () {
110- super .required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates()
111- }
112-
113- @Test
114- override fun optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals () {
115- super .optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals()
116- }
117-
118- @Test
119- override fun required_spec102_maySignalLessThanRequestedAndTerminateSubscription () {
120- super .required_spec102_maySignalLessThanRequestedAndTerminateSubscription()
121- }
122-
123- @Test
124- override fun required_createPublisher3MustProduceAStreamOfExactly3Elements () {
125- super .required_createPublisher3MustProduceAStreamOfExactly3Elements()
126- }
14+ class ReactiveStreamTckTest {
12715
128- @Test
129- override fun optional_spec111_maySupportMultiSubscribe () {
130- super .optional_spec111_maySupportMultiSubscribe( )
16+ @Factory(dataProvider = " dispatchers " )
17+ fun createTests ( dispatcher : Dispatcher ): Array < Any > {
18+ return arrayOf( ReactiveStreamTckTestSuite (dispatcher) )
13119 }
13220
133- @Test
134- override fun stochastic_spec103_mustSignalOnMethodsSequentially () {
135- super .stochastic_spec103_mustSignalOnMethodsSequentially()
136- }
21+ @DataProvider(name = " dispatchers" )
22+ public fun dispatchers (): Array <Array <Any >> = Dispatcher .values().map { arrayOf<Any >(it) }.toTypedArray()
13723
138- @Test
139- override fun required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops () {
140- super .required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops()
141- }
14224
143- @Test
144- override fun required_createPublisher1MustProduceAStreamOfExactly1Element () {
145- super .required_createPublisher1MustProduceAStreamOfExactly1Element()
146- }
25+ public class ReactiveStreamTckTestSuite (
26+ private val dispatcher : Dispatcher
27+ ) : PublisherVerification<Long>(TestEnvironment ()) {
14728
148- @Test
149- override fun optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne () {
150- super .optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne()
151- }
29+ private val scope = CoroutineScope (dispatcher.dispatcher + NonCancellable )
15230
153- @Test
154- override fun optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront () {
155- super .optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront( )
156- }
31+ override fun createPublisher ( elements : Long ): Publisher < Long > =
32+ scope.publish {
33+ for (i in 1 .. elements) send(i )
34+ }
15735
158- @Test
159- override fun required_spec309_requestNegativeNumberMustSignalIllegalArgumentException () {
160- super .required_spec309_requestNegativeNumberMustSignalIllegalArgumentException ()
161- }
36+ override fun createFailedPublisher (): Publisher < Long > =
37+ scope.publish {
38+ throw TestException ()
39+ }
16240
163- @Test
164- override fun required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling () {
165- super .required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling()
166- }
167-
168- @Test
169- override fun required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue () {
170- super .required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue()
171- }
172-
173- @Test
174- override fun optional_spec104_mustSignalOnErrorWhenFails () {
175- super .optional_spec104_mustSignalOnErrorWhenFails()
176- }
177-
178- @Test
179- override fun required_spec309_requestZeroMustSignalIllegalArgumentException () {
180- super .required_spec309_requestZeroMustSignalIllegalArgumentException()
181- }
182-
183- @Test
184- override fun optional_spec309_requestNegativeNumberMaySignalIllegalArgumentExceptionWithSpecificMessage () {
185- super .optional_spec309_requestNegativeNumberMaySignalIllegalArgumentExceptionWithSpecificMessage()
186- }
187-
188- @Test
189- override fun required_spec109_subscribeThrowNPEOnNullSubscriber () {
190- super .required_spec109_subscribeThrowNPEOnNullSubscriber()
191- }
192-
193- @Test
194- override fun optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected () {
195- super .optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected()
196- }
41+ @Test
42+ public override fun required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber () {
43+ // This test fails on default dispatcher because it retains a reference to the last task
44+ // in the structure of its GlobalQueue
45+ // So we skip it with the default dispatcher.
46+ // todo: remove it when CoroutinesScheduler is improved
47+ if (dispatcher == Dispatcher .DEFAULT ) return
48+ super .required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber()
49+ }
19750
198- @Test
199- override fun required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements () {
200- super .required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements( )
201- }
51+ @Test
52+ public override fun optional_spec105_emptyStreamMustTerminateBySignallingOnComplete () {
53+ throw SkipException ( " Skipped " )
54+ }
20255
203- @Test
204- override fun required_spec109_mustIssueOnSubscribeForNonNullSubscriber () {
205- super .required_spec109_mustIssueOnSubscribeForNonNullSubscriber()
56+ class TestException : Exception ()
20657 }
58+ }
20759
208- class TestException : Exception ()
209- }
60+ enum class Dispatcher (val dispatcher : CoroutineDispatcher ) {
61+ DEFAULT (Dispatchers .Default ),
62+ UNCONFINED (Dispatchers .Unconfined )
63+ }
0 commit comments