Skip to content

Commit 6be33bb

Browse files
authored
MINOR: Empty stream double check (#742)
## What's Changed In some cases InflaterInputStream can return a non zero on `available()` method call, while it is actually at EOS. A subsequent `read()` call would respond with -1 and eventually cause: <pre> Caused by: org.apache.arrow.flight.FlightRuntimeException: Failed to read message. at org.apache.arrow.flight.CallStatus.toRuntimeException(CallStatus.java:121) at org.apache.arrow.flight.grpc.StatusUtils.fromGrpcRuntimeException(StatusUtils.java:161) at org.apache.arrow.flight.grpc.StatusUtils.fromThrowable(StatusUtils.java:182) at org.apache.arrow.flight.FlightStream$Observer.onError(FlightStream.java:489) at org.apache.arrow.flight.FlightClient$1.onError(FlightClient.java:371) at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:564) at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) at org.apache.arrow.flight.grpc.ClientInterceptorAdapter$FlightClientCallListener.onClose(ClientInterceptorAdapter.java:118) at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:564) at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:72) at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:729) at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:710) at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) ... 3 common frames omitted Caused by: java.lang.RuntimeException: com.google.protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either that the input has been truncated or that an embedded message misreported its own length. at org.apache.arrow.flight.ArrowMessage.frame(ArrowMessage.java:363) at org.apache.arrow.flight.ArrowMessage$ArrowMessageHolderMarshaller.parse(ArrowMessage.java:575) at org.apache.arrow.flight.ArrowMessage$ArrowMessageHolderMarshaller.parse(ArrowMessage.java:560) at io.grpc.MethodDescriptor.parseResponse(MethodDescriptor.java:284) at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInternal(ClientCallImpl.java:657) at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:644) ... 5 common frames omitted Caused by: com.google.protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either that the input has been truncated or that an embedded message misreported its own length. at com.google.protobuf.InvalidProtocolBufferException.truncatedMessage(InvalidProtocolBufferException.java:92) at com.google.protobuf.CodedInputStream.readRawVarint32(CodedInputStream.java:568) at org.apache.arrow.flight.ArrowMessage.readRawVarint32(ArrowMessage.java:369) at org.apache.arrow.flight.ArrowMessage.frame(ArrowMessage.java:290) ... 10 common frames omitted </pre>
1 parent b459647 commit 6be33bb

File tree

1 file changed

+9
-1
lines changed

1 file changed

+9
-1
lines changed

flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,11 @@ private static ArrowMessage frame(BufferAllocator allocator, final InputStream s
287287
ArrowBuf body = null;
288288
ArrowBuf appMetadata = null;
289289
while (stream.available() > 0) {
290-
int tag = readRawVarint32(stream);
290+
final int tagFirstByte = stream.read();
291+
if (tagFirstByte == -1) {
292+
break;
293+
}
294+
int tag = readRawVarint32(tagFirstByte, stream);
291295
switch (tag) {
292296
case DESCRIPTOR_TAG:
293297
{
@@ -366,6 +370,10 @@ private static ArrowMessage frame(BufferAllocator allocator, final InputStream s
366370

367371
private static int readRawVarint32(InputStream is) throws IOException {
368372
int firstByte = is.read();
373+
return readRawVarint32(firstByte, is);
374+
}
375+
376+
private static int readRawVarint32(int firstByte, InputStream is) throws IOException {
369377
return CodedInputStream.readRawVarint32(firstByte, is);
370378
}
371379

0 commit comments

Comments
 (0)