Skip to content

Commit 4e7123f

Browse files
committed
FIX: Fix live subscription in C++ client
1 parent 6ce35cb commit 4e7123f

File tree

9 files changed

+74
-35
lines changed

9 files changed

+74
-35
lines changed

CHANGELOG.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
# Changelog
22

3-
## 0.19.0 - TBD
3+
## 0.18.2 - TBD
44

55
### Enhancements
66
- Added new `UncrossingPrice` `StatType` variant
77
- Added new publisher values for `XNAS.BASIC`
88

99
### Bug fixes
10-
- Fix descriptions for `FINN` and `FINY` publishers.
10+
- Fixed overloading of live `Subscribe` methods
11+
- Fixed live subscribing with default-constructed `UnixNanos`
12+
- Fixed descriptions for `FINN` and `FINY` publishers.
1113

1214
## 0.18.1 - 2024-05-22
1315

include/databento/live_blocking.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ class LiveBlocking {
5050
SType stype_in, UnixNanos start);
5151
void Subscribe(const std::vector<std::string>& symbols, Schema schema,
5252
SType stype_in, const std::string& start);
53-
void Subscribe(const std::vector<std::string>& symbols, Schema schema,
54-
SType stype_in, bool use_snapshot);
53+
void SubscribeWithSnapshot(const std::vector<std::string>& symbols,
54+
Schema schema, SType stype_in);
5555
// Notifies the gateway to start sending messages for all subscriptions.
5656
//
5757
// This method should only be called once per instance.

include/databento/live_threaded.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ class LiveThreaded {
6565
SType stype_in, UnixNanos start);
6666
void Subscribe(const std::vector<std::string>& symbols, Schema schema,
6767
SType stype_in, const std::string& start);
68-
void Subscribe(const std::vector<std::string>& symbols, Schema schema,
69-
SType stype_in, bool use_snapshot);
68+
void SubscribeWithSnapshot(const std::vector<std::string>& symbols,
69+
Schema schema, SType stype_in);
7070
// Notifies the gateway to start sending messages for all subscriptions.
7171
// `metadata_callback` will be called exactly once, before any calls to
7272
// `record_callback`. `record_callback` will be called for records from all

