From 30f59484c4553d1ce0977221d74193567c06e084 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Thu, 15 Jan 2026 15:42:52 -0800 Subject: [PATCH] refactor: implement service-based strand with pooled impls - Add strand_service with fixed pool of 211 strand_impl objects - Replace shared_ptr with raw pointer to pooled impl - Add strand_queue with coroutine frame recycling via free list - Add dispatch(), post(), defer() methods and work tracking - Add comprehensive unit tests for strand and strand_queue --- bench/bench.cpp | 2 +- include/boost/capy.hpp | 2 +- .../boost/capy/ex/detail/strand_service.hpp | 78 +++ include/boost/capy/ex/strand.hpp | 317 +++++++++++ include/boost/capy/strand.hpp | 182 ------ src/ex/detail/strand_queue.hpp | 276 +++++++++ src/ex/detail/strand_service.cpp | 102 ++++ src/ex/detail/strand_service.hpp | 58 ++ test/unit/ex/strand.cpp | 522 ++++++++++++++++++ test/unit/ex/strand_queue.cpp | 345 ++++++++++++ 10 files changed, 1700 insertions(+), 184 deletions(-) create mode 100644 include/boost/capy/ex/detail/strand_service.hpp create mode 100644 include/boost/capy/ex/strand.hpp delete mode 100644 include/boost/capy/strand.hpp create mode 100644 src/ex/detail/strand_queue.hpp create mode 100644 src/ex/detail/strand_service.cpp create mode 100644 src/ex/detail/strand_service.hpp create mode 100644 test/unit/ex/strand.cpp create mode 100644 test/unit/ex/strand_queue.cpp diff --git a/bench/bench.cpp b/bench/bench.cpp index 23b9170..e676632 100644 --- a/bench/bench.cpp +++ b/bench/bench.cpp @@ -10,7 +10,7 @@ #include #include #include -#include +#include #include #include diff --git a/include/boost/capy.hpp b/include/boost/capy.hpp index 07f66ba..e892b50 100644 --- a/include/boost/capy.hpp +++ b/include/boost/capy.hpp @@ -60,7 +60,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/include/boost/capy/ex/detail/strand_service.hpp b/include/boost/capy/ex/detail/strand_service.hpp new file mode 100644 index 0000000..8b3b20d --- /dev/null +++ b/include/boost/capy/ex/detail/strand_service.hpp @@ -0,0 +1,78 @@ +// +// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +#ifndef BOOST_CAPY_EX_DETAIL_STRAND_SERVICE_HPP +#define BOOST_CAPY_EX_DETAIL_STRAND_SERVICE_HPP + +#include +#include + +#include + +namespace boost { +namespace capy { +namespace detail { + +// Forward declaration - full definition in src/ +struct strand_impl; + +//---------------------------------------------------------- + +/** Service that manages pooled strand implementations. + + This service maintains a fixed pool of strand_impl objects. + When a strand is constructed, it obtains a pointer to one + of these pooled implementations based on a hash. + + @par Thread Safety + The service operations are thread-safe. +*/ +class BOOST_CAPY_DECL strand_service + : public execution_context::service +{ + class impl; + impl* impl_; + +public: + /** Construct the strand service. + + @param ctx The owning execution context. + */ + explicit + strand_service(execution_context& ctx); + + /** Destructor. + */ + ~strand_service(); + + /** Return a pointer to a pooled implementation. + + Uses a hash to select an implementation from the pool. + The salt is incremented after each call to distribute + strands across the pool. + + @return Pointer to a strand_impl from the pool. + */ + strand_impl* + get_implementation(); + +protected: + /** Shut down the service. + + Called when the owning execution context shuts down. + */ + void + shutdown() override; +}; + +} // namespace detail +} // namespace capy +} // namespace boost + +#endif diff --git a/include/boost/capy/ex/strand.hpp b/include/boost/capy/ex/strand.hpp new file mode 100644 index 0000000..6f43caa --- /dev/null +++ b/include/boost/capy/ex/strand.hpp @@ -0,0 +1,317 @@ +// +// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +#ifndef BOOST_CAPY_EX_STRAND_HPP +#define BOOST_CAPY_EX_STRAND_HPP + +#include +#include +#include + +namespace boost { +namespace capy { + +namespace detail { + +/** Push a coroutine to the strand queue. + + @param impl The strand implementation. + @param h The coroutine handle to enqueue. + @param should_run Set to true if the caller should run the batch. +*/ +BOOST_CAPY_DECL +void +strand_enqueue( + strand_impl& impl, + any_coro h, + bool& should_run); + +/** Dispatch all pending coroutines. + + Resumes all coroutines in the pending queue. After this + function returns, the queue will be empty. + + @param impl The strand implementation. +*/ +BOOST_CAPY_DECL +void +strand_dispatch_pending(strand_impl& impl); + +/** Release the strand lock. + + Sets locked_ to false, allowing another caller to acquire + the strand. Must be called after dispatching is complete. + + @param impl The strand implementation. +*/ +BOOST_CAPY_DECL +void +strand_unlock(strand_impl& impl); + +/** Check if the strand is currently executing. + + @param impl The strand implementation. + @return true if a coroutine is running in the strand. +*/ +BOOST_CAPY_DECL +bool +strand_running_in_this_thread(strand_impl& impl) noexcept; + +} // namespace detail + +//---------------------------------------------------------- + +/** Provides serialized coroutine execution for any executor type. + + A strand wraps an inner executor and ensures that coroutines + dispatched through it never run concurrently. At most one + coroutine executes at a time within a strand, even when the + underlying executor runs on multiple threads. + + Strands are lightweight handles that can be copied freely. + Copies share the same internal serialization state, so + coroutines dispatched through any copy are serialized with + respect to all other copies. + + @par Invariant + Coroutines resumed through a strand shall not run concurrently. + + @par Implementation + The strand uses a service-based architecture with a fixed pool + of 211 implementation objects. New strands hash to select an + impl from the pool. Strands that hash to the same index share + serialization, which is harmless (just extra serialization) + and rare with 211 buckets. + + @par Executor Concept + This class satisfies the `executor` concept, providing: + - `context()` - Returns the underlying execution context + - `on_work_started()` / `on_work_finished()` - Work tracking + - `dispatch(h)` - May run immediately if strand is idle + - `post(h)` - Always queues for later execution + - `defer(h)` - Same as post (continuation hint) + + @par Thread Safety + Distinct objects: Safe. + Shared objects: Safe. + + @par Example + @code + thread_pool pool(4); + auto strand = make_strand(pool.get_executor()); + + // These coroutines will never run concurrently + strand.post(coro1); + strand.post(coro2); + strand.post(coro3); + @endcode + + @tparam Executor The type of the underlying executor. Must + satisfy the `executor` concept. + + @see make_strand, executor +*/ +template +class strand +{ + Executor ex_; + detail::strand_impl* impl_; + +public: + /** The type of the underlying executor. + */ + using inner_executor_type = Executor; + + /** Construct a strand for the specified executor. + + Obtains a strand implementation from the service associated + with the executor's context. The implementation is selected + from a fixed pool using a hash function. + + @param ex The inner executor to wrap. Coroutines will + ultimately be dispatched through this executor. + */ + explicit + strand(Executor ex) + : ex_(std::move(ex)) + , impl_(ex_.context() + .template use_service() + .get_implementation()) + { + } + + /** Copy constructor. + + Creates a strand that shares serialization state with + the original. Coroutines dispatched through either strand + will be serialized with respect to each other. + */ + strand(strand const&) = default; + + /** Move constructor. + */ + strand(strand&&) = default; + + /** Copy assignment operator. + */ + strand& operator=(strand const&) = default; + + /** Move assignment operator. + */ + strand& operator=(strand&&) = default; + + /** Return the underlying executor. + + @return A const reference to the inner executor. + */ + Executor const& + get_inner_executor() const noexcept + { + return ex_; + } + + /** Return the underlying execution context. + + @return A reference to the execution context associated + with the inner executor. + */ + auto& + context() const noexcept + { + return ex_.context(); + } + + /** Notify that work has started. + + Delegates to the inner executor's `on_work_started()`. + This is a no-op for most executor types. + */ + void + on_work_started() const noexcept + { + ex_.on_work_started(); + } + + /** Notify that work has finished. + + Delegates to the inner executor's `on_work_finished()`. + This is a no-op for most executor types. + */ + void + on_work_finished() const noexcept + { + ex_.on_work_finished(); + } + + /** Determine whether the strand is running in the current thread. + + @return true if a coroutine is currently executing within + this strand's serialization context. + + @note This is an approximation based on the strand's lock + state rather than true thread-local tracking. + */ + bool + running_in_this_thread() const noexcept + { + return detail::strand_running_in_this_thread(*impl_); + } + + /** Compare two strands for equality. + + Two strands are equal if they share the same internal + serialization state. Equal strands serialize coroutines + with respect to each other. + + @param other The strand to compare against. + @return true if both strands share the same implementation. + */ + bool + operator==(strand const& other) const noexcept + { + return impl_ == other.impl_; + } + + /** Dispatch a coroutine through the strand. + + If no coroutine is currently running in the strand, the + coroutine is executed immediately along with any other + pending coroutines. Otherwise, it is queued for later + execution when the current holder releases the strand. + + @param h The coroutine handle to dispatch. + @return A coroutine handle for symmetric transfer. Returns + `noop_coroutine()` if the work was queued. + */ + any_coro + dispatch(any_coro h) const + { + bool should_run = false; + detail::strand_enqueue(*impl_, h, should_run); + if(should_run) + { + detail::strand_dispatch_pending(*impl_); + detail::strand_unlock(*impl_); + } + return std::noop_coroutine(); + } + + /** Post a coroutine to the strand. + + The coroutine is queued for execution. If this is the first + work item queued (strand was idle), all pending coroutines + are dispatched immediately on the current thread. + + @param h The coroutine handle to post. + */ + void + post(any_coro h) const + { + bool should_run = false; + detail::strand_enqueue(*impl_, h, should_run); + if(should_run) + { + detail::strand_dispatch_pending(*impl_); + detail::strand_unlock(*impl_); + } + } + + /** Defer a coroutine to the strand. + + Equivalent to `post()`. The defer hint indicates that the + coroutine is a continuation of the current execution context, + but strands treat this the same as post. + + @param h The coroutine handle to defer. + */ + void + defer(any_coro h) const + { + post(h); + } + + /** Dispatch a coroutine through the strand. + + This operator provides a dispatcher-style interface for + use with symmetric transfer. Equivalent to `dispatch()`. + + @param h The coroutine handle to dispatch. + @return A coroutine handle for symmetric transfer. + */ + any_coro + operator()(any_coro h) const + { + return dispatch(h); + } +}; + +} // namespace capy +} // namespace boost + +#endif diff --git a/include/boost/capy/strand.hpp b/include/boost/capy/strand.hpp deleted file mode 100644 index d6916dc..0000000 --- a/include/boost/capy/strand.hpp +++ /dev/null @@ -1,182 +0,0 @@ -// -// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com) -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -// Official repository: https://github.com/cppalliance/capy -// - -#ifndef BOOST_CAPY_STRAND_HPP -#define BOOST_CAPY_STRAND_HPP - -#include -#include -#include - -#include -#include - -namespace boost { -namespace capy { - -namespace detail { - -/** Node type for strand's intrusive queue. -*/ -struct strand_node : intrusive_queue::node -{ - any_coro h_; - - explicit strand_node(any_coro h) noexcept - : h_(h) - { - } -}; - -/** Shared state for strand serialization. -*/ -struct strand_impl -{ - std::mutex mutex_; - intrusive_queue pending_; - bool locked_ = false; -}; - -} // namespace detail - -/** Provides serialized coroutine dispatch for any executor type. - - A strand wraps an inner executor and ensures that coroutines - dispatched through it are serialized - at most one runs at a time. - This is a simplified implementation for benchmarking purposes. - - The strand is copyable; all copies share the same serialization - state via a shared pointer. - - @tparam Executor The type of the underlying executor. -*/ -template -class strand -{ - Executor ex_; - std::shared_ptr impl_; - -public: - /** The type of the underlying executor. - */ - using inner_executor_type = Executor; - - /** Construct a strand for the specified executor. - - @param ex The inner executor to wrap. - */ - explicit strand(Executor ex) - : ex_(std::move(ex)) - , impl_(std::make_shared()) - { - } - - /** Obtain the underlying executor. - */ - Executor const& - get_inner_executor() const noexcept - { - return ex_; - } - - /** Obtain the underlying execution context. - */ - decltype(auto) - context() const noexcept - { - return ex_.context(); - } - - /** Determine whether the strand is running in the current thread. - - @return @c true if the strand is currently locked. - */ - bool - running_in_this_thread() const noexcept - { - std::lock_guard lock(impl_->mutex_); - return impl_->locked_; - } - - /** Compare two strands for equality. - - Two strands are equal if they share the same internal state. - */ - bool - operator==(strand const& other) const noexcept - { - return impl_ == other.impl_; - } - - /** Dispatch a coroutine through the strand. - - This function serializes coroutine execution. If no other - coroutine is currently running on the strand, the coroutine - is dispatched immediately through the inner executor. - Otherwise, it is queued for later execution. - - @param h The coroutine handle to dispatch. - - @return A coroutine handle for symmetric transfer. - */ - any_coro - operator()(any_coro h) const - { - auto* node = new detail::strand_node(h); - - std::lock_guard lock(impl_->mutex_); - impl_->pending_.push(node); - - if(impl_->locked_) - return std::noop_coroutine(); - - impl_->locked_ = true; - return run_pending(); - } - -private: - /** Run pending coroutines. - - Must be called with mutex held and locked_ == true. - */ - any_coro - run_pending() const - { - auto* node = impl_->pending_.pop(); - if(!node) - { - impl_->locked_ = false; - return std::noop_coroutine(); - } - - any_coro h = node->h_; - delete node; - - // Dispatch through inner executor - return ex_(h); - } -}; - -/** Create a strand for an executor. - - @param ex An executor. - - @returns A strand constructed with the specified executor. -*/ -template -strand -make_strand(Executor const& ex) -{ - return strand(ex); -} - -} // namespace capy -} // namespace boost - -#endif diff --git a/src/ex/detail/strand_queue.hpp b/src/ex/detail/strand_queue.hpp new file mode 100644 index 0000000..9714259 --- /dev/null +++ b/src/ex/detail/strand_queue.hpp @@ -0,0 +1,276 @@ +// +// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +#ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP +#define BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP + +#include + +#include +#include +#include + +namespace boost { +namespace capy { +namespace detail { + +class strand_queue; + +//---------------------------------------------------------- + +// Metadata stored before the coroutine frame +struct frame_prefix +{ + frame_prefix* next; + strand_queue* queue; + std::size_t alloc_size; +}; + +//---------------------------------------------------------- + +/** Wrapper coroutine for strand queue dispatch operations. + + This coroutine wraps a target coroutine handle and resumes + it when dispatched. The wrapper ensures control returns to + the dispatch loop after the target suspends or completes. + + The promise contains an intrusive list node for queue + storage and supports a custom allocator that recycles + coroutine frames via a free list. +*/ +struct strand_op +{ + struct promise_type + { + promise_type* next = nullptr; + + void* + operator new( + std::size_t size, + strand_queue& q, + std::coroutine_handle); + + void + operator delete(void* p, std::size_t); + + strand_op + get_return_object() noexcept + { + return {std::coroutine_handle::from_promise(*this)}; + } + + std::suspend_always + initial_suspend() noexcept + { + return {}; + } + + std::suspend_always + final_suspend() noexcept + { + return {}; + } + + void + return_void() noexcept + { + } + + void + unhandled_exception() + { + std::terminate(); + } + }; + + std::coroutine_handle h_; +}; + +//---------------------------------------------------------- + +/** Single-threaded dispatch queue for coroutine handles. + + This queue stores coroutine handles and resumes them + sequentially when dispatch() is called. Each pushed + handle is wrapped in a strand_op coroutine that ensures + control returns to the dispatch loop after the target + suspends or completes. + + The queue uses an intrusive singly-linked list through + the promise type to avoid separate node allocations. + A free list recycles wrapper coroutine frames to reduce + allocation overhead during repeated push/dispatch cycles. + + @par Thread Safety + This class is not thread-safe. All operations must be + called from a single thread. +*/ +class strand_queue +{ + using promise_type = strand_op::promise_type; + + promise_type* head_ = nullptr; + promise_type* tail_ = nullptr; + frame_prefix* free_list_ = nullptr; + + friend struct strand_op::promise_type; + + static + strand_op + make_strand_op( + strand_queue& q, + std::coroutine_handle target) + { + target.resume(); + co_return; + } + +public: + strand_queue() = default; + + strand_queue(strand_queue const&) = delete; + strand_queue& operator=(strand_queue const&) = delete; + + /** Destructor. + + Destroys any pending wrappers without resuming them, + then frees all memory in the free list. + */ + ~strand_queue() + { + // Destroy pending wrappers + while(head_) + { + promise_type* p = head_; + head_ = p->next; + + auto h = std::coroutine_handle::from_promise(*p); + h.destroy(); + } + + // Free the free list memory + while(free_list_) + { + frame_prefix* prefix = free_list_; + free_list_ = prefix->next; + ::operator delete(prefix, prefix->alloc_size); + } + } + + /** Returns true if there are no pending operations. + */ + bool + empty() const noexcept + { + return head_ == nullptr; + } + + /** Push a coroutine handle to the queue. + + Creates a wrapper coroutine and appends it to the + queue. The wrapper will resume the target handle + when dispatch() processes it. + + @param h The coroutine handle to dispatch. + */ + void + push(std::coroutine_handle h) + { + strand_op op = make_strand_op(*this, h); + + promise_type* p = &op.h_.promise(); + p->next = nullptr; + + if(tail_) + tail_->next = p; + else + head_ = p; + tail_ = p; + } + + /** Resume all queued coroutines in sequence. + + Processes each wrapper in FIFO order, resuming its + target coroutine. After each target suspends or + completes, the wrapper is destroyed and its frame + is added to the free list for reuse. + + Coroutines resumed during dispatch may push new + handles, which will also be processed in the same + dispatch call. + */ + void + dispatch() + { + while(head_) + { + promise_type* p = head_; + head_ = p->next; + if(!head_) + tail_ = nullptr; + + auto h = std::coroutine_handle::from_promise(*p); + h.resume(); + h.destroy(); + } + } +}; + +//---------------------------------------------------------- + +inline +void* +strand_op::promise_type::operator new( + std::size_t size, + strand_queue& q, + std::coroutine_handle) +{ + // Total size includes prefix + std::size_t alloc_size = size + sizeof(frame_prefix); + void* raw; + + // Try to reuse from free list + if(q.free_list_) + { + frame_prefix* prefix = q.free_list_; + q.free_list_ = prefix->next; + raw = prefix; + } + else + { + raw = ::operator new(alloc_size); + } + + // Initialize prefix + frame_prefix* prefix = static_cast(raw); + prefix->next = nullptr; + prefix->queue = &q; + prefix->alloc_size = alloc_size; + + // Return pointer AFTER the prefix (this is where coroutine frame goes) + return prefix + 1; +} + +inline +void +strand_op::promise_type::operator delete(void* p, std::size_t) +{ + // Calculate back to get the prefix + frame_prefix* prefix = static_cast(p) - 1; + + // Add to free list + prefix->next = prefix->queue->free_list_; + prefix->queue->free_list_ = prefix; +} + +} // namespace detail +} // namespace capy +} // namespace boost + +#endif diff --git a/src/ex/detail/strand_service.cpp b/src/ex/detail/strand_service.cpp new file mode 100644 index 0000000..7976e8a --- /dev/null +++ b/src/ex/detail/strand_service.cpp @@ -0,0 +1,102 @@ +// +// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +#include "strand_service.hpp" +#include + +namespace boost { +namespace capy { +namespace detail { + +strand_service:: +strand_service(execution_context& ctx) + : service() + , impl_(new impl) +{ + (void)ctx; +} + +strand_service:: +~strand_service() +{ + delete impl_; +} + +strand_impl* +strand_service:: +get_implementation() +{ + std::lock_guard lock(impl_->mutex_); + + // Hash the salt to select an impl from the pool + std::size_t index = impl_->salt_++; + index = index % impl::num_impls; + + return &impl_->impls_[index]; +} + +void +strand_service:: +shutdown() +{ + // Clear pending operations from all impls + for(std::size_t i = 0; i < impl::num_impls; ++i) + { + std::lock_guard lock(impl_->impls_[i].mutex_); + // Mark as locked to prevent new work + impl_->impls_[i].locked_ = true; + } +} + +//---------------------------------------------------------- + +void +strand_enqueue( + strand_impl& impl, + any_coro h, + bool& should_run) +{ + std::lock_guard lock(impl.mutex_); + + impl.pending_.push(h); + + if(!impl.locked_) + { + impl.locked_ = true; + should_run = true; + } + else + { + should_run = false; + } +} + +void +strand_dispatch_pending(strand_impl& impl) +{ + impl.pending_.dispatch(); +} + +void +strand_unlock(strand_impl& impl) +{ + std::lock_guard lock(impl.mutex_); + impl.locked_ = false; +} + +bool +strand_running_in_this_thread(strand_impl& impl) noexcept +{ + std::lock_guard lock(impl.mutex_); + return impl.locked_; +} + +} // namespace detail +} // namespace capy +} // namespace boost diff --git a/src/ex/detail/strand_service.hpp b/src/ex/detail/strand_service.hpp new file mode 100644 index 0000000..b63bce1 --- /dev/null +++ b/src/ex/detail/strand_service.hpp @@ -0,0 +1,58 @@ +// +// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +#ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_SERVICE_HPP +#define BOOST_CAPY_SRC_EX_DETAIL_STRAND_SERVICE_HPP + +// Private header - implementation details + +#include "strand_queue.hpp" +#include + +#include + +namespace boost { +namespace capy { +namespace detail { + +//---------------------------------------------------------- + +/** Implementation state for a strand. + + Each strand_impl provides serialization for coroutines + dispatched through strands that share it. +*/ +struct strand_impl +{ + std::mutex mutex_; + strand_queue pending_; + bool locked_ = false; +}; + +//---------------------------------------------------------- + +/** Internal implementation of strand_service. + + Holds the fixed pool of strand_impl objects. +*/ +class strand_service::impl +{ +public: + static constexpr std::size_t num_impls = 211; + + strand_impl impls_[num_impls]; + std::size_t salt_ = 0; + std::mutex mutex_; +}; + +} // namespace detail +} // namespace capy +} // namespace boost + +#endif diff --git a/test/unit/ex/strand.cpp b/test/unit/ex/strand.cpp new file mode 100644 index 0000000..8970b3c --- /dev/null +++ b/test/unit/ex/strand.cpp @@ -0,0 +1,522 @@ +// +// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +// Test that header file is self-contained. +// Test that header file is self-contained. +#include + +#include +#include +#include + +#include "test_suite.hpp" + +#include +#include +#include +#include + +namespace boost { +namespace capy { + +namespace { + +// Verify executor concept at compile time +static_assert(executor>, + "strand must satisfy executor concept"); + +// Helper to wait for a condition with timeout +template +bool wait_for(Pred pred, std::chrono::milliseconds timeout = std::chrono::milliseconds(5000)) +{ + auto start = std::chrono::steady_clock::now(); + while(!pred()) + { + if(std::chrono::steady_clock::now() - start > timeout) + return false; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + return true; +} + +// Simple test coroutine that increments a counter +struct counter_coro +{ + struct promise_type + { + std::atomic* counter; + + counter_coro + get_return_object() noexcept + { + return counter_coro{std::coroutine_handle::from_promise(*this)}; + } + + std::suspend_always + initial_suspend() noexcept + { + return {}; + } + + std::suspend_never + final_suspend() noexcept + { + return {}; + } + + void + return_void() noexcept + { + } + + void + unhandled_exception() + { + std::terminate(); + } + }; + + std::coroutine_handle h_; + + ~counter_coro() + { + if(h_) + h_.destroy(); + } + + counter_coro(counter_coro&& other) noexcept + : h_(other.h_) + { + other.h_ = nullptr; + } + + counter_coro& operator=(counter_coro&& other) noexcept + { + if(h_) + h_.destroy(); + h_ = other.h_; + other.h_ = nullptr; + return *this; + } + + std::coroutine_handle + handle() const noexcept + { + return h_; + } + + void + release() noexcept + { + h_ = nullptr; + } + +private: + explicit counter_coro(std::coroutine_handle h) + : h_(h) + { + } +}; + +// Creates a coroutine that increments counter +inline counter_coro +make_counter_coro(std::atomic& counter) +{ + return [](std::atomic* counter) -> counter_coro { + ++(*counter); + co_return; + }(&counter); +} + +// Coroutine that records order of execution +struct order_coro +{ + struct promise_type + { + std::vector* log; + std::mutex* log_mutex; + int id; + + order_coro + get_return_object() noexcept + { + return order_coro{std::coroutine_handle::from_promise(*this)}; + } + + std::suspend_always + initial_suspend() noexcept + { + return {}; + } + + std::suspend_never + final_suspend() noexcept + { + return {}; + } + + void + return_void() noexcept + { + } + + void + unhandled_exception() + { + std::terminate(); + } + }; + + std::coroutine_handle h_; + + ~order_coro() + { + if(h_) + h_.destroy(); + } + + order_coro(order_coro&& other) noexcept + : h_(other.h_) + { + other.h_ = nullptr; + } + + order_coro& operator=(order_coro&& other) noexcept + { + if(h_) + h_.destroy(); + h_ = other.h_; + other.h_ = nullptr; + return *this; + } + + std::coroutine_handle + handle() const noexcept + { + return h_; + } + + void + release() noexcept + { + h_ = nullptr; + } + +private: + explicit order_coro(std::coroutine_handle h) + : h_(h) + { + } +}; + +// Creates a coroutine that logs its id to a vector +inline order_coro +make_order_coro(std::vector& log, std::mutex& log_mutex, int id) +{ + return [](std::vector* log, std::mutex* log_mutex, int id) -> order_coro { + std::lock_guard lock(*log_mutex); + log->push_back(id); + co_return; + }(&log, &log_mutex, id); +} + +} // namespace + +struct strand_test +{ + void + testConstruct() + { + // Construct from executor + { + thread_pool pool(1); + strand s(pool.get_executor()); + (void)s; + } + + // Using implicit guide + { + thread_pool pool(1); + auto s = strand(pool.get_executor()); + (void)s; + } + } + + void + testCopy() + { + thread_pool pool(1); + auto s1 = strand(pool.get_executor()); + + // Copy construction + auto s2 = s1; + + // Copies should be equal (share same impl) + BOOST_TEST(s1 == s2); + + // Copy assignment + auto s3 = strand(pool.get_executor()); + s3 = s1; + BOOST_TEST(s1 == s3); + } + + void + testMove() + { + thread_pool pool(1); + auto s1 = strand(pool.get_executor()); + + // Move construction + auto s2 = std::move(s1); + (void)s2; + + // Move assignment + auto s3 = strand(pool.get_executor()); + auto s4 = strand(pool.get_executor()); + s4 = std::move(s3); + (void)s4; + } + + void + testGetInnerExecutor() + { + thread_pool pool(1); + auto ex = pool.get_executor(); + strand s(ex); + + // get_inner_executor returns the wrapped executor + BOOST_TEST(s.get_inner_executor() == ex); + } + + void + testContext() + { + thread_pool pool(1); + auto s = strand(pool.get_executor()); + + // context() returns the pool + BOOST_TEST_EQ(&s.context(), &pool); + } + + void + testWorkTracking() + { + thread_pool pool(1); + auto s = strand(pool.get_executor()); + + // Work tracking should not throw + s.on_work_started(); + s.on_work_started(); + s.on_work_finished(); + s.on_work_finished(); + } + + void + testEquality() + { + thread_pool pool(1); + + auto s1 = strand(pool.get_executor()); + auto s2 = s1; // Copy shares impl + + // Copies are equal + BOOST_TEST(s1 == s2); + + // Different strands may or may not be equal + // (depends on hash collision) + auto s3 = strand(pool.get_executor()); + auto s4 = strand(pool.get_executor()); + // We don't test s3 == s4 since it's hash-dependent + (void)s3; + (void)s4; + } + + void + testDispatch() + { + thread_pool pool(1); + auto s = strand(pool.get_executor()); + + std::atomic counter{0}; + + auto coro = make_counter_coro(counter); + auto result = s.dispatch(coro.handle()); + coro.release(); + + // dispatch returns noop_coroutine (work was queued/run) + BOOST_TEST(result == std::noop_coroutine()); + + // Wait for work to complete + BOOST_TEST(wait_for([&]{ return counter.load() >= 1; })); + BOOST_TEST_EQ(counter.load(), 1); + } + + void + testPost() + { + thread_pool pool(1); + auto s = strand(pool.get_executor()); + + std::atomic counter{0}; + + auto coro = make_counter_coro(counter); + s.post(coro.handle()); + coro.release(); + + // Wait for work to complete + BOOST_TEST(wait_for([&]{ return counter.load() >= 1; })); + BOOST_TEST_EQ(counter.load(), 1); + } + + void + testDefer() + { + thread_pool pool(1); + auto s = strand(pool.get_executor()); + + std::atomic counter{0}; + + auto coro = make_counter_coro(counter); + s.defer(coro.handle()); + coro.release(); + + // Wait for work to complete + BOOST_TEST(wait_for([&]{ return counter.load() >= 1; })); + BOOST_TEST_EQ(counter.load(), 1); + } + + void + testOperatorCall() + { + thread_pool pool(1); + auto s = strand(pool.get_executor()); + + std::atomic counter{0}; + + auto coro = make_counter_coro(counter); + auto result = s(coro.handle()); + coro.release(); + + // operator() returns noop_coroutine + BOOST_TEST(result == std::noop_coroutine()); + + // Wait for work to complete + BOOST_TEST(wait_for([&]{ return counter.load() >= 1; })); + } + + void + testMultipleWork() + { + thread_pool pool(2); + auto s = strand(pool.get_executor()); + + std::atomic counter{0}; + constexpr int N = 100; + + std::vector coros; + coros.reserve(N); + + for(int i = 0; i < N; ++i) + { + coros.push_back(make_counter_coro(counter)); + s.post(coros.back().handle()); + coros.back().release(); + } + + // Wait for all work to complete + BOOST_TEST(wait_for([&]{ return counter.load() >= N; })); + BOOST_TEST_EQ(counter.load(), N); + } + + void + testConcurrentPost() + { + thread_pool pool(4); + auto s = strand(pool.get_executor()); + + std::atomic counter{0}; + constexpr int num_threads = 4; + constexpr int per_thread = 25; + + std::vector threads; + threads.reserve(num_threads); + + for(int i = 0; i < num_threads; ++i) + { + threads.emplace_back([&s, &counter]{ + for(int j = 0; j < per_thread; ++j) + { + auto coro = make_counter_coro(counter); + s.post(coro.handle()); + coro.release(); + } + }); + } + + for(auto& t : threads) + t.join(); + + // Wait for all work to complete + BOOST_TEST(wait_for([&]{ return counter.load() >= num_threads * per_thread; })); + BOOST_TEST_EQ(counter.load(), num_threads * per_thread); + } + + void + testServiceCreation() + { + // Strand should create strand_service on first use + thread_pool pool(1); + + // Initially no strand_service + BOOST_TEST(!pool.has_service()); + + // Creating a strand should create the service + auto s = strand(pool.get_executor()); + BOOST_TEST(pool.has_service()); + } + + void + testRunningInThisThread() + { + thread_pool pool(1); + auto s = strand(pool.get_executor()); + + // Initially not running + // Note: This is an approximation based on lock state + bool running = s.running_in_this_thread(); + (void)running; // Value depends on timing + } + + void + run() + { + testConstruct(); + testCopy(); + testMove(); + testGetInnerExecutor(); + testContext(); + testWorkTracking(); + testEquality(); + testDispatch(); + testPost(); + testDefer(); + testOperatorCall(); + testMultipleWork(); + testConcurrentPost(); + testServiceCreation(); + testRunningInThisThread(); + } +}; + +TEST_SUITE( + strand_test, + "boost.capy.strand"); + +} // capy +} // boost diff --git a/test/unit/ex/strand_queue.cpp b/test/unit/ex/strand_queue.cpp new file mode 100644 index 0000000..ec66198 --- /dev/null +++ b/test/unit/ex/strand_queue.cpp @@ -0,0 +1,345 @@ +// +// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +// Test the strand_queue implementation detail. +// This header is in src/ as it's not part of the public API. +#include "../../../src/ex/detail/strand_queue.hpp" + +#include "test_suite.hpp" + +#include +#include + +namespace boost { +namespace capy { +namespace detail { + +// Simple test coroutine that records when it runs +struct test_coro +{ + struct promise_type + { + int* counter; + std::vector* log; + int id; + + test_coro + get_return_object() noexcept + { + return test_coro{std::coroutine_handle::from_promise(*this)}; + } + + std::suspend_always + initial_suspend() noexcept + { + return {}; + } + + std::suspend_never + final_suspend() noexcept + { + return {}; + } + + void + return_void() noexcept + { + } + + void + unhandled_exception() + { + std::terminate(); + } + }; + + std::coroutine_handle h_; + + ~test_coro() + { + if(h_) + h_.destroy(); + } + + test_coro(test_coro&& other) noexcept + : h_(other.h_) + { + other.h_ = nullptr; + } + + test_coro& operator=(test_coro&& other) noexcept + { + if(h_) + h_.destroy(); + h_ = other.h_; + other.h_ = nullptr; + return *this; + } + + std::coroutine_handle + handle() const noexcept + { + return h_; + } + + void + release() noexcept + { + h_ = nullptr; + } + +private: + explicit test_coro(std::coroutine_handle h) + : h_(h) + { + } +}; + +// Creates a coroutine that increments counter and logs its id +inline test_coro +make_test_coro(int& counter, std::vector& log, int id) +{ + // Access promise to store our tracking data + auto result = []() -> test_coro { co_return; }(); + result.h_.promise().counter = &counter; + result.h_.promise().log = &log; + result.h_.promise().id = id; + + // Return a new coroutine that does the actual work + return [](int* counter, std::vector* log, int id) -> test_coro { + ++(*counter); + log->push_back(id); + co_return; + }(&counter, &log, id); +} + +// Coroutine that pushes another coroutine during dispatch +inline test_coro +make_pusher_coro( + strand_queue& q, + int& counter, + std::vector& log, + int id, + std::coroutine_handle to_push) +{ + return [](strand_queue* q, int* counter, std::vector* log, int id, + std::coroutine_handle to_push) -> test_coro { + ++(*counter); + log->push_back(id); + if(to_push) + q->push(to_push); + co_return; + }(&q, &counter, &log, id, to_push); +} + +struct strand_queue_test +{ + void + testEmpty() + { + // Default constructed queue is empty + strand_queue q; + BOOST_TEST(q.empty()); + } + + void + testSinglePushDispatch() + { + // Push and dispatch single coroutine + strand_queue q; + int counter = 0; + std::vector log; + + auto coro = make_test_coro(counter, log, 1); + q.push(coro.handle()); + coro.release(); + + BOOST_TEST(!q.empty()); + q.dispatch(); + BOOST_TEST(q.empty()); + BOOST_TEST_EQ(counter, 1); + BOOST_TEST_EQ(log.size(), 1u); + BOOST_TEST_EQ(log[0], 1); + } + + void + testFIFOOrder() + { + // Multiple coroutines dispatch in FIFO order + strand_queue q; + int counter = 0; + std::vector log; + + auto c1 = make_test_coro(counter, log, 1); + auto c2 = make_test_coro(counter, log, 2); + auto c3 = make_test_coro(counter, log, 3); + + q.push(c1.handle()); + q.push(c2.handle()); + q.push(c3.handle()); + c1.release(); + c2.release(); + c3.release(); + + BOOST_TEST(!q.empty()); + q.dispatch(); + BOOST_TEST(q.empty()); + + BOOST_TEST_EQ(counter, 3); + BOOST_TEST_EQ(log.size(), 3u); + BOOST_TEST_EQ(log[0], 1); + BOOST_TEST_EQ(log[1], 2); + BOOST_TEST_EQ(log[2], 3); + } + + void + testPushDuringDispatch() + { + // Coroutine can push new work during dispatch + strand_queue q; + int counter = 0; + std::vector log; + + // c2 will be pushed by c1 during dispatch + auto c2 = make_test_coro(counter, log, 2); + auto c1 = make_pusher_coro(q, counter, log, 1, c2.handle()); + c2.release(); + + q.push(c1.handle()); + c1.release(); + + q.dispatch(); + BOOST_TEST(q.empty()); + + // Both should have run, c1 first then c2 + BOOST_TEST_EQ(counter, 2); + BOOST_TEST_EQ(log.size(), 2u); + BOOST_TEST_EQ(log[0], 1); + BOOST_TEST_EQ(log[1], 2); + } + + void + testDispatchOnEmpty() + { + // Dispatch on empty queue is a no-op + strand_queue q; + BOOST_TEST(q.empty()); + q.dispatch(); // Should not crash + BOOST_TEST(q.empty()); + } + + void + testMultipleDispatchCycles() + { + // Multiple push/dispatch cycles reuse free list + strand_queue q; + int counter = 0; + std::vector log; + + // First cycle + { + auto c1 = make_test_coro(counter, log, 1); + auto c2 = make_test_coro(counter, log, 2); + q.push(c1.handle()); + q.push(c2.handle()); + c1.release(); + c2.release(); + q.dispatch(); + } + + BOOST_TEST_EQ(counter, 2); + BOOST_TEST(q.empty()); + + // Second cycle - should reuse frames from free list + { + auto c3 = make_test_coro(counter, log, 3); + auto c4 = make_test_coro(counter, log, 4); + q.push(c3.handle()); + q.push(c4.handle()); + c3.release(); + c4.release(); + q.dispatch(); + } + + BOOST_TEST_EQ(counter, 4); + BOOST_TEST(q.empty()); + BOOST_TEST_EQ(log.size(), 4u); + } + + void + testDestructorCleansPending() + { + // Destructor cleans up pending work without resuming + int counter = 0; + std::vector log; + + { + strand_queue q; + auto c1 = make_test_coro(counter, log, 1); + auto c2 = make_test_coro(counter, log, 2); + q.push(c1.handle()); + q.push(c2.handle()); + // Don't release - let test_coro destructors clean up + // since dispatch() is never called + // q destroyed here without dispatch + } + + // Coroutines should not have run + BOOST_TEST_EQ(counter, 0); + BOOST_TEST(log.empty()); + } + + void + testManyOperations() + { + // Stress test with many operations + strand_queue q; + int counter = 0; + std::vector log; + constexpr int N = 100; + + std::vector coros; + coros.reserve(N); + + for(int i = 0; i < N; ++i) + { + coros.push_back(make_test_coro(counter, log, i)); + q.push(coros.back().handle()); + coros.back().release(); + } + + q.dispatch(); + + BOOST_TEST_EQ(counter, N); + BOOST_TEST_EQ(log.size(), static_cast(N)); + for(int i = 0; i < N; ++i) + BOOST_TEST_EQ(log[i], i); + } + + void + run() + { + testEmpty(); + testSinglePushDispatch(); + testFIFOOrder(); + testPushDuringDispatch(); + testDispatchOnEmpty(); + testMultipleDispatchCycles(); + testDestructorCleansPending(); + testManyOperations(); + } +}; + +TEST_SUITE( + strand_queue_test, + "boost.capy.ex.detail.strand_queue"); + +} // namespace detail +} // namespace capy +} // namespace boost