Skip to content

Commit 7eab160

Browse files
committed
Revert "core: Report marshaller error for uncompressed size too large back to the client "
This reverts commit a40d549. A user has noticed it caused test failures. cl/828098798 ``` java.lang.AssertionError: Failed executing read operation at io.grpc.internal.CompositeReadableBuffer.execute(CompositeReadableBuffer.java:328) at io.grpc.internal.CompositeReadableBuffer.executeNoThrow(CompositeReadableBuffer.java:336) at io.grpc.internal.CompositeReadableBuffer.readBytes(CompositeReadableBuffer.java:151) at io.grpc.internal.ReadableBuffers$BufferInputStream.read(ReadableBuffers.java:377) at io.grpc.protobuf.lite.ProtoLiteUtils$MessageMarshaller.parse(ProtoLiteUtils.java:205) at io.grpc.protobuf.lite.ProtoLiteUtils$MessageMarshaller.parse(ProtoLiteUtils.java:133) at io.grpc.MethodDescriptor.parseRequest(MethodDescriptor.java:307) at io.grpc.ServerInterceptors$2$2.onMessage(ServerInterceptors.java:324) ```
1 parent a76ab79 commit 7eab160

File tree

4 files changed

+8
-80
lines changed

4 files changed

+8
-80
lines changed

core/src/main/java/io/grpc/internal/ServerCallImpl.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -327,20 +327,16 @@ private void messagesAvailableInternal(final MessageProducer producer) {
327327
return;
328328
}
329329

