@@ -59,8 +59,42 @@ void LiveBlocking::Subscribe(const std::vector<std::string>& symbols,
5959
6060void LiveBlocking::Subscribe (const std::vector<std::string>& symbols,
6161 Schema schema, SType stype_in, UnixNanos start) {
62+ std::ostringstream sub_msg;
63+ sub_msg << " schema=" << ToString (schema)
64+ << " |stype_in=" << ToString (stype_in);
65+ if (start.time_since_epoch ().count ()) {
66+ sub_msg << " |start=" << start.time_since_epoch ().count ();
67+ }
68+ Subscribe (sub_msg.str (), symbols, false );
69+ }
70+
71+ void LiveBlocking::Subscribe (const std::vector<std::string>& symbols,
72+ Schema schema, SType stype_in,
73+ const std::string& start) {
74+ std::ostringstream sub_msg;
75+ sub_msg << " schema=" << ToString (schema)
76+ << " |stype_in=" << ToString (stype_in);
77+ if (!start.empty ()) {
78+ sub_msg << " |start=" << start;
79+ }
80+ Subscribe (sub_msg.str (), symbols, false );
81+ }
82+
83+ void LiveBlocking::Subscribe (const std::vector<std::string>& symbols,
84+ Schema schema, SType stype_in, bool use_snapshot) {
85+ std::ostringstream sub_msg;
86+ sub_msg << " schema=" << ToString (schema)
87+ << " |stype_in=" << ToString (stype_in);
88+
89+ Subscribe (sub_msg.str (), symbols, use_snapshot);
90+ }
91+
92+ void LiveBlocking::Subscribe (const std::string& sub_msg,
93+ const std::vector<std::string>& symbols,
94+ bool use_snapshot) {
6295 static constexpr auto kMethodName = " Live::Subscribe" ;
6396 constexpr std::ptrdiff_t kSymbolMaxChunkSize = 128 ;
97+
6498 if (symbols.empty ()) {
6599 throw InvalidArgumentError{kMethodName , " symbols" ,
66100 " must contain at least one symbol" };
@@ -70,36 +104,17 @@ void LiveBlocking::Subscribe(const std::vector<std::string>& symbols,
70104 const auto chunk_size =
71105 std::min (kSymbolMaxChunkSize , std::distance (symbols_it, symbols.end ()));
72106
73- std::ostringstream sub_msg;
74- sub_msg << " schema=" << ToString (schema)
75- << " |stype_in=" << ToString (stype_in) << " |symbols="
76- << JoinSymbolStrings (kMethodName , symbols_it,
77- symbols_it + chunk_size);
78- if (start.time_since_epoch ().count ()) {
79- sub_msg << " |start=" << start.time_since_epoch ().count ();
80- }
81- sub_msg << ' \n ' ;
82- client_.WriteAll (sub_msg.str ());
107+ std::ostringstream chunked_sub_msg;
108+ chunked_sub_msg << sub_msg << " |symbols="
109+ << JoinSymbolStrings (kMethodName , symbols_it,
110+ symbols_it + chunk_size)
111+ << " |snapshot=" << use_snapshot << ' \n ' ;
112+ client_.WriteAll (chunked_sub_msg.str ());
83113
84114 symbols_it += chunk_size;
85115 }
86116}
87117
88- void LiveBlocking::Subscribe (const std::vector<std::string>& symbols,
89- Schema schema, SType stype_in,
90- const std::string& start) {
91- std::ostringstream sub_msg;
92- sub_msg << " schema=" << ToString (schema) << " |stype_in=" << ToString (stype_in)
93- << " |symbols="
94- << JoinSymbolStrings (" LiveBlocking::Subscribe" , symbols);
95- if (!start.empty ()) {
96- sub_msg << " |start=" << start;
97- }
98- sub_msg << ' \n ' ;
99-
100- client_.WriteAll (sub_msg.str ());
101- }
102-
103118databento::Metadata LiveBlocking::Start () {
104119 constexpr auto kMetadataPreludeSize = 8 ;
105120 client_.WriteAll (" start_session\n " );
0 commit comments