From d9f11bdc277fe18c75d50a769341417c21e22393 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Wed, 14 Jan 2026 11:49:02 -0800 Subject: [PATCH] feat: implement cancellation for read and write socket operations - Add do_cancel() implementation for read_op using CancelIoEx - Add do_cancel() implementation for write_op using CancelIoEx - Add win_socket_impl reference to read_op and write_op structs - Initialize read and write operations with socket impl in constructor --- CMakeLists.txt | 8 +- include/boost/corosio/detail/scheduler.hpp | 5 +- src/corosio/src/detail/posix_op.hpp | 7 +- src/corosio/src/detail/posix_scheduler.cpp | 6 +- src/corosio/src/detail/posix_scheduler.hpp | 7 +- src/corosio/src/detail/scheduler_op.hpp | 227 ++++++++++++++++++ src/corosio/src/detail/win_iocp_scheduler.cpp | 20 +- src/corosio/src/detail/win_iocp_scheduler.hpp | 6 +- src/corosio/src/detail/win_iocp_sockets.hpp | 14 +- src/corosio/src/detail/win_overlapped_op.hpp | 7 +- src/corosio/src/socket.cpp | 10 +- test/unit/CMakeLists.txt | 4 +- 12 files changed, 275 insertions(+), 46 deletions(-) create mode 100644 src/corosio/src/detail/scheduler_op.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 44cafa0..26c17c1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -64,6 +64,11 @@ foreach (BOOST_COROSIO_DEPENDENCY ${BOOST_COROSIO_DEPENDENCIES}) endif () endforeach () +# Include asio and filesystem which are needed by capy's tests +if (BOOST_COROSIO_BUILD_TESTS) + list(APPEND BOOST_COROSIO_INCLUDE_LIBRARIES asio filesystem) +endif () + # Complete dependency list set(BOOST_INCLUDE_LIBRARIES ${BOOST_COROSIO_INCLUDE_LIBRARIES}) set(BOOST_EXCLUDE_LIBRARIES corosio) @@ -141,8 +146,7 @@ function(boost_corosio_setup_properties target) target_compile_features(${target} PUBLIC cxx_std_20) target_include_directories(${target} PUBLIC "${PROJECT_SOURCE_DIR}/include") target_include_directories(${target} PRIVATE - "${PROJECT_SOURCE_DIR}/src/corosio" - "${PROJECT_SOURCE_DIR}/src/corosio/src") + "${PROJECT_SOURCE_DIR}/src/corosio") target_link_libraries(${target} PUBLIC ${BOOST_COROSIO_DEPENDENCIES} diff --git a/include/boost/corosio/detail/scheduler.hpp b/include/boost/corosio/detail/scheduler.hpp index b510a7b..6f8d896 100644 --- a/include/boost/corosio/detail/scheduler.hpp +++ b/include/boost/corosio/detail/scheduler.hpp @@ -12,7 +12,6 @@ #include #include -#include #include @@ -20,11 +19,13 @@ namespace boost { namespace corosio { namespace detail { +class scheduler_op; + struct scheduler { virtual ~scheduler() = default; virtual void post(capy::any_coro) const = 0; - virtual void post(capy::execution_context::handler*) const = 0; + virtual void post(scheduler_op*) const = 0; virtual void on_work_started() noexcept = 0; virtual void on_work_finished() noexcept = 0; virtual bool running_in_this_thread() const noexcept = 0; diff --git a/src/corosio/src/detail/posix_op.hpp b/src/corosio/src/detail/posix_op.hpp index 74ec380..32ff748 100644 --- a/src/corosio/src/detail/posix_op.hpp +++ b/src/corosio/src/detail/posix_op.hpp @@ -16,9 +16,10 @@ #include #include #include -#include #include +#include "src/detail/scheduler_op.hpp" + #include #include @@ -42,7 +43,7 @@ namespace detail { It stores the coroutine handle, dispatcher, and result pointers needed to complete an async operation. */ -struct posix_op : capy::execution_context::handler +struct posix_op : scheduler_op { struct canceller { @@ -136,7 +137,7 @@ struct posix_op : capy::execution_context::handler }; inline posix_op* -get_posix_op(capy::execution_context::handler* h) noexcept +get_posix_op(scheduler_op* h) noexcept { return static_cast(h->data()); } diff --git a/src/corosio/src/detail/posix_scheduler.cpp b/src/corosio/src/detail/posix_scheduler.cpp index 12d5e7d..fd10a8f 100644 --- a/src/corosio/src/detail/posix_scheduler.cpp +++ b/src/corosio/src/detail/posix_scheduler.cpp @@ -135,7 +135,7 @@ posix_scheduler:: post(capy::any_coro h) const { struct post_handler final - : capy::execution_context::handler + : scheduler_op { capy::any_coro h_; @@ -172,7 +172,7 @@ post(capy::any_coro h) const void posix_scheduler:: -post(capy::execution_context::handler* h) const +post(scheduler_op* h) const { outstanding_work_.fetch_add(1, std::memory_order_relaxed); @@ -411,7 +411,7 @@ do_one(long timeout_us) return 0; // First check if there are handlers in the queue - capy::execution_context::handler* h = nullptr; + scheduler_op* h = nullptr; { std::lock_guard lock(mutex_); h = completed_ops_.pop(); diff --git a/src/corosio/src/detail/posix_scheduler.hpp b/src/corosio/src/detail/posix_scheduler.hpp index a14e379..388f9f0 100644 --- a/src/corosio/src/detail/posix_scheduler.hpp +++ b/src/corosio/src/detail/posix_scheduler.hpp @@ -13,7 +13,8 @@ #include #include #include -#include + +#include "src/detail/scheduler_op.hpp" #include #include @@ -25,8 +26,6 @@ namespace boost { namespace corosio { namespace detail { -using op_queue = capy::intrusive_queue; - // Forward declaration struct posix_op; @@ -68,7 +67,7 @@ class posix_scheduler void shutdown() override; void post(capy::any_coro h) const override; - void post(capy::execution_context::handler* h) const override; + void post(scheduler_op* h) const override; void on_work_started() noexcept override; void on_work_finished() noexcept override; bool running_in_this_thread() const noexcept override; diff --git a/src/corosio/src/detail/scheduler_op.hpp b/src/corosio/src/detail/scheduler_op.hpp new file mode 100644 index 0000000..801a409 --- /dev/null +++ b/src/corosio/src/detail/scheduler_op.hpp @@ -0,0 +1,227 @@ +// +// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#ifndef BOOST_COROSIO_DETAIL_SCHEDULER_OP_HPP +#define BOOST_COROSIO_DETAIL_SCHEDULER_OP_HPP + +#include +#include + +namespace boost { +namespace corosio { +namespace detail { + +/** Abstract base class for completion handlers. + + Handlers are continuations that execute after an asynchronous + operation completes. They can be queued for deferred invocation, + allowing callbacks and coroutine resumptions to be posted to an + executor. + + Handlers should execute quickly - typically just initiating + another I/O operation or suspending on a foreign task. Heavy + computation should be avoided in handlers to prevent blocking + the event loop. + + Handlers may be heap-allocated or may be data members of an + enclosing object. The allocation strategy is determined by the + creator of the handler. + + @par Ownership Contract + + Callers must invoke exactly ONE of `operator()` or `destroy()`, + never both: + + @li `operator()` - Invokes the handler. The handler is + responsible for its own cleanup (typically `delete this` + for heap-allocated handlers). The caller must not call + `destroy()` after invoking this. + + @li `destroy()` - Destroys an uninvoked handler. This is + called when a queued handler must be discarded without + execution, such as during shutdown or exception cleanup. + For heap-allocated handlers, this typically calls + `delete this`. + + @par Exception Safety + + Implementations of `operator()` must perform cleanup before + any operation that might throw. This ensures that if the handler + throws, the exception propagates cleanly to the caller of + `run()` without leaking resources. Typical pattern: + + @code + void operator()() override + { + auto h = h_; + delete this; // cleanup FIRST + h.resume(); // then resume (may throw) + } + @endcode + + This "delete-before-invoke" pattern also enables memory + recycling - the handler's memory can be reused immediately + by subsequent allocations. + + @note Callers must never delete handlers directly with `delete`; + use `operator()` for normal invocation or `destroy()` for cleanup. + + @note Heap-allocated handlers are typically allocated with + custom allocators to minimize allocation overhead in + high-frequency async operations. + + @note Some handlers (such as those owned by containers like + `std::unique_ptr` or embedded as data members) are not meant to + be destroyed and should implement both functions as no-ops + (for `operator()`, invoke the continuation but don't delete). + + @see scheduler_op_queue +*/ +class scheduler_op : public capy::intrusive_queue::node +{ +public: + virtual void operator()() = 0; + virtual void destroy() = 0; + + /** Returns the user-defined data pointer. + + Derived classes may set this to store auxiliary data + such as a pointer to the most-derived object. + + @par Postconditions + @li Initially returns `nullptr` for newly constructed handlers. + @li Returns the current value of `data_` if modified by a derived class. + + @return The user-defined data pointer, or `nullptr` if not set. + */ + void* data() const noexcept + { + return data_; + } + +protected: + ~scheduler_op() = default; + + void* data_ = nullptr; +}; + +//------------------------------------------------------------------------------ + +using op_queue = capy::intrusive_queue; + +//------------------------------------------------------------------------------ + +/** An intrusive FIFO queue of scheduler_ops. + + This queue stores scheduler_ops using an intrusive linked list, + avoiding additional allocations for queue nodes. Scheduler_ops + are popped in the order they were pushed (first-in, first-out). + + The destructor calls `destroy()` on any remaining scheduler_ops. + + @note This is not thread-safe. External synchronization is + required for concurrent access. + + @see scheduler_op +*/ +class scheduler_op_queue +{ + op_queue q_; + +public: + /** Default constructor. + + Creates an empty queue. + + @post `empty() == true` + */ + scheduler_op_queue() = default; + + /** Move constructor. + + Takes ownership of all scheduler_ops from `other`, + leaving `other` empty. + + @param other The queue to move from. + + @post `other.empty() == true` + */ + scheduler_op_queue(scheduler_op_queue&& other) noexcept + : q_(std::move(other.q_)) + { + } + + scheduler_op_queue(scheduler_op_queue const&) = delete; + scheduler_op_queue& operator=(scheduler_op_queue const&) = delete; + scheduler_op_queue& operator=(scheduler_op_queue&&) = delete; + + /** Destructor. + + Calls `destroy()` on any remaining scheduler_ops in the queue. + */ + ~scheduler_op_queue() + { + while(auto* h = q_.pop()) + h->destroy(); + } + + /** Return true if the queue is empty. + + @return `true` if the queue contains no scheduler_ops. + */ + bool + empty() const noexcept + { + return q_.empty(); + } + + /** Add a scheduler_op to the back of the queue. + + @param h Pointer to the scheduler_op to add. + + @pre `h` is not null and not already in a queue. + */ + void + push(scheduler_op* h) noexcept + { + q_.push(h); + } + + /** Splice all scheduler_ops from another queue to the back. + + All scheduler_ops from `other` are moved to the back of this + queue. After this call, `other` is empty. + + @param other The queue to splice from. + + @post `other.empty() == true` + */ + void + push(scheduler_op_queue& other) noexcept + { + q_.splice(other.q_); + } + + /** Remove and return the front scheduler_op. + + @return Pointer to the front scheduler_op, or `nullptr` + if the queue is empty. + */ + scheduler_op* + pop() noexcept + { + return q_.pop(); + } +}; + +} // namespace detail +} // namespace corosio +} // namespace boost + +#endif diff --git a/src/corosio/src/detail/win_iocp_scheduler.cpp b/src/corosio/src/detail/win_iocp_scheduler.cpp index a936715..4ef3ac8 100644 --- a/src/corosio/src/detail/win_iocp_scheduler.cpp +++ b/src/corosio/src/detail/win_iocp_scheduler.cpp @@ -25,17 +25,17 @@ In corosio, we post BOTH types directly to the completion port: - OVERLAPPED* (overlapped_op) for I/O operations - - capy::execution_context::handler* for posted handlers/coroutines + - scheduler_op* for posted handlers/coroutines Discrimination is done via the completion key: - - handler_key (1): the LPOVERLAPPED is actually a handler* + - handler_key (1): the LPOVERLAPPED is actually a scheduler_op* - overlapped_key (2): the LPOVERLAPPED is an overlapped_op* - The op_queue (intrusive_list) holds MIXED elements: + The op_queue (intrusive_list) holds MIXED elements: - Plain handlers (coro_work, etc.) - - overlapped_op (which derives from handler) + - overlapped_op (which derives from scheduler_op) - Use get_overlapped_op(handler*) to safely check if a handler is an + Use get_overlapped_op(scheduler_op*) to safely check if a scheduler_op is an overlapped_op (returns nullptr if not). All code that processes op_queue must be mindful of this mixed content. */ @@ -150,7 +150,7 @@ shutdown() if (key == handler_key) { // Posted handlers (coro_work, etc.) - reinterpret_cast(overlapped)->destroy(); + reinterpret_cast(overlapped)->destroy(); } else if (key == overlapped_key) { @@ -169,7 +169,7 @@ win_iocp_scheduler:: post(capy::any_coro h) const { struct post_handler final - : capy::execution_context::handler + : scheduler_op { capy::any_coro h_; long ready_ = 1; // always ready for immediate dispatch @@ -210,7 +210,7 @@ post(capy::any_coro h) const void win_iocp_scheduler:: -post(capy::execution_context::handler* h) const +post(scheduler_op* h) const { // Mark ready if this is an overlapped_op (safe to dispatch immediately) if (auto* op = get_overlapped_op(h)) @@ -453,9 +453,9 @@ do_one(unsigned long timeout_ms) { if (key == handler_key) { - // handler* + // scheduler_op* work_guard g{this}; - (*reinterpret_cast(overlapped))(); + (*reinterpret_cast(overlapped))(); return 1; } else if (key == overlapped_key) diff --git a/src/corosio/src/detail/win_iocp_scheduler.hpp b/src/corosio/src/detail/win_iocp_scheduler.hpp index cc7d829..6f2adaf 100644 --- a/src/corosio/src/detail/win_iocp_scheduler.hpp +++ b/src/corosio/src/detail/win_iocp_scheduler.hpp @@ -13,9 +13,9 @@ #include #include #include -#include #include +#include "src/detail/scheduler_op.hpp" #include "src/detail/win_mutex.hpp" #include @@ -33,8 +33,6 @@ constexpr std::uintptr_t shutdown_key = 0; constexpr std::uintptr_t handler_key = 1; constexpr std::uintptr_t overlapped_key = 2; -using op_queue = capy::intrusive_queue; - // Forward declaration struct overlapped_op; @@ -54,7 +52,7 @@ class win_iocp_scheduler void shutdown() override; void post(capy::any_coro h) const override; - void post(capy::execution_context::handler* h) const override; + void post(scheduler_op* h) const override; void on_work_started() noexcept override; void on_work_finished() noexcept override; bool running_in_this_thread() const noexcept override; diff --git a/src/corosio/src/detail/win_iocp_sockets.hpp b/src/corosio/src/detail/win_iocp_sockets.hpp index c1776b3..ac4531d 100644 --- a/src/corosio/src/detail/win_iocp_sockets.hpp +++ b/src/corosio/src/detail/win_iocp_sockets.hpp @@ -108,6 +108,12 @@ class win_socket_impl { friend class win_iocp_sockets; + win_iocp_sockets& svc_; + connect_op conn_; + read_op rd_; + write_op wr_; + SOCKET socket_ = INVALID_SOCKET; + public: explicit win_socket_impl(win_iocp_sockets& svc) noexcept; @@ -141,14 +147,6 @@ class win_socket_impl void cancel() noexcept; void close_socket() noexcept; void set_socket(SOCKET s) noexcept { socket_ = s; } - - connect_op conn_; - read_op rd_; - write_op wr_; - -private: - win_iocp_sockets& svc_; - SOCKET socket_ = INVALID_SOCKET; }; //------------------------------------------------------------------------------ diff --git a/src/corosio/src/detail/win_overlapped_op.hpp b/src/corosio/src/detail/win_overlapped_op.hpp index 1b4c9cb..0f335f0 100644 --- a/src/corosio/src/detail/win_overlapped_op.hpp +++ b/src/corosio/src/detail/win_overlapped_op.hpp @@ -15,9 +15,10 @@ #include #include #include -#include #include +#include "src/detail/scheduler_op.hpp" + #include #include #include @@ -31,7 +32,7 @@ namespace detail { struct overlapped_op : OVERLAPPED - , capy::execution_context::handler + , scheduler_op { struct canceller { @@ -134,7 +135,7 @@ struct overlapped_op }; inline overlapped_op* -get_overlapped_op(capy::execution_context::handler* h) noexcept +get_overlapped_op(scheduler_op* h) noexcept { return static_cast(h->data()); } diff --git a/src/corosio/src/socket.cpp b/src/corosio/src/socket.cpp index b52c032..584f688 100644 --- a/src/corosio/src/socket.cpp +++ b/src/corosio/src/socket.cpp @@ -8,21 +8,20 @@ // #include +#include #ifdef _WIN32 -#include "detail/win_iocp_sockets.hpp" +#include "src/detail/win_iocp_sockets.hpp" #else -#include "detail/posix_sockets.hpp" +#include "src/detail/posix_sockets.hpp" #endif -#include - #include namespace boost { namespace corosio { -namespace { +namespace { #ifdef _WIN32 using socket_service = detail::win_iocp_sockets; using socket_impl_type = detail::win_socket_impl; @@ -30,7 +29,6 @@ using socket_impl_type = detail::win_socket_impl; using socket_service = detail::posix_sockets; using socket_impl_type = detail::posix_socket_impl; #endif - } // namespace socket:: diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt index 2c55084..c126a0e 100644 --- a/test/unit/CMakeLists.txt +++ b/test/unit/CMakeLists.txt @@ -7,7 +7,9 @@ # Official repository: https://github.com/cppalliance/corosio # -add_subdirectory(../../../url/extra/test_suite test_suite) +if(NOT TARGET boost_url_test_suite) + add_subdirectory(../../../url/extra/test_suite test_suite) +endif() file(GLOB_RECURSE PFILES CONFIGURE_DEPENDS *.cpp *.hpp) list(APPEND PFILES