Skip to content

Commit 66a80bf

Browse files
committed
Refactor infrastructure and improve trading pair support
Key changes: 1. Infrastructure improvements: - Reordered session member in OrderBookInfraContext for better destruction order - Replaced jthread with boost::asio::post for IO context execution - Added proper session stopping in StopAsync() - Enhanced WebSocket session cleanup with promises/futures for proper shutdown 2. Trading pair enhancements: - Expanded supported trading pairs (added ETHBTC, BNBUSDT, SOLBNB, XRPUSDT, ADAUSDT, ADAXRP etc.) - Updated converters and mappings for new trading pairs - Added validation for trading pair in snapshot processing 3. Performance and stability: - Added latency measurement for Binance depth updates - Reduced verbose logging in JSON parsers - Improved error handling for missing trading pairs - Added proper cancellation support in WebSocket sessions 4. New features: - Added base TradeSubscriptionBuilder for spot and futures - Extended Bybit support with linear tickers channel - Improved memory pool constants for funding rate events 5. Bug fixes: - Fixed trading pair initialization in OrderBookEventInterface - Ensured proper trading pair setting in snapshot listeners - Added missing trading pair checks in sync logic The changes improve stability, add support for more trading pairs, enhance performance monitoring, and provide better resource cleanup during shutdown.
1 parent 5ffc4f8 commit 66a80bf

File tree

50 files changed

+1810
-195
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+1810
-195
lines changed

aoe/aoe.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
#include "aoe/binance/parser/json/ws/execution_response/i_execution_event_parser.h"
4444
#include "aoe/binance/parser/json/ws/order_response/i_order_event_parser.h"
4545
#include "aoe/binance/parser/json/ws/order_response/order_event_parser.h"
46+
#include "aoe/binance/parser/json/ws/trade_response/i_trade_event_parser.h"
47+
#include "aoe/binance/parser/json/ws/trade_response/trade_event_parser.h"
4648
#include "aoe/binance/request/amend_order/i_request.h"
4749
#include "aoe/binance/request/amend_order/request.h"
4850
#include "aoe/binance/request/cancel_order/i_request.h"
@@ -86,6 +88,10 @@
8688
#include "aoe/bybit/credentials_loader/credentials_loader.h"
8789
#include "aoe/bybit/enum_printer/enum_printer.h"
8890
#include "aoe/bybit/enums/enums.h"
91+
#include "aoe/bybit/event/funding_rate_event/funding_rate_event.h"
92+
#include "aoe/bybit/event/funding_rate_event/i_funding_rate_event.h"
93+
#include "aoe/bybit/events_acceptor/funding_rate_event/acceptor.h"
94+
#include "aoe/bybit/events_acceptor/funding_rate_event/i_acceptor.h"
8995
#include "aoe/bybit/execution_event/i_types.h"
9096
#include "aoe/bybit/execution_event/types.h"
9197
#include "aoe/bybit/execution_watcher/execution_watcher.h"
@@ -119,6 +125,8 @@
119125
#include "aoe/bybit/parser/json/ws/order_book_response/parser.h"
120126
#include "aoe/bybit/parser/json/ws/order_response/i_order_event_parser.h"
121127
#include "aoe/bybit/parser/json/ws/order_response/order_event_parser.h"
128+
#include "aoe/bybit/parser/json/ws/ticker_response/funding_rate/i_parser.h"
129+
#include "aoe/bybit/parser/json/ws/ticker_response/funding_rate/parser.h"
122130
#include "aoe/bybit/ping_manager/for_private_channel/ping_manager.h"
123131
#include "aoe/bybit/request/amend_order/i_request.h"
124132
#include "aoe/bybit/request/amend_order/request.h"
@@ -138,10 +146,13 @@
138146
#include "aoe/bybit/response_queue_listener/json/ws/order_book_response/listener.h"
139147
#include "aoe/bybit/response_queue_listener/json/ws/order_response/listener_default.h"
140148
#include "aoe/bybit/response_queue_listener/json/ws/order_response/listener.h"
149+
#include "aoe/bybit/response_queue_listener/json/ws/tickers_response/funding_rate/c_listener.h"
150+
#include "aoe/bybit/response_queue_listener/json/ws/tickers_response/funding_rate/listener.h"
141151
#include "aoe/bybit/session_setup/web_socket/private/session_setup.h"
142152
#include "aoe/bybit/session/web_socket/i_web_socket.h"
143153
#include "aoe/bybit/session/web_socket/web_socket.h"
144154
#include "aoe/bybit/subscription_builder/subscription_builder.h"
155+
#include "aoe/bybit/subscription_builder/tickers/subscription_builder.h"
145156
#include "aoe/credentials/api_key/api_key.h"
146157
#include "aoe/credentials/api_key/i_api_key.h"
147158
#include "aoe/credentials/i_credentials.h"

