Skip to content

Commit d2b79f5

Browse files
committed
FIX: Fix zstd get_range buffer management
1 parent 6f94f5d commit d2b79f5

File tree

10 files changed

+87
-58
lines changed

10 files changed

+87
-58
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ upgrading the input to DBNv3.
3131
has been updated to match
3232
- The previous `StatMsg` has been moved to `v2::StatMsg` or `StatMsgV2`
3333

34+
### Bug fixes
35+
- Fixed "Zstd error decompressing: Operation made no progress over multiple calls, due
36+
to output buffer being full" error with `TimeseriesGetRange`
37+
- Fixed missing implementation of `HistoricalBuilder::SetLogReceiver`
38+
3439
## 0.35.1 - 2025-05-20
3540

3641
### Bug fixes

examples/historical/timeseries_get_range.cpp

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,15 @@
1-
#include <chrono>
2-
#include <ctime>
3-
#include <iomanip>
41
#include <iostream> // setw
5-
#ifdef _WIN32
6-
// _mkgmtime is equivalent to timegm
7-
#define timegm _mkgmtime
8-
#endif
92

103
#include "databento/constants.hpp"
11-
#include "databento/datetime.hpp"
124
#include "databento/enums.hpp"
135
#include "databento/historical.hpp"
146

15-
// Converts a date to Unix Epoch nanoseconds
16-
databento::UnixNanos DateToUnixNanos(int year, int month, int day) {
17-
std::tm time{};
18-
time.tm_year = year - 1900;
19-
// January is 0
20-
time.tm_mon = month - 1;
21-
time.tm_mday = day;
22-
return databento::UnixNanos{std::chrono::seconds{::timegm(&time)}};
23-
}
24-
257
int main() {
268
auto client = databento::HistoricalBuilder{}.SetKeyFromEnv().Build();
27-
const databento::UnixNanos start = DateToUnixNanos(2022, 10, 3);
28-
const databento::UnixNanos end = DateToUnixNanos(2022, 10, 4);
299
const auto limit = 1000;
3010
client.TimeseriesGetRange(
31-
databento::dataset::kGlbxMdp3, {start, end}, {"ESZ2"},
11+
databento::dataset::kGlbxMdp3,
12+
databento::DateTimeRange<std::string>{"2022-10-03"}, {"ESZ2"},
3213
databento::Schema::Trades, databento::SType::RawSymbol,
3314
databento::SType::InstrumentId, limit,
3415
[](databento::Metadata&& metadata) { std::cout << metadata << '\n'; },

include/databento/detail/buffer.hpp

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <cstddef>
44
#include <memory>
55
#include <new>
6+
#include <ostream>
67

78
#include "databento/ireadable.hpp"
89
#include "databento/iwritable.hpp"
@@ -22,22 +23,32 @@ class Buffer : public IReadable, public IWritable {
2223
void WriteAll(const char* data, std::size_t length);
2324
void WriteAll(const std::byte* data, std::size_t length) override;
2425

25-
std::byte*& WriteBegin() { return write_pos_; }
26+
std::byte* WriteBegin() { return write_pos_; }
2627
std::byte* WriteEnd() { return end_; }
2728
const std::byte* WriteBegin() const { return write_pos_; }
2829
const std::byte* WriteEnd() const { return end_; }
30+
// Indicate how many bytes were written
31+
void Fill(std::size_t length) { write_pos_ += length; }
2932
std::size_t WriteCapacity() const {
3033
return static_cast<std::size_t>(end_ - write_pos_);
3134
}
3235

33-
/// Will throw if `length > ReadCapacity()`.
36+
// Will throw if `length > ReadCapacity()`.
3437
void ReadExact(std::byte* buffer, std::size_t length) override;
3538
std::size_t ReadSome(std::byte* buffer, std::size_t max_length) override;
3639

37-
std::byte*& ReadBegin() { return read_pos_; }
40+
std::byte* ReadBegin() { return read_pos_; }
3841
std::byte* ReadEnd() { return write_pos_; }
3942
const std::byte* ReadBegin() const { return read_pos_; }
4043
const std::byte* ReadEnd() const { return write_pos_; }
44+
// Indicate how mnay bytes were read
45+
void Consume(std::size_t length) {
46+
read_pos_ += length;
47+
if (static_cast<std::size_t>(read_pos_ - buf_.get()) > (Capacity() / 2)) {
48+
Shift();
49+
}
50+
}
51+
void ConsumeNoShift(std::size_t length) { read_pos_ += length; }
4152
std::size_t ReadCapacity() const {
4253
return static_cast<std::size_t>(write_pos_ - read_pos_);
4354
}
@@ -52,6 +63,8 @@ class Buffer : public IReadable, public IWritable {
5263
void Reserve(std::size_t capacity);
5364
void Shift();
5465

66+
friend std::ostream& operator<<(std::ostream& stream, const Buffer& buffer);
67+
5568
private:
5669
static constexpr std::align_val_t kAlignment{8};
5770

src/dbn_decoder.cpp

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ DbnDecoder::DbnDecoder(ILogReceiver* log_receiver,
8282
input_ =
8383
std::make_unique<detail::ZstdDecodeStream>(std::move(input_), buffer_);
8484
input_->ReadExact(buffer_.WriteBegin(), kMagicSize);
85-
buffer_.WriteBegin() += kMagicSize;
85+
buffer_.Fill(kMagicSize);
8686
const auto* buf_ptr = buffer_.ReadBegin();
8787
if (std::strncmp(Consume(buf_ptr, 3), kDbnPrefix, 3) != 0) {
8888
throw DbnResponseError{"Found Zstd input, but not DBN prefix"};
@@ -181,17 +181,17 @@ databento::Metadata DbnDecoder::DecodeMetadata() {
181181
// already read first 4 bytes detecting compression
182182
const auto read_size = kMetadataPreludeSize - kMagicSize;
183183
input_->ReadExact(buffer_.WriteBegin(), read_size);
184-
buffer_.WriteBegin() += read_size;
184+
buffer_.Fill(read_size);
185185
const auto [version, size] = DbnDecoder::DecodeMetadataVersionAndSize(
186186
buffer_.ReadBegin(), kMetadataPreludeSize);
187-
buffer_.ReadBegin() += kMetadataPreludeSize;
187+
buffer_.Consume(kMetadataPreludeSize);
188188
version_ = version;
189189
buffer_.Reserve(size);
190190
input_->ReadExact(buffer_.WriteBegin(), size);
191-
buffer_.WriteBegin() += size;
191+
buffer_.Fill(size);
192192
auto metadata = DbnDecoder::DecodeMetadataFields(
193193
version_, buffer_.ReadBegin(), buffer_.ReadEnd());
194-
buffer_.ReadBegin() += size;
194+
buffer_.Consume(size);
195195
// Metadata may leave buffer misaligned. Shift records to ensure 8-byte
196196
// alignment
197197
buffer_.Shift();
@@ -316,7 +316,7 @@ const databento::Record* DbnDecoder::DecodeRecord() {
316316
}
317317
}
318318
current_record_ = Record{BufferRecordHeader()};
319-
buffer_.ReadBegin() += current_record_.Size();
319+
buffer_.Consume(current_record_.Size());
320320
current_record_ = DbnDecoder::DecodeRecordCompat(
321321
version_, upgrade_policy_, ts_out_, &compat_buffer_, current_record_);
322322
return &current_record_;
@@ -328,7 +328,7 @@ size_t DbnDecoder::FillBuffer() {
328328
}
329329
const auto fill_size =
330330
input_->ReadSome(buffer_.WriteBegin(), buffer_.WriteCapacity());
331-
buffer_.WriteBegin() += fill_size;
331+
buffer_.Fill(fill_size);
332332
return fill_size;
333333
}
334334

@@ -338,7 +338,7 @@ databento::RecordHeader* DbnDecoder::BufferRecordHeader() {
338338

339339
bool DbnDecoder::DetectCompression() {
340340
input_->ReadExact(buffer_.WriteBegin(), kMagicSize);
341-
buffer_.WriteBegin() += kMagicSize;
341+
buffer_.Fill(kMagicSize);
342342
const auto* buffer_it = buffer_.ReadBegin();
343343
if (std::strncmp(Consume(buffer_it, 3), kDbnPrefix, 3) == 0) {
344344
return false;

src/detail/buffer.cpp

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <sstream>
55

66
#include "databento/exceptions.hpp"
7+
#include "stream_op_helper.hpp"
78

89
using databento::detail::Buffer;
910

@@ -16,7 +17,7 @@ size_t Buffer::Write(const std::byte* data, std::size_t length) {
1617
}
1718
const auto write_size = std::min(WriteCapacity(), length);
1819
std::copy(data, data + write_size, WriteBegin());
19-
WriteBegin() += write_size;
20+
Fill(length);
2021
return write_size;
2122
}
2223

@@ -34,7 +35,7 @@ void Buffer::WriteAll(const std::byte* data, std::size_t length) {
3435
}
3536

3637
void Buffer::ReadExact(std::byte* buffer, std::size_t length) {
37-
if (length < ReadCapacity()) {
38+
if (length > ReadCapacity()) {
3839
std::ostringstream err_msg;
3940
err_msg << "Reached end of buffer without " << length << " bytes, only "
4041
<< ReadCapacity() << " bytes available";
@@ -46,7 +47,7 @@ void Buffer::ReadExact(std::byte* buffer, std::size_t length) {
4647
std::size_t Buffer::ReadSome(std::byte* buffer, std::size_t max_length) {
4748
const auto read_size = std::min(ReadCapacity(), max_length);
4849
std::copy(ReadBegin(), ReadBegin() + read_size, buffer);
49-
ReadBegin() += read_size;
50+
Consume(read_size);
5051
return read_size;
5152
}
5253

@@ -71,3 +72,18 @@ void Buffer::Shift() {
7172
read_pos_ = buf_.get();
7273
write_pos_ = read_pos_ + unread_bytes;
7374
}
75+
76+
std::ostream& databento::detail::operator<<(std::ostream& stream,
77+
const Buffer& buffer) {
78+
return StreamOpBuilder{stream}
79+
.SetTypeName("Buffer")
80+
.SetSpacer(" ")
81+
.Build()
82+
.AddField("buf_", buffer.buf_.get())
83+
.AddField("end_", buffer.end_)
84+
.AddField("read_pos", buffer.read_pos_)
85+
.AddField("write_pos_", buffer.write_pos_)
86+
.AddField("ReadCapacity", buffer.ReadCapacity())
87+
.AddField("WriteCapacity", buffer.WriteCapacity())
88+
.Finish();
89+
}

src/detail/dbn_buffer_decoder.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ databento::KeepGoing DbnBufferDecoder::Process(const char* data,
1313
zstd_buffer_->WriteAll(data, length);
1414
const auto read_size = zstd_stream_.ReadSome(dbn_buffer_.WriteBegin(),
1515
dbn_buffer_.WriteCapacity());
16-
dbn_buffer_.WriteBegin() += read_size;
16+
dbn_buffer_.Fill(read_size);
1717
if (read_size == 0) {
1818
return KeepGoing::Continue;
1919
}
@@ -25,7 +25,7 @@ databento::KeepGoing DbnBufferDecoder::Process(const char* data,
2525
std::tie(input_version_, bytes_needed_) =
2626
DbnDecoder::DecodeMetadataVersionAndSize(dbn_buffer_.ReadBegin(),
2727
dbn_buffer_.ReadCapacity());
28-
dbn_buffer_.ReadBegin() += kMetadataPreludeSize;
28+
dbn_buffer_.Consume(kMetadataPreludeSize);
2929
dbn_buffer_.Reserve(bytes_needed_);
3030
state_ = DecoderState::Metadata;
3131
[[fallthrough]];
@@ -36,7 +36,7 @@ databento::KeepGoing DbnBufferDecoder::Process(const char* data,
3636
}
3737
auto metadata = DbnDecoder::DecodeMetadataFields(
3838
input_version_, dbn_buffer_.ReadBegin(), dbn_buffer_.ReadEnd());
39-
dbn_buffer_.ReadBegin() += bytes_needed_;
39+
dbn_buffer_.Consume(bytes_needed_);
4040
// Metadata may leave buffer misaligned. Shift records to ensure 8-byte
4141
// alignment
4242
dbn_buffer_.Shift();
@@ -61,7 +61,7 @@ databento::KeepGoing DbnBufferDecoder::Process(const char* data,
6161
if (record_callback_(record) == KeepGoing::Stop) {
6262
return KeepGoing::Stop;
6363
}
64-
dbn_buffer_.ReadBegin() += bytes_needed_;
64+
dbn_buffer_.Consume(bytes_needed_);
6565
}
6666
}
6767
}

src/detail/zstd_stream.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ ZstdDecodeStream::ZstdDecodeStream(std::unique_ptr<IReadable> input,
2424
read_suggestion_{::ZSTD_initDStream(z_dstream_.get())},
2525
in_buffer_{in_buffer.ReadBegin(), in_buffer.ReadEnd()},
2626
z_in_buffer_{in_buffer_.data(), in_buffer_.size(), 0} {
27-
in_buffer.ReadBegin() += in_buffer.ReadCapacity();
27+
in_buffer.Consume(in_buffer.ReadCapacity());
2828
}
2929

3030
void ZstdDecodeStream::ReadExact(std::byte* buffer, std::size_t length) {

src/historical.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -949,6 +949,12 @@ HistoricalBuilder& HistoricalBuilder::SetGateway(HistoricalGateway gateway) {
949949
return *this;
950950
}
951951

952+
HistoricalBuilder& HistoricalBuilder::SetLogReceiver(
953+
ILogReceiver* log_receiver) {
954+
log_receiver_ = log_receiver;
955+
return *this;
956+
}
957+
952958
Historical HistoricalBuilder::Build() {
953959
if (key_.empty()) {
954960
throw Exception{"'key' is unset"};

src/live_blocking.cpp

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -140,16 +140,16 @@ void LiveBlocking::Subscribe(std::string_view sub_msg,
140140
databento::Metadata LiveBlocking::Start() {
141141
client_.WriteAll("start_session\n");
142142
client_.ReadExact(buffer_.WriteBegin(), kMetadataPreludeSize);
143-
buffer_.WriteBegin() += kMetadataPreludeSize;
143+
buffer_.Fill(kMetadataPreludeSize);
144144
const auto [version, size] = DbnDecoder::DecodeMetadataVersionAndSize(
145145
buffer_.ReadBegin(), kMetadataPreludeSize);
146-
buffer_.ReadBegin() += kMetadataPreludeSize;
146+
buffer_.Consume(kMetadataPreludeSize);
147147
buffer_.Reserve(size);
148148
client_.ReadExact(buffer_.WriteBegin(), size);
149-
buffer_.WriteBegin() += size;
149+
buffer_.Fill(size);
150150
auto metadata = DbnDecoder::DecodeMetadataFields(version, buffer_.ReadBegin(),
151151
buffer_.ReadEnd());
152-
buffer_.ReadBegin() += size;
152+
buffer_.Consume(size);
153153
// Metadata may leave buffer misaligned. Shift records to ensure 8-byte
154154
// alignment
155155
buffer_.Shift();
@@ -184,7 +184,7 @@ const databento::Record* LiveBlocking::NextRecord(
184184
}
185185
}
186186
current_record_ = Record{BufferRecordHeader()};
187-
buffer_.ReadBegin() += current_record_.Size();
187+
buffer_.Consume(current_record_.Size());
188188
current_record_ =
189189
DbnDecoder::DecodeRecordCompat(version_, upgrade_policy_, send_ts_out_,
190190
&compat_buffer_, current_record_);
@@ -223,7 +223,7 @@ std::string LiveBlocking::DecodeChallenge() {
223223
if (read_size == 0) {
224224
throw LiveApiError{"Gateway closed socket during authentication"};
225225
}
226-
buffer_.WriteBegin() += read_size;
226+
buffer_.Fill(read_size);
227227
// first line is version
228228
std::string response{reinterpret_cast<const char*>(buffer_.ReadBegin()),
229229
buffer_.ReadCapacity()};
@@ -243,9 +243,8 @@ std::string LiveBlocking::DecodeChallenge() {
243243
: response.find('\n', find_start);
244244
while (next_nl_pos == std::string::npos) {
245245
// read more
246-
buffer_.WriteBegin() +=
247-
client_.ReadSome(buffer_.WriteBegin(), buffer_.WriteCapacity())
248-
.read_size;
246+
buffer_.Fill(client_.ReadSome(buffer_.WriteBegin(), buffer_.WriteCapacity())
247+
.read_size);
249248
if (buffer_.ReadCapacity() == 0) {
250249
throw LiveApiError{"Gateway closed socket during authentication"};
251250
}
@@ -335,7 +334,7 @@ std::uint64_t LiveBlocking::DecodeAuthResp() {
335334
"Unexpected end of message received from server after replying to "
336335
"CRAM"};
337336
}
338-
buffer_.WriteBegin() += read_size;
337+
buffer_.Fill(read_size);
339338
newline_ptr = std::find(buffer_.ReadBegin(), buffer_.ReadEnd(),
340339
static_cast<std::byte>('\n'));
341340
} while (newline_ptr == buffer_.ReadEnd());
@@ -349,7 +348,7 @@ std::uint64_t LiveBlocking::DecodeAuthResp() {
349348
log_receiver_->Receive(LogLevel::Debug, log_ss.str());
350349
}
351350
// set in case Read call also read records. One beyond newline
352-
buffer_.ReadBegin() += response.length() + 1;
351+
buffer_.Consume(response.length() + 1);
353352

354353
std::size_t pos{};
355354
bool found_success{};
@@ -411,7 +410,7 @@ databento::detail::TcpClient::Result LiveBlocking::FillBuffer(
411410
buffer_.Shift();
412411
const auto read_res =
413412
client_.ReadSome(buffer_.WriteBegin(), buffer_.WriteCapacity(), timeout);
414-
buffer_.WriteBegin() += read_res.read_size;
413+
buffer_.Fill(read_res.read_size);
415414
return read_res;
416415
}
417416

tests/src/buffer_tests.cpp

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ using namespace std::string_view_literals;
1010
namespace databento::detail::tests {
1111
TEST(BufferTests, TestWriteAllPastCapacity) {
1212
Buffer target{10};
13-
target.WriteBegin() += 4;
14-
target.ReadBegin() += 2;
13+
target.Fill(4);
14+
target.ConsumeNoShift(2);
1515
ASSERT_EQ(target.WriteCapacity(), 6);
1616
ASSERT_EQ(target.ReadCapacity(), 2);
1717
ASSERT_EQ(target.Capacity(), 10);
@@ -25,7 +25,7 @@ TEST(BufferTests, TestWriteAllPastCapacity) {
2525
TEST(BufferTests, TestWriteAllShift) {
2626
Buffer target{20};
2727
target.WriteAll("TestWriteAllShift", 17);
28-
target.ReadBegin() += 4;
28+
target.ConsumeNoShift(4);
2929
ASSERT_EQ(target.WriteCapacity(), 3);
3030
ASSERT_EQ(target.ReadCapacity(), 13);
3131
ASSERT_EQ(target.Capacity(), 20);
@@ -38,8 +38,8 @@ TEST(BufferTests, TestWriteAllShift) {
3838

3939
TEST(BufferTests, TestWriteRead) {
4040
Buffer target{10};
41-
target.WriteBegin() += 5;
42-
target.ReadBegin() += 5;
41+
target.Fill(5);
42+
target.ConsumeNoShift(5);
4343
const auto write_len = target.Write("BufferTests", 11);
4444
ASSERT_EQ(write_len, 10);
4545
std::array<std::byte, 10> read_buf{};
@@ -55,7 +55,16 @@ TEST(BufferTests, TestReserve) {
5555
ASSERT_EQ(target.ReadCapacity(), 0);
5656
ASSERT_EQ(target.Capacity(), 120);
5757
target.WriteAll("TestReserve", 11);
58-
target.ReadBegin() += 4;
58+
target.ConsumeNoShift(4);
5959
}
6060

61+
TEST(BufferTests, TestConsumeShift) {
62+
Buffer target{120};
63+
target.Fill(120);
64+
ASSERT_EQ(target.WriteCapacity(), 0);
65+
target.ConsumeNoShift(100);
66+
ASSERT_EQ(target.WriteCapacity(), 0);
67+
target.Consume(1);
68+
ASSERT_EQ(target.WriteCapacity(), 101);
69+
}
6170
} // namespace databento::detail::tests

0 commit comments

Comments
 (0)