2929import java .util .concurrent .BlockingQueue ;
3030import java .util .concurrent .TimeUnit ;
3131import java .util .concurrent .TimeoutException ;
32+ import java .util .concurrent .atomic .AtomicReference ;
3233import java .util .logging .Level ;
3334import java .util .logging .Logger ;
3435
@@ -69,7 +70,7 @@ public final class BlockingClientCall<ReqT, RespT> {
6970 private final ThreadSafeThreadlessExecutor executor ;
7071
7172 private boolean writeClosed ;
72- private volatile Status closedStatus ; // null if not closed
73+ private AtomicReference < CloseState > closeState = new AtomicReference <>();
7374
7475 BlockingClientCall (ClientCall <ReqT , RespT > call , ThreadSafeThreadlessExecutor executor ) {
7576 this .call = call ;
@@ -120,22 +121,22 @@ private RespT read(boolean waitForever, long endNanoTime)
120121 logger .finer ("Client Blocking read had value: " + bufferedValue );
121122 }
122123
123- Status currentClosedStatus ;
124+ CloseState currentCloseState ;
124125 if (bufferedValue != null ) {
125126 call .request (1 );
126127 return bufferedValue ;
127- } else if ((currentClosedStatus = closedStatus ) == null ) {
128+ } else if ((currentCloseState = closeState . get () ) == null ) {
128129 throw new IllegalStateException (
129130 "The message disappeared... are you reading from multiple threads?" );
130- } else if (!currentClosedStatus .isOk ()) {
131- throw currentClosedStatus . asException ();
131+ } else if (!currentCloseState . status .isOk ()) {
132+ throw currentCloseState . status . asException (currentCloseState . trailers );
132133 } else {
133134 return null ;
134135 }
135136 }
136137
137138 boolean skipWaitingForRead () {
138- return closedStatus != null || !buffer .isEmpty ();
139+ return closeState . get () != null || !buffer .isEmpty ();
139140 }
140141
141142 /**
@@ -148,11 +149,11 @@ boolean skipWaitingForRead() {
148149 * @throws StatusException If the stream was closed in an error state
149150 */
150151 public boolean hasNext () throws InterruptedException , StatusException {
151- executor .waitAndDrain ((x ) -> !x .buffer .isEmpty () || x .closedStatus != null , this );
152+ executor .waitAndDrain ((x ) -> !x .buffer .isEmpty () || x .closeState . get () != null , this );
152153
153- Status currentClosedStatus = closedStatus ;
154- if (currentClosedStatus != null && !currentClosedStatus .isOk ()) {
155- throw currentClosedStatus . asException ();
154+ CloseState currentCloseState = closeState . get () ;
155+ if (currentCloseState != null && !currentCloseState . status .isOk ()) {
156+ throw currentCloseState . status . asException (currentCloseState . trailers );
156157 }
157158
158159 return !buffer .isEmpty ();
@@ -221,17 +222,16 @@ private boolean write(boolean waitForever, ReqT request, long endNanoTime)
221222 }
222223
223224 Predicate <BlockingClientCall <ReqT , RespT >> predicate =
224- (x ) -> x .call .isReady () || x .closedStatus != null ;
225+ (x ) -> x .call .isReady () || x .closeState . get () != null ;
225226 executor .waitAndDrainWithTimeout (waitForever , endNanoTime , predicate , this );
226- Status savedClosedStatus = closedStatus ;
227- if (savedClosedStatus == null ) {
227+ CloseState savedCloseState = closeState . get () ;
228+ if (savedCloseState == null || savedCloseState . status == null ) {
228229 call .sendMessage (request );
229230 return true ;
230- } else if (savedClosedStatus .isOk ()) {
231+ } else if (savedCloseState . status .isOk ()) {
231232 return false ;
232233 } else {
233- // Propagate any errors returned from the server
234- throw savedClosedStatus .asException ();
234+ throw savedCloseState .status .asException (savedCloseState .trailers );
235235 }
236236 }
237237
@@ -274,7 +274,8 @@ public void halfClose() {
274274 @ VisibleForTesting
275275 Status getClosedStatus () {
276276 drainQuietly ();
277- return closedStatus ;
277+ CloseState state = closeState .get ();
278+ return (state == null ) ? null : state .status ;
278279 }
279280
280281 /**
@@ -317,7 +318,7 @@ boolean isWriteReady() {
317318 * @return True if writes haven't been closed and the server hasn't closed the stream
318319 */
319320 private boolean isWriteLegal () {
320- return !writeClosed && closedStatus == null ;
321+ return !writeClosed && closeState . get () == null ;
321322 }
322323
323324 ClientCall .Listener <RespT > getListener () {
@@ -335,15 +336,25 @@ private void drainQuietly() {
335336 private final class QueuingListener extends ClientCall .Listener <RespT > {
336337 @ Override
337338 public void onMessage (RespT value ) {
338- Preconditions .checkState (closedStatus == null , "ClientCall already closed" );
339+ Preconditions .checkState (closeState . get () == null , "ClientCall already closed" );
339340 buffer .add (value );
340341 }
341342
342343 @ Override
343344 public void onClose (Status status , Metadata trailers ) {
344- Preconditions .checkState (closedStatus == null , "ClientCall already closed" );
345- closedStatus = status ;
345+ CloseState newCloseState = new CloseState (status , trailers );
346+ boolean wasSet = closeState .compareAndSet (null , newCloseState );
347+ Preconditions .checkState (wasSet , "ClientCall already closed" );
346348 }
347349 }
348350
351+ private static final class CloseState {
352+ final Status status ;
353+ final Metadata trailers ;
354+
355+ CloseState (Status status , Metadata trailers ) {
356+ this .status = Preconditions .checkNotNull (status , "status" );
357+ this .trailers = trailers ;
358+ }
359+ }
349360}
0 commit comments