aoe/binance/infrastructure/infrastructure.h

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,13 @@ struct OrderBookInfraContext {
4646
best_ask_price_notifier;
4747

4848
std::unique_ptr<ListenerT> listener;
49-
std::unique_ptr<SessionRWType> session;
5049
std::unique_ptr<aoe::SubscriptionBuilderInterface>
5150
diff_subscription_builder;
51+
std::unique_ptr<SessionRWType> session; // it is last
5252

53-
std::jthread thread;
54-
boost::asio::executor_work_guard<boost::asio::io_context::executor_type>
55-
work_guard_{boost::asio::make_work_guard(ioc)};
53+
// std::jthread thread;
54+
// boost::asio::executor_work_guard<boost::asio::io_context::executor_type>
55+
// work_guard_{boost::asio::make_work_guard(ioc)};
5656
};
5757

5858
template <typename Price, typename Qty,
@@ -116,8 +116,10 @@ class InfrastructureImpl : public InfrastructureInterface,
116116
context.listener =
117117
std::make_unique<ListenerT>(pool_, queue, *context.sync);
118118

119-
context.thread =
120-
std::jthread([ioc_ptr = &context.ioc]() { ioc_ptr->run(); });
119+
// context.thread =
120+
// std::jthread([ioc_ptr = &context.ioc]() { ioc_ptr->run(); });
121+
// boost::asio::post(pool_,
122+
// [ioc_ptr = &context.ioc]() { ioc_ptr->run(); });
121123

122124
context.session = std::make_unique<SessionRWType>(context.ioc, queue,
123125
*context.listener);
@@ -193,11 +195,22 @@ class InfrastructureImpl : public InfrastructureInterface,
193195
for (auto& [pair, context] : contexts_) {
194196
if (context) {
195197
context->session->StartAsync();
198+
// context->thread = std::jthread(
199+
// [ioc_ptr = &context->ioc]() { ioc_ptr->run(); });
200+
boost::asio::post(
201+
pool_, [ioc_ptr = &context->ioc]() { ioc_ptr->run(); });
196202
}
197203
}
198204
}
199205

200-
void StopAsync() override { logi("[BINANCE INFRASTRUCTURE] async stop"); }
206+
void StopAsync() override {
207+
logi("[BINANCE INFRASTRUCTURE] async stop");
208+
for (auto& [pair, context] : contexts_) {
209+
if (context) {
210+
context->session->StopAsync();
211+
}
212+
}
213+
}
201214

202215
private:
203216
boost::asio::thread_pool& pool_;

aoe/binance/order_book_event/i_order_book_event.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class OrderBookEventInterface
4545

4646
protected:
4747
common::ExchangeId exchange_id_ = common::ExchangeId::kBinance;
48-
aos::TradingPair trading_pair_;
48+
aos::TradingPair trading_pair_ = aos::TradingPair::kCount;
4949
std::vector<aos::OrderBookLevelRaw<Price, Qty>> bids_;
5050
std::vector<aos::OrderBookLevelRaw<Price, Qty>> asks_;
5151
};

