Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions cpp/src/arrow/io/buffered.cc
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,9 @@ class BufferedInputStream::Impl : public BufferedBase {

// Resize internal read buffer. Note that the internal buffer-size
// should not be larger than the raw_read_bound_.
// It might change the buffer_size_, but will not change buffer states
// buffer_pos_ and bytes_buffered_.
// It might change the buffer_size_, and may reset buffer_pos_ to 0
// when bytes_buffered_ == 0 to reuse the beginning of the buffer.
// bytes_buffered_ will not be changed.
Status SetBufferSize(int64_t new_buffer_size) {
if (new_buffer_size <= 0) {
return Status::Invalid("Buffer size should be positive");
Expand All @@ -297,12 +298,14 @@ class BufferedInputStream::Impl : public BufferedBase {
new_buffer_size, ", buffer_pos: ", buffer_pos_,
", bytes_buffered: ", bytes_buffered_, ", buffer_size: ", buffer_size_);
}
bool need_reset_buffer_pos = false;
if (raw_read_bound_ >= 0) {
// No need to reserve space for more than the total remaining number of bytes.
if (bytes_buffered_ == 0) {
// Special case: we can not keep the current buffer because it does not
// Special case: we can override data in the current buffer because it does not
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you rewrite comment in SetBufferSize? Since buffer_pos_ would be rewritten now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed that comment, thanks, fixed

// contain any required data.
new_buffer_size = std::min(new_buffer_size, raw_read_bound_ - raw_read_total_);
need_reset_buffer_pos = true;
} else {
// We should keep the current buffer because it contains data that
// can be read.
Expand All @@ -311,7 +314,11 @@ class BufferedInputStream::Impl : public BufferedBase {
buffer_pos_ + bytes_buffered_ + (raw_read_bound_ - raw_read_total_));
}
}
return ResizeBuffer(new_buffer_size);
auto status = ResizeBuffer(new_buffer_size);
if (status.ok() && need_reset_buffer_pos) {
buffer_pos_ = 0;
}
return status;
}

Result<std::string_view> Peek(int64_t nbytes) {
Expand Down
33 changes: 33 additions & 0 deletions cpp/src/arrow/io/buffered_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,39 @@ TEST_F(TestBufferedInputStream, PeekPastBufferedBytes) {
ASSERT_EQ(0, buffered_->bytes_buffered());
}

TEST_F(TestBufferedInputStream, PeekAfterExhaustingBuffer) {
// GH-48311: When bytes_buffered_ == 0 and raw_read_bound_ >= 0,
// SetBufferSize should reset buffer_pos_ to 0 and reuse the beginning of the buffer
MakeExample1(/*buffer_size=*/10, default_memory_pool(), /*raw_read_bound=*/25);

// Fill the buffer
ASSERT_OK_AND_ASSIGN(auto view, buffered_->Peek(10));
EXPECT_EQ(view, kExample1.substr(0, 10));
ASSERT_EQ(10, buffered_->bytes_buffered());
ASSERT_EQ(10, buffered_->buffer_size());

// Read all buffered bytes to exhaust the buffer (bytes_buffered_ == 0),
// at this point buffer_pos_ is non-zero
ASSERT_OK_AND_ASSIGN(auto bytes, buffered_->Read(10));
EXPECT_EQ(std::string_view(*bytes), kExample1.substr(0, 10));
ASSERT_EQ(0, buffered_->bytes_buffered());
ASSERT_EQ(10, buffered_->buffer_size());

// Peek should trigger SetBufferSize with bytes_buffered_ == 0,
// which should reset buffer_pos_ to 0 and reuse the beginning of the buffer,
// so resulting size of the buffer should be 15 instead of 25
ASSERT_OK_AND_ASSIGN(view, buffered_->Peek(15));
EXPECT_EQ(view, kExample1.substr(10, 15));
ASSERT_EQ(15, buffered_->bytes_buffered());
ASSERT_EQ(15, buffered_->buffer_size());

// Do read just in case
ASSERT_OK_AND_ASSIGN(bytes, buffered_->Read(15));
EXPECT_EQ(std::string_view(*bytes), kExample1.substr(10, 15));
ASSERT_EQ(0, buffered_->bytes_buffered());
ASSERT_EQ(15, buffered_->buffer_size());
}

class TestBufferedInputStreamBound : public ::testing::Test {
public:
void SetUp() { CreateExample(/*bounded=*/true); }
Expand Down
Loading