330+
InputStream message;
330331
try {
331-
InputStream message;
332332
while ((message = producer.next()) != null) {
333-
ReqT parsedMessage;
334-
try (InputStream ignored = message) {
335-
parsedMessage = call.method.parseRequest(message);
336-
} catch (StatusRuntimeException e) {
333+
try {
334+
listener.onMessage(call.method.parseRequest(message));
335+
} catch (Throwable t) {
337336
GrpcUtil.closeQuietly(message);
338-
GrpcUtil.closeQuietly(producer);
339-
call.cancelled = true;
340-
call.close(e.getStatus(), new Metadata());
341-
return;
337+
throw t;
342338
}
343-
listener.onMessage(parsedMessage);
339+
message.close();
344340
}
345341
} catch (Throwable t) {
346342
GrpcUtil.closeQuietly(producer);

core/src/test/java/io/grpc/internal/ServerCallImplTest.java

Lines changed: 0 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,9 @@
4848
import io.grpc.SecurityLevel;
4949
import io.grpc.ServerCall;
5050
import io.grpc.Status;
51-
import io.grpc.StatusRuntimeException;
5251
import io.grpc.internal.ServerCallImpl.ServerStreamListenerImpl;
5352
import io.perfmark.PerfMark;
5453
import java.io.ByteArrayInputStream;
55-
import java.io.IOException;
5654
import java.io.InputStream;
5755
import java.io.InputStreamReader;
5856
import org.junit.Before;
@@ -71,8 +69,6 @@ public class ServerCallImplTest {
7169

7270
@Mock private ServerStream stream;
7371
@Mock private ServerCall.Listener<Long> callListener;
74-
@Mock private StreamListener.MessageProducer messageProducer;
75-
@Mock private InputStream message;
7672

7773
private final CallTracer serverCallTracer = CallTracer.getDefaultFactory().create();
7874
private ServerCallImpl<Long, Long> call;
@@ -497,43 +493,6 @@ public void streamListener_unexpectedRuntimeException() {
497493
assertThat(e).hasMessageThat().isEqualTo("unexpected exception");
498494
}
499495

500-
@Test
501-
public void streamListener_statusRuntimeException() throws IOException {
502-
MethodDescriptor<Long, Long> failingParseMethod = MethodDescriptor.<Long, Long>newBuilder()
503-
.setType(MethodType.UNARY)
504-
.setFullMethodName("service/method")
505-
.setRequestMarshaller(new LongMarshaller() {
506-
@Override
507-
public Long parse(InputStream stream) {
508-
throw new StatusRuntimeException(Status.RESOURCE_EXHAUSTED
509-
.withDescription("Decompressed gRPC message exceeds maximum size"));
510-
}
511-
})
512-
.setResponseMarshaller(new LongMarshaller())
513-
.build();
514-
515-
call = new ServerCallImpl<>(stream, failingParseMethod, requestHeaders, context,
516-
DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(),
517-
serverCallTracer, PerfMark.createTag());
518-
519-
ServerStreamListenerImpl<Long> streamListener =
520-
new ServerCallImpl.ServerStreamListenerImpl<>(call, callListener, context);
521-
522-
when(messageProducer.next()).thenReturn(message, (InputStream) null);
523-
streamListener.messagesAvailable(messageProducer);
524-
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
525-
ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class);
526-
527-
verify(stream).close(statusCaptor.capture(), metadataCaptor.capture());
528-
Status status = statusCaptor.getValue();
529-
assertEquals(Status.RESOURCE_EXHAUSTED.getCode(), status.getCode());
530-
assertEquals("Decompressed gRPC message exceeds maximum size", status.getDescription());
531-
532-
streamListener.halfClosed();
533-
verify(callListener, never()).onHalfClose();
534-
verify(callListener, never()).onMessage(any());
535-
}
536-
537496
private static class LongMarshaller implements Marshaller<Long> {
538497
@Override
539498
public InputStream stream(Long value) {

interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2024,7 +2024,7 @@ private void assertPayload(Payload expected, Payload actual) {
20242024
}
20252025
}
20262026

2027-
protected static void assertCodeEquals(Status.Code expected, Status actual) {
2027+
private static void assertCodeEquals(Status.Code expected, Status actual) {
20282028
assertWithMessage("Unexpected status: %s", actual).that(actual.getCode()).isEqualTo(expected);
20292029
}
20302030

interop-testing/src/test/java/io/grpc/testing/integration/TransportCompressionTest.java

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package io.grpc.testing.integration;
1818

1919
import static org.junit.Assert.assertEquals;
20-
import static org.junit.Assert.assertThrows;
2120
import static org.junit.Assert.assertTrue;
2221

2322
import com.google.protobuf.ByteString;
@@ -38,8 +37,6 @@
3837
import io.grpc.ServerCall.Listener;
3938
import io.grpc.ServerCallHandler;
4039
import io.grpc.ServerInterceptor;
41-
import io.grpc.Status.Code;
42-
import io.grpc.StatusRuntimeException;
4340
import io.grpc.internal.GrpcUtil;
4441
import io.grpc.netty.InternalNettyChannelBuilder;
4542
import io.grpc.netty.InternalNettyServerBuilder;
@@ -56,9 +53,7 @@
5653
import java.io.OutputStream;
5754
import org.junit.Before;
5855
import org.junit.BeforeClass;
59-
import org.junit.Rule;
6056
import org.junit.Test;
61-
import org.junit.rules.TestName;
6257
import org.junit.runner.RunWith;
6358
import org.junit.runners.JUnit4;
6459

@@ -89,16 +84,10 @@ public static void registerCompressors() {
8984
compressors.register(Codec.Identity.NONE);
9085
}
9186

92-
@Rule
93-
public final TestName currentTest = new TestName();
94-
9587
@Override
9688
protected ServerBuilder<?> getServerBuilder() {
9789
NettyServerBuilder builder = NettyServerBuilder.forPort(0, InsecureServerCredentials.create())
98-
.maxInboundMessageSize(
99-
DECOMPRESSED_MESSAGE_TOO_LONG_METHOD_NAME.equals(currentTest.getMethodName())
100-
? 1000
101-
: AbstractInteropTest.MAX_MESSAGE_SIZE)
90+
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
10291
.compressorRegistry(compressors)
10392
.decompressorRegistry(decompressors)
10493
.intercept(new ServerInterceptor() {
@@ -137,22 +126,6 @@ public void compresses() {
137126
assertTrue(FZIPPER.anyWritten);
138127
}
139128

140-
private static final String DECOMPRESSED_MESSAGE_TOO_LONG_METHOD_NAME =
141-
"decompressedMessageTooLong";
142-
143-
@Test
144-
public void decompressedMessageTooLong() {
145-
assertEquals(DECOMPRESSED_MESSAGE_TOO_LONG_METHOD_NAME, currentTest.getMethodName());
146-
final SimpleRequest bigRequest = SimpleRequest.newBuilder()
147-
.setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[10_000])))
148-
.build();
149-
StatusRuntimeException e = assertThrows(StatusRuntimeException.class,
150-
() -> blockingStub.withCompression("gzip").unaryCall(bigRequest));
151-
assertCodeEquals(Code.RESOURCE_EXHAUSTED, e.getStatus());
152-
assertEquals("Decompressed gRPC message exceeds maximum size 1000",
153-
e.getStatus().getDescription());
154-
}
155-
156129
@Override
157130
protected NettyChannelBuilder createChannelBuilder() {
158131
NettyChannelBuilder builder = NettyChannelBuilder.forAddress(getListenAddress())

0 commit comments

Comments
 (0)