Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions cpp/src/arrow/ipc/feather.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ using internal::checked_cast;
class ExtensionType;

namespace ipc {

using internal::kArrowMagicBytes;

namespace feather {

namespace {
Expand Down Expand Up @@ -787,8 +790,8 @@ Result<std::shared_ptr<Reader>> Reader::Open(
// IPC Read options are ignored for ReaderV1
RETURN_NOT_OK(result->Open(source));
return result;
} else if (memcmp(buffer->data(), internal::kArrowMagicBytes,
strlen(internal::kArrowMagicBytes)) == 0) {
} else if (std::string_view(buffer->data_as<char>(), kArrowMagicBytes.size()) ==
kArrowMagicBytes) {
std::shared_ptr<ReaderV2> result = std::make_shared<ReaderV2>();
RETURN_NOT_OK(result->Open(source, options));
return result;
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/ipc/metadata_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <cstring>
#include <memory>
#include <string>
#include <string_view>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -78,7 +79,7 @@ flatbuf::MetadataVersion MetadataVersionToFlatbuffer(MetadataVersion version);
// Whether the type has a validity bitmap in the given IPC version
bool HasValidityBitmap(Type::type type_id, MetadataVersion version);

static constexpr const char* kArrowMagicBytes = "ARROW1";
constexpr const std::string_view kArrowMagicBytes = "ARROW1";

struct FieldMetadata {
int64_t length;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/ipc/read_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3045,7 +3045,7 @@ void GetReadRecordBatchReadRanges(

auto read_ranges = tracked->get_read_ranges();

const int32_t magic_size = static_cast<int>(strlen(ipc::internal::kArrowMagicBytes));
const int32_t magic_size = static_cast<int>(ipc::internal::kArrowMagicBytes.size());
// read magic and footer length IO
auto file_end_size = magic_size + sizeof(int32_t);
auto footer_length_offset = buffer->size() - file_end_size;
Expand Down
100 changes: 85 additions & 15 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <cstring>
#include <memory>
#include <numeric>
#include <optional>
#include <string>
#include <type_traits>
#include <unordered_map>
Expand Down Expand Up @@ -1864,34 +1865,36 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
}

Future<> ReadFooterAsync(arrow::internal::Executor* executor) {
const int32_t magic_size = static_cast<int>(strlen(kArrowMagicBytes));
constexpr int32_t kMagicSize = static_cast<int>(kArrowMagicBytes.size());

if (footer_offset_ <= magic_size * 2 + 4) {
if (footer_offset_ <= kMagicSize * 2 + 4) {
return Status::Invalid("File is too small: ", footer_offset_);
}

int file_end_size = static_cast<int>(magic_size + sizeof(int32_t));
int file_end_size = static_cast<int>(kMagicSize + sizeof(int32_t));
auto self = std::dynamic_pointer_cast<RecordBatchFileReaderImpl>(shared_from_this());
auto read_magic = file_->ReadAsync(footer_offset_ - file_end_size, file_end_size);
if (executor) read_magic = executor->Transfer(std::move(read_magic));
return read_magic
.Then([=](const std::shared_ptr<Buffer>& buffer)
-> Future<std::shared_ptr<Buffer>> {
const int64_t expected_footer_size = magic_size + sizeof(int32_t);
const int64_t expected_footer_size = kMagicSize + sizeof(int32_t);
if (buffer->size() < expected_footer_size) {
return Status::Invalid("Unable to read ", expected_footer_size,
"from end of file");
}

if (memcmp(buffer->data() + sizeof(int32_t), kArrowMagicBytes, magic_size)) {
const auto magic_start = buffer->data() + sizeof(int32_t);
if (std::string_view(reinterpret_cast<const char*>(magic_start), kMagicSize) !=
kArrowMagicBytes) {
return Status::Invalid("Not an Arrow file");
}

int32_t footer_length = bit_util::FromLittleEndian(
*reinterpret_cast<const int32_t*>(buffer->data()));

if (footer_length <= 0 ||
footer_length > self->footer_offset_ - magic_size * 2 - 4) {
footer_length > self->footer_offset_ - kMagicSize * 2 - 4) {
return Status::Invalid("File is smaller than indicated metadata size");
}

Expand Down Expand Up @@ -2689,6 +2692,28 @@ Status ValidateFuzzBatch(const RecordBatchWithMetadata& batch) {
return Status::OK();
}

Status CompareFuzzBatches(const RecordBatchWithMetadata& left,
const RecordBatchWithMetadata& right) {
bool ok = true;
if ((left.batch != nullptr) != (right.batch != nullptr)) {
ok = false;
} else if (left.batch) {
ok &= left.batch->Equals(*right.batch, EqualOptions{}.nans_equal(true));
}
return ok ? Status::OK() : Status::Invalid("Batches unequal");
}

Status CompareFuzzBatches(const std::vector<RecordBatchWithMetadata>& left,
const std::vector<RecordBatchWithMetadata>& right) {
if (left.size() != right.size()) {
return Status::Invalid("Not the same number of batches");
}
for (size_t i = 0; i < left.size(); ++i) {
RETURN_NOT_OK(CompareFuzzBatches(left[i], right[i]));
}
return Status::OK();
}

IpcReadOptions FuzzingOptions() {
IpcReadOptions options;
options.memory_pool = ::arrow::internal::fuzzing_memory_pool();
Expand Down Expand Up @@ -2723,7 +2748,29 @@ Status FuzzIpcFile(const uint8_t* data, int64_t size) {

Status final_status;

auto do_read = [&](bool pre_buffer) {
// Try to read the IPC file as a stream to compare the results (differential fuzzing)
auto do_stream_read = [&]() -> Result<std::vector<RecordBatchWithMetadata>> {
io::BufferReader buffer_reader(buffer);
// Skip magic bytes at the beginning
RETURN_NOT_OK(
buffer_reader.Advance(bit_util::RoundUpToMultipleOf8(kArrowMagicBytes.length())));
ARROW_ASSIGN_OR_RAISE(auto batch_reader, RecordBatchStreamReader::Open(
&buffer_reader, FuzzingOptions()));

std::vector<RecordBatchWithMetadata> batches;
while (true) {
ARROW_ASSIGN_OR_RAISE(auto batch, batch_reader->ReadNext());
if (!batch.batch && !batch.custom_metadata) {
// EOS
break;
}
batches.push_back(batch);
}
return batches;
};

auto do_file_read =
[&](bool pre_buffer) -> Result<std::vector<RecordBatchWithMetadata>> {
io::BufferReader buffer_reader(buffer);
ARROW_ASSIGN_OR_RAISE(auto batch_reader,
RecordBatchFileReader::Open(&buffer_reader, FuzzingOptions()));
Expand All @@ -2733,20 +2780,43 @@ Status FuzzIpcFile(const uint8_t* data, int64_t size) {
}

const int n_batches = batch_reader->num_record_batches();
std::vector<RecordBatchWithMetadata> batches;
// Delay error return until the end, as we want to access all record batches
Status st;
for (int i = 0; i < n_batches; ++i) {
RecordBatchWithMetadata batch;
auto st = batch_reader->ReadRecordBatchWithCustomMetadata(i).Value(&batch);
final_status &= st;
if (!st.ok()) {
continue;
}
final_status &= ValidateFuzzBatch(batch);
st &= batch_reader->ReadRecordBatchWithCustomMetadata(i).Value(&batch);
st &= ValidateFuzzBatch(batch);
batches.push_back(batch);
}
return Status::OK();
RETURN_NOT_OK(st);
return batches;
};

// Lazily-initialized if the IPC reader succeeds
std::optional<Result<std::vector<RecordBatchWithMetadata>>> maybe_stream_batches;

for (const bool pre_buffer : {false, true}) {
final_status &= do_read(pre_buffer);
auto maybe_file_batches = do_file_read(pre_buffer);
final_status &= maybe_file_batches.status();
if (maybe_file_batches.ok()) {
// IPC file read successful: differential fuzzing with IPC stream reader,
// if possible.
// NOTE: some valid IPC files may not be readable as IPC streams,
// for example because of excess spacing between IPC messages.
// A regular IPC file writer would not produce them, but fuzzing might.
if (!maybe_stream_batches.has_value()) {
maybe_stream_batches = do_stream_read();
final_status &= maybe_stream_batches->status();
}
if (maybe_stream_batches->ok()) {
// XXX: in some rare cases, an IPC file might read unequal to the enclosed
// IPC stream, for example if the footer skips some batches or orders the
// batches differently. We should revisit this if the fuzzer generates such
// files.
ARROW_CHECK_OK(CompareFuzzBatches(*maybe_file_batches, **maybe_stream_batches));
}
}
}

return final_status;
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/ipc/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1493,7 +1493,7 @@ class PayloadFileWriter : public internal::IpcPayloadWriter, protected StreamBoo
RETURN_NOT_OK(UpdatePosition());

// It is only necessary to align to 8-byte boundary at the start of the file
RETURN_NOT_OK(Write(kArrowMagicBytes, strlen(kArrowMagicBytes)));
RETURN_NOT_OK(Write(kArrowMagicBytes.data(), kArrowMagicBytes.size()));
RETURN_NOT_OK(Align());

return Status::OK();
Expand Down Expand Up @@ -1521,7 +1521,7 @@ class PayloadFileWriter : public internal::IpcPayloadWriter, protected StreamBoo
RETURN_NOT_OK(Write(&footer_length, sizeof(int32_t)));

// Write magic bytes to end file
return Write(kArrowMagicBytes, strlen(kArrowMagicBytes));
return Write(kArrowMagicBytes.data(), kArrowMagicBytes.size());
}

protected:
Expand Down
Loading