Skip to content

Commit 6cd9ad9

Browse files
committed
MOD: Use POST for TimeseriesGetRange methods
1 parent 158d28c commit 6cd9ad9

File tree

8 files changed

+221
-154
lines changed

8 files changed

+221
-154
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
### Enhancements
77
- Added `range_by_schema` field to `DatasetRange` struct
8+
- Changed historical `TimeseriesGetRange` and `TimeseriesGetRangeToFile` methods to use
9+
a `POST` request to allow for requesting supported maximum of 2000 symbols
810
- Added logging around `Historical::BatchDownload`
911
- Changed the following Venue, Publisher, and Dataset descriptions:
1012
- "ICE Futures Europe (Financials)" renamed to "ICE Europe Financials"

include/databento/detail/http_client.hpp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,23 @@ class HttpClient {
2424
nlohmann::json GetJson(const std::string& path,
2525
const httplib::Params& params);
2626
nlohmann::json PostJson(const std::string& path,
27-
const httplib::Params& params);
27+
const httplib::Params& form_params);
2828
void GetRawStream(const std::string& path, const httplib::Params& params,
2929
const httplib::ContentReceiver& callback);
30+
void PostRawStream(const std::string& path,
31+
const httplib::Params& form_params,
32+
const httplib::ContentReceiver& callback);
3033

3134
private:
35+
static bool IsErrorStatus(int status_code);
36+
static httplib::ResponseHandler MakeStreamResponseHandler(int& out_status);
37+
static void CheckStatusAndStreamRes(const std::string& path, int status_code,
38+
std::string&& err_body,
39+
const httplib::Result& res);
40+
3241
nlohmann::json CheckAndParseResponse(const std::string& path,
3342
httplib::Result&& res) const;
3443
void CheckWarnings(const httplib::Response& response) const;
35-
static bool IsErrorStatus(int status_code);
3644

3745
static const httplib::Headers kHeaders;
3846

