Skip to content

Commit 0b589ed

Browse files
committed
ADD: Add configurable heartbeat support to clients
1 parent 03b0c6d commit 0b589ed

File tree

11 files changed

+204
-109
lines changed

11 files changed

+204
-109
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,16 @@
33
## 0.19.0 - TBD
44

55
### Enhancements
6+
- Added configurable `heartbeat_interval` parameter for live clients that determines the
7+
timeout before heartbeat `SystemMsg` records will be sent. It can be configured via
8+
the `SetHeartbeatInterval` method of the `LiveBuilder`
9+
- Added `SetAddress` method to `LiveBuilder` for configuring a custom gateway address
10+
without using the constructor directly
611
- Added new `UncrossingPrice` `StatType` variant
712
- Added new publisher values for `XNAS.BASIC`
813

914
### Breaking changes
15+
- Added `heartbeat_interval` parameter to the `Live` constructors
1016
- Removed `start_date` and `end_date` fields from `DatasetRange` struct
1117
in favor of `start` and `end`
1218
- Removed live `Subscribe` method overloads with `use_snapshot`

include/databento/live.hpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22

3+
#include <chrono>
34
#include <string>
45

56
#include "databento/enums.hpp" // VersionUpgradePolicy
@@ -29,6 +30,10 @@ class LiveBuilder {
2930
LiveBuilder& SetUpgradePolicy(VersionUpgradePolicy upgrade_policy);
3031
// Sets the receiver of the logs to be used by the client.
3132
LiveBuilder& SetLogReceiver(ILogReceiver* log_receiver);
33+
// Overrides the heartbeat interval.
34+
LiveBuilder& SetHeartbeatInterval(std::chrono::seconds heartbeat_interval);
35+
// Overrides the gateway and port. This is an advanced method.
36+
LiveBuilder& SetAddress(std::string gateway, std::uint16_t port);
3237
// Attempts to construct an instance of a blocking live client or throws an
3338
// exception.
3439
LiveBlocking BuildBlocking();
@@ -40,9 +45,12 @@ class LiveBuilder {
4045
void Validate();
4146

4247
ILogReceiver* log_receiver_{};
48+
std::string gateway_{};
49+
std::uint16_t port_{};
4350
std::string key_;
4451
std::string dataset_;
4552
bool send_ts_out_{false};
4653
VersionUpgradePolicy upgrade_policy_{VersionUpgradePolicy::Upgrade};
54+
std::chrono::seconds heartbeat_interval_{};
4755
};
4856
} // namespace databento

include/databento/live_blocking.hpp

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <chrono> // milliseconds
55
#include <cstdint>
66
#include <string>
7+
#include <utility> // pair
78
#include <vector>
89

