diff --git a/cpp/src/arrow/io/buffered.cc b/cpp/src/arrow/io/buffered.cc index 0dae888ca0e..14a0fe4215e 100644 --- a/cpp/src/arrow/io/buffered.cc +++ b/cpp/src/arrow/io/buffered.cc @@ -285,8 +285,9 @@ class BufferedInputStream::Impl : public BufferedBase { // Resize internal read buffer. Note that the internal buffer-size // should not be larger than the raw_read_bound_. - // It might change the buffer_size_, but will not change buffer states - // buffer_pos_ and bytes_buffered_. + // It might change the buffer_size_, and may reset buffer_pos_ to 0 + // when bytes_buffered_ == 0 to reuse the beginning of the buffer. + // bytes_buffered_ will not be changed. Status SetBufferSize(int64_t new_buffer_size) { if (new_buffer_size <= 0) { return Status::Invalid("Buffer size should be positive"); @@ -297,12 +298,14 @@ class BufferedInputStream::Impl : public BufferedBase { new_buffer_size, ", buffer_pos: ", buffer_pos_, ", bytes_buffered: ", bytes_buffered_, ", buffer_size: ", buffer_size_); } + bool need_reset_buffer_pos = false; if (raw_read_bound_ >= 0) { // No need to reserve space for more than the total remaining number of bytes. if (bytes_buffered_ == 0) { - // Special case: we can not keep the current buffer because it does not + // Special case: we can override data in the current buffer because it does not // contain any required data. new_buffer_size = std::min(new_buffer_size, raw_read_bound_ - raw_read_total_); + need_reset_buffer_pos = true; } else { // We should keep the current buffer because it contains data that // can be read. @@ -311,7 +314,11 @@ class BufferedInputStream::Impl : public BufferedBase { buffer_pos_ + bytes_buffered_ + (raw_read_bound_ - raw_read_total_)); } } - return ResizeBuffer(new_buffer_size); + auto status = ResizeBuffer(new_buffer_size); + if (status.ok() && need_reset_buffer_pos) { + buffer_pos_ = 0; + } + return status; } Result Peek(int64_t nbytes) { diff --git a/cpp/src/arrow/io/buffered_test.cc b/cpp/src/arrow/io/buffered_test.cc index 1d4805f580c..efaec09dc7c 100644 --- a/cpp/src/arrow/io/buffered_test.cc +++ b/cpp/src/arrow/io/buffered_test.cc @@ -514,6 +514,39 @@ TEST_F(TestBufferedInputStream, PeekPastBufferedBytes) { ASSERT_EQ(0, buffered_->bytes_buffered()); } +TEST_F(TestBufferedInputStream, PeekAfterExhaustingBuffer) { + // GH-48311: When bytes_buffered_ == 0 and raw_read_bound_ >= 0, + // SetBufferSize should reset buffer_pos_ to 0 and reuse the beginning of the buffer + MakeExample1(/*buffer_size=*/10, default_memory_pool(), /*raw_read_bound=*/25); + + // Fill the buffer + ASSERT_OK_AND_ASSIGN(auto view, buffered_->Peek(10)); + EXPECT_EQ(view, kExample1.substr(0, 10)); + ASSERT_EQ(10, buffered_->bytes_buffered()); + ASSERT_EQ(10, buffered_->buffer_size()); + + // Read all buffered bytes to exhaust the buffer (bytes_buffered_ == 0), + // at this point buffer_pos_ is non-zero + ASSERT_OK_AND_ASSIGN(auto bytes, buffered_->Read(10)); + EXPECT_EQ(std::string_view(*bytes), kExample1.substr(0, 10)); + ASSERT_EQ(0, buffered_->bytes_buffered()); + ASSERT_EQ(10, buffered_->buffer_size()); + + // Peek should trigger SetBufferSize with bytes_buffered_ == 0, + // which should reset buffer_pos_ to 0 and reuse the beginning of the buffer, + // so resulting size of the buffer should be 15 instead of 25 + ASSERT_OK_AND_ASSIGN(view, buffered_->Peek(15)); + EXPECT_EQ(view, kExample1.substr(10, 15)); + ASSERT_EQ(15, buffered_->bytes_buffered()); + ASSERT_EQ(15, buffered_->buffer_size()); + + // Do read just in case + ASSERT_OK_AND_ASSIGN(bytes, buffered_->Read(15)); + EXPECT_EQ(std::string_view(*bytes), kExample1.substr(10, 15)); + ASSERT_EQ(0, buffered_->bytes_buffered()); + ASSERT_EQ(15, buffered_->buffer_size()); +} + class TestBufferedInputStreamBound : public ::testing::Test { public: void SetUp() { CreateExample(/*bounded=*/true); }