Skip to content

Commit 4f51ac1

Browse files
committed
ADD: Add subscription ID to clients
1 parent 578467c commit 4f51ac1

File tree

5 files changed

+42
-12
lines changed

5 files changed

+42
-12
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Changelog
22

3+
## 0.33.0 - TBD
4+
5+
##### Enhancements
6+
- Added `id` field to `LiveSubscription` requests, which will be used for improved error
7+
messages
8+
39
## 0.32.1 - 2025-04-07
410

511
### Bug fixes

include/databento/live_blocking.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ class LiveBlocking {
9696
std::string GenerateCramReply(const std::string& challenge_key);
9797
std::string EncodeAuthReq(const std::string& auth);
9898
std::uint64_t DecodeAuthResp();
99+
void IncrementSubCounter();
99100
void Subscribe(const std::string& sub_msg,
100101
const std::vector<std::string>& symbols, bool use_snapshot);
101102
detail::TcpClient::Result FillBuffer(std::chrono::milliseconds timeout);
@@ -113,6 +114,7 @@ class LiveBlocking {
113114
VersionUpgradePolicy upgrade_policy_;
114115
std::chrono::seconds heartbeat_interval_;
115116
detail::TcpClient client_;
117+
std::uint32_t sub_counter_{};
116118
std::vector<LiveSubscription> subscriptions_;
117119
// Must be 8-byte aligned for records
118120
alignas(RecordHeader) std::array<char, kMaxStrLen> read_buffer_{};

include/databento/live_subscription.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22

3+
#include <cstdint>
34
#include <string>
45
#include <variant>
56
#include <vector>
@@ -17,5 +18,6 @@ struct LiveSubscription {
1718
Schema schema;
1819
SType stype_in;
1920
Start start;
21+
std::uint32_t id{};
2022
};
2123
} // namespace databento

src/live_blocking.cpp

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <cstddef> // ptrdiff_t
99
#include <cstdlib>
1010
#include <ios> // hex, setfill, setw
11+
#include <limits>
1112
#include <sstream>
1213
#include <variant>
1314

@@ -66,42 +67,46 @@ void LiveBlocking::Subscribe(const std::vector<std::string>& symbols,
6667

6768
void LiveBlocking::Subscribe(const std::vector<std::string>& symbols,
6869
Schema schema, SType stype_in, UnixNanos start) {
70+
IncrementSubCounter();
6971
std::ostringstream sub_msg;
7072
sub_msg << "schema=" << ToString(schema) << "|stype_in=" << ToString(stype_in)
71-
<< "|start=" << start.time_since_epoch().count();
73+
<< "|start=" << start.time_since_epoch().count()
74+
<< "|id=" << std::to_string(sub_counter_);
7275
Subscribe(sub_msg.str(), symbols, false);
7376
subscriptions_.emplace_back(
74-
LiveSubscription{symbols, schema, stype_in, start});
77+
LiveSubscription{symbols, schema, stype_in, start, sub_counter_});
7578
}
7679

7780
void LiveBlocking::Subscribe(const std::vector<std::string>& symbols,
7881
Schema schema, SType stype_in,
7982
const std::string& start) {
83+
IncrementSubCounter();
8084
std::ostringstream sub_msg;
81-
sub_msg << "schema=" << ToString(schema)
82-
<< "|stype_in=" << ToString(stype_in);
85+
sub_msg << "schema=" << ToString(schema) << "|stype_in=" << ToString(stype_in)
86+
<< "|id=" << std::to_string(sub_counter_);
8387
if (!start.empty()) {
8488
sub_msg << "|start=" << start;
8589
}
8690
Subscribe(sub_msg.str(), symbols, false);
8791
if (start.empty()) {
88-
subscriptions_.emplace_back(LiveSubscription{symbols, schema, stype_in,
89-
LiveSubscription::NoStart{}});
92+
subscriptions_.emplace_back(LiveSubscription{
93+
symbols, schema, stype_in, LiveSubscription::NoStart{}, sub_counter_});
9094
} else {
9195
subscriptions_.emplace_back(
92-
LiveSubscription{symbols, schema, stype_in, start});
96+
LiveSubscription{symbols, schema, stype_in, start, sub_counter_});
9397
}
9498
}
9599

