Skip to content

Commit 80f2436

Browse files
committed
FIX: Fix potential unaligned records
1 parent f97b560 commit 80f2436

File tree

10 files changed

+84
-60
lines changed

10 files changed

+84
-60
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
# Changelog
22

3+
## 0.34.2 - TBD
4+
5+
#### Bug fixes
6+
- Fixed potential for unaligned records in live and historical streaming requests
7+
38
## 0.34.1 - 2025-04-29
49

510
### Enhancements

include/databento/dbn_decoder.hpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <string>
77

88
#include "databento/dbn.hpp"
9+
#include "databento/detail/buffer.hpp"
910
#include "databento/enums.hpp" // Upgrade Policy
1011
#include "databento/file_stream.hpp"
1112
#include "databento/ireadable.hpp"
@@ -54,16 +55,14 @@ class DbnDecoder {
5455
const std::byte* buffer_end);
5556
bool DetectCompression();
5657
std::size_t FillBuffer();
57-
std::size_t GetReadBufferSize() const;
5858
RecordHeader* BufferRecordHeader();
5959

6060
ILogReceiver* log_receiver_;
6161
std::uint8_t version_{};
6262
VersionUpgradePolicy upgrade_policy_;
6363
bool ts_out_{};
6464
std::unique_ptr<IReadable> input_;
65-
std::vector<std::byte> read_buffer_;
66-
std::size_t buffer_idx_{};
65+
detail::Buffer buffer_{};
6766
// Must be 8-byte aligned for records
6867
alignas(RecordHeader) std::array<std::byte, kMaxRecordLen> compat_buffer_{};
6968
Record current_record_{nullptr};

include/databento/detail/buffer.hpp

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <cstddef>
44
#include <memory>
5+
#include <new>
56

