Skip to content

Commit 6f94f5d

Browse files
committed
ADD: Add client support for is_last
1 parent 4827509 commit 6f94f5d

File tree

6 files changed

+32
-18
lines changed

6 files changed

+32
-18
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ upgrading the input to DBNv3.
1212
method
1313
- Metadata will now always be encoded with a length divisible by 8 bytes for better
1414
alignment
15+
- Added `is_last` field to live subscription requests which will be used to improve the
16+
handling of split subscription requests
1517

1618
### Breaking changes
1719
- Release of DBN version 3

src/live_blocking.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,9 @@ void LiveBlocking::Subscribe(std::string_view sub_msg,
128128
chunked_sub_msg << sub_msg << "|symbols="
129129
<< JoinSymbolStrings(kMethodName, symbols_it,
130130
symbols_it + chunk_size)
131-
<< "|snapshot=" << use_snapshot << '\n';
131+
<< "|snapshot=" << use_snapshot
132+
<< "|is_last=" << (distance_from_end <= kSymbolMaxChunkSize)
133+
<< '\n';
132134
client_.WriteAll(chunked_sub_msg.str());
133135

134136
symbols_it += chunk_size;

tests/include/mock/mock_lsg_server.hpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,11 @@ class MockLsgServer {
5151
void Accept();
5252
void Authenticate();
5353
void Subscribe(const std::vector<std::string>& symbols, Schema schema,
54-
SType stype);
54+
SType stype, bool is_last);
5555
void Subscribe(const std::vector<std::string>& symbols, Schema schema,
56-
SType stype, const std::string& start);
56+
SType stype, const std::string& start, bool is_last);
5757
void SubscribeWithSnapshot(const std::vector<std::string>& symbols,
58-
Schema schema, SType stype);
58+
Schema schema, SType stype, bool is_last);
5959
void Start();
6060
std::size_t Send(const std::string& msg);
6161
::ssize_t UncheckedSend(const std::string& msg);

tests/src/live_blocking_tests.cpp

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ TEST_F(LiveBlockingTests, TestSubscribe) {
9494
[&kSymbols, kSchema, kSType](mock::MockLsgServer& self) {
9595
self.Accept();
9696
self.Authenticate();
97-
self.Subscribe(kSymbols, kSchema, kSType);
97+
self.Subscribe(kSymbols, kSchema, kSType, true);
9898
}};
9999

100100
LiveBlocking target = builder_.SetDataset(kDataset)
@@ -122,7 +122,8 @@ TEST_F(LiveBlockingTests, TestSubscriptionChunkingUnixNanos) {
122122
const auto chunk_size =
123123
std::min(static_cast<std::size_t>(500), kSymbolCount - i);
124124
const std::vector<std::string> symbols_chunk(chunk_size, kSymbol);
125-
self.Subscribe(symbols_chunk, kSchema, kSType);
125+
self.Subscribe(symbols_chunk, kSchema, kSType,
126+
i + chunk_size == kSymbolCount);
126127
i += chunk_size;
127128
}
128129
}};
@@ -149,7 +150,7 @@ TEST_F(LiveBlockingTests, TestSubscriptionUnixNanos0) {
149150
self.Accept();
150151
self.Authenticate();
151152
std::size_t i{};
152-
self.Subscribe(kSymbols, kSchema, kSType, "0");
153+
self.Subscribe(kSymbols, kSchema, kSType, "0", true);
153154
}};
154155

