Skip to content

Commit 3405de7

Browse files
committed
FIX: Fix ts_out decoding with DBNv1 upgrade
1 parent 8998a7d commit 3405de7

File tree

9 files changed

+118
-19
lines changed

9 files changed

+118
-19
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,18 @@
44

55
### Enhancements
66
- Added new publisher values for consolidated DBEQ.MAX
7+
- Added constructor to `WithTsOut` that updates `length` to the correct value to account
8+
for the extra 8 bytes
79

810
### Breaking changes
911
- Changed default `upgrade_policy` to `Upgrade` so by default the primary record types
1012
can always be used
1113
- Renamed `dummy` field in `ImbalanceMsg` and `StatMsg` to `reserved`
1214

1315
### Bug fixes
16+
- Fixed handling of `ts_out` when decoding DBNv1 and upgrading to version 2
17+
- Fixed missing logic to upgrade `ErrorMsgV1` and `SystemMsgV1` when decoding DBN with
18+
`VersionUpgradePolicy::Upgrade`
1419
- Added missing `StatType::Vwap` variant used in the ICE datasets
1520
- Added missing `ToString` and `operator<<` handling for `StatType::ClosePrice` and
1621
`StatType::NetChange`

include/databento/constants.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ static constexpr auto kUndefTimestamp =
2424
static constexpr auto kDbnVersion = 2;
2525
// The length of fixed-length symbol strings.
2626
static constexpr auto kSymbolCstrLen = 71;
27+
// The multiplier for converting the `length` field in `RecordHeader` to bytes.
28+
static constexpr std::size_t kRecordHeaderLengthMultiplier = 4;
2729

2830
// This is not necessarily a comprehensive list of available datasets. Please
2931
// use `Historical.MetadataListDatasets` to retrieve an up-to-date list.