67
#include "databento/ireadable.hpp"
78
#include "databento/iwritable.hpp"
@@ -11,7 +12,7 @@ class Buffer : public IReadable, public IWritable {
1112
public:
1213
Buffer() : Buffer(64 * std::size_t{1 << 10}) {}
1314
explicit Buffer(std::size_t init_capacity)
14-
: buf_{std::make_unique<std::byte[]>(init_capacity)},
15+
: buf_{AlignedNew(init_capacity), AlignedDelete},
1516
end_{buf_.get() + init_capacity},
1617
read_pos_{buf_.get()},
1718
write_pos_{buf_.get()} {}
@@ -22,7 +23,9 @@ class Buffer : public IReadable, public IWritable {
2223
void WriteAll(const std::byte* data, std::size_t length) override;
2324

2425
std::byte*& WriteBegin() { return write_pos_; }
25-
std::byte* WriteEnd() const { return end_; }
26+
std::byte* WriteEnd() { return end_; }
27+
const std::byte* WriteBegin() const { return write_pos_; }
28+
const std::byte* WriteEnd() const { return end_; }
2629
std::size_t WriteCapacity() const {
2730
return static_cast<std::size_t>(end_ - write_pos_);
2831
}
@@ -32,7 +35,9 @@ class Buffer : public IReadable, public IWritable {
3235
std::size_t ReadSome(std::byte* buffer, std::size_t max_length) override;
3336

3437
std::byte*& ReadBegin() { return read_pos_; }
35-
std::byte* ReadEnd() const { return write_pos_; }
38+
std::byte* ReadEnd() { return write_pos_; }
39+
const std::byte* ReadBegin() const { return read_pos_; }
40+
const std::byte* ReadEnd() const { return write_pos_; }
3641
std::size_t ReadCapacity() const {
3742
return static_cast<std::size_t>(write_pos_ - read_pos_);
3843
}
@@ -48,7 +53,16 @@ class Buffer : public IReadable, public IWritable {
4853
void Shift();
4954

5055
private:
51-
std::unique_ptr<std::byte[]> buf_;
56+
static constexpr std::align_val_t kAlignment{8};
57+
58+
using UniqueBufPtr = std::unique_ptr<std::byte[], void (*)(std::byte*)>;
59+
60+
std::byte* AlignedNew(std::size_t capacity) {
61+
return new (kAlignment) std::byte[capacity];
62+
}
63+
static void AlignedDelete(std::byte* p) { operator delete[](p, kAlignment); }
64+
65+
UniqueBufPtr buf_;
5266
std::byte* end_;
5367
std::byte* read_pos_{};
5468
std::byte* write_pos_{};

include/databento/detail/zstd_stream.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <memory> // unique_ptr
77
#include <vector>
88

9+
#include "databento/detail/buffer.hpp"
910
#include "databento/ireadable.hpp"
1011
#include "databento/iwritable.hpp"
1112
#include "databento/log.hpp"
@@ -14,8 +15,7 @@ namespace databento::detail {
1415
class ZstdDecodeStream : public IReadable {
1516
public:
1617
explicit ZstdDecodeStream(std::unique_ptr<IReadable> input);
17-
ZstdDecodeStream(std::unique_ptr<IReadable> input,
18-
std::vector<std::byte>&& in_buffer);
18+
ZstdDecodeStream(std::unique_ptr<IReadable> input, detail::Buffer& in_buffer);
1919

2020
// Read exactly `length` bytes into `buffer`.
2121
void ReadExact(std::byte* buffer, std::size_t length) override;

src/dbn_constants.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ constexpr std::size_t kFixedMetadataLen = 100;
1313
constexpr std::size_t kDatasetCstrLen = 16;
1414
constexpr std::size_t kMetadataReservedLen = 53;
1515
constexpr std::size_t kMetadataReservedLenV1 = 47;
16-
constexpr std::size_t kBufferCapacity = 8UL * 1024;
1716
constexpr std::uint16_t kNullSchema = std::numeric_limits<std::uint16_t>::max();
1817
constexpr std::uint8_t kNullSType = std::numeric_limits<std::uint8_t>::max();
1918
constexpr std::uint64_t kNullRecordCount =

src/dbn_decoder.cpp

Lines changed: 39 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include "databento/compat.hpp"
1010
#include "databento/constants.hpp"
1111
#include "databento/datetime.hpp"
12+
#include "databento/detail/buffer.hpp"
1213
#include "databento/detail/zstd_stream.hpp"
1314
#include "databento/enums.hpp"
1415
#include "databento/exceptions.hpp"
@@ -76,16 +77,12 @@ DbnDecoder::DbnDecoder(ILogReceiver* log_receiver,
7677
: log_receiver_{log_receiver},
7778
upgrade_policy_{upgrade_policy},
7879
input_{std::move(input)} {
79-
read_buffer_.reserve(kBufferCapacity);
8080
if (DetectCompression()) {
81-
input_ = std::make_unique<detail::ZstdDecodeStream>(
82-
std::move(input_), std::move(read_buffer_));
83-
// Reinitialize buffer and get it into the same state as uncompressed input
84-
read_buffer_ = std::vector<std::byte>();
85-
read_buffer_.reserve(kBufferCapacity);
86-
read_buffer_.resize(kMagicSize);
87-
input_->ReadExact(read_buffer_.data(), kMagicSize);
88-
const auto* buf_ptr = read_buffer_.data();
81+
input_ =
82+
std::make_unique<detail::ZstdDecodeStream>(std::move(input_), buffer_);
83+
input_->ReadExact(buffer_.WriteBegin(), kMagicSize);
84+
buffer_.WriteBegin() += kMagicSize;
85+
const auto* buf_ptr = buffer_.ReadBegin();
8986
if (std::strncmp(Consume(buf_ptr, 3), kDbnPrefix, 3) != 0) {
9087
throw DbnResponseError{"Found Zstd input, but not DBN prefix"};
9188
}
@@ -181,16 +178,22 @@ databento::Metadata DbnDecoder::DecodeMetadataFields(
181178

182179
databento::Metadata DbnDecoder::DecodeMetadata() {
183180
// already read first 4 bytes detecting compression
184-
read_buffer_.resize(kMetadataPreludeSize);
185-
input_->ReadExact(&read_buffer_[4], 4);
181+
const auto read_size = kMetadataPreludeSize - kMagicSize;
182+
input_->ReadExact(buffer_.WriteBegin(), read_size);
183+
buffer_.WriteBegin() += read_size;
186184
const auto [version, size] = DbnDecoder::DecodeMetadataVersionAndSize(
187-
read_buffer_.data(), kMetadataPreludeSize);
185+
buffer_.ReadBegin(), kMetadataPreludeSize);
186+
buffer_.ReadBegin() += kMetadataPreludeSize;
188187
version_ = version;
189-
read_buffer_.resize(size);
190-
input_->ReadExact(read_buffer_.data(), read_buffer_.size());
191-
buffer_idx_ = read_buffer_.size();
188+
buffer_.Reserve(size);
189+
input_->ReadExact(buffer_.WriteBegin(), size);
190+
buffer_.WriteBegin() += size;
192191
auto metadata = DbnDecoder::DecodeMetadataFields(
193-
version_, read_buffer_.data(), read_buffer_.data() + read_buffer_.size());
192+
version_, buffer_.ReadBegin(), buffer_.ReadEnd());
193+
buffer_.ReadBegin() += size;
194+
// Metadata may leave buffer misaligned. Shift records to ensure 8-byte
195+
// alignment
196+
buffer_.Shift();
194197
ts_out_ = metadata.ts_out;
195198
metadata.Upgrade(upgrade_policy_);
196199
return metadata;
@@ -239,60 +242,53 @@ databento::Record DbnDecoder::DecodeRecordCompat(
239242
// assumes DecodeMetadata has been called
240243
const databento::Record* DbnDecoder::DecodeRecord() {
241244
// need some unread unread_bytes
242-
if (GetReadBufferSize() == 0) {
245+
if (buffer_.ReadCapacity() == 0) {
243246
if (FillBuffer() == 0) {
244247
return nullptr;
245248
}
246249
}
247250
// check length
248-
while (GetReadBufferSize() < BufferRecordHeader()->Size()) {
251+
while (buffer_.ReadCapacity() < BufferRecordHeader()->Size()) {
249252
if (FillBuffer() == 0) {
250-
if (GetReadBufferSize() > 0) {
253+
if (buffer_.ReadCapacity() > 0) {
251254
log_receiver_->Receive(
252255
LogLevel::Warning,
253256
"Unexpected partial record remaining in stream: " +
254-
std::to_string(GetReadBufferSize()) + " bytes");
257+
std::to_string(buffer_.ReadCapacity()) + " bytes");
255258
}
256259
return nullptr;
257260
}
258261
}
259262
current_record_ = Record{BufferRecordHeader()};
260-
buffer_idx_ += current_record_.Size();
263+
buffer_.ReadBegin() += current_record_.Size();
261264
current_record_ = DbnDecoder::DecodeRecordCompat(
262265
version_, upgrade_policy_, ts_out_, &compat_buffer_, current_record_);
263266
return &current_record_;
264267
}
265268

266269
size_t DbnDecoder::FillBuffer() {
267-
// Shift data forward
268-
std::copy(read_buffer_.cbegin() + static_cast<std::ptrdiff_t>(buffer_idx_),
269-
read_buffer_.cend(), read_buffer_.begin());
270-
const auto unread_size = read_buffer_.size() - buffer_idx_;
271-
buffer_idx_ = 0;
272-
read_buffer_.resize(kBufferCapacity);
273-
const auto fill_size = input_->ReadSome(&read_buffer_[unread_size],
274-
kBufferCapacity - unread_size);
275-
read_buffer_.resize(unread_size + fill_size);
270+
if (buffer_.WriteCapacity() < kMaxRecordLen) {
271+
buffer_.Shift();
272+
}
273+
const auto fill_size =
274+
input_->ReadSome(buffer_.WriteBegin(), buffer_.WriteCapacity());
275+
buffer_.WriteBegin() += fill_size;
276276
return fill_size;
277277
}
278278

279-
std::size_t DbnDecoder::GetReadBufferSize() const {
280-
return read_buffer_.size() - buffer_idx_;
281-
}
282-
283279
databento::RecordHeader* DbnDecoder::BufferRecordHeader() {
284-
return reinterpret_cast<RecordHeader*>(&read_buffer_[buffer_idx_]);
280+
return reinterpret_cast<RecordHeader*>(buffer_.ReadBegin());
285281
}
286282

287283
bool DbnDecoder::DetectCompression() {
288-
read_buffer_.resize(kMagicSize);
289-
input_->ReadExact(read_buffer_.data(), kMagicSize);
290-
const auto* read_buffer_it = read_buffer_.data();
291-
if (std::strncmp(Consume(read_buffer_it, 3), kDbnPrefix, 3) == 0) {
284+
input_->ReadExact(buffer_.WriteBegin(), kMagicSize);
285+
buffer_.WriteBegin() += kMagicSize;
286+
const auto* buffer_it = buffer_.ReadBegin();
287+
if (std::strncmp(Consume(buffer_it, 3), kDbnPrefix, 3) == 0) {
292288
return false;
293289
}
294-
read_buffer_it = read_buffer_.data();
295-
auto x = Consume<std::uint32_t>(read_buffer_it);
290+
buffer_it = buffer_.ReadBegin();
291+
auto x = Consume<std::uint32_t>(buffer_it);
296292
if (x == kZstdMagicNumber) {
297293
return true;
298294
}
@@ -302,12 +298,10 @@ bool DbnDecoder::DetectCompression() {
302298
if ((x & kZstdSkippableFrame) == kZstdSkippableFrame) {
303299
throw DbnResponseError{
304300
"Legacy DBZ encoding is not supported. Please use the dbn CLI tool "
305-
"to "
306-
"convert it to DBN."};
301+
"to convert it to DBN."};
307302
}
308303
throw DbnResponseError{
309-
"Couldn't detect input type. It doesn't appear to be Zstd or "
310-
"DBN."};
304+
"Couldn't detect input type. It doesn't appear to be Zstd or DBN."};
311305
}
312306

313307
std::string DbnDecoder::DecodeSymbol(std::size_t symbol_cstr_len,

src/detail/buffer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ void Buffer::Reserve(std::size_t capacity) {
5454
if (capacity <= Capacity()) {
5555
return;
5656
}
57-
auto new_buf = std::make_unique<std::byte[]>(capacity);
57+
UniqueBufPtr new_buf{AlignedNew(capacity), AlignedDelete};
5858
const auto unread_bytes = ReadCapacity();
5959
std::copy(ReadBegin(), ReadEnd(), new_buf.get());
6060
buf_ = std::move(new_buf);

src/detail/dbn_buffer_decoder.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ databento::KeepGoing DbnBufferDecoder::Process(const char* data,
3535
auto metadata = DbnDecoder::DecodeMetadataFields(
3636
input_version_, dbn_buffer_.ReadBegin(), dbn_buffer_.ReadEnd());
3737
dbn_buffer_.ReadBegin() += bytes_needed_;
38+
// Metadata may leave buffer misaligned. Shift records to ensure 8-byte
39+
// alignment
40+
dbn_buffer_.Shift();
3841
ts_out_ = metadata.ts_out;
3942
metadata.Upgrade(VersionUpgradePolicy::UpgradeToV2);
4043
if (metadata_callback_) {

src/detail/zstd_stream.cpp

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,28 @@
44
#include <sstream>
55
#include <utility> // move
66

7+
#include "databento/detail/buffer.hpp"
78
#include "databento/exceptions.hpp"
89
#include "databento/log.hpp"
910

1011
using databento::detail::ZstdDecodeStream;
1112

1213
ZstdDecodeStream::ZstdDecodeStream(std::unique_ptr<IReadable> input)
13-
: ZstdDecodeStream{std::move(input), {}} {}
14+
: input_{std::move(input)},
15+
z_dstream_{::ZSTD_createDStream(), ::ZSTD_freeDStream},
16+
read_suggestion_{::ZSTD_initDStream(z_dstream_.get())},
17+
in_buffer_{},
18+
z_in_buffer_{in_buffer_.data(), 0, 0} {}
1419

1520
ZstdDecodeStream::ZstdDecodeStream(std::unique_ptr<IReadable> input,
16-
std::vector<std::byte>&& in_buffer)
21+
detail::Buffer& in_buffer)
1722
: input_{std::move(input)},
1823
z_dstream_{::ZSTD_createDStream(), ::ZSTD_freeDStream},
1924
read_suggestion_{::ZSTD_initDStream(z_dstream_.get())},
20-
in_buffer_{std::move(in_buffer)},
21-
z_in_buffer_{in_buffer_.data(), in_buffer_.size(), 0} {}
25+
in_buffer_{in_buffer.ReadBegin(), in_buffer.ReadEnd()},
26+
z_in_buffer_{in_buffer_.data(), in_buffer_.size(), 0} {
27+
in_buffer.ReadBegin() += in_buffer.ReadCapacity();
28+
}
2229

2330
void ZstdDecodeStream::ReadExact(std::byte* buffer, std::size_t length) {
2431
std::size_t size{};

src/live_blocking.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,9 @@ databento::Metadata LiveBlocking::Start() {
148148
auto metadata = DbnDecoder::DecodeMetadataFields(version, buffer_.ReadBegin(),
149149
buffer_.ReadEnd());
150150
buffer_.ReadBegin() += size;
151+
// Metadata may leave buffer misaligned. Shift records to ensure 8-byte
152+
// alignment
153+
buffer_.Shift();
151154
version_ = metadata.version;
152155
metadata.Upgrade(upgrade_policy_);
153156
return metadata;

0 commit comments

Comments
 (0)