diff --git a/cpp/src/arrow/ipc/feather.cc b/cpp/src/arrow/ipc/feather.cc index f6c6f342a09..6aceaa7f448 100644 --- a/cpp/src/arrow/ipc/feather.cc +++ b/cpp/src/arrow/ipc/feather.cc @@ -57,6 +57,9 @@ using internal::checked_cast; class ExtensionType; namespace ipc { + +using internal::kArrowMagicBytes; + namespace feather { namespace { @@ -787,8 +790,8 @@ Result> Reader::Open( // IPC Read options are ignored for ReaderV1 RETURN_NOT_OK(result->Open(source)); return result; - } else if (memcmp(buffer->data(), internal::kArrowMagicBytes, - strlen(internal::kArrowMagicBytes)) == 0) { + } else if (std::string_view(buffer->data_as(), kArrowMagicBytes.size()) == + kArrowMagicBytes) { std::shared_ptr result = std::make_shared(); RETURN_NOT_OK(result->Open(source, options)); return result; diff --git a/cpp/src/arrow/ipc/metadata_internal.h b/cpp/src/arrow/ipc/metadata_internal.h index 914ce3efe69..2a9574d84a1 100644 --- a/cpp/src/arrow/ipc/metadata_internal.h +++ b/cpp/src/arrow/ipc/metadata_internal.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -78,7 +79,7 @@ flatbuf::MetadataVersion MetadataVersionToFlatbuffer(MetadataVersion version); // Whether the type has a validity bitmap in the given IPC version bool HasValidityBitmap(Type::type type_id, MetadataVersion version); -static constexpr const char* kArrowMagicBytes = "ARROW1"; +constexpr const std::string_view kArrowMagicBytes = "ARROW1"; struct FieldMetadata { int64_t length; diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index 86cd0e06ab0..15cf0258b2e 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -3045,7 +3045,7 @@ void GetReadRecordBatchReadRanges( auto read_ranges = tracked->get_read_ranges(); - const int32_t magic_size = static_cast(strlen(ipc::internal::kArrowMagicBytes)); + const int32_t magic_size = static_cast(ipc::internal::kArrowMagicBytes.size()); // read magic and footer length IO auto file_end_size = magic_size + sizeof(int32_t); auto footer_length_offset = buffer->size() - file_end_size; diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 908a223a57d..991d238240f 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -1864,26 +1865,28 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { } Future<> ReadFooterAsync(arrow::internal::Executor* executor) { - const int32_t magic_size = static_cast(strlen(kArrowMagicBytes)); + constexpr int32_t kMagicSize = static_cast(kArrowMagicBytes.size()); - if (footer_offset_ <= magic_size * 2 + 4) { + if (footer_offset_ <= kMagicSize * 2 + 4) { return Status::Invalid("File is too small: ", footer_offset_); } - int file_end_size = static_cast(magic_size + sizeof(int32_t)); + int file_end_size = static_cast(kMagicSize + sizeof(int32_t)); auto self = std::dynamic_pointer_cast(shared_from_this()); auto read_magic = file_->ReadAsync(footer_offset_ - file_end_size, file_end_size); if (executor) read_magic = executor->Transfer(std::move(read_magic)); return read_magic .Then([=](const std::shared_ptr& buffer) -> Future> { - const int64_t expected_footer_size = magic_size + sizeof(int32_t); + const int64_t expected_footer_size = kMagicSize + sizeof(int32_t); if (buffer->size() < expected_footer_size) { return Status::Invalid("Unable to read ", expected_footer_size, "from end of file"); } - if (memcmp(buffer->data() + sizeof(int32_t), kArrowMagicBytes, magic_size)) { + const auto magic_start = buffer->data() + sizeof(int32_t); + if (std::string_view(reinterpret_cast(magic_start), kMagicSize) != + kArrowMagicBytes) { return Status::Invalid("Not an Arrow file"); } @@ -1891,7 +1894,7 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { *reinterpret_cast(buffer->data())); if (footer_length <= 0 || - footer_length > self->footer_offset_ - magic_size * 2 - 4) { + footer_length > self->footer_offset_ - kMagicSize * 2 - 4) { return Status::Invalid("File is smaller than indicated metadata size"); } @@ -2689,6 +2692,28 @@ Status ValidateFuzzBatch(const RecordBatchWithMetadata& batch) { return Status::OK(); } +Status CompareFuzzBatches(const RecordBatchWithMetadata& left, + const RecordBatchWithMetadata& right) { + bool ok = true; + if ((left.batch != nullptr) != (right.batch != nullptr)) { + ok = false; + } else if (left.batch) { + ok &= left.batch->Equals(*right.batch, EqualOptions{}.nans_equal(true)); + } + return ok ? Status::OK() : Status::Invalid("Batches unequal"); +} + +Status CompareFuzzBatches(const std::vector& left, + const std::vector& right) { + if (left.size() != right.size()) { + return Status::Invalid("Not the same number of batches"); + } + for (size_t i = 0; i < left.size(); ++i) { + RETURN_NOT_OK(CompareFuzzBatches(left[i], right[i])); + } + return Status::OK(); +} + IpcReadOptions FuzzingOptions() { IpcReadOptions options; options.memory_pool = ::arrow::internal::fuzzing_memory_pool(); @@ -2723,7 +2748,29 @@ Status FuzzIpcFile(const uint8_t* data, int64_t size) { Status final_status; - auto do_read = [&](bool pre_buffer) { + // Try to read the IPC file as a stream to compare the results (differential fuzzing) + auto do_stream_read = [&]() -> Result> { + io::BufferReader buffer_reader(buffer); + // Skip magic bytes at the beginning + RETURN_NOT_OK( + buffer_reader.Advance(bit_util::RoundUpToMultipleOf8(kArrowMagicBytes.length()))); + ARROW_ASSIGN_OR_RAISE(auto batch_reader, RecordBatchStreamReader::Open( + &buffer_reader, FuzzingOptions())); + + std::vector batches; + while (true) { + ARROW_ASSIGN_OR_RAISE(auto batch, batch_reader->ReadNext()); + if (!batch.batch && !batch.custom_metadata) { + // EOS + break; + } + batches.push_back(batch); + } + return batches; + }; + + auto do_file_read = + [&](bool pre_buffer) -> Result> { io::BufferReader buffer_reader(buffer); ARROW_ASSIGN_OR_RAISE(auto batch_reader, RecordBatchFileReader::Open(&buffer_reader, FuzzingOptions())); @@ -2733,20 +2780,43 @@ Status FuzzIpcFile(const uint8_t* data, int64_t size) { } const int n_batches = batch_reader->num_record_batches(); + std::vector batches; + // Delay error return until the end, as we want to access all record batches + Status st; for (int i = 0; i < n_batches; ++i) { RecordBatchWithMetadata batch; - auto st = batch_reader->ReadRecordBatchWithCustomMetadata(i).Value(&batch); - final_status &= st; - if (!st.ok()) { - continue; - } - final_status &= ValidateFuzzBatch(batch); + st &= batch_reader->ReadRecordBatchWithCustomMetadata(i).Value(&batch); + st &= ValidateFuzzBatch(batch); + batches.push_back(batch); } - return Status::OK(); + RETURN_NOT_OK(st); + return batches; }; + // Lazily-initialized if the IPC reader succeeds + std::optional>> maybe_stream_batches; + for (const bool pre_buffer : {false, true}) { - final_status &= do_read(pre_buffer); + auto maybe_file_batches = do_file_read(pre_buffer); + final_status &= maybe_file_batches.status(); + if (maybe_file_batches.ok()) { + // IPC file read successful: differential fuzzing with IPC stream reader, + // if possible. + // NOTE: some valid IPC files may not be readable as IPC streams, + // for example because of excess spacing between IPC messages. + // A regular IPC file writer would not produce them, but fuzzing might. + if (!maybe_stream_batches.has_value()) { + maybe_stream_batches = do_stream_read(); + final_status &= maybe_stream_batches->status(); + } + if (maybe_stream_batches->ok()) { + // XXX: in some rare cases, an IPC file might read unequal to the enclosed + // IPC stream, for example if the footer skips some batches or orders the + // batches differently. We should revisit this if the fuzzer generates such + // files. + ARROW_CHECK_OK(CompareFuzzBatches(*maybe_file_batches, **maybe_stream_batches)); + } + } } return final_status; diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index cba484af158..09a9aef8975 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -1493,7 +1493,7 @@ class PayloadFileWriter : public internal::IpcPayloadWriter, protected StreamBoo RETURN_NOT_OK(UpdatePosition()); // It is only necessary to align to 8-byte boundary at the start of the file - RETURN_NOT_OK(Write(kArrowMagicBytes, strlen(kArrowMagicBytes))); + RETURN_NOT_OK(Write(kArrowMagicBytes.data(), kArrowMagicBytes.size())); RETURN_NOT_OK(Align()); return Status::OK(); @@ -1521,7 +1521,7 @@ class PayloadFileWriter : public internal::IpcPayloadWriter, protected StreamBoo RETURN_NOT_OK(Write(&footer_length, sizeof(int32_t))); // Write magic bytes to end file - return Write(kArrowMagicBytes, strlen(kArrowMagicBytes)); + return Write(kArrowMagicBytes.data(), kArrowMagicBytes.size()); } protected: