2626
2727abstract class AbstractSubscription <TResult > implements Subscription {
2828 private static final Logger LOGGER = Loggers .getLogger ("client" );
29+ private static final Object NULL_PLACEHOLDER = new Object ();
2930 private final Observer <? super TResult > observer ;
3031
3132 /* protected by `this` */
@@ -36,7 +37,7 @@ abstract class AbstractSubscription<TResult> implements Subscription {
3637 private boolean isTerminated = false ;
3738 /* protected by `this` */
3839
39- private final ConcurrentLinkedQueue <TResult > resultsQueue = new ConcurrentLinkedQueue <TResult >();
40+ private final ConcurrentLinkedQueue <Object > resultsQueue = new ConcurrentLinkedQueue <Object >();
4041
4142 AbstractSubscription (final Observer <? super TResult > observer ) {
4243 this .observer = observer ;
@@ -109,14 +110,18 @@ synchronized long getRequested() {
109110 }
110111
111112 void addToQueue (@ Nullable final TResult result ) {
112- if (result != null ) {
113+ if (result == null ) {
114+ resultsQueue .add (NULL_PLACEHOLDER );
115+ } else {
113116 resultsQueue .add (result );
114117 }
115118 }
116119
117120 void addToQueue (@ Nullable final List <TResult > results ) {
118121 if (results != null ) {
119- resultsQueue .addAll (results );
122+ for (TResult cur : results ) {
123+ addToQueue (cur );
124+ }
120125 }
121126 }
122127
@@ -134,8 +139,11 @@ void onError(final Throwable t) {
134139 }
135140 }
136141
137- void onNext (final TResult next ) {
142+ private void onNext (@ Nullable final TResult next ) {
138143 if (!isTerminated ()) {
144+ if (next == null ) {
145+ throw new NullPointerException ();
146+ }
139147 try {
140148 observer .onNext (next );
141149 } catch (Throwable t ) {
@@ -145,7 +153,7 @@ void onNext(final TResult next) {
145153 }
146154 }
147155
148- void onComplete () {
156+ private void onComplete () {
149157 if (terminalAction ()) {
150158 postTerminate ();
151159 try {
@@ -173,6 +181,7 @@ void tryProcessResultsQueue() {
173181 }
174182 }
175183
184+ @ SuppressWarnings ("unchecked" )
176185 private void processResultsQueue () {
177186 boolean mustProcess = false ;
178187
@@ -207,11 +216,11 @@ private void processResultsQueue() {
207216 processedCount = 0 ;
208217
209218 while (localWanted > 0 ) {
210- TResult item = resultsQueue .poll ();
219+ Object item = resultsQueue .poll ();
211220 if (item == null ) {
212221 break ;
213222 } else {
214- onNext (item );
223+ onNext (item == NULL_PLACEHOLDER ? null : ( TResult ) item );
215224 localWanted -= 1 ;
216225 processedCount += 1 ;
217226 }
0 commit comments