96100
void LiveBlocking::SubscribeWithSnapshot(
97101
const std::vector<std::string>& symbols, Schema schema, SType stype_in) {
102+
IncrementSubCounter();
98103
std::ostringstream sub_msg;
99-
sub_msg << "schema=" << ToString(schema)
100-
<< "|stype_in=" << ToString(stype_in);
104+
sub_msg << "schema=" << ToString(schema) << "|stype_in=" << ToString(stype_in)
105+
<< "|id=" << std::to_string(sub_counter_);
101106

102107
Subscribe(sub_msg.str(), symbols, true);
103-
subscriptions_.emplace_back(LiveSubscription{symbols, schema, stype_in,
104-
LiveSubscription::Snapshot{}});
108+
subscriptions_.emplace_back(LiveSubscription{
109+
symbols, schema, stype_in, LiveSubscription::Snapshot{}, sub_counter_});
105110
}
106111

107112
void LiveBlocking::Subscribe(const std::string& sub_msg,
@@ -182,6 +187,7 @@ void LiveBlocking::Stop() { client_.Close(); }
182187
void LiveBlocking::Reconnect() {
183188
log_receiver_->Receive(LogLevel::Info, "Reconnecting");
184189
client_ = detail::TcpClient{gateway_, port_};
190+
sub_counter_ = 0;
185191
session_id_ = this->Authenticate();
186192
}
187193

@@ -191,9 +197,11 @@ void LiveBlocking::Resubscribe() {
191197
std::holds_alternative<std::string>(subscription.start)) {
192198
subscription.start = LiveSubscription::NoStart{};
193199
}
200+
sub_counter_ = std::max(sub_counter_, subscription.id);
194201
std::ostringstream sub_msg;
195202
sub_msg << "schema=" << ToString(subscription.schema)
196-
<< "|stype_in=" << ToString(subscription.stype_in);
203+
<< "|stype_in=" << ToString(subscription.stype_in)
204+
<< "|id=" << std::to_string(sub_counter_);
197205
Subscribe(
198206
sub_msg.str(), subscription.symbols,
199207
std::holds_alternative<LiveSubscription::Snapshot>(subscription.start));
@@ -377,6 +385,16 @@ std::uint64_t LiveBlocking::DecodeAuthResp() {
377385
return session_id;
378386
}
379387

388+
void LiveBlocking::IncrementSubCounter() {
389+
if (sub_counter_ == std::numeric_limits<uint32_t>::max()) {
390+
log_receiver_->Receive(
391+
LogLevel::Warning,
392+
"[LiveBlocking::Subscribe] Exhausted all subscription IDs");
393+
} else {
394+
++sub_counter_;
395+
}
396+
}
397+
380398
databento::detail::TcpClient::Result LiveBlocking::FillBuffer(
381399
std::chrono::milliseconds timeout) {
382400
// Shift data forward

tests/src/mock_lsg_server.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ void MockLsgServer::SubscribeWithSnapshot(
136136
EXPECT_NE(received.find(std::string{"stype_in="} + ToString(stype)),
137137
std::string::npos);
138138
EXPECT_EQ(received.find("start="), std::string::npos);
139+
EXPECT_NE(received.find("id="), std::string::npos);
139140
EXPECT_NE(received.find("snapshot=1"), std::string::npos);
140141
}
141142

@@ -156,6 +157,7 @@ void MockLsgServer::Subscribe(const std::vector<std::string>& symbols,
156157
} else {
157158
EXPECT_NE(received.find(std::string{"start="} + start), std::string::npos);
158159
}
160+
EXPECT_NE(received.find("id="), std::string::npos);
159161
EXPECT_NE(received.find("snapshot=0"), std::string::npos);
160162
}
161163

0 commit comments

Comments
 (0)