Skip to content

Commit 4d3ce49

Browse files
ADD: Add snapshot subscription for public clients
1 parent dfe8e6c commit 4d3ce49

File tree

9 files changed

+101
-12
lines changed

9 files changed

+101
-12
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
## 0.18.1 - TBD
44

5+
### Enhancements
6+
- Added live `Subscribe` function overload with `use_snapshot` parameter
7+
58
### Bug fixes
69
- Added missing symbol chunking for live `Subscribe` overloads with `const std::string&`
710
`start` parameter

include/databento/live_blocking.hpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ class LiveBlocking {
5050
SType stype_in, UnixNanos start);
5151
void Subscribe(const std::vector<std::string>& symbols, Schema schema,
5252
SType stype_in, const std::string& start);
53+
void Subscribe(const std::vector<std::string>& symbols, Schema schema,
54+
SType stype_in, bool use_snapshot);
5355
// Notifies the gateway to start sending messages for all subscriptions.
5456
//
5557
// This method should only be called once per instance.
@@ -79,7 +81,7 @@ class LiveBlocking {
7981
std::string EncodeAuthReq(const std::string& auth);
8082
std::uint64_t DecodeAuthResp();
8183
void Subscribe(const std::string& sub_msg,
82-
const std::vector<std::string>& symbols);
84+
const std::vector<std::string>& symbols, bool use_snapshot);
8385
detail::TcpClient::Result FillBuffer(std::chrono::milliseconds timeout);
8486
RecordHeader* BufferRecordHeader();
8587

include/databento/live_threaded.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ class LiveThreaded {
6565
SType stype_in, UnixNanos start);
6666
void Subscribe(const std::vector<std::string>& symbols, Schema schema,
6767
SType stype_in, const std::string& start);
68+
void Subscribe(const std::vector<std::string>& symbols, Schema schema,
69+
SType stype_in, bool use_snapshot);
6870
// Notifies the gateway to start sending messages for all subscriptions.
6971
// `metadata_callback` will be called exactly once, before any calls to
7072
// `record_callback`. `record_callback` will be called for records from all

src/live_blocking.cpp

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ void LiveBlocking::Subscribe(const std::vector<std::string>& symbols,
6565
if (start.time_since_epoch().count()) {
6666
sub_msg << "|start=" << start.time_since_epoch().count();
6767
}
68-
Subscribe(sub_msg.str(), symbols);
68+
Subscribe(sub_msg.str(), symbols, false);
6969
}
7070

7171
void LiveBlocking::Subscribe(const std::vector<std::string>& symbols,
@@ -77,11 +77,21 @@ void LiveBlocking::Subscribe(const std::vector<std::string>& symbols,
7777
if (!start.empty()) {
7878
sub_msg << "|start=" << start;
7979
}
80-
Subscribe(sub_msg.str(), symbols);
80+
Subscribe(sub_msg.str(), symbols, false);
81+
}
82+
83+
void LiveBlocking::Subscribe(const std::vector<std::string>& symbols,
84+
Schema schema, SType stype_in, bool use_snapshot) {
85+
std::ostringstream sub_msg;
86+
sub_msg << "schema=" << ToString(schema)
87+
<< "|stype_in=" << ToString(stype_in);
88+
89+
Subscribe(sub_msg.str(), symbols, use_snapshot);
8190
}
8291

8392
void LiveBlocking::Subscribe(const std::string& sub_msg,
84-
const std::vector<std::string>& symbols) {
93+
const std::vector<std::string>& symbols,
94+
bool use_snapshot) {
8595
static constexpr auto kMethodName = "Live::Subscribe";
8696
constexpr std::ptrdiff_t kSymbolMaxChunkSize = 128;
8797

@@ -98,7 +108,7 @@ void LiveBlocking::Subscribe(const std::string& sub_msg,
98108
chunked_sub_msg << sub_msg << "|symbols="
99109
<< JoinSymbolStrings(kMethodName, symbols_it,
100110
symbols_it + chunk_size)
101-
<< '\n';
111+
<< "|snapshot=" << use_snapshot << '\n';
102112
client_.WriteAll(chunked_sub_msg.str());
103113

104114
symbols_it += chunk_size;

src/live_threaded.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,11 @@ void LiveThreaded::Subscribe(const std::vector<std::string>& symbols,
102102
impl_->blocking.Subscribe(symbols, schema, stype_in, start);
103103
}
104104

105+
void LiveThreaded::Subscribe(const std::vector<std::string>& symbols,
106+
Schema schema, SType stype_in, bool use_snapshot) {
107+
impl_->blocking.Subscribe(symbols, schema, stype_in, use_snapshot);
108+
}
109+
105110
void LiveThreaded::Start(RecordCallback callback) {
106111
Start({}, std::move(callback), {});
107112
}

test/include/mock/mock_lsg_server.hpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,10 @@ class MockLsgServer {
3434
void Accept();
3535
void Authenticate();
3636
void Subscribe(const std::vector<std::string>& symbols, Schema schema,
37-
SType stype);
37+
SType stype, bool use_snapshot = false);
3838
void Subscribe(const std::vector<std::string>& symbols, Schema schema,
39-
SType stype, const std::string& start);
39+
SType stype, const std::string& start,
40+
bool use_snapshot = false);
4041
void Start();
4142
std::size_t Send(const std::string& msg);
4243
::ssize_t UncheckedSend(const std::string& msg);

test/src/live_blocking_tests.cpp

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,66 @@ TEST_F(LiveBlockingTests, TestSubscriptionChunkingStringStart) {
162162
target.Subscribe(kSymbols, kSchema, kSType, kStart);
163163
}
164164

165+
TEST_F(LiveBlockingTests, TestSubscribeSnapshot) {
166+
constexpr auto kTsOut = false;
167+
constexpr auto kDataset = dataset::kXnasItch;
168+
const auto kSymbol = "TEST";
169+
const std::size_t kSymbolCount = 1000;
170+
const auto kSchema = Schema::Ohlcv1M;
171+
const auto kSType = SType::RawSymbol;
172+
const auto use_snapshot = true;
173+
174+
const mock::MockLsgServer mock_server{
175+
kDataset, kTsOut,
176+
[kSymbol, kSymbolCount, kSchema, kSType](mock::MockLsgServer& self) {
177+
self.Accept();
178+
self.Authenticate();
179+
std::size_t i{};
180+
while (i < 1000) {
181+
const auto chunk_size =
182+
std::min(static_cast<std::size_t>(128), kSymbolCount - i);
183+
const std::vector<std::string> symbols_chunk(chunk_size, kSymbol);
184+
self.Subscribe(symbols_chunk, kSchema, kSType, use_snapshot);
185+
i += chunk_size;
186+
}
187+
}};
188+
189+
LiveBlocking target{logger_.get(),
190+
kKey,
191+
kDataset,
192+
kLocalhost,
193+
mock_server.Port(),
194+
kTsOut,
195+
VersionUpgradePolicy{}};
196+
const std::vector<std::string> kSymbols(kSymbolCount, kSymbol);
197+
target.Subscribe(kSymbols, kSchema, kSType, use_snapshot);
198+
}
199+
200+
TEST_F(LiveBlockingTests, TestInvalidSubscription) {
201+
constexpr auto kTsOut = false;
202+
constexpr auto kDataset = dataset::kXnasItch;
203+
const std::vector<std::string> noSymbols{};
204+
const auto kSchema = Schema::Ohlcv1M;
205+
const auto kSType = SType::RawSymbol;
206+
207+
const mock::MockLsgServer mock_server{kDataset, kTsOut,
208+
[](mock::MockLsgServer& self) {
209+
self.Accept();
210+
self.Authenticate();
211+
}};
212+
213+
LiveBlocking target{logger_.get(),
214+
kKey,
215+
kDataset,
216+
kLocalhost,
217+
mock_server.Port(),
218+
kTsOut,
219+
VersionUpgradePolicy{}};
220+
221+
ASSERT_THROW(target.Subscribe(noSymbols, kSchema, kSType),
222+
databento::InvalidArgumentError);
223+
}
224+
165225
TEST_F(LiveBlockingTests, TestNextRecord) {
166226
constexpr auto kTsOut = false;
167227
const auto kRecCount = 12;

test/src/live_threaded_tests.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ TEST_F(LiveThreadedTests, TestExceptionCallbackAndReconnect) {
178178
{},
179179
2};
180180

181+
const auto use_snapshot = true;
181182
bool should_close{};
182183
std::mutex should_close_mutex;
183184
std::condition_variable should_close_cv;
@@ -187,7 +188,7 @@ TEST_F(LiveThreadedTests, TestExceptionCallbackAndReconnect) {
187188
kSType](mock::MockLsgServer& self) {
188189
self.Accept();
189190
self.Authenticate();
190-
self.Subscribe(kAllSymbols, kSchema, kSType);
191+
self.Subscribe(kAllSymbols, kSchema, kSType, use_snapshot);
191192
self.Start();
192193
{
193194
std::unique_lock<std::mutex> shutdown_lock{should_close_mutex};
@@ -235,7 +236,8 @@ TEST_F(LiveThreadedTests, TestExceptionCallbackAndReconnect) {
235236
return LiveThreaded::ExceptionAction::Stop;
236237
}
237238
};
238-
target.Subscribe(kAllSymbols, kSchema, kSType);
239+
240+
target.Subscribe(kAllSymbols, kSchema, kSType, use_snapshot);
239241
target.Start(metadata_cb, record_cb, exception_cb);
240242
target.BlockForStop();
241243
EXPECT_EQ(metadata_calls, 2);

test/src/mock_lsg_server.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,13 @@ void MockLsgServer::Authenticate() {
9191
}
9292

9393
void MockLsgServer::Subscribe(const std::vector<std::string>& symbols,
94-
Schema schema, SType stype) {
95-
Subscribe(symbols, schema, stype, {});
94+
Schema schema, SType stype, bool use_snapshot) {
95+
Subscribe(symbols, schema, stype, {}, use_snapshot);
9696
}
9797

9898
void MockLsgServer::Subscribe(const std::vector<std::string>& symbols,
9999
Schema schema, SType stype,
100-
const std::string& start) {
100+
const std::string& start, bool use_snapshot) {
101101
const auto received = Receive();
102102
EXPECT_NE(
103103
received.find("symbols=" +
@@ -112,6 +112,10 @@ void MockLsgServer::Subscribe(const std::vector<std::string>& symbols,
112112
} else {
113113
EXPECT_NE(received.find(std::string{"start="} + start), std::string::npos);
114114
}
115+
116+
EXPECT_NE(
117+
received.find(std::string{"snapshot="} + std::to_string(use_snapshot)),
118+
std::string::npos);
115119
}
116120

117121
void MockLsgServer::Start() {

0 commit comments

Comments
 (0)