include/databento/dbn_decoder.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class DbnDecoder {
3333
// given version and upgrade policy. If an upgrade is applied,
3434
// compat_buffer is modified.
3535
static Record DecodeRecordCompat(
36-
std::uint8_t version, VersionUpgradePolicy upgrade_policy,
36+
std::uint8_t version, VersionUpgradePolicy upgrade_policy, bool ts_out,
3737
std::array<std::uint8_t, kMaxRecordLen>* compat_buffer, Record rec);
3838

3939
// Should be called exactly once.
@@ -64,6 +64,7 @@ class DbnDecoder {
6464

6565
std::uint8_t version_{};
6666
VersionUpgradePolicy upgrade_policy_;
67+
bool ts_out_{};
6768
std::unique_ptr<IReadable> input_;
6869
std::vector<std::uint8_t> read_buffer_;
6970
std::size_t buffer_idx_{};

include/databento/record.hpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,13 @@
1313
#include "databento/enums.hpp"
1414
#include "databento/flag_set.hpp" // FlagSet
1515
#include "databento/publishers.hpp" // Publisher
16+
#include "databento/with_ts_out.hpp"
1617

1718
namespace databento {
1819
// Common data for all Databento Records.
1920
struct RecordHeader {
20-
static constexpr std::size_t kLengthMultiplier = 4;
21+
static constexpr std::size_t kLengthMultiplier =
22+
kRecordHeaderLengthMultiplier;
2123

2224
// The length of the message in 32-bit words.
2325
std::uint8_t length;
@@ -537,5 +539,6 @@ std::ostream& operator<<(std::ostream& stream,
537539
const SymbolMappingMsg& symbol_mapping_msg);
538540

539541
// The length in bytes of the largest record type.
540-
static constexpr std::size_t kMaxRecordLen = sizeof(InstrumentDefMsg);
542+
static constexpr std::size_t kMaxRecordLen =
543+
sizeof(WithTsOut<InstrumentDefMsg>);
541544
} // namespace databento

include/databento/with_ts_out.hpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
#pragma once
22

3-
#include "databento/datetime.hpp" // UnixNanos
4-
#include "databento/enums.hpp"
3+
#include "databento/constants.hpp" // kRecordHeaderLengthMultiplier
4+
#include "databento/datetime.hpp" // UnixNanos
5+
#include "databento/enums.hpp" // RType
56

67
namespace databento {
78
// Record wrapper to read records with their live gateway send
@@ -10,6 +11,11 @@ template <typename R>
1011
struct WithTsOut {
1112
static bool HasRType(RType rtype) { return R::HasRType(rtype); }
1213

14+
constexpr WithTsOut(R r, UnixNanos ts) : rec{r}, ts_out{ts} {
15+
// Adjust length for `ts_out`
16+
this->rec.hd.length = sizeof(*this) / kRecordHeaderLengthMultiplier;
17+
}
18+
1319
// The base record.
1420
R rec;
1521
// The end timestamp from the Databento live gateway.

src/dbn_decoder.cpp

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include "databento/enums.hpp"
1414
#include "databento/exceptions.hpp"
1515
#include "databento/record.hpp"
16+
#include "databento/with_ts_out.hpp"
1617

1718
using databento::DbnDecoder;
1819

@@ -188,23 +189,47 @@ databento::Metadata DbnDecoder::DecodeMetadata() {
188189
read_buffer_.resize(version_and_size.second);
189190
input_->ReadExact(read_buffer_.data(), read_buffer_.size());
190191
buffer_idx_ = read_buffer_.size();
191-
return DbnDecoder::DecodeMetadataFields(version_, read_buffer_);
192+
auto metadata = DbnDecoder::DecodeMetadataFields(version_, read_buffer_);
193+
ts_out_ = metadata.ts_out;
194+
return metadata;
192195
}
193196

197+
namespace {
198+
template <typename T, typename U>
199+
databento::Record UpgradeRecord(
200+
bool ts_out,
201+
std::array<std::uint8_t, databento::kMaxRecordLen>* compat_buffer,
202+
databento::Record rec) {
203+
if (ts_out) {
204+
const auto orig = rec.Get<databento::WithTsOut<T>>();
205+
const databento::WithTsOut<U> v2{orig.rec.ToV2(), orig.ts_out};
206+
const auto v2_ptr = reinterpret_cast<const std::uint8_t*>(&v2);
207+
std::copy(v2_ptr, v2_ptr + v2.rec.hd.Size(), compat_buffer->data());
208+
} else {
209+
const auto v2 = rec.Get<T>().ToV2();
210+
const auto v2_ptr = reinterpret_cast<const std::uint8_t*>(&v2);
211+
std::copy(v2_ptr, v2_ptr + v2.hd.Size(), compat_buffer->data());
212+
}
213+
return databento::Record{
214+
reinterpret_cast<databento::RecordHeader*>(compat_buffer->data())};
215+
}
216+
} // namespace
217+
194218
databento::Record DbnDecoder::DecodeRecordCompat(
195-
std::uint8_t version, VersionUpgradePolicy upgrade_policy,
219+
std::uint8_t version, VersionUpgradePolicy upgrade_policy, bool ts_out,
196220
std::array<std::uint8_t, kMaxRecordLen>* compat_buffer, Record rec) {
197221
if (version == 1 && upgrade_policy == VersionUpgradePolicy::Upgrade) {
198222
if (rec.RType() == RType::InstrumentDef) {
199-
auto v2 = rec.Get<InstrumentDefMsgV1>().ToV2();
200-
auto v2_ptr = reinterpret_cast<std::uint8_t*>(&v2);
201-
std::copy(v2_ptr, v2_ptr + v2.hd.Size(), compat_buffer->data());
202-
return Record{reinterpret_cast<RecordHeader*>(compat_buffer->data())};
223+
return UpgradeRecord<InstrumentDefMsgV1, InstrumentDefMsgV2>(
224+
ts_out, compat_buffer, rec);
203225
} else if (rec.RType() == RType::SymbolMapping) {
204-
auto v2 = rec.Get<SymbolMappingMsgV1>().ToV2();
205-
auto v2_ptr = reinterpret_cast<std::uint8_t*>(&v2);
206-
std::copy(v2_ptr, v2_ptr + v2.hd.Size(), compat_buffer->data());
207-
return Record{reinterpret_cast<RecordHeader*>(compat_buffer->data())};
226+
return UpgradeRecord<SymbolMappingMsgV1, SymbolMappingMsgV2>(
227+
ts_out, compat_buffer, rec);
228+
} else if (rec.RType() == RType::Error) {
229+
return UpgradeRecord<ErrorMsgV1, ErrorMsgV2>(ts_out, compat_buffer, rec);
230+
} else if (rec.RType() == RType::System) {
231+
return UpgradeRecord<SystemMsgV1, SystemMsgV2>(ts_out, compat_buffer,
232+
rec);
208233
}
209234
}
210235
return rec;
@@ -228,7 +253,7 @@ const databento::Record* DbnDecoder::DecodeRecord() {
228253
current_record_ = Record{BufferRecordHeader()};
229254
buffer_idx_ += current_record_.Size();
230255
current_record_ = DbnDecoder::DecodeRecordCompat(
231-
version_, upgrade_policy_, &compat_buffer_, current_record_);
256+
version_, upgrade_policy_, ts_out_, &compat_buffer_, current_record_);
232257
return &current_record_;
233258
}
234259

src/live_blocking.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,9 @@ const databento::Record* LiveBlocking::NextRecord(
143143
}
144144
current_record_ = Record{BufferRecordHeader()};
145145
buffer_idx_ += current_record_.Size();
146-
current_record_ = DbnDecoder::DecodeRecordCompat(
147-
version_, upgrade_policy_, &compat_buffer_, current_record_);
146+
current_record_ =
147+
DbnDecoder::DecodeRecordCompat(version_, upgrade_policy_, send_ts_out_,
148+
&compat_buffer_, current_record_);
148149
return &current_record_;
149150
}
150151

test/src/dbn_decoder_tests.cpp

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include <gtest/gtest.h>
22

3+
#include <chrono>
34
#include <cstddef>
45
#include <cstdint>
56
#include <cstring>
@@ -9,6 +10,7 @@
910

1011
#include "databento/compat.hpp"
1112
#include "databento/constants.hpp"
13+
#include "databento/datetime.hpp"
1214
#include "databento/dbn.hpp"
1315
#include "databento/dbn_decoder.hpp"
1416
#include "databento/detail/file_stream.hpp"
@@ -18,6 +20,7 @@
1820
#include "databento/exceptions.hpp"
1921
#include "databento/ireadable.hpp"
2022
#include "databento/record.hpp"
23+
#include "databento/with_ts_out.hpp"
2124

2225
namespace databento {
2326
namespace test {
@@ -138,6 +141,59 @@ TEST_F(DbnDecoderTests, TestDecodeDefinitionUpgrade) {
138141
AssertDefEq<InstrumentDefMsgV2>(ch_record2, f_record2);
139142
}
140143

144+
TEST_F(DbnDecoderTests, TestUpgradeSymbolMappingWithTsOut) {
145+
SymbolMappingMsgV1 sym_map{
146+
{sizeof(SymbolMappingMsgV1) / RecordHeader::kLengthMultiplier,
147+
RType::SymbolMapping, 0, 1, UnixNanos{std::chrono::nanoseconds{2}}},
148+
{"ES.c.0"},
149+
{"ESH4"},
150+
{},
151+
{},
152+
{}};
153+
WithTsOut<SymbolMappingMsgV1> orig{
154+
sym_map, UnixNanos{std::chrono::system_clock::now()}};
155+
std::array<std::uint8_t, kMaxRecordLen> compat_buffer{};
156+
const auto res =
157+
DbnDecoder::DecodeRecordCompat(1, VersionUpgradePolicy::Upgrade, true,
158+
&compat_buffer, Record{&orig.rec.hd});
159+
const auto& upgraded = res.Get<WithTsOut<SymbolMappingMsgV2>>();
160+
ASSERT_EQ(orig.ts_out, upgraded.ts_out);
161+
ASSERT_STREQ(orig.rec.STypeInSymbol(), upgraded.rec.STypeInSymbol());
162+
ASSERT_STREQ(orig.rec.STypeOutSymbol(), upgraded.rec.STypeOutSymbol());
163+
// `length` properly set
164+
ASSERT_EQ(upgraded.rec.hd.Size(), sizeof(upgraded));
165+
// used compat buffer
166+
ASSERT_EQ(reinterpret_cast<const std::uint8_t*>(&upgraded),
167+
compat_buffer.data());
168+
}
169+
170+
TEST_F(DbnDecoderTests, TestUpgradeMbp1WithTsOut) {
171+
WithTsOut<Mbp1Msg> orig{
172+
Mbp1Msg{{sizeof(Mbp1Msg) / RecordHeader::kLengthMultiplier,
173+
RType::Mbp1,
174+
{},
175+
{},
176+
{}},
177+
1'250'000'000,
178+
{},
179+
{},
180+
Side::Ask,
181+
{},
182+
{},
183+
{},
184+
{},
185+
{},
186+
{}},
187+
{std::chrono::system_clock::now()}};
188+
std::array<std::uint8_t, kMaxRecordLen> compat_buffer{};
189+
const auto res =
190+
DbnDecoder::DecodeRecordCompat(1, VersionUpgradePolicy::Upgrade, true,
191+
&compat_buffer, Record{&orig.rec.hd});
192+
const auto& upgraded = res.Get<WithTsOut<Mbp1Msg>>();
193+
// compat buffer unused and pointer unchanged
194+
ASSERT_EQ(&orig, &upgraded);
195+
}
196+
141197
class DbnDecoderSchemaTests
142198
: public DbnDecoderTests,
143199
public testing::WithParamInterface<std::pair<const char*, std::uint8_t>> {

test/src/live_blocking_tests.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ TEST_F(LiveBlockingTests, TestNextRecordWithTsOut) {
267267
constexpr auto kRecCount = 5;
268268
constexpr auto kTsOut = true;
269269
constexpr WithTsOut<TradeMsg> kRec{
270-
{DummyHeader<WithTsOut<TradeMsg>>(RType::Mbp0),
270+
{DummyHeader<TradeMsg>(RType::Mbp0),
271271
1,
272272
2,
273273
Action::Add,

0 commit comments

Comments
 (0)