src/live_blocking.cpp

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,17 +54,14 @@ LiveBlocking::LiveBlocking(ILogReceiver* log_receiver, std::string key,
5454

5555
void LiveBlocking::Subscribe(const std::vector<std::string>& symbols,
5656
Schema schema, SType stype_in) {
57-
Subscribe(symbols, schema, stype_in, UnixNanos{});
57+
Subscribe(symbols, schema, stype_in, std::string{""});
5858
}
5959

6060
void LiveBlocking::Subscribe(const std::vector<std::string>& symbols,
6161
Schema schema, SType stype_in, UnixNanos start) {
6262
std::ostringstream sub_msg;
63-
sub_msg << "schema=" << ToString(schema)
64-
<< "|stype_in=" << ToString(stype_in);
65-
if (start.time_since_epoch().count()) {
66-
sub_msg << "|start=" << start.time_since_epoch().count();
67-
}
63+
sub_msg << "schema=" << ToString(schema) << "|stype_in=" << ToString(stype_in)
64+
<< "|start=" << start.time_since_epoch().count();
6865
Subscribe(sub_msg.str(), symbols, false);
6966
}
7067

@@ -80,13 +77,13 @@ void LiveBlocking::Subscribe(const std::vector<std::string>& symbols,
8077
Subscribe(sub_msg.str(), symbols, false);
8178
}
8279

83-
void LiveBlocking::Subscribe(const std::vector<std::string>& symbols,
84-
Schema schema, SType stype_in, bool use_snapshot) {
80+
void LiveBlocking::SubscribeWithSnapshot(
81+
const std::vector<std::string>& symbols, Schema schema, SType stype_in) {
8582
std::ostringstream sub_msg;
8683
sub_msg << "schema=" << ToString(schema)
8784
<< "|stype_in=" << ToString(stype_in);
8885

89-
Subscribe(sub_msg.str(), symbols, use_snapshot);
86+
Subscribe(sub_msg.str(), symbols, true);
9087
}
9188

9289
void LiveBlocking::Subscribe(const std::string& sub_msg,

src/live_threaded.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,9 @@ void LiveThreaded::Subscribe(const std::vector<std::string>& symbols,
102102
impl_->blocking.Subscribe(symbols, schema, stype_in, start);
103103
}
104104

105-
void LiveThreaded::Subscribe(const std::vector<std::string>& symbols,
106-
Schema schema, SType stype_in, bool use_snapshot) {
107-
impl_->blocking.Subscribe(symbols, schema, stype_in, use_snapshot);
105+
void LiveThreaded::SubscribeWithSnapshot(
106+
const std::vector<std::string>& symbols, Schema schema, SType stype_in) {
107+
impl_->blocking.SubscribeWithSnapshot(symbols, schema, stype_in);
108108
}
109109

110110
void LiveThreaded::Start(RecordCallback callback) {

test/include/mock/mock_lsg_server.hpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,11 @@ class MockLsgServer {
3434
void Accept();
3535
void Authenticate();
3636
void Subscribe(const std::vector<std::string>& symbols, Schema schema,
37-
SType stype, bool use_snapshot = false);
37+
SType stype);
3838
void Subscribe(const std::vector<std::string>& symbols, Schema schema,
39-
SType stype, const std::string& start,
40-
bool use_snapshot = false);
39+
SType stype, const std::string& start);
40+
void SubscribeWithSnapshot(const std::vector<std::string>& symbols,
41+
Schema schema, SType stype);
4142
void Start();
4243
std::size_t Send(const std::string& msg);
4344
::ssize_t UncheckedSend(const std::string& msg);

test/src/live_blocking_tests.cpp

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,33 @@ TEST_F(LiveBlockingTests, TestSubscriptionChunkingUnixNanos) {
126126
target.Subscribe(kSymbols, kSchema, kSType);
127127
}
128128

129+
TEST_F(LiveBlockingTests, TestSubscriptionUnixNanos0) {
130+
constexpr auto kTsOut = false;
131+
constexpr auto kDataset = dataset::kXnasItch;
132+
const std::vector<std::string> kSymbols = {"TEST1", "TEST2"};
133+
const auto kSchema = Schema::Ohlcv1M;
134+
const auto kSType = SType::RawSymbol;
135+
const auto kStart = UnixNanos{};
136+
137+
const mock::MockLsgServer mock_server{
138+
kDataset, kTsOut,
139+
[&kSymbols, kSchema, kSType, kStart](mock::MockLsgServer& self) {
140+
self.Accept();
141+
self.Authenticate();
142+
std::size_t i{};
143+
self.Subscribe(kSymbols, kSchema, kSType, "0");
144+
}};
145+
146+
LiveBlocking target{logger_.get(),
147+
kKey,
148+
kDataset,
149+
kLocalhost,
150+
mock_server.Port(),
151+
kTsOut,
152+
VersionUpgradePolicy{}};
153+
target.Subscribe(kSymbols, kSchema, kSType, kStart);
154+
}
155+
129156
TEST_F(LiveBlockingTests, TestSubscriptionChunkingStringStart) {
130157
constexpr auto kTsOut = false;
131158
constexpr auto kDataset = dataset::kXnasItch;
@@ -182,7 +209,7 @@ TEST_F(LiveBlockingTests, TestSubscribeSnapshot) {
182209
const auto chunk_size =
183210
std::min(static_cast<std::size_t>(128), kSymbolCount - i);
184211
const std::vector<std::string> symbols_chunk(chunk_size, kSymbol);
185-
self.Subscribe(symbols_chunk, kSchema, kSType, kUseSnapshot);
212+
self.SubscribeWithSnapshot(symbols_chunk, kSchema, kSType);
186213
i += chunk_size;
187214
}
188215
}};
@@ -195,13 +222,13 @@ TEST_F(LiveBlockingTests, TestSubscribeSnapshot) {
195222
kTsOut,
196223
VersionUpgradePolicy{}};
197224
const std::vector<std::string> kSymbols(kSymbolCount, kSymbol);
198-
target.Subscribe(kSymbols, kSchema, kSType, kUseSnapshot);
225+
target.SubscribeWithSnapshot(kSymbols, kSchema, kSType);
199226
}
200227

201228
TEST_F(LiveBlockingTests, TestInvalidSubscription) {
202229
constexpr auto kTsOut = false;
203230
constexpr auto kDataset = dataset::kXnasItch;
204-
const std::vector<std::string> noSymbols{};
231+
const std::vector<std::string> kNoSymbols{};
205232
const auto kSchema = Schema::Ohlcv1M;
206233
const auto kSType = SType::RawSymbol;
207234

@@ -219,7 +246,7 @@ TEST_F(LiveBlockingTests, TestInvalidSubscription) {
219246
kTsOut,
220247
VersionUpgradePolicy{}};
221248

222-
ASSERT_THROW(target.Subscribe(noSymbols, kSchema, kSType),
249+
ASSERT_THROW(target.Subscribe(kNoSymbols, kSchema, kSType),
223250
databento::InvalidArgumentError);
224251
}
225252

test/src/live_threaded_tests.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ TEST_F(LiveThreadedTests, TestExceptionCallbackAndReconnect) {
188188
kSType, kUseSnapshot](mock::MockLsgServer& self) {
189189
self.Accept();
190190
self.Authenticate();
191-
self.Subscribe(kAllSymbols, kSchema, kSType, kUseSnapshot);
191+
self.SubscribeWithSnapshot(kAllSymbols, kSchema, kSType);
192192
self.Start();
193193
{
194194
std::unique_lock<std::mutex> shutdown_lock{should_close_mutex};
@@ -237,7 +237,7 @@ TEST_F(LiveThreadedTests, TestExceptionCallbackAndReconnect) {
237237
}
238238
};
239239

240-
target.Subscribe(kAllSymbols, kSchema, kSType, kUseSnapshot);
240+
target.SubscribeWithSnapshot(kAllSymbols, kSchema, kSType);
241241
target.Start(metadata_cb, record_cb, exception_cb);
242242
target.BlockForStop();
243243
EXPECT_EQ(metadata_calls, 2);

test/src/mock_lsg_server.cpp

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,28 @@ void MockLsgServer::Authenticate() {
9191
}
9292

9393
void MockLsgServer::Subscribe(const std::vector<std::string>& symbols,
94-
Schema schema, SType stype, bool use_snapshot) {
95-
Subscribe(symbols, schema, stype, {}, use_snapshot);
94+
Schema schema, SType stype) {
95+
Subscribe(symbols, schema, stype, "");
96+
}
97+
98+
void MockLsgServer::SubscribeWithSnapshot(
99+
const std::vector<std::string>& symbols, Schema schema, SType stype) {
100+
const auto received = Receive();
101+
EXPECT_NE(
102+
received.find("symbols=" +
103+
JoinSymbolStrings("MockLsgServer::Subscribe", symbols)),
104+
std::string::npos);
105+
EXPECT_NE(received.find(std::string{"schema="} + ToString(schema)),
106+
std::string::npos);
107+
EXPECT_NE(received.find(std::string{"stype_in="} + ToString(stype)),
108+
std::string::npos);
109+
EXPECT_EQ(received.find("start="), std::string::npos);
110+
EXPECT_NE(received.find("snapshot=1"), std::string::npos);
96111
}
97112

98113
void MockLsgServer::Subscribe(const std::vector<std::string>& symbols,
99114
Schema schema, SType stype,
100-
const std::string& start, bool use_snapshot) {
115+
const std::string& start) {
101116
const auto received = Receive();
102117
EXPECT_NE(
103118
received.find("symbols=" +
@@ -108,14 +123,11 @@ void MockLsgServer::Subscribe(const std::vector<std::string>& symbols,
108123
EXPECT_NE(received.find(std::string{"stype_in="} + ToString(stype)),
109124
std::string::npos);
110125
if (start.empty()) {
111-
EXPECT_EQ(received.find(std::string{"start="}), std::string::npos);
126+
EXPECT_EQ(received.find("start="), std::string::npos);
112127
} else {
113128
EXPECT_NE(received.find(std::string{"start="} + start), std::string::npos);
114129
}
115-
116-
EXPECT_NE(
117-
received.find(std::string{"snapshot="} + std::to_string(use_snapshot)),
118-
std::string::npos);
130+
EXPECT_NE(received.find("snapshot=0"), std::string::npos);
119131
}
120132

121133
void MockLsgServer::Start() {

0 commit comments

Comments
 (0)