155156
LiveBlocking target = builder_.SetDataset(kDataset)
@@ -179,7 +180,8 @@ TEST_F(LiveBlockingTests, TestSubscriptionChunkingStringStart) {
179180
const auto chunk_size =
180181
std::min(static_cast<std::size_t>(500), kSymbolCount - i);
181182
const std::vector<std::string> symbols_chunk(chunk_size, kSymbol);
182-
self.Subscribe(symbols_chunk, kSchema, kSType, kStart);
183+
self.Subscribe(symbols_chunk, kSchema, kSType, kStart,
184+
i + chunk_size == kSymbolCount);
183185
i += chunk_size;
184186
}
185187
}};
@@ -212,7 +214,8 @@ TEST_F(LiveBlockingTests, TestSubscribeSnapshot) {
212214
const auto chunk_size =
213215
std::min(static_cast<std::size_t>(500), kSymbolCount - i);
214216
const std::vector<std::string> symbols_chunk(chunk_size, kSymbol);
215-
self.SubscribeWithSnapshot(symbols_chunk, kSchema, kSType);
217+
self.SubscribeWithSnapshot(symbols_chunk, kSchema, kSType,
218+
i + chunk_size == kSymbolCount);
216219
i += chunk_size;
217220
}
218221
}};
@@ -501,7 +504,8 @@ TEST_F(LiveBlockingTests, TestReconnectAndResubscribe) {
501504
&should_close_cv, &should_close_mutex](mock::MockLsgServer& self) {
502505
self.Accept();
503506
self.Authenticate();
504-
self.Subscribe(kAllSymbols, Schema::Trades, SType::RawSymbol, "0");
507+
self.Subscribe(kAllSymbols, Schema::Trades, SType::RawSymbol, "0",
508+
true);
505509
self.Start();
506510
self.SendRecord(kRec);
507511
{
@@ -518,7 +522,7 @@ TEST_F(LiveBlockingTests, TestReconnectAndResubscribe) {
518522
// Wait for reconnect
519523
self.Accept();
520524
self.Authenticate();
521-
self.Subscribe(kAllSymbols, Schema::Trades, SType::RawSymbol);
525+
self.Subscribe(kAllSymbols, Schema::Trades, SType::RawSymbol, true);
522526
self.Start();
523527
self.SendRecord(kRec);
524528
});

tests/src/live_threaded_tests.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ TEST_F(LiveThreadedTests, TestExceptionCallbackReconnectAndResubscribe) {
199199
kSType, kUseSnapshot](mock::MockLsgServer& self) {
200200
self.Accept();
201201
self.Authenticate();
202-
self.Subscribe(kAllSymbols, kSchema, kSType, "0");
202+
self.Subscribe(kAllSymbols, kSchema, kSType, "0", true);
203203
self.Start();
204204
self.SendRecord(kRec);
205205
{
@@ -210,7 +210,7 @@ TEST_F(LiveThreadedTests, TestExceptionCallbackReconnectAndResubscribe) {
210210
self.Close();
211211
self.Accept();
212212
self.Authenticate();
213-
self.Subscribe(kAllSymbols, kSchema, kSType);
213+
self.Subscribe(kAllSymbols, kSchema, kSType, true);
214214
self.Start();
215215
self.SendRecord(kRec);
216216
}};
@@ -290,7 +290,7 @@ TEST_F(LiveThreadedTests, TestDeadlockPrevention) {
290290
self.Close();
291291
self.Accept();
292292
self.Authenticate();
293-
self.Subscribe(kSymbols, kSchema, kSType);
293+
self.Subscribe(kSymbols, kSchema, kSType, true);
294294
}};
295295
LiveThreaded target = builder_.SetLogReceiver(ILogReceiver::Default())
296296
.SetDataset(dataset::kXnasItch)

tests/src/mock_lsg_server.cpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <openssl/sha.h> // SHA256_DIGEST_LENGTH
1010

1111
#include <chrono>
12+
#include <cstddef>
1213

1314
#include "databento/compat.hpp"
1415
#include "databento/constants.hpp"
@@ -119,12 +120,13 @@ void MockLsgServer::Authenticate() {
119120
}
120121

121122
void MockLsgServer::Subscribe(const std::vector<std::string>& symbols,
122-
Schema schema, SType stype) {
123-
Subscribe(symbols, schema, stype, "");
123+
Schema schema, SType stype, bool is_last) {
124+
Subscribe(symbols, schema, stype, "", is_last);
124125
}
125126

126127
void MockLsgServer::SubscribeWithSnapshot(
127-
const std::vector<std::string>& symbols, Schema schema, SType stype) {
128+
const std::vector<std::string>& symbols, Schema schema, SType stype,
129+
bool is_last) {
128130
const auto received = Receive();
129131
EXPECT_NE(
130132
received.find("symbols=" +
@@ -137,11 +139,13 @@ void MockLsgServer::SubscribeWithSnapshot(
137139
EXPECT_EQ(received.find("start="), std::string::npos);
138140
EXPECT_NE(received.find("id="), std::string::npos);
139141
EXPECT_NE(received.find("snapshot=1"), std::string::npos);
142+
EXPECT_NE(received.find(std::string{"is_last="} + std::to_string(is_last)),
143+
std::string::npos);
140144
}
141145

142146
void MockLsgServer::Subscribe(const std::vector<std::string>& symbols,
143147
Schema schema, SType stype,
144-
const std::string& start) {
148+
const std::string& start, bool is_last) {
145149
const auto received = Receive();
146150
EXPECT_NE(
147151
received.find("symbols=" +
@@ -158,6 +162,8 @@ void MockLsgServer::Subscribe(const std::vector<std::string>& symbols,
158162
}
159163
EXPECT_NE(received.find("id="), std::string::npos);
160164
EXPECT_NE(received.find("snapshot=0"), std::string::npos);
165+
EXPECT_NE(received.find(std::string{"is_last="} + std::to_string(is_last)),
166+
std::string::npos);
161167
}
162168

163169
void MockLsgServer::Start() {

0 commit comments

Comments
 (0)