From 8c3d57d645235293564d80d1bdefcab55fe665ed Mon Sep 17 00:00:00 2001 From: ThePyLord Date: Sat, 8 Nov 2025 13:56:50 -0600 Subject: [PATCH 1/6] Fixed order matching and changed to MPSC architecture. --- main.cpp | 174 +++++++++++++++--------------- src/OrderBook.cpp | 197 ++++++++++++++++++++-------------- tests/test_lockfree_queue.cpp | 52 ++++++--- util/Logger.cpp | 9 +- 4 files changed, 237 insertions(+), 195 deletions(-) diff --git a/main.cpp b/main.cpp index 7df6c8f..c6d1b64 100644 --- a/main.cpp +++ b/main.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -12,115 +13,110 @@ #include "util/Logger.h" extern int matches; - +std::chrono::steady_clock::time_point start_time = std::chrono::steady_clock::now(); template T randval(T min, T max) { - if constexpr (std::is_integral::value) { - return ((T)rand() % (max - min + 1)) + min; - } else if constexpr (std::is_floating_point::value) { - return min + static_cast(rand()) / (static_cast(RAND_MAX) / (max - min)); - } + if constexpr (std::is_integral::value) { + return ((T)rand() % (max - min + 1)) + min; + } else if constexpr (std::is_floating_point::value) { + return min + static_cast(rand()) / (static_cast(RAND_MAX) / (max - min)); + } } Side randomSide() { - return randval(0, 1) == 0 ? BUY : SELL; + return randval(0, 1) == 0 ? BUY : SELL; } void signal_handler(int signum) { - if (signum == SIGINT) { - // Handle the Ctrl+C interrupt - // For example, clean up resources, save data, and exit gracefully - // Logger::getInstance().info("Ctrl+C detected! Exiting gracefully..."); - // Logger::getInstance().info("Found " + std::to_string(matches) + " matches."); - std::cout << "Ctrl+C detected! Exiting gracefully..." << std::endl; - std::cout << "Found " << matches << " matches." << std::endl; - exit(signum); // Terminate the program with the signal code - } + if (signum == SIGINT) { + // Handle the Ctrl+C interrupt + // For example, clean up resources, save data, and exit gracefully + // Logger::getInstance().info("Ctrl+C detected! Exiting gracefully..."); + // Logger::getInstance().info("Found " + std::to_string(matches) + " matches."); + std::cout << "Ctrl+C detected! Exiting gracefully..." << std::endl; + std::cout << "Found " << matches << " matches." << std::endl; + std::chrono::steady_clock::time_point end_time = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast(end_time - start_time).count(); + std::cout << "Total runtime: " << duration << " seconds." << std::endl; + exit(signum); // Terminate the program with the signal code + } } void addMarketOrders(OrderBook& ob, size_t num_orders) { - for (size_t i = 0; i < num_orders; i++) { - Order order(Order::createMarketOrder( - randomSide(), - randval(100, 110))); - ob.add_order(order); - } + for (size_t i = 0; i < num_orders; i++) { + Order order(Order::createMarketOrder( + randomSide(), + randval(100, 110))); + ob.add_order(order); + } } int main(int argc, char const* argv[]) { - Logger& logger = Logger::getInstance(); - logger.setLogFile("hft_sim.log"); - logger.setLogLevel(LogLevel::INFO); - logger.enableConsole(false); - logger.enableFile(true); - logger.info("Simulation started"); - // Seed the random number generator + Logger& logger = Logger::getInstance(); + logger.setLogFile("rnd.log"); + logger.setLogLevel(LogLevel::INFO); + logger.enableConsole(false); + logger.enableFile(true); + logger.info("Simulation started"); + // Seed the random number generator + srand(time(nullptr)); - OrderBook lob; // Create limit order book - LockFreeQueue order_queue; - // Create a dedicated thread to manage logging - // std::thread logger_thread(&Logger::run, &logger); - // logger_thread.detach(); // Detach the thread to run independently + std::random_device rd; + std::mt19937 gen(rd()); + + std::uniform_int_distribution<> dis(0, 1); + + OrderBook lob; // Create limit order book + LockFreeQueue order_queue; - // View orders (Expectation is they should be sorted, per std::map implementation) - auto bids = lob.getBids(); - auto asks = lob.getAsks(); + signal(SIGINT, signal_handler); + // Simulate the market + const size_t orders_per_producer = 100; + const size_t num_producers = 4; - signal(SIGINT, signal_handler); - // Simulate the market - const size_t orders_per_producer = 10; - const size_t num_producers = 2; - const size_t num_consumers = 2; + std::vector producers; + std::atomic done{false}; + producers.reserve(num_producers); - std::vector producers; - std::vector consumers; - producers.reserve(num_producers); - consumers.reserve(num_consumers); + for (size_t i = 0; i < num_producers; ++i) { + producers.emplace_back( + [](LockFreeQueue& p_queue, uint32_t num_orders) { + for (size_t j = 0; j < num_orders; ++j) { + Order order( + randomSide(), // Side (BUY/SELL) + (Type)(randval(0, 1)), // Market/Limit orders + randval(50, 54), // Price [50, 54] + randval(100, 110)); // Order size + p_queue.push_back(order); + } + }, + std::ref(order_queue), + orders_per_producer); + } - for (size_t i = 0; i < num_producers; ++i) { - producers.emplace_back( - [](LockFreeQueue& p_queue, uint32_t num_orders) { - for (size_t j = 0; j < num_orders; ++j) { - Order order( - randomSide(), // Side (BUY/SELL) - (Type)(randval(0, 1)), // Market/Limit orders - randval(50, 54), // Price [50, 54] - randval(100, 110)); // Order size - p_queue.push_back(order); - } - }, - std::ref(order_queue), - orders_per_producer); - } - for (size_t i = 0; i < num_consumers; ++i) { - consumers.emplace_back( - [](LockFreeQueue& o_queue, OrderBook& lob) { - while (true) { - std::optional order = o_queue.pop(); - if (order.has_value()) { - lob.add_order(*order); - } else { - std::this_thread::yield(); // Yield if no orders are available - } - } - }, - std::ref(order_queue), - std::ref(lob)); - } + for (auto& producer : producers) { + if (producer.joinable()) { + producer.join(); + } + } - // while (1) { - // Order order( - // randomSide(), // Side (BUY/SELL) - // (Type)(randval(0, 1)), // Market/Limit orders - // randval(50, 54), // Price [50, 54] - // randval(100, 110)); // Order size - // lob.add_order(order); - // if (lob.getBids().size() > 2 and lob.getAsks().size() > 2) { - // // puts("Matching orders..."); - // lob.match_orders(); - // } - // } + done.store(true, std::memory_order_release); - return 0; + std::thread consumer_thread([&]() { + while (true) { + std::optional order = order_queue.pop(); + if (order.has_value()) { + lob.add_order(*order); + } else { + if (done.load(std::memory_order_acquire)) { + break; + } + std::this_thread::yield(); + } + } + }); + consumer_thread.join(); + + return 0; } diff --git a/src/OrderBook.cpp b/src/OrderBook.cpp index e371e2c..ac4d0e2 100644 --- a/src/OrderBook.cpp +++ b/src/OrderBook.cpp @@ -3,7 +3,7 @@ #include #include #include -#include "Logger.h" +// #include "Logger.h" int matches = 0; @@ -11,14 +11,83 @@ int getMatches() { return matches; } +/** + * Adds an order to the order book. + * Market orders are matched immediately. + * @param order Order to be added to the order book + */ +void OrderBook::add_order(Order& order) { + if (order.getType() == MARKET) { + // Handle market orders immediately + + logger.debug(std::format("Adding market order {} to order book", getSideName(order.getSide()))); + logger.debug(std::format("Order price: {}", order.getPrice().value())); + match_market_order(order); + return; + } + // assert that order has a price (limit orders need a value) + // assert(order.getPrice().has_value() && "Limit orders must have a price"); + if (order.getSide() == BUY) { + // Store the location of the order in the bids map + bids[order.getPrice().value()].push_back(order); + // bidsDeque[order.getPrice().value()].push_back(order); + if (!bids[order.getPrice().value()].empty()) { + _order_locations[order.getId()] = std::prev(bids[order.getPrice().value()].end()); + // std::string string = std::format("Added BUY order ID {} at price {:.2f} size {}", order.getId(), order.getPrice().value(), order.getSize()); + // logger.debug(string); + } + } else if (order.getSide() == SELL) { + asks[order.getPrice().value()].push_back(order); + // asksDeque[order.getPrice().value()].push_back(order); + if (!asks[order.getPrice().value()].empty()) { + _order_locations[order.getId()] = std::prev(asks[order.getPrice().value()].end()); + // std::string string = std::format("Added SELL order ID {} at price {:.2f} size {}", order.getId(), order.getPrice().value(), order.getSize()); + // logger.debug(string); + } + } +} + +bool OrderBook::cancel_order(Order& order) { + // TODO: Complete this method (orders need to be removed from the order book) + auto it = _order_locations.find(order.getId()); + if (it == _order_locations.end()) { + // Order not found + // std::cout << "Order with ID " << order.getId() << " not found in order book." << std::endl; + return false; + } + // std::cout << it->first << " => " << getSideName(it->second->getSide()) << " " << it->second->getPrice().value() << std::endl; + + auto side = it->second->getSide(); + auto price = it->second->getPrice().value(); + + if (side == BUY) { + bids[price].erase(it->second); // Remove the order from the list at this price + if (bids[price].empty()) { + bids.erase(price); // Remove the price level if no orders left + } + } + else if (side == SELL) { + asks[price].erase(it->second); // Remove the order from the list at this price; + if (asks[price].empty()) { + asks.erase(price); + } + } + _order_locations.erase(it); + // std::cout << "Order with ID " << order.getId() << " cancelled successfully." << std::endl; + return true; +} + void OrderBook::executeTrade(Order& bid, Order& ask, uint32_t fill_qty) { auto latency = std::chrono::duration_cast( std::chrono::steady_clock::now() - bid.getTimestamp()).count(); - Logger& logger = Logger::getInstance(); // Log the trade execution if (ask.getPrice().has_value()) { - std::string trade_info = std::format("Trade executed: {} units at price {:.2f} {:} (Latency: {}µs)", - fill_qty, ask.getPrice().value(), 4, latency); + std::string trade_info = std::format(" (ASK) Trade executed: {} units at price {:.2f} (Latency: {}µs)", + fill_qty, ask.getPrice().value(), latency); + logger.info(trade_info); + } else if (bid.getPrice().has_value()) { + std::string trade_info = std::format(" (BID) Trade executed: {} units at price {:.2f} (Latency: {}µs)", + fill_qty, bid.getPrice().value(), latency); logger.info(trade_info); } @@ -26,51 +95,65 @@ void OrderBook::executeTrade(Order& bid, Order& ask, uint32_t fill_qty) { ask.setSize(ask.getSize() - fill_qty); } + void OrderBook::match_market_order(Order& order) { + if (bids.empty() || asks.empty()) { + logger.debug("Other side of book is empty, cannot match market order"); + return; + } if (order.getSide() == BUY) { - bool asksEmpty = asks.empty() ? true : false; - if(asksEmpty) - return; - // Iterator for asks (loop through price levels with this) auto askIter = asks.begin(); - while (order.getSize() > 0 && !askIter->second.empty()) { + while (order.getSize() > 0 && askIter != asks.end()) { // Loop through orders at given price orders using this iterator auto& asksAtPrice = askIter->second; + if (asksAtPrice.empty()) { + // Move to the next price level if no orders at this price + logger.debug("No asks at this price level for market buy order"); + ++askIter; + continue; + } auto& ask = asksAtPrice.front(); - // printf("Matching order! [LIMIT ORDER] %d x %d [MARKET]\n", ask.getSize(), order.getSize()); + uint32_t fill_qty = std::min(ask.getSize(), order.getSize()); uint32_t rem_asks = ask.getSize() - fill_qty; executeTrade(order, ask, fill_qty); matches++; + logger.debug("Matched orders in order book"); + logger.debug("Checking for empty orders after match"); if (rem_asks == 0) { asksAtPrice.pop_front(); } if(asksAtPrice.empty()) { // Removes the iterator for the asks at this given price - asks.erase(askIter); + askIter = asks.erase(askIter); } } } else { - // bool emptyBids = bids.empty() ? true : false; - - if (bids.empty()) - return; auto bidIter = bids.rbegin(); - while (order.getSize() > 0 && !bidIter->second.empty()) { + while (order.getSize() > 0 && bidIter != bids.rend()) { // Loop through orders at given price orders using this iterator auto& bidsAtPrice = bidIter->second; + if (bidsAtPrice.empty()) { + // Move to the next price level if no orders at this price + logger.debug("No bids at this price level for market sell order"); + ++bidIter; + continue; + } auto& bid = bidsAtPrice.front(); - // printf("Matching order! [LIMIT ORDER] %d x %d [MARKET]\n", bid.getSize(), order.getSize()); uint32_t fill_qty = std::min(bid.getSize(), order.getSize()); uint32_t rem_bids = bid.getSize() - fill_qty; - executeTrade(order, bid, fill_qty); + executeTrade(bid, order, fill_qty); matches++; + logger.debug("Matched orders in order book"); + logger.debug("Checking for empty orders after match"); + // Size exhausted for this bid order (fully filled) if (rem_bids == 0) { + logger.debug("Removing bid order after market sell"); bidsAtPrice.pop_front(); } @@ -78,80 +161,28 @@ void OrderBook::match_market_order(Order& order) { // Removes the iterator for the bids at this given price // Use the base() method to convert reverse_iterator to regular iterator // This allows us to safely erase the element - bids.erase(std::next(bidIter).base()); + logger.debug("Removing bid price level after market sell"); + bidIter = std::reverse_iterator(bids.erase(std::next(bidIter).base())); } } } } -/** - * Adds an order to the order book. - * Market orders are matched immediately. - * @param order Order to be added to the order book - */ -void OrderBook::add_order(Order& order) { - if (order.getType() == MARKET) { - // Handle market orders immediately - match_market_order(order); - return; - } - // assert that order has a price (limit orders need a value) - if (order.getSide() == BUY) { - // Store the location of the order in the bids map - bids[order.getPrice().value()].push_back(order); - // bidsDeque[order.getPrice().value()].push_back(order); - _order_locations[order.getId()] = std::prev(bids[order.getPrice().value()].end()); - - } else if (order.getSide() == SELL) { - asks[order.getPrice().value()].push_back(order); - // asksDeque[order.getPrice().value()].push_back(order); - _order_locations[order.getId()] = std::prev(asks[order.getPrice().value()].end()); - } -} - -bool OrderBook::cancel_order(Order& order) { - // TODO: Complete this method (orders need to be removed from the order book) - auto it = _order_locations.find(order.getId()); - if (it == _order_locations.end()) { - // Order not found - // std::cout << "Order with ID " << order.getId() << " not found in order book." << std::endl; - return false; - } - // std::cout << it->first << " => " << getSideName(it->second->getSide()) << " " << it->second->getPrice().value() << std::endl; - - auto side = it->second->getSide(); - auto price = it->second->getPrice().value(); - - if (side == BUY) { - bids[price].erase(it->second); // Remove the order from the list at this price - if (bids[price].empty()) { - bids.erase(price); // Remove the price level if no orders left - } - } - else if (side == SELL) { - asks[price].erase(it->second); // Remove the order from the list at this price; - if (asks[price].empty()) { - asks.erase(price); - } - } - _order_locations.erase(it); - // std::cout << "Order with ID " << order.getId() << " cancelled successfully." << std::endl; - return true; -} - void OrderBook::match_orders() { + auto bidIter = bids.begin(); + auto askIter = asks.begin(); + logger.debug("Matching orders in order book..."); while (!bids.empty() && !asks.empty()) { - auto bidIter = bids.begin(); - auto askIter = asks.begin(); - // If the bid or ask is empty, remove it from the book // Safely handle iterator invalidation - if (bidIter->second.empty()) { + if (bidIter != bids.end()) { bidIter = bids.erase(bidIter); + logger.debug("Removing empty bid price level in matching orders"); continue; } - if (askIter->second.empty()) { + if (askIter != asks.end()) { + logger.debug("Removing empty ask price level in matching orders"); askIter = asks.erase(askIter); continue; } @@ -170,16 +201,20 @@ void OrderBook::match_orders() { uint32_t rem_asks = askOrder.getSize() - trade_quantity; executeTrade(bidOrder, askOrder, trade_quantity); - + logger.debug("Matched orders in order book"); + logger.debug("Checking for empty orders after match"); if (rem_bids == 0) { bidIter->second.pop_front(); // Remove the order from the order locations map + logger.debug("Removing bid order after match"); _order_locations.erase(bidOrder.getId()); } - if (rem_asks == 0) + if (rem_asks == 0) { askIter->second.pop_front(); // Remove the order from the order locations map + logger.debug("Removing ask order after match"); _order_locations.erase(askOrder.getId()); + } // Safely handle iterator invalidation if (bidIter->second.empty()) diff --git a/tests/test_lockfree_queue.cpp b/tests/test_lockfree_queue.cpp index 26a5581..3cf0b93 100644 --- a/tests/test_lockfree_queue.cpp +++ b/tests/test_lockfree_queue.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include "LockFreeQueue.h" // Suite: LockFreeQueueTest_Basic @@ -16,8 +17,7 @@ TEST(LockFreeQueueTest_Basic, PushPopSingle) { int result = 42; // This will fail (TDD) until pop is implemented. auto out = q.pop(); - EXPECT_TRUE(out); - EXPECT_EQ(result, value); + ASSERT_EQ(out.value(), value); } TEST(LockFreeQueueTest_Basic, PushMultiplePopOrder) { @@ -36,7 +36,7 @@ TEST(LockFreeQueueTest_Basic, PushMultiplePopOrder) { TEST(LockFreeQueueTest_Empty, PopEmptyReturnsFalse) { LockFreeQueue q; auto result = q.pop(); - EXPECT_FALSE(result.has_value()); + ASSERT_FALSE(result.has_value()); } // Suite: LockFreeQueueTest_Concurrency @@ -52,7 +52,6 @@ TEST(LockFreeQueueTest_Concurrency, ConcurrentPushPop) { }; auto consumer = [&q, &pop_count, N]() { - for (int i = 0; i < N; ++i) { auto val = q.pop(); while (!val.has_value()) { @@ -60,14 +59,17 @@ TEST(LockFreeQueueTest_Concurrency, ConcurrentPushPop) { std::this_thread::sleep_for(std::chrono::microseconds(50)); } ++pop_count; + EXPECT_EQ(val.value(), i); + std::cout << "Popped value: " << val.value() << std::endl; } }; - std::thread t1(producer), t2(consumer); - t1.join(); - t2.join(); + // Spin up a single producer and consumer thread + std::thread t1(producer), t2(consumer); + t1.join(); + t2.join(); - EXPECT_EQ(pop_count, N); + ASSERT_EQ(pop_count, N); } // Benchmark: Single-threaded push and pop @@ -76,7 +78,7 @@ TEST(LockFreeQueueBenchmark, SingleThreaded) { const int N = 1000000; auto start = std::chrono::high_resolution_clock::now(); for (int i = 0; i < N; ++i) q.push_back(i); - // int val; + for (int i = 0; i < N; ++i) q.pop(); auto end = std::chrono::high_resolution_clock::now(); double ms = std::chrono::duration(end - start).count(); @@ -84,24 +86,40 @@ TEST(LockFreeQueueBenchmark, SingleThreaded) { << (N * 2 / ms * 1000) << " ops/sec" << std::endl; } -// Benchmark: Producer-consumer (2 threads) +// Benchmark: Producer-consumer with multiple producers TEST(LockFreeQueueBenchmark, ProducerConsumer) { + std::locale::global(std::locale("en_US.UTF-8")); LockFreeQueue q; - const int N = 1000000; + const int N = 100'000'000; + std::atomic producers_done{false}; auto start = std::chrono::high_resolution_clock::now(); - std::thread producer([&q, N]() { - for (int i = 0; i < N; ++i) q.push_back(i); - }); - std::thread consumer([&q, N]() { + std::vector producers; + producers.reserve(20); + std::cout << "Starting " << producers.capacity() << " producer threads..." << std::endl; + producers.emplace_back( + [](LockFreeQueue &q, int N) { + for (int i = 0; i < N; ++i) + q.push_back(i); + }, + std::ref(q), N); + + for (auto& producer : producers) { + if (producer.joinable()) { + producer.join(); + } + } + + producers_done.store(true, std::memory_order_release); + + std::thread consumer([&]() { for (int i = 0; i < N; ++i) { auto val = q.pop(); while (!val) std::this_thread::yield(); } }); - producer.join(); consumer.join(); auto end = std::chrono::high_resolution_clock::now(); double ms = std::chrono::duration(end - start).count(); - std::cout << "[ProducerConsumer] " << N << " push+pop in " << ms << " ms, " + std::cout << "[ProducerConsumer] " << std::format("{:L}", N) << " push+pop in " << ms << " ms, " << (N * 2 / ms * 1000) << " ops/sec" << std::endl; } diff --git a/util/Logger.cpp b/util/Logger.cpp index d1a1b60..6454898 100644 --- a/util/Logger.cpp +++ b/util/Logger.cpp @@ -23,8 +23,8 @@ Logger::~Logger() { } void Logger::setLogFile(const std::string& filename) { - std::lock_guard lock(mutex_); log_file_ = filename; + std::cout << "Log file set to: " << log_file_ << std::endl; if (file_stream_.is_open()) { file_stream_.close(); } @@ -34,12 +34,10 @@ void Logger::setLogFile(const std::string& filename) { } void Logger::enableConsole(bool enable) { - std::lock_guard lock(mutex_); console_enabled_ = enable; } void Logger::enableFile(bool enable) { - std::lock_guard lock(mutex_); file_enabled_ = enable; if (file_enabled_ && !file_stream_.is_open()) { file_stream_.open(log_file_, std::ios::app); @@ -49,7 +47,6 @@ void Logger::enableFile(bool enable) { } void Logger::setLogLevel(LogLevel level) { - std::lock_guard lock(mutex_); min_level_ = level; } @@ -72,7 +69,6 @@ void Logger::error(const std::string& msg) { } void Logger::logImpl(LogLevel level, const std::string& msg) { - std::lock_guard lock(mutex_); // Timestamp auto now = std::chrono::system_clock::now(); auto in_time_t = std::chrono::system_clock::to_time_t(now); @@ -80,9 +76,6 @@ void Logger::logImpl(LogLevel level, const std::string& msg) { ss << std::put_time(std::localtime(&in_time_t), "%Y-%m-%d %H:%M:%S"); // Log format: [LEVEL][timestamp] message std::string log_line = "[" + levelToString(level) + "][" + ss.str() + "] " + msg + "\n"; - - // buffer[num_logged] = log_line; - // num_logged = (num_logged + 1) % buffer.size(); if (console_enabled_) { std::cout << log_line; } From 9139454d42d388f3c6f28ef9910fb2df5c55cd67 Mon Sep 17 00:00:00 2001 From: ThePyLord Date: Sat, 8 Nov 2025 13:57:07 -0600 Subject: [PATCH 2/6] Updated .gitignore and CMakeLists --- .gitignore | 2 ++ CMakeLists.txt | 14 +++++++++----- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/.gitignore b/.gitignore index 050f2f5..2a9275e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,7 @@ +# Dotfiles and system files .idea/ .vscode/ +.cache/ .DS_Store build/ diff --git a/CMakeLists.txt b/CMakeLists.txt index 3033af2..d271726 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,9 +1,10 @@ cmake_minimum_required(VERSION 3.14) # Set the C++ standard +set(CMAKE_EXPORT_COMPILE_COMMANDS ON) set(CMAKE_CXX_STANDARD 20) set(CMAKE_CXX_STANDARD_REQUIRED ON) -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fexperimental-library") +# set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fexperimental-library") project(hft_sim) @@ -21,6 +22,7 @@ FetchContent_Declare( GIT_SHALLOW TRUE ) FetchContent_MakeAvailable(googletest) +include(GoogleTest) # Net library set(NET_SOURCES @@ -89,14 +91,15 @@ set(TEST_SOURCES tests/test_udp_transport.cpp ) -# Function to add a Google Test executable (commented out for now) +# Function to add a Google Test executable function(add_gtest_test TEST_NAME TEST_SOURCE) add_executable(${TEST_NAME} ${TEST_SOURCE} src/OrderBook.cpp src/Order.cpp util/Logger.cpp) - set_target_properties(${TEST_NAME} PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${TEST_OUTPUT_DIR}) + set_target_properties(${TEST_NAME} PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${TEST_OUTPUT_DIR}) target_include_directories(${TEST_NAME} PRIVATE - ${CMAKE_SOURCE_DIR}/include - ${CMAKE_SOURCE_DIR}/util) + ${CMAKE_SOURCE_DIR}/include + ${CMAKE_SOURCE_DIR}/util) target_link_libraries(${TEST_NAME} PRIVATE gtest gtest_main hsnet) + gtest_discover_tests(${TEST_NAME} PRIVATE) add_test(NAME ${TEST_NAME} COMMAND ${TEST_OUTPUT_DIR}/${TEST_NAME}) message("Writing test executable: ${TEST_NAME} with source: ${TEST_SOURCE} to ${TEST_OUTPUT_DIR}") endfunction() @@ -105,6 +108,7 @@ add_gtest_test(test_full_orders tests/test_full_orders.cpp) add_gtest_test(test_cancel_order tests/test_cancel_order.cpp) add_gtest_test(test_crc32c tests/test_crc32c.cpp) add_gtest_test(test_reordering_buffer tests/test_reordering_buffer.cpp) +add_gtest_test(test_lockfree_queue tests/test_lockfree_queue.cpp) #foreach(TEST_SRC ${TEST_SOURCES}) From 1c7099e7d75ede240e0d57dbdf5d1f7e02863dc6 Mon Sep 17 00:00:00 2001 From: ThePyLord Date: Thu, 1 Jan 2026 18:09:51 -0600 Subject: [PATCH 3/6] Implemented basic memory pool for future use. --- CMakeLists.txt | 2 ++ include/MemoryPool.h | 69 ++++++++++++++++++++++++++++++++++++++ tests/test_memory_pool.cpp | 26 ++++++++++++++ 3 files changed, 97 insertions(+) mode change 100644 => 100755 CMakeLists.txt create mode 100644 include/MemoryPool.h create mode 100644 tests/test_memory_pool.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt old mode 100644 new mode 100755 index d271726..424d4fb --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -89,6 +89,7 @@ set(TEST_SOURCES tests/test_crc32c.cpp tests/test_reordering_buffer.cpp tests/test_udp_transport.cpp + tests/test_memory_pool.cpp ) # Function to add a Google Test executable @@ -109,6 +110,7 @@ add_gtest_test(test_cancel_order tests/test_cancel_order.cpp) add_gtest_test(test_crc32c tests/test_crc32c.cpp) add_gtest_test(test_reordering_buffer tests/test_reordering_buffer.cpp) add_gtest_test(test_lockfree_queue tests/test_lockfree_queue.cpp) +add_gtest_test(test_memory_pool tests/test_memory_pool.cpp) #foreach(TEST_SRC ${TEST_SOURCES}) diff --git a/include/MemoryPool.h b/include/MemoryPool.h new file mode 100644 index 0000000..f32bac3 --- /dev/null +++ b/include/MemoryPool.h @@ -0,0 +1,69 @@ +#ifndef MEMORY_POOL_H +#define MEMORY_POOL_H +#include +#include +#include +#include + +template +class MemoryPool { +public: + explicit MemoryPool(size_t pool_size = 1024): pool_size_(pool_size) { + pool_ = std::make_unique(pool_size_); + init_free_list(); + }; + + ~MemoryPool() {}; + + template + T* allocate(Args&&... args) { + if(!head) + return nullptr; + Slot* curr = head; + head = head->next; + return new (&curr->data) T(std::forward(args)...); // Pool exhausted + }; + + /** + * Marks a pooled object slot as free so it can be reused. + * @param obj Pointer previously obtained from this pool's allocate(). Must point + * into the memory block returned by pool_.get(). + * See also: allocate() + */ + void deallocate(T* obj) { + + if(!obj) + return; + obj->~T(); + + Slot* curr = reinterpret_cast(obj); + // Set the current slot to the top of the free list + curr->next = head; + head = curr; + }; + +private: + // Storage + union Slot { + T data; + Slot* next; + }; + std::unique_ptr pool_; + // free list + Slot* head; + // In-use buffer + bool* in_use_; + size_t pool_size_; + + void init_free_list() { + std::cout << "Initializing free list..." << std::endl; + for (size_t i = 0; i < pool_size_ - 1; ++i) { + pool_[i].next = &pool_[i + 1]; + } + pool_[pool_size_ - 1].next = nullptr; + head = &pool_[0]; + } +}; + + +#endif \ No newline at end of file diff --git a/tests/test_memory_pool.cpp b/tests/test_memory_pool.cpp new file mode 100644 index 0000000..e2c7baf --- /dev/null +++ b/tests/test_memory_pool.cpp @@ -0,0 +1,26 @@ +#include +#include "MemoryPool.h" + +TEST(MemoryPoolTest, AcquireAndRelease) { + const size_t POOL_SIZE = 10; + MemoryPool intPool(POOL_SIZE); + + // Allocate all objects + int* nums[POOL_SIZE]; + for (size_t i = 0; i < POOL_SIZE; ++i) { + nums[i] = intPool.allocate(); + } + + // // Pool should be exhausted now + int* val = intPool.allocate(); + ASSERT_EQ(intPool.allocate(), nullptr); + + // // Release one object + intPool.deallocate(nums[5]); + + // // Now we should be able to acquire one more object + int* newObj = intPool.allocate(); + ASSERT_NE(newObj, nullptr); + *newObj = 42; + ASSERT_EQ(*newObj, 42); +} \ No newline at end of file From 8fe2fbe4b7d1c9e9a9dace514f9da8fcd908972e Mon Sep 17 00:00:00 2001 From: ThePyLord Date: Fri, 2 Jan 2026 00:39:29 -0600 Subject: [PATCH 4/6] Implemented CRC32 hardware option. --- .gitignore | 0 CMakeLists.txt | 9 +++++++++ include/net/Crc32c.h | 0 src/net/Crc32c.cpp | 26 +++++++++++++++++++------- tests/test_crc32c.cpp | 0 5 files changed, 28 insertions(+), 7 deletions(-) mode change 100644 => 100755 .gitignore mode change 100644 => 100755 include/net/Crc32c.h mode change 100644 => 100755 src/net/Crc32c.cpp mode change 100644 => 100755 tests/test_crc32c.cpp diff --git a/.gitignore b/.gitignore old mode 100644 new mode 100755 diff --git a/CMakeLists.txt b/CMakeLists.txt index 424d4fb..d9531ac 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -34,6 +34,15 @@ set(NET_SOURCES add_library(hsnet STATIC ${NET_SOURCES}) target_include_directories(hsnet PUBLIC ${CMAKE_SOURCE_DIR}/include/net) +# Enable ARM CRC intrinsics on Apple Silicon (M1/M2) for hsnet only +if(APPLE AND CMAKE_SYSTEM_PROCESSOR MATCHES "^(arm|aarch64)") + message(STATUS "Enabling ARM CRC intrinsics for hsnet on Apple Silicon") + # Request an ARM architecture that supports CRC instructions + target_compile_options(hsnet PRIVATE -march=armv8-a+crc) + target_compile_definitions(hsnet PRIVATE __ARM_FEATURE_CRC32=1) +endif() + + # Add the source files for the main executable set(SOURCES main.cpp diff --git a/include/net/Crc32c.h b/include/net/Crc32c.h old mode 100644 new mode 100755 diff --git a/src/net/Crc32c.cpp b/src/net/Crc32c.cpp old mode 100644 new mode 100755 index 0aeb523..771c682 --- a/src/net/Crc32c.cpp +++ b/src/net/Crc32c.cpp @@ -1,7 +1,12 @@ #include "net/Crc32c.h" #include +#include #include +#include +#ifdef __ARM_FEATURE_CRC32 +# include +#endif namespace hsnet { @@ -78,21 +83,28 @@ static constexpr uint32_t CRC32C_TABLE[256] = { // Software CRC32C implementation using lookup table uint32_t crc32c_sw(const uint8_t* data, size_t len) noexcept { - uint32_t crc = 0xFFFFFFFF; + uint32_t crc = 0xFFFFFFFFu; for (size_t i = 0; i < len; ++i) { crc = (crc >> 8) ^ CRC32C_TABLE[(crc ^ data[i]) & 0xFF]; } - return crc ^ 0xFFFFFFFF; + return crc ^ 0xFFFFFFFFu; } -// Hardware CRC32C implementation (stub for now) +// Hardware CRC32C implementation via the Castagnoli polynomial uint32_t crc32c_hw(const uint8_t* data, size_t len) noexcept { - // TODO: Implement with SSE4.2 (Intel) or ARMv8 CRC instructions - // For now, fall back to software implementation - return crc32c_sw(data, len); + #ifdef __ARM_FEATURE_CRC32 + uint32_t crc = 0xFFFFFFFFu; + for (size_t i = 0; i < len; ++i) { + crc = __crc32cb(crc, data[i]); + } + return crc ^ 0xFFFFFFFFu; + #else + // Fallback to software if unavailable + return crc32c_sw(data, len); + #endif } -// Main CRC32C function - currently always uses software +// Main CRC32C function (Incomplete) uint32_t crc32c(const uint8_t* data, size_t len) noexcept { // TODO: Add runtime CPU feature detection and use hardware when available return crc32c_sw(data, len); diff --git a/tests/test_crc32c.cpp b/tests/test_crc32c.cpp old mode 100644 new mode 100755 From 8d075c40b14f6da37116c0a115404283ad7a46ef Mon Sep 17 00:00:00 2001 From: ThePyLord Date: Fri, 2 Jan 2026 00:41:35 -0600 Subject: [PATCH 5/6] Updated README --- README.md | 31 +++++++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) mode change 100644 => 100755 README.md diff --git a/README.md b/README.md old mode 100644 new mode 100755 index ac387ba..2179deb --- a/README.md +++ b/README.md @@ -10,12 +10,39 @@ The simulator allows users to test various trading strategies, analyze market da - Order cancellation - Feed publishing and subscription - Trade execution simulation +- Lock-free multi-producer single-consumer (MPSC) queue for high-performance message passing +- Basic logging functionality -## Things that can be improved +## Things to work on / TODOs - Performance optimizations (order matching and execution) - Price representation as floats (should be a "Money" type) +- More sophisticated order types (e.g., stop-loss, iceberg orders) +- Enhanced logging and monitoring capabilities + +# Potential future features +- Integration with real market data feeds +- Support for additional asset classes (e.g., options, futures) +- Advanced analytics and reporting tools +- + +# Getting Started + +## Prerequisites +- C++17 or later +- CMake 3.20 or later +- A C++ compiler that supports C++17 (e.g., GCC, Clang, MSVC) +## Testing +To run all tests, use the following command after building the project: + +```bash +cd build/ +ctest +``` + ## References +While I was working on the lock-free MPSC queue, I found the following resources helpful: - [Lock-Free Queue Implementation](https://people.cs.pitt.edu/~jacklange/teaching/cs2510-f17/implementing_lock_free.pdf) - - [Lock-Free MPMC](https://www.linuxjournal.com/content/lock-free-multi-producer-multi-consumer-queue-ring-buffer) \ No newline at end of file + - [Lock-Free MPMC](https://www.linuxjournal.com/content/lock-free-multi-producer-multi-consumer-queue-ring-buffer) + From d53560e863b5f817de1d56fb1a00d4cc7e4745d5 Mon Sep 17 00:00:00 2001 From: ThePyLord Date: Sun, 4 Jan 2026 01:27:47 -0600 Subject: [PATCH 6/6] ReorderingBuffer refactor, simplified the API. --- include/net/ReorderingBuffer.h | 47 ++++++++++++++++++-------------- src/net/ReorderingBuffer.cpp | 13 +++++---- tests/test_reordering_buffer.cpp | 43 +++++++++++++++-------------- 3 files changed, 55 insertions(+), 48 deletions(-) mode change 100644 => 100755 include/net/ReorderingBuffer.h mode change 100644 => 100755 src/net/ReorderingBuffer.cpp mode change 100644 => 100755 tests/test_reordering_buffer.cpp diff --git a/include/net/ReorderingBuffer.h b/include/net/ReorderingBuffer.h old mode 100644 new mode 100755 index 7f527f0..709dac3 --- a/include/net/ReorderingBuffer.h +++ b/include/net/ReorderingBuffer.h @@ -6,53 +6,58 @@ #include namespace hsnet { - +using data_t = std::vector; // Simple reordering buffer that ensures in-sequence delivery class ReorderingBuffer { public: explicit ReorderingBuffer(size_t max_size = 1024); - + // Add a packet with sequence number, returns true if it was added - bool add(uint64_t sequence, std::vector data, uint32_t stream_id = 0); - + bool add(uint64_t sequence, data_t data); + // Get next in-sequence packet, returns empty if none available - std::optional, uint32_t>> get_next(); - + std::optional get_next(); + // Check if we have any packets ready for delivery bool has_ready() const; - + // Get the next expected sequence number - uint64_t next_expected() const { return next_seq_; } - + uint64_t next_expected() const { + return next_seq_; + } + // Get statistics - size_t size() const { return count_; } - size_t max_size() const { return max_size_; } - + size_t size() const { + return count_; + } + size_t max_size() const { + return max_size_; + } + // Clear the buffer (useful for reset scenarios) void clear(); private: struct Packet { uint64_t sequence; - std::vector data; - uint32_t stream_id; + data_t data; bool valid; - - Packet() : sequence(0), stream_id(0), valid(false) {} - Packet(uint64_t seq, std::vector d, uint32_t sid) : sequence(seq), data(std::move(d)), stream_id(sid), valid(true) {} + + Packet() : sequence(0), valid(false) {} + Packet(uint64_t seq, data_t d) : sequence(seq), data(std::move(d)), valid(true) {} }; - + std::vector buffer_; size_t max_size_; uint64_t next_seq_; size_t head_; size_t count_; - + // Helper to find packet in buffer size_t find_packet(uint64_t sequence) const; - + // Helper to advance head pointer void advance_head(); }; -} // namespace hsnet \ No newline at end of file +} // namespace hsnet \ No newline at end of file diff --git a/src/net/ReorderingBuffer.cpp b/src/net/ReorderingBuffer.cpp old mode 100644 new mode 100755 index 521ec99..e7745e7 --- a/src/net/ReorderingBuffer.cpp +++ b/src/net/ReorderingBuffer.cpp @@ -9,7 +9,7 @@ ReorderingBuffer::ReorderingBuffer(size_t max_size) buffer_.resize(max_size); } -bool ReorderingBuffer::add(uint64_t sequence, std::vector data, uint32_t stream_id) { +bool ReorderingBuffer::add(uint64_t sequence, std::vector data) { // If sequence is too old, ignore it if (sequence < next_seq_) { return false; @@ -34,12 +34,13 @@ bool ReorderingBuffer::add(uint64_t sequence, std::vector data, uint32_ if (!buffer_[pos].valid) { count_++; } - buffer_[pos] = Packet(sequence, std::move(data), stream_id); + buffer_[pos] = Packet(sequence, std::move(data)); return true; } -std::optional, uint32_t>> ReorderingBuffer::get_next() { +std::optional ReorderingBuffer::get_next() { +// std::optional, uint32_t>> ReorderingBuffer::get_next() { if (!has_ready()) { return std::nullopt; } @@ -51,19 +52,19 @@ std::optional, uint32_t>> ReorderingBuffer::get_n // Extract data and stream_id auto data = std::move(packet.data); - auto stream_id = packet.stream_id; + // auto stream_id = packet.stream_id; // Mark slot as invalid packet.valid = false; packet.sequence = 0; - packet.stream_id = 0; + // packet.stream_id = 0; // Advance sequence number and head pointer next_seq_++; advance_head(); count_--; - return std::make_pair(std::move(data), stream_id); + return std::move(data); } bool ReorderingBuffer::has_ready() const { diff --git a/tests/test_reordering_buffer.cpp b/tests/test_reordering_buffer.cpp old mode 100644 new mode 100755 index 2388004..a83d803 --- a/tests/test_reordering_buffer.cpp +++ b/tests/test_reordering_buffer.cpp @@ -14,33 +14,34 @@ TEST(ReorderingBuffer, BasicOperations) { TEST(ReorderingBuffer, InSequenceDelivery) { hsnet::ReorderingBuffer buffer(4); - + // Add packets in sequence - std::vector data1 = {1, 2, 3}; - std::vector data2 = {4, 5, 6}; - EXPECT_TRUE(buffer.add(0, data1, 0)); - EXPECT_TRUE(buffer.add(1, data2, 1)); + hsnet::data_t data1 = {1, 2, 3}; + hsnet::data_t data2 = {4, 5, 6}; + std::cout << "Running in sequence delivery test" << std::endl; + ASSERT_TRUE(buffer.add(0, data1)); + ASSERT_TRUE(buffer.add(1, data2)); - EXPECT_EQ(buffer.size(), 2); - EXPECT_TRUE(buffer.has_ready()); + ASSERT_EQ(buffer.size(), 2); + ASSERT_TRUE(buffer.has_ready()); // Get first packet auto result1 = buffer.get_next(); - EXPECT_TRUE(result1.has_value()); + ASSERT_TRUE(result1.has_value()); auto d1 = *result1; - EXPECT_EQ(d1.first, data1); - EXPECT_EQ(buffer.next_expected(), 1); + ASSERT_EQ(d1, data1); + ASSERT_EQ(buffer.next_expected(), 1); // Get second packet auto result2 = buffer.get_next(); - EXPECT_TRUE(result2.has_value()); + ASSERT_TRUE(result2.has_value()); d1 = *result2; - EXPECT_EQ(d1.first, data2); - EXPECT_EQ(buffer.next_expected(), 2); + ASSERT_EQ(d1, data2); + ASSERT_EQ(buffer.next_expected(), 2); - EXPECT_EQ(buffer.size(), 0); - EXPECT_FALSE(buffer.has_ready()); + ASSERT_EQ(buffer.size(), 0); + ASSERT_FALSE(buffer.has_ready()); } TEST(ReorderingBuffer, OutOfOrderDelivery) { @@ -52,8 +53,8 @@ TEST(ReorderingBuffer, OutOfOrderDelivery) { // Add packets out of order: 1, 3, 2 EXPECT_TRUE(buffer.add(0, data1)); - EXPECT_TRUE(buffer.add(2, data3, 2)); - EXPECT_TRUE(buffer.add(1, data2, 3)); + EXPECT_TRUE(buffer.add(2, data3)); + EXPECT_TRUE(buffer.add(1, data2)); EXPECT_EQ(buffer.size(), 3); EXPECT_TRUE(buffer.has_ready()); @@ -62,17 +63,17 @@ TEST(ReorderingBuffer, OutOfOrderDelivery) { auto result1 = buffer.get_next(); EXPECT_TRUE(result1.has_value()); auto p = *result1; - EXPECT_EQ(p.first, data1); + EXPECT_EQ(p, data1); auto result2 = buffer.get_next(); EXPECT_TRUE(result2.has_value()); p = *result2; - EXPECT_EQ(p.first, data2); + EXPECT_EQ(p, data2); auto result3 = buffer.get_next(); EXPECT_TRUE(result3.has_value()); p = *result3; - EXPECT_EQ(p.first, data3); + EXPECT_EQ(p, data3); EXPECT_EQ(buffer.size(), 0); EXPECT_FALSE(buffer.has_ready()); @@ -134,7 +135,7 @@ TEST(ReorderingBuffer, GapsInSequence) { // Should only be able to get packet 0 auto result = buffer.get_next(); EXPECT_TRUE(result.has_value()); - EXPECT_EQ(result->first, data1); + EXPECT_EQ(result, data1); // No more packets should be ready EXPECT_FALSE(buffer.has_ready());