77#include < iostream>
88#include < memory>
99#include < thread> // this_thread
10+ #include < variant>
1011
1112#include " databento/constants.hpp"
1213#include " databento/datetime.hpp"
1314#include " databento/dbn.hpp"
1415#include " databento/enums.hpp"
1516#include " databento/exceptions.hpp"
1617#include " databento/live.hpp"
18+ #include " databento/live_subscription.hpp"
1719#include " databento/live_threaded.hpp"
1820#include " databento/log.hpp"
1921#include " databento/record.hpp"
@@ -173,7 +175,7 @@ TEST_F(LiveThreadedTests, TestStop) {
173175 mock_server.reset ();
174176}
175177
176- TEST_F (LiveThreadedTests, TestExceptionCallbackAndReconnect ) {
178+ TEST_F (LiveThreadedTests, TestExceptionCallbackReconnectAndResubscribe ) {
177179 constexpr auto kSchema = Schema::Trades;
178180 constexpr auto kSType = SType::RawSymbol;
179181 constexpr TradeMsg kRec {DummyHeader<TradeMsg>(RType::Mbp0),
@@ -197,8 +199,9 @@ TEST_F(LiveThreadedTests, TestExceptionCallbackAndReconnect) {
197199 kSType , kUseSnapshot ](mock::MockLsgServer& self) {
198200 self.Accept ();
199201 self.Authenticate ();
200- self.SubscribeWithSnapshot (kAllSymbols , kSchema , kSType );
202+ self.Subscribe (kAllSymbols , kSchema , kSType , " 0 " );
201203 self.Start ();
204+ self.SendRecord (kRec );
202205 {
203206 std::unique_lock<std::mutex> shutdown_lock{should_close_mutex};
204207 should_close_cv.wait (shutdown_lock,
@@ -216,19 +219,22 @@ TEST_F(LiveThreadedTests, TestExceptionCallbackAndReconnect) {
216219 .SetAddress (kLocalhost , mock_server.Port ())
217220 .BuildThreaded ();
218221 std::atomic<std::int32_t > metadata_calls{};
219- const auto metadata_cb = [&metadata_calls, &should_close, &should_close_cv,
220- &should_close_mutex](Metadata&& metadata) {
222+ const auto metadata_cb = [&metadata_calls](Metadata&& metadata) {
221223 ++metadata_calls;
222224 EXPECT_TRUE (metadata.has_mixed_schema );
223- // close server
224- const std::lock_guard<std::mutex> _lock{should_close_mutex};
225- should_close = true ;
226- should_close_cv.notify_one ();
227225 };
228226 std::atomic<std::int32_t > record_calls{};
229- const auto record_cb = [&record_calls, kRec ](const Record& record) {
227+ const auto record_cb = [&record_calls, kRec , &should_close_mutex,
228+ &should_close,
229+ &should_close_cv](const Record& record) {
230230 ++record_calls;
231231 EXPECT_EQ (record.Get <TradeMsg>(), kRec );
232+ if (record_calls == 1 ) { // close server
233+ const std::lock_guard<std::mutex> _lock{should_close_mutex};
234+ should_close = true ;
235+ should_close_cv.notify_one ();
236+ return KeepGoing::Continue;
237+ }
232238 return KeepGoing::Stop;
233239 };
234240 std::atomic<std::int32_t > exception_calls{};
@@ -239,20 +245,25 @@ TEST_F(LiveThreadedTests, TestExceptionCallbackAndReconnect) {
239245 EXPECT_NE (dynamic_cast <const databento::DbnResponseError*>(&exc), nullptr )
240246 << " Unexpected exception type" ;
241247 target.Reconnect ();
242- target.Subscribe (kAllSymbols , kSchema , kSType );
248+ target.Resubscribe ();
249+ EXPECT_EQ (target.Subscriptions ().size (), 1 );
250+ EXPECT_TRUE (std::holds_alternative<LiveSubscription::NoStart>(
251+ target.Subscriptions ()[0 ].start ));
243252 return LiveThreaded::ExceptionAction::Restart;
244253 } else {
245254 GTEST_NONFATAL_FAILURE_ (" Exception callback called more than expected" );
246255 return LiveThreaded::ExceptionAction::Stop;
247256 }
248257 };
249258
250- target.SubscribeWithSnapshot (kAllSymbols , kSchema , kSType );
259+ ASSERT_TRUE (target.Subscriptions ().empty ());
260+ target.Subscribe (kAllSymbols , kSchema , kSType , " 0" );
261+ ASSERT_EQ (target.Subscriptions ().size (), 1 );
251262 target.Start (metadata_cb, record_cb, exception_cb);
252263 target.BlockForStop ();
253264 EXPECT_EQ (metadata_calls, 2 );
254265 EXPECT_EQ (exception_calls, 1 );
255- EXPECT_EQ (record_calls, 1 );
266+ EXPECT_EQ (record_calls, 2 );
256267}
257268
258269TEST_F (LiveThreadedTests, TestDeadlockPrevention) {
0 commit comments