910
#include "databento/datetime.hpp" // UnixNanos
@@ -22,10 +23,12 @@ class ILogReceiver;
2223
class LiveBlocking {
2324
public:
2425
LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset,
25-
bool send_ts_out, VersionUpgradePolicy upgrade_policy);
26+
bool send_ts_out, VersionUpgradePolicy upgrade_policy,
27+
std::chrono::seconds heartbeat_interval);
2628
LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset,
2729
std::string gateway, std::uint16_t port, bool send_ts_out,
28-
VersionUpgradePolicy upgrade_policy);
30+
VersionUpgradePolicy upgrade_policy,
31+
std::chrono::seconds heartbeat_interval);
2932
/*
3033
* Getters
3134
*/
@@ -36,6 +39,11 @@ class LiveBlocking {
3639
std::uint16_t Port() const { return port_; }
3740
bool SendTsOut() const { return send_ts_out_; }
3841
VersionUpgradePolicy UpgradePolicy() const { return upgrade_policy_; }
42+
// The the first member of the pair will be true, when the heartbeat interval
43+
// was overridden.
44+
std::pair<bool, std::chrono::seconds> HeartbeatInterval() const {
45+
return {heartbeat_interval_.count() > 0, heartbeat_interval_};
46+
}
3947

4048
/*
4149
* Methods
@@ -95,6 +103,7 @@ class LiveBlocking {
95103
bool send_ts_out_;
96104
std::uint8_t version_{};
97105
VersionUpgradePolicy upgrade_policy_;
106+
std::chrono::seconds heartbeat_interval_;
98107
detail::TcpClient client_;
99108
// Must be 8-byte aligned for records
100109
alignas(RecordHeader) std::array<char, kMaxStrLen> read_buffer_{};

include/databento/live_threaded.hpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <functional> // function
55
#include <memory> // unique_ptr
66
#include <string>
7+
#include <utility> // pair
78
#include <vector>
89

910
#include "databento/datetime.hpp" // UnixNanos
@@ -31,10 +32,12 @@ class LiveThreaded {
3132
std::function<ExceptionAction(const std::exception&)>;
3233

3334
LiveThreaded(ILogReceiver* log_receiver, std::string key, std::string dataset,
34-
bool send_ts_out, VersionUpgradePolicy upgrade_policy);
35+
bool send_ts_out, VersionUpgradePolicy upgrade_policy,
36+
std::chrono::seconds heartbeat_interval);
3537
LiveThreaded(ILogReceiver* log_receiver, std::string key, std::string dataset,
3638
std::string gateway, std::uint16_t port, bool send_ts_out,
37-
VersionUpgradePolicy upgrade_policy);
39+
VersionUpgradePolicy upgrade_policy,
40+
std::chrono::seconds heartbeat_interval);
3841
LiveThreaded(const LiveThreaded&) = delete;
3942
LiveThreaded& operator=(const LiveThreaded&) = delete;
4043
LiveThreaded(LiveThreaded&& other) noexcept;
@@ -51,6 +54,9 @@ class LiveThreaded {
5154
std::uint16_t Port() const;
5255
bool SendTsOut() const;
5356
VersionUpgradePolicy UpgradePolicy() const;
57+
// The the first member of the pair will be true, when the heartbeat interval
58+
// was overridden.
59+
std::pair<bool, std::chrono::seconds> HeartbeatInterval() const;
5460

5561
/*
5662
* Methods

src/live.cpp

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "databento/live.hpp"
22

3+
#include <chrono>
34
#include <utility> // move
45

56
#include "databento/constants.hpp" // kApiKeyLength
@@ -51,16 +52,40 @@ LiveBuilder& LiveBuilder::SetLogReceiver(
5152
return *this;
5253
}
5354

55+
LiveBuilder& LiveBuilder::SetHeartbeatInterval(
56+
std::chrono::seconds heartbeat_interval) {
57+
heartbeat_interval_ = heartbeat_interval;
58+
return *this;
59+
}
60+
61+
LiveBuilder& LiveBuilder::SetAddress(std::string gateway, std::uint16_t port) {
62+
gateway_ = std::move(gateway);
63+
port_ = port;
64+
return *this;
65+
}
66+
5467
databento::LiveBlocking LiveBuilder::BuildBlocking() {
5568
Validate();
56-
return databento::LiveBlocking{log_receiver_, key_, dataset_, send_ts_out_,
57-
upgrade_policy_};
69+
if (gateway_.empty()) {
70+
return databento::LiveBlocking{log_receiver_, key_,
71+
dataset_, send_ts_out_,
72+
upgrade_policy_, heartbeat_interval_};
73+
}
74+
return databento::LiveBlocking{
75+
log_receiver_, key_, dataset_, gateway_,
76+
port_, send_ts_out_, upgrade_policy_, heartbeat_interval_};
5877
}
5978

6079
databento::LiveThreaded LiveBuilder::BuildThreaded() {
6180
Validate();
62-
return databento::LiveThreaded{log_receiver_, key_, dataset_, send_ts_out_,
63-
upgrade_policy_};
81+
if (gateway_.empty()) {
82+
return databento::LiveThreaded{log_receiver_, key_,
83+
dataset_, send_ts_out_,
84+
upgrade_policy_, heartbeat_interval_};
85+
}
86+
return databento::LiveThreaded{
87+
log_receiver_, key_, dataset_, gateway_,
88+
port_, send_ts_out_, upgrade_policy_, heartbeat_interval_};
6489
}
6590

6691
void LiveBuilder::Validate() {

src/live_blocking.cpp

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44

55
#include <algorithm> // copy
66
#include <cctype> // tolower
7-
#include <cstddef> // ptrdiff_t
7+
#include <chrono>
8+
#include <cstddef> // ptrdiff_t
89
#include <cstdlib>
910
#include <ios> //hex, setfill, setw
1011
#include <sstream>
@@ -26,7 +27,8 @@ constexpr std::size_t kBucketIdLength = 5;
2627

2728
LiveBlocking::LiveBlocking(ILogReceiver* log_receiver, std::string key,
2829
std::string dataset, bool send_ts_out,
29-
VersionUpgradePolicy upgrade_policy)
30+
VersionUpgradePolicy upgrade_policy,
31+
std::chrono::seconds heartbeat_interval)
3032

3133
: log_receiver_{log_receiver},
3234
key_{std::move(key)},
@@ -35,20 +37,23 @@ LiveBlocking::LiveBlocking(ILogReceiver* log_receiver, std::string key,
3537
port_{13000},
3638
send_ts_out_{send_ts_out},
3739
upgrade_policy_{upgrade_policy},
40+
heartbeat_interval_{heartbeat_interval},
3841
client_{gateway_, port_},
3942
session_id_{this->Authenticate()} {}
4043

4144
LiveBlocking::LiveBlocking(ILogReceiver* log_receiver, std::string key,
4245
std::string dataset, std::string gateway,
4346
std::uint16_t port, bool send_ts_out,
44-
VersionUpgradePolicy upgrade_policy)
47+
VersionUpgradePolicy upgrade_policy,
48+
std::chrono::seconds heartbeat_interval)
4549
: log_receiver_{log_receiver},
4650
key_{std::move(key)},
4751
dataset_{std::move(dataset)},
4852
gateway_{std::move(gateway)},
4953
port_{port},
5054
send_ts_out_{send_ts_out},
5155
upgrade_policy_{upgrade_policy},
56+
heartbeat_interval_{heartbeat_interval},
5257
client_{gateway_, port_},
5358
session_id_{this->Authenticate()} {}
5459

@@ -261,11 +266,14 @@ std::string LiveBlocking::GenerateCramReply(const std::string& challenge_key) {
261266
}
262267

263268
std::string LiveBlocking::EncodeAuthReq(const std::string& auth) {
264-
std::ostringstream reply_stream;
265-
reply_stream << "auth=" << auth << "|dataset=" << dataset_ << "|encoding=dbn|"
266-
<< "ts_out=" << send_ts_out_
267-
<< "|client=C++ " DATABENTO_VERSION "\n";
268-
return reply_stream.str();
269+
std::ostringstream req_stream;
270+
req_stream << "auth=" << auth << "|dataset=" << dataset_ << "|encoding=dbn|"
271+
<< "ts_out=" << send_ts_out_ << "|client=C++ " DATABENTO_VERSION;
272+
if (heartbeat_interval_.count() > 0) {
273+
req_stream << "|heartbeat_interval_s=" << heartbeat_interval_.count();
274+
}
275+
req_stream << '\n';
276+
return req_stream.str();
269277
}
270278

271279
std::uint64_t LiveBlocking::DecodeAuthResp() {

src/live_threaded.cpp

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,16 +57,19 @@ LiveThreaded::~LiveThreaded() {
5757

5858
LiveThreaded::LiveThreaded(ILogReceiver* log_receiver, std::string key,
5959
std::string dataset, bool send_ts_out,
60-
VersionUpgradePolicy upgrade_policy)
60+
VersionUpgradePolicy upgrade_policy,
61+
std::chrono::seconds heartbeat_interval)
6162
: impl_{new Impl{log_receiver, std::move(key), std::move(dataset),
62-
send_ts_out, upgrade_policy}} {}
63+
send_ts_out, upgrade_policy, heartbeat_interval}} {}
6364

6465
LiveThreaded::LiveThreaded(ILogReceiver* log_receiver, std::string key,
6566
std::string dataset, std::string gateway,
6667
std::uint16_t port, bool send_ts_out,
67-
VersionUpgradePolicy upgrade_policy)
68+
VersionUpgradePolicy upgrade_policy,
69+
std::chrono::seconds heartbeat_interval)
6870
: impl_{new Impl{log_receiver, std::move(key), std::move(dataset),
69-
std::move(gateway), port, send_ts_out, upgrade_policy}} {}
71+
std::move(gateway), port, send_ts_out, upgrade_policy,
72+
heartbeat_interval}} {}
7073

7174
const std::string& LiveThreaded::Key() const { return impl_->blocking.Key(); }
7275

@@ -86,6 +89,10 @@ databento::VersionUpgradePolicy LiveThreaded::UpgradePolicy() const {
8689
return impl_->blocking.UpgradePolicy();
8790
}
8891

92+
std::pair<bool, std::chrono::seconds> LiveThreaded::HeartbeatInterval() const {
93+
return impl_->blocking.HeartbeatInterval();
94+
}
95+
8996
void LiveThreaded::Subscribe(const std::vector<std::string>& symbols,
9097
Schema schema, SType stype_in) {
9198
impl_->blocking.Subscribe(symbols, schema, stype_in);

test/include/mock/mock_lsg_server.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ using ssize_t = SSIZE_T;
1010
#include <sys/socket.h> // send
1111
#endif
1212

13+
#include <chrono>
1314
#include <condition_variable>
1415
#include <functional> // function
1516
#include <mutex>
@@ -28,6 +29,9 @@ class MockLsgServer {
2829
public:
2930
MockLsgServer(std::string dataset, bool ts_out,
3031
std::function<void(MockLsgServer&)> serve_fn);
32+
MockLsgServer(std::string dataset, bool ts_out,
33+
std::chrono::seconds heartbeat_interval,
34+
std::function<void(MockLsgServer&)> serve_fn);
3135

3236
std::uint16_t Port() const { return port_; }
3337

@@ -83,6 +87,7 @@ class MockLsgServer {
8387

8488
std::string dataset_;
8589
bool ts_out_;
90+
std::chrono::seconds heartbeat_interval_;
8691
std::uint16_t port_{};
8792
detail::ScopedFd socket_{};
8893
detail::ScopedFd conn_fd_{};

0 commit comments

Comments
 (0)