aoe/binance/order_book_sync/order_book_sync.h

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,9 @@ class OrderBookSync
9292
}
9393
boost::asio::awaitable<void> RequestNewSnapshot(aos::TradingPair pair) {
9494
boost::asio::io_context ioc;
95+
// response from binance not provide trading pair.
96+
// i need set it in manual mode
97+
listener_.SetTradingPair(pair);
9598

9699
aoe::binance::impl::main_net::spot::RestSessionRW session(
97100
ioc, response_queue_, listener_);
@@ -129,6 +132,16 @@ class OrderBookSync
129132
SetLastDiffUpdateId(diff.FinalUpdateId());
130133
diffs_.pop();
131134
}
135+
// verified snapshot
136+
if (ptr->TradingPair() == aos::TradingPair::kCount) {
137+
loge(
138+
"[PARSER SNAPSHOT] trading pair is not init. current "
139+
"trading_pair={}",
140+
ptr->TradingPair());
141+
need_process_current_snapshot_ = false;
142+
need_make_snapshot_ = true;
143+
co_return;
144+
}
132145
need_process_current_snapshot_ = true;
133146
co_return;
134147
}
@@ -279,7 +292,9 @@ class OrderBookSync
279292
}
280293
boost::asio::awaitable<void> RequestNewSnapshot(aos::TradingPair pair) {
281294
boost::asio::io_context ioc;
282-
295+
// response from binance not provide trading pair.
296+
// i need set it in manual mode
297+
listener_.SetTradingPair(pair);
283298
aoe::binance::impl::main_net::futures::RestSessionRW session(
284299
ioc, response_queue_, listener_);
285300
aoe::binance::snapshot::impl::SnapshotRequestSender<MemoryPool> sender(
@@ -300,7 +315,8 @@ class OrderBookSync
300315
last_snapshot_update_id_ = ptr->UpdateId();
301316
last_snapshot_ = ptr;
302317
snapshot_request_in_flight_ = false;
303-
logd("Snapshot received, clearing in-flight flag");
318+
logd("Snapshot received for {}, clearing in-flight flag",
319+
ptr->TradingPair());
304320
// skip earlier diffs as soon as possible
305321
while (!diffs_.empty()) {
306322
auto ptr = diffs_.front();
@@ -317,6 +333,16 @@ class OrderBookSync
317333
SetLastDiffUpdateId(diff.FinalUpdateId());
318334
diffs_.pop();
319335
}
336+
// verified snapshot
337+
if (ptr->TradingPair() == aos::TradingPair::kCount) {
338+
loge(
339+
"[PARSER SNAPSHOT] trading pair is not init. current "
340+
"trading_pair={}",
341+
ptr->TradingPair());
342+
need_process_current_snapshot_ = false;
343+
need_make_snapshot_ = true;
344+
co_return;
345+
}
320346
need_process_current_snapshot_ = true;
321347
co_return;
322348
}

aoe/binance/parser/json/ws/diff_response/parser.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,23 @@ class DiffEventParserBase
4343
event_type != "depthUpdate") {
4444
return {false, nullptr};
4545
}
46+
// // NEED PARSE E
47+
// --- 🔽 Парсим поле "E" (event time)
48+
uint64_t event_time = 0;
49+
if (doc["E"].get_uint64().get(event_time) != simdjson::SUCCESS) {
50+
return {false, nullptr};
51+
}
52+
// --- 🔽 Получаем текущее время (в мс с эпохи)
53+
uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
54+
std::chrono::system_clock::now().time_since_epoch())
55+
.count();
56+
57+
// --- 🔽 Вычисляем задержку
58+
int64_t delay_ms =
59+
static_cast<int64_t>(now) - static_cast<int64_t>(event_time);
60+
61+
// --- 🔽 Логгируем
62+
logi("[depthUpdate binance] Delay: {}", delay_ms);
4663

4764
EventPtr ptr = factory_event_();
4865

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#pragma once
2+
#include "aoe/binance/execution_event/i_types.h"
3+
#include "boost/intrusive_ptr.hpp"
4+
#include "simdjson.h" // NOLINT
5+
6+
namespace aoe {
7+
namespace binance {
8+
template <template <typename> typename MemoryPool, typename PositionT>
9+
class ExecutionEventParserInterface {
10+
public:
11+
virtual ~ExecutionEventParserInterface() = default;
12+
using EventPtr =
13+
boost::intrusive_ptr<ExecutionEventInterface<MemoryPool, PositionT>>;
14+
virtual std::pair<bool, EventPtr> ParseAndCreate(
15+
simdjson::ondemand::document& doc) = 0;
16+
};
17+
}; // namespace binance
18+
}; // namespace aoe

0 commit comments

Comments
 (0)