include/databento/historical.hpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,8 +230,6 @@ class Historical {
230230
using HttplibParams = std::multimap<std::string, std::string>;
231231

232232
BatchJob BatchSubmitJob(const HttplibParams& params);
233-
void StreamToFile(const std::string& url_path, const HttplibParams& params,
234-
const std::filesystem::path& file_path);
235233
void DownloadFile(const std::string& url,
236234
const std::filesystem::path& output_path);
237235
std::vector<BatchJob> BatchListJobs(const HttplibParams& params);

src/detail/http_client.cpp

Lines changed: 58 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -53,25 +53,64 @@ void HttpClient::GetRawStream(const std::string& path,
5353
const std::string full_path = httplib::append_query_params(path, params);
5454
std::string err_body{};
5555
int err_status{};
56-
const httplib::Result res = client_.Get(
57-
full_path,
58-
[&err_status](const httplib::Response& resp) {
59-
if (HttpClient::IsErrorStatus(resp.status)) {
60-
err_status = resp.status;
61-
}
62-
return true;
63-
},
64-
[&callback, &err_body, &err_status](const char* data,
65-
std::size_t length) {
66-
// if an error response was received, read all content into err_status
67-
if (err_status > 0) {
68-
err_body.append(data, length);
69-
return true;
70-
}
71-
return callback(data, length);
72-
});
73-
if (err_status > 0) {
74-
throw HttpResponseError{path, err_status, std::move(err_body)};
56+
const httplib::Result res =
57+
client_.Get(full_path, MakeStreamResponseHandler(err_status),
58+
[&callback, &err_body, &err_status](const char* data,
59+
std::size_t length) {
60+
// if an error response was received, read all content into
61+
// err_body
62+
if (err_status > 0) {
63+
err_body.append(data, length);
64+
return true;
65+
}
66+
return callback(data, length);
67+
});
68+
CheckStatusAndStreamRes(path, err_status, std::move(err_body), res);
69+
}
70+
71+
void HttpClient::PostRawStream(const std::string& path,
72+
const httplib::Params& form_params,
73+
const httplib::ContentReceiver& callback) {
74+
std::string err_body{};
75+
int err_status{};
76+
httplib::Request req;
77+
req.method = "POST";
78+
req.set_header("Content-Type", "application/x-www-form-urlencoded");
79+
req.path = path;
80+
req.body = httplib::detail::params_to_query_str(form_params);
81+
req.response_handler = MakeStreamResponseHandler(err_status);
82+
req.content_receiver = [&callback, &err_body, &err_status](
83+
const char* data, std::size_t length,
84+
std::uint64_t, std::uint64_t) {
85+
// if an error response was received, read all content into
86+
// err_body
87+
if (err_status > 0) {
88+
err_body.append(data, length);
89+
return true;
90+
}
91+
return callback(data, length);
92+
};
93+
// NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection): dependency code
94+
const httplib::Result res = client_.send(req);
95+
CheckStatusAndStreamRes(path, err_status, std::move(err_body), res);
96+
}
97+
98+
httplib::ResponseHandler HttpClient::MakeStreamResponseHandler(
99+
int& out_status) {
100+
return [&out_status](const httplib::Response& resp) {
101+
if (HttpClient::IsErrorStatus(resp.status)) {
102+
out_status = resp.status;
103+
}
104+
return true;
105+
};
106+
}
107+
108+
void HttpClient::CheckStatusAndStreamRes(const std::string& path,
109+
int status_code,
110+
std::string&& err_body,
111+
const httplib::Result& res) {
112+
if (status_code > 0) {
113+
throw HttpResponseError{path, status_code, std::move(err_body)};
75114
}
76115
if (res.error() != httplib::Error::Success &&
77116
// canceled happens if `callback` returns false, which is based on the

src/historical.cpp

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -316,17 +316,6 @@ std::filesystem::path Historical::BatchDownload(
316316
return output_path;
317317
}
318318

319-
void Historical::StreamToFile(const std::string& url_path,
320-
const HttplibParams& params,
321-
const std::filesystem::path& file_path) {
322-
OutFileStream out_file{file_path};
323-
this->client_.GetRawStream(
324-
url_path, params, [&out_file](const char* data, std::size_t length) {
325-
out_file.WriteAll(reinterpret_cast<const std::byte*>(data), length);
326-
return true;
327-
});
328-
}
329-
330319
void Historical::DownloadFile(const std::string& url,
331320
const std::filesystem::path& output_path) {
332321
static const std::string kMethod = "Historical::DownloadFile";
@@ -352,7 +341,12 @@ void Historical::DownloadFile(const std::string& url,
352341
<< output_path;
353342
log_receiver_->Receive(LogLevel::Info, ss.str());
354343

355-
StreamToFile(path, {}, output_path);
344+
OutFileStream out_file{output_path};
345+
this->client_.GetRawStream(
346+
path, {}, [&out_file](const char* data, std::size_t length) {
347+
out_file.WriteAll(reinterpret_cast<const std::byte*>(data), length);
348+
return true;
349+
});
356350

357351
if (log_receiver_->ShouldLog(LogLevel::Debug)) {
358352
ss.str("");
@@ -880,7 +874,7 @@ void Historical::TimeseriesGetRange(const HttplibParams& params,
880874
detail::DbnBufferDecoder decoder{metadata_callback, record_callback};
881875

882876
bool early_exit = false;
883-
this->client_.GetRawStream(
877+
this->client_.PostRawStream(
884878
kTimeseriesGetRangePath, params,
885879
[&decoder, &early_exit](const char* data, std::size_t length) mutable {
886880
if (decoder.Process(data, length) == KeepGoing::Continue) {
@@ -959,7 +953,15 @@ databento::DbnFileStore Historical::TimeseriesGetRangeToFile(
959953
}
960954
databento::DbnFileStore Historical::TimeseriesGetRangeToFile(
961955
const HttplibParams& params, const std::filesystem::path& file_path) {
962-
StreamToFile(kTimeseriesGetRangePath, params, file_path);
956+
{
957+
OutFileStream out_file{file_path};
958+
this->client_.PostRawStream(
959+
kTimeseriesGetRangePath, params,
960+
[&out_file](const char* data, std::size_t length) {
961+
out_file.WriteAll(reinterpret_cast<const std::byte*>(data), length);
962+
return true;
963+
});
964+
} // Flush out_file
963965
return DbnFileStore{log_receiver_, file_path,
964966
VersionUpgradePolicy::UpgradeToV3};
965967
}

tests/include/mock/mock_http_server.hpp

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55

66
#include <cstddef>
77
#include <map>
8+
#include <memory>
89
#include <string>
910

11+
#include "databento/detail/buffer.hpp"
1012
#include "databento/detail/scoped_thread.hpp"
1113
#include "databento/record.hpp"
1214

@@ -23,7 +25,7 @@ class MockHttpServer {
2325
~MockHttpServer() { server_.stop(); }
2426

2527
int ListenOnThread();
26-
void MockBadRequest(const std::string& path, const nlohmann::json& json);
28+
void MockBadPostRequest(const std::string& path, const nlohmann::json& json);
2729
void MockGetJson(const std::string& path, const nlohmann::json& json);
2830
void MockGetJson(const std::string& path,
2931
const std::map<std::string, std::string>& params,
@@ -34,22 +36,31 @@ class MockHttpServer {
3436
void MockPostJson(const std::string& path,
3537
const std::map<std::string, std::string>& params,
3638
const nlohmann::json& json);
37-
void MockStreamDbn(const std::string& path,
38-
const std::map<std::string, std::string>& params,
39-
const std::string& dbn_path);
40-
void MockStreamDbn(const std::string& path,
41-
const std::map<std::string, std::string>& params,
42-
Record record, std::size_t count, std::size_t chunk_size);
43-
void MockStreamDbn(const std::string& path,
44-
const std::map<std::string, std::string>& params,
45-
Record record, std::size_t count, std::size_t extra_bytes,
46-
std::size_t chunk_size);
39+
void MockGetDbn(const std::string& path,
40+
const std::map<std::string, std::string>& params,
41+
const std::string& dbn_path);
42+
void MockPostDbn(const std::string& path,
43+
const std::map<std::string, std::string>& params,
44+
const std::string& dbn_path);
45+
void MockPostDbn(const std::string& path,
46+
const std::map<std::string, std::string>& params,
47+
Record record, std::size_t count, std::size_t chunk_size);
48+
void MockPostDbn(const std::string& path,
49+
const std::map<std::string, std::string>& params,
50+
Record record, std::size_t count, std::size_t extra_bytes,
51+
std::size_t chunk_size);
4752

4853
private:
54+
using SharedConstBuffer = std::shared_ptr<const detail::Buffer>;
55+
4956
static void CheckParams(const std::map<std::string, std::string>& params,
5057
const httplib::Request& req);
5158
static void CheckFormParams(const std::map<std::string, std::string>& params,
5259
const httplib::Request& req);
60+
static SharedConstBuffer EncodeToBuffer(const std::string& dbn_path);
61+
static httplib::Server::Handler MakeDbnStreamHandler(
62+
const std::map<std::string, std::string>& params,
63+
SharedConstBuffer&& buffer, std::size_t chunk_size);
5364

5465
httplib::Server server_{};
5566
const int port_{};

tests/src/historical_tests.cpp

Lines changed: 44 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -241,8 +241,8 @@ TEST_F(HistoricalTests, TestBatchDownloadAll) {
241241
const TempFile temp_dbn_file{tmp_path_ / "job123/test.dbn"};
242242
mock_server_.MockGetJson("/v0/batch.list_files", {{"job_id", kJobId}},
243243
kListFilesResp);
244-
mock_server_.MockStreamDbn("/v0/job_id/test.dbn", {},
245-
TEST_DATA_DIR "/test_data.mbo.v3.dbn");
244+
mock_server_.MockGetDbn("/v0/job_id/test.dbn", {},
245+
TEST_DATA_DIR "/test_data.mbo.v3.dbn");
246246
mock_server_.MockGetJson("/v0/job_id/test_metadata.json", {{"key", "value"}});
247247
const auto port = mock_server_.ListenOnThread();
248248

@@ -640,17 +640,17 @@ TEST_F(HistoricalTests, TestSymbologyResolve) {
640640
}
641641

642642
TEST_F(HistoricalTests, TestTimeseriesGetRange_Basic) {
643-
mock_server_.MockStreamDbn("/v0/timeseries.get_range",
644-
{{"dataset", dataset::kGlbxMdp3},
645-
{"symbols", "ESH1"},
646-
{"schema", "mbo"},
647-
{"start", "1609160400000711344"},
648-
{"end", "1609160800000711344"},
649-
{"encoding", "dbn"},
650-
{"stype_in", "raw_symbol"},
651-
{"stype_out", "instrument_id"},
652-
{"limit", "2"}},
653-
TEST_DATA_DIR "/test_data.mbo.v3.dbn.zst");
643+
mock_server_.MockPostDbn("/v0/timeseries.get_range",
644+
{{"dataset", dataset::kGlbxMdp3},
645+
{"symbols", "ESH1"},
646+
{"schema", "mbo"},
647+
{"start", "1609160400000711344"},
648+
{"end", "1609160800000711344"},
649+
{"encoding", "dbn"},
650+
{"stype_in", "raw_symbol"},
651+
{"stype_out", "instrument_id"},
652+
{"limit", "2"}},
653+
TEST_DATA_DIR "/test_data.mbo.v3.dbn.zst");
654654
const auto port = mock_server_.ListenOnThread();
655655

656656
databento::Historical target{&logger_, kApiKey, "localhost",
@@ -676,16 +676,16 @@ TEST_F(HistoricalTests, TestTimeseriesGetRange_Basic) {
676676
}
677677

678678
TEST_F(HistoricalTests, TestTimeseriesGetRange_NoMetadataCallback) {
679-
mock_server_.MockStreamDbn("/v0/timeseries.get_range",
680-
{{"dataset", dataset::kGlbxMdp3},
681-
{"start", "2022-10-21T13:30"},
682-
{"end", "2022-10-21T20:00"},
683-
{"symbols", "CYZ2"},
684-
{"schema", "tbbo"},
685-
{"encoding", "dbn"},
686-
{"stype_in", "raw_symbol"},
687-
{"stype_out", "instrument_id"}},
688-
TEST_DATA_DIR "/test_data.tbbo.v3.dbn.zst");
679+
mock_server_.MockPostDbn("/v0/timeseries.get_range",
680+
{{"dataset", dataset::kGlbxMdp3},
681+
{"start", "2022-10-21T13:30"},
682+
{"end", "2022-10-21T20:00"},
683+
{"symbols", "CYZ2"},
684+
{"schema", "tbbo"},
685+
{"encoding", "dbn"},
686+
{"stype_in", "raw_symbol"},
687+
{"stype_out", "instrument_id"}},
688+
TEST_DATA_DIR "/test_data.tbbo.v3.dbn.zst");
689689
const auto port = mock_server_.ListenOnThread();
690690

691691
databento::Historical target{&logger_, kApiKey, "localhost",
@@ -704,7 +704,7 @@ TEST_F(HistoricalTests, TestTimeseriesGetRange_NoMetadataCallback) {
704704
TEST_F(HistoricalTests, TestTimeseriesGetRange_BadRequest) {
705705
const nlohmann::json resp{
706706
{"detail", "Authorization failed: illegal chars in username."}};
707-
mock_server_.MockBadRequest("/v0/timeseries.get_range", resp);
707+
mock_server_.MockBadPostRequest("/v0/timeseries.get_range", resp);
708708
const auto port = mock_server_.ListenOnThread();
709709

710710
databento::Historical target{&logger_, kApiKey, "localhost",
@@ -729,8 +729,8 @@ TEST_F(HistoricalTests, TestTimeseriesGetRange_BadRequest) {
729729
}
730730

731731
TEST_F(HistoricalTests, TestTimeseriesGetRange_CallbackException) {
732-
mock_server_.MockStreamDbn("/v0/timeseries.get_range", {},
733-
TEST_DATA_DIR "/test_data.mbo.v3.dbn.zst");
732+
mock_server_.MockPostDbn("/v0/timeseries.get_range", {},
733+
TEST_DATA_DIR "/test_data.mbo.v3.dbn.zst");
734734
const auto port = mock_server_.ListenOnThread();
735735

736736
databento::Historical target{&logger_, kApiKey, "localhost",
@@ -747,8 +747,8 @@ TEST_F(HistoricalTests, TestTimeseriesGetRange_CallbackException) {
747747
}
748748

749749
TEST_F(HistoricalTests, TestTimeseriesGetRange_Cancellation) {
750-
mock_server_.MockStreamDbn("/v0/timeseries.get_range", {},
751-
TEST_DATA_DIR "/test_data.mbo.v3.dbn.zst");
750+
mock_server_.MockPostDbn("/v0/timeseries.get_range", {},
751+
TEST_DATA_DIR "/test_data.mbo.v3.dbn.zst");
752752
const auto port = mock_server_.ListenOnThread();
753753

754754
databento::Historical target{&logger_, kApiKey, "localhost",
@@ -777,9 +777,9 @@ TEST_F(HistoricalTests, TestTimeseriesGetRange_LargeChunks) {
777777
10005,
778778
{}}};
779779
constexpr auto kRecordCount = 50'000;
780-
mock_server_.MockStreamDbn("/v0/timeseries.get_range",
781-
{{"dataset", ToString(Dataset::IfusImpact)}},
782-
Record{&mbp1.hd}, kRecordCount, 75'000);
780+
mock_server_.MockPostDbn("/v0/timeseries.get_range",
781+
{{"dataset", ToString(Dataset::IfusImpact)}},
782+
Record{&mbp1.hd}, kRecordCount, 75'000);
783783
const auto port = mock_server_.ListenOnThread();
784784

785785
databento::Historical target{&logger_, kApiKey, "localhost",
@@ -804,9 +804,9 @@ TEST_F(HistoricalTests, TestTimeseriesGetRange_UnreadBytes) {
804804
10005,
805805
{}}};
806806
constexpr auto kRecordCount = 1'000;
807-
mock_server_.MockStreamDbn("/v0/timeseries.get_range",
808-
{{"dataset", ToString(Dataset::IfusImpact)}},
809-
Record{&mbp1.hd}, kRecordCount, 20, 75'000);
807+
mock_server_.MockPostDbn("/v0/timeseries.get_range",
808+
{{"dataset", ToString(Dataset::IfusImpact)}},
809+
Record{&mbp1.hd}, kRecordCount, 20, 75'000);
810810
const auto port = mock_server_.ListenOnThread();
811811

812812
logger_ = mock::MockLogReceiver{[](auto count, LogLevel level,
@@ -830,16 +830,16 @@ TEST_F(HistoricalTests, TestTimeseriesGetRange_UnreadBytes) {
830830
}
831831

832832
TEST_F(HistoricalTests, TestTimeseriesGetRangeToFile) {
833-
mock_server_.MockStreamDbn("/v0/timeseries.get_range",
834-
{{"dataset", dataset::kGlbxMdp3},
835-
{"start", "2022-10-21T13:30"},
836-
{"end", "2022-10-21T20:00"},
837-
{"symbols", "CYZ2"},
838-
{"schema", "tbbo"},
839-
{"encoding", "dbn"},
840-
{"stype_in", "raw_symbol"},
841-
{"stype_out", "instrument_id"}},
842-
TEST_DATA_DIR "/test_data.tbbo.v3.dbn.zst");
833+
mock_server_.MockPostDbn("/v0/timeseries.get_range",
834+
{{"dataset", dataset::kGlbxMdp3},
835+
{"start", "2022-10-21T13:30"},
836+
{"end", "2022-10-21T20:00"},
837+
{"symbols", "CYZ2"},
838+
{"schema", "tbbo"},
839+
{"encoding", "dbn"},
840+
{"stype_in", "raw_symbol"},
841+
{"stype_out", "instrument_id"}},
842+
TEST_DATA_DIR "/test_data.tbbo.v3.dbn.zst");
843843
const auto port = mock_server_.ListenOnThread();
844844

845845
databento::Historical target{&logger_, kApiKey, "localhost",

0 commit comments

Comments
 (0)