Skip to content

Commit dfe8e6c

Browse files
committed
FIX: Fix symbol chunking with string start param
1 parent 0ccbf66 commit dfe8e6c

File tree

6 files changed

+88
-26
lines changed

6 files changed

+88
-26
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Changelog
22

3+
## 0.18.1 - TBD
4+
5+
### Bug fixes
6+
- Added missing symbol chunking for live `Subscribe` overloads with `const std::string&`
7+
`start` parameter
8+
39
## 0.18.0 - 2024-05-14
410

511
### Breaking changes

include/databento/live_blocking.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ class LiveBlocking {
7878
std::string GenerateCramReply(const std::string& challenge_key);
7979
std::string EncodeAuthReq(const std::string& auth);
8080
std::uint64_t DecodeAuthResp();
81+
void Subscribe(const std::string& sub_msg,
82+
const std::vector<std::string>& symbols);
8183
detail::TcpClient::Result FillBuffer(std::chrono::milliseconds timeout);
8284
RecordHeader* BufferRecordHeader();
8385

src/live_blocking.cpp

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,32 @@ void LiveBlocking::Subscribe(const std::vector<std::string>& symbols,
5959

6060
void LiveBlocking::Subscribe(const std::vector<std::string>& symbols,
6161
Schema schema, SType stype_in, UnixNanos start) {
62+
std::ostringstream sub_msg;
63+
sub_msg << "schema=" << ToString(schema)
64+
<< "|stype_in=" << ToString(stype_in);
65+
if (start.time_since_epoch().count()) {
66+
sub_msg << "|start=" << start.time_since_epoch().count();
67+
}
68+
Subscribe(sub_msg.str(), symbols);
69+
}
70+
71+
void LiveBlocking::Subscribe(const std::vector<std::string>& symbols,
72+
Schema schema, SType stype_in,
73+
const std::string& start) {
74+
std::ostringstream sub_msg;
75+
sub_msg << "schema=" << ToString(schema)
76+
<< "|stype_in=" << ToString(stype_in);
77+
if (!start.empty()) {
78+
sub_msg << "|start=" << start;
79+
}
80+
Subscribe(sub_msg.str(), symbols);
81+
}
82+
83+
void LiveBlocking::Subscribe(const std::string& sub_msg,
84+
const std::vector<std::string>& symbols) {
6285
static constexpr auto kMethodName = "Live::Subscribe";
6386
constexpr std::ptrdiff_t kSymbolMaxChunkSize = 128;
87+
6488
if (symbols.empty()) {
6589
throw InvalidArgumentError{kMethodName, "symbols",
6690
"must contain at least one symbol"};
@@ -70,36 +94,17 @@ void LiveBlocking::Subscribe(const std::vector<std::string>& symbols,
7094
const auto chunk_size =
7195
std::min(kSymbolMaxChunkSize, std::distance(symbols_it, symbols.end()));
7296

73-
std::ostringstream sub_msg;
74-
sub_msg << "schema=" << ToString(schema)
75-
<< "|stype_in=" << ToString(stype_in) << "|symbols="
76-
<< JoinSymbolStrings(kMethodName, symbols_it,
77-
symbols_it + chunk_size);
78-
if (start.time_since_epoch().count()) {
79-
sub_msg << "|start=" << start.time_since_epoch().count();
80-
}
81-
sub_msg << '\n';
82-
client_.WriteAll(sub_msg.str());
97+
std::ostringstream chunked_sub_msg;
98+
chunked_sub_msg << sub_msg << "|symbols="
99+
<< JoinSymbolStrings(kMethodName, symbols_it,
100+
symbols_it + chunk_size)
101+
<< '\n';
102+
client_.WriteAll(chunked_sub_msg.str());
83103

84104
symbols_it += chunk_size;
85105
}
86106
}
87107

88-
void LiveBlocking::Subscribe(const std::vector<std::string>& symbols,
89-
Schema schema, SType stype_in,
90-
const std::string& start) {
91-
std::ostringstream sub_msg;
92-
sub_msg << "schema=" << ToString(schema) << "|stype_in=" << ToString(stype_in)
93-
<< "|symbols="
94-
<< JoinSymbolStrings("LiveBlocking::Subscribe", symbols);
95-
if (!start.empty()) {
96-
sub_msg << "|start=" << start;
97-
}
98-
sub_msg << '\n';
99-
100-
client_.WriteAll(sub_msg.str());
101-
}
102-
103108
databento::Metadata LiveBlocking::Start() {
104109
constexpr auto kMetadataPreludeSize = 8;
105110
client_.WriteAll("start_session\n");

test/include/mock/mock_lsg_server.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ class MockLsgServer {
3535
void Authenticate();
3636
void Subscribe(const std::vector<std::string>& symbols, Schema schema,
3737
SType stype);
38+
void Subscribe(const std::vector<std::string>& symbols, Schema schema,
39+
SType stype, const std::string& start);
3840
void Start();
3941
std::size_t Send(const std::string& msg);
4042
::ssize_t UncheckedSend(const std::string& msg);

test/src/live_blocking_tests.cpp

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ TEST_F(LiveBlockingTests, TestSubscribe) {
9292
target.Subscribe(kSymbols, kSchema, kSType);
9393
}
9494

95-
TEST_F(LiveBlockingTests, TestSubscriptionChunking) {
95+
TEST_F(LiveBlockingTests, TestSubscriptionChunkingUnixNanos) {
9696
constexpr auto kTsOut = false;
9797
constexpr auto kDataset = dataset::kXnasItch;
9898
const auto kSymbol = "TEST";
@@ -126,6 +126,42 @@ TEST_F(LiveBlockingTests, TestSubscriptionChunking) {
126126
target.Subscribe(kSymbols, kSchema, kSType);
127127
}
128128

129+
TEST_F(LiveBlockingTests, TestSubscriptionChunkingStringStart) {
130+
constexpr auto kTsOut = false;
131+
constexpr auto kDataset = dataset::kXnasItch;
132+
const auto kSymbol = "TEST";
133+
const std::size_t kSymbolCount = 1000;
134+
const auto kSchema = Schema::Ohlcv1M;
135+
const auto kSType = SType::RawSymbol;
136+
const auto kStart = "2020-01-01T00:00:00";
137+
138+
const mock::MockLsgServer mock_server{
139+
kDataset, kTsOut,
140+
[kSymbol, kSymbolCount, kSchema, kSType,
141+
kStart](mock::MockLsgServer& self) {
142+
self.Accept();
143+
self.Authenticate();
144+
std::size_t i{};
145+
while (i < 1000) {
146+
const auto chunk_size =
147+
std::min(static_cast<std::size_t>(128), kSymbolCount - i);
148+
const std::vector<std::string> symbols_chunk(chunk_size, kSymbol);
149+
self.Subscribe(symbols_chunk, kSchema, kSType, kStart);
150+
i += chunk_size;
151+
}
152+
}};
153+
154+
LiveBlocking target{logger_.get(),
155+
kKey,
156+
kDataset,
157+
kLocalhost,
158+
mock_server.Port(),
159+
kTsOut,
160+
VersionUpgradePolicy{}};
161+
const std::vector<std::string> kSymbols(kSymbolCount, kSymbol);
162+
target.Subscribe(kSymbols, kSchema, kSType, kStart);
163+
}
164+
129165
TEST_F(LiveBlockingTests, TestNextRecord) {
130166
constexpr auto kTsOut = false;
131167
const auto kRecCount = 12;

test/src/mock_lsg_server.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,12 @@ void MockLsgServer::Authenticate() {
9292

9393
void MockLsgServer::Subscribe(const std::vector<std::string>& symbols,
9494
Schema schema, SType stype) {
95+
Subscribe(symbols, schema, stype, {});
96+
}
97+
98+
void MockLsgServer::Subscribe(const std::vector<std::string>& symbols,
99+
Schema schema, SType stype,
100+
const std::string& start) {
95101
const auto received = Receive();
96102
EXPECT_NE(
97103
received.find("symbols=" +
@@ -101,6 +107,11 @@ void MockLsgServer::Subscribe(const std::vector<std::string>& symbols,
101107
std::string::npos);
102108
EXPECT_NE(received.find(std::string{"stype_in="} + ToString(stype)),
103109
std::string::npos);
110+
if (start.empty()) {
111+
EXPECT_EQ(received.find(std::string{"start="}), std::string::npos);
112+
} else {
113+
EXPECT_NE(received.find(std::string{"start="} + start), std::string::npos);
114+
}
104115
}
105116

106117
void MockLsgServer::Start() {

0 commit comments

Comments
 (0)