Skip to content

Commit cd28657

Browse files
committed
MOD: Batch long subscriptions
1 parent 6cb67d3 commit cd28657

File tree

6 files changed

+85
-14
lines changed

6 files changed

+85
-14
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
# Changelog
22

3+
## 0.9.1 - TBD
4+
5+
#### Enhancements
6+
- Added constants for dataset codes for Databento Equity Basic and OPRA Pillar
7+
8+
#### Bug fixes
9+
- Batch live subscriptions to avoid hitting max message length
10+
311
## 0.9.0 - 2023-06-13
412

513
#### Enhancements

include/databento/constants.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,13 @@ static constexpr auto kUndefOrderSize =
1818
// This is not necessarily a comprehensive list of available datasets. Please
1919
// use `Historical.MetadataListDatasets` to retrieve an up-to-date list.
2020
namespace dataset {
21+
// The dataset code for Databento Equity Basic.
22+
static constexpr auto kDbeqBasic = "DBEQ.BASIC";
23+
// The dataset code for CME Globex MDP 3.0.
2124
static constexpr auto kGlbxMdp3 = "GLBX.MDP3";
25+
// The dataset code for OPRA.PILLAR.
26+
static constexpr auto kOpraPillar = "OPRA.PILLAR";
27+
// The dataset code for Nasdaq TotalView ITCH.
2228
static constexpr auto kXnasItch = "XNAS.ITCH";
2329
} // namespace dataset
2430
} // namespace databento

include/databento/symbology.hpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,12 @@ struct SymbologyResolution {
2727
// Converts a vector of symbols to a comma-delineated string for sending to
2828
// Databento's historical and live APIs.
2929
//
30-
// Throws InvalidArgumentError if symbols is empty.
30+
// Throws InvalidArgumentError if symbols is empty or the iterator range is
31+
// empty.
32+
std::string JoinSymbolStrings(
33+
const std::string& method_name,
34+
std::vector<std::string>::const_iterator symbols_begin,
35+
std::vector<std::string>::const_iterator symbols_end);
3136
std::string JoinSymbolStrings(const std::string& method_name,
3237
const std::vector<std::string>& symbols);
3338
std::string ToString(const StrMappingInterval& mapping_interval);

src/live_blocking.cpp

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
#include <openssl/sha.h> // SHA256, SHA256_DIGEST_LENGTH
44

55
#include <algorithm> // copy
6-
#include <cctype>
6+
#include <cctype> // tolower
7+
#include <cstddef> // ptrdiff_t
78
#include <cstdlib>
89
#include <ios> //hex, setfill, setw
910
#include <sstream>
@@ -53,16 +54,30 @@ void LiveBlocking::Subscribe(const std::vector<std::string>& symbols,
5354

5455
void LiveBlocking::Subscribe(const std::vector<std::string>& symbols,
5556
Schema schema, SType stype_in, UnixNanos start) {
56-
std::ostringstream sub_msg;
57-
sub_msg << "schema=" << ToString(schema) << "|stype_in=" << ToString(stype_in)
58-
<< "|symbols="
59-
<< JoinSymbolStrings("LiveBlocking::Subscribe", symbols);
60-
if (start.time_since_epoch().count()) {
61-
sub_msg << "|start=" << start.time_since_epoch().count();
57+
static constexpr auto kMethodName = "Live::Subscribe";
58+
constexpr std::ptrdiff_t kSymbolMaxChunkSize = 128;
59+
if (symbols.empty()) {
60+
throw InvalidArgumentError{kMethodName, "symbols",
61+
"must contain at least one symbol"};
6262
}
63-
sub_msg << '\n';
63+
auto symbols_it = symbols.begin();
64+
while (symbols_it != symbols.end()) {
65+
const auto chunk_size =
66+
std::min(kSymbolMaxChunkSize, std::distance(symbols_it, symbols.end()));
6467

65-
client_.WriteAll(sub_msg.str());
68+
std::ostringstream sub_msg;
69+
sub_msg << "schema=" << ToString(schema)
70+
<< "|stype_in=" << ToString(stype_in) << "|symbols="
71+
<< JoinSymbolStrings(kMethodName, symbols_it,
72+
symbols_it + chunk_size);
73+
if (start.time_since_epoch().count()) {
74+
sub_msg << "|start=" << start.time_since_epoch().count();
75+
}
76+
sub_msg << '\n';
77+
client_.WriteAll(sub_msg.str());
78+
79+
symbols_it += chunk_size;
80+
}
6681
}
6782

6883
void LiveBlocking::Subscribe(const std::vector<std::string>& symbols,

src/symbology.cpp

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,28 @@
77
#include "stream_op_helper.hpp" // StreamOpBuilder
88

99
namespace databento {
10-
std::string JoinSymbolStrings(const std::string& method_name,
11-
const std::vector<std::string>& symbols) {
12-
if (symbols.empty()) {
10+
std::string JoinSymbolStrings(
11+
const std::string& method_name,
12+
std::vector<std::string>::const_iterator symbols_begin,
13+
std::vector<std::string>::const_iterator symbols_end) {
14+
if (symbols_begin == symbols_end) {
1315
throw InvalidArgumentError{method_name, "symbols", "Cannot be empty"};
1416
}
15-
return std::accumulate(symbols.begin(), symbols.end(), std::string{},
17+
return std::accumulate(symbols_begin, symbols_end, std::string{},
1618
[](std::string acc, const std::string& sym) {
1719
return acc.empty() ? sym
1820
: std::move(acc) + ',' + sym;
1921
});
2022
}
2123

24+
std::string JoinSymbolStrings(const std::string& method_name,
25+
const std::vector<std::string>& symbols) {
26+
if (symbols.empty()) {
27+
throw InvalidArgumentError{method_name, "symbols", "Cannot be empty"};
28+
}
29+
return JoinSymbolStrings(method_name, symbols.begin(), symbols.end());
30+
}
31+
2232
std::string ToString(const StrMappingInterval& mapping_interval) {
2333
return MakeString(mapping_interval);
2434
}

test/src/live_blocking_tests.cpp

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,33 @@ TEST_F(LiveBlockingTests, TestSubscribe) {
8686
target.Subscribe(kSymbols, kSchema, kSType);
8787
}
8888

89+
TEST_F(LiveBlockingTests, TestSubscriptionChunking) {
90+
constexpr auto kTsOut = false;
91+
constexpr auto kDataset = dataset::kXnasItch;
92+
constexpr auto kSymbol = "TEST";
93+
constexpr auto kSymbolCount = 1000;
94+
constexpr auto kSchema = Schema::Ohlcv1M;
95+
constexpr auto kSType = SType::RawSymbol;
96+
97+
const mock::MockLsgServer mock_server{
98+
kDataset, kTsOut, [](mock::MockLsgServer& self) {
99+
self.Accept();
100+
self.Authenticate();
101+
std::size_t i{};
102+
while (i < 1000) {
103+
const auto chunk_size = std::min(128UL, kSymbolCount - i);
104+
const std::vector<std::string> symbols_chunk(chunk_size, kSymbol);
105+
self.Subscribe(symbols_chunk, kSchema, kSType);
106+
i += chunk_size;
107+
}
108+
}};
109+
110+
LiveBlocking target{logger_.get(), kKey, kDataset, "127.0.0.1",
111+
mock_server.Port(), kTsOut};
112+
const std::vector<std::string> kSymbols(kSymbolCount, kSymbol);
113+
target.Subscribe(kSymbols, kSchema, kSType);
114+
}
115+
89116
TEST_F(LiveBlockingTests, TestNextRecord) {
90117
constexpr auto kTsOut = false;
91118
constexpr auto kRecCount = 12;

0 commit comments

Comments
 (0)