diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 63e3338..5eec73f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -4,7 +4,7 @@ on: push: branches: [ "**" ] pull_request: - branches: [ master ] + branches: [ master, develop ] workflow_dispatch: jobs: diff --git a/CMakeLists.txt b/CMakeLists.txt index 4509c49..4f0452b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -130,12 +130,12 @@ file(GLOB_RECURSE BOOST_COROSIO_HEADERS CONFIGURE_DEPENDS "${CMAKE_CURRENT_SOURCE_DIR}/include/boost/corosio/*.hpp" "${CMAKE_CURRENT_SOURCE_DIR}/include/boost/corosio.hpp") file(GLOB_RECURSE BOOST_COROSIO_SOURCES CONFIGURE_DEPENDS - "${CMAKE_CURRENT_SOURCE_DIR}/src/src/*.hpp" - "${CMAKE_CURRENT_SOURCE_DIR}/src/src/*.cpp") + "${CMAKE_CURRENT_SOURCE_DIR}/src/*.hpp" + "${CMAKE_CURRENT_SOURCE_DIR}/src/*.cpp") source_group("" FILES "include/boost/corosio.hpp") source_group(TREE "${CMAKE_CURRENT_SOURCE_DIR}/include/boost/corosio" PREFIX "include" FILES ${BOOST_COROSIO_HEADERS}) -source_group(TREE "${CMAKE_CURRENT_SOURCE_DIR}/src/src" PREFIX "src" FILES ${BOOST_COROSIO_SOURCES}) +source_group(TREE "${CMAKE_CURRENT_SOURCE_DIR}/src" PREFIX "src" FILES ${BOOST_COROSIO_SOURCES}) function(boost_corosio_setup_properties target) target_compile_features(${target} PUBLIC cxx_std_20) diff --git a/include/boost/corosio/acceptor.hpp b/include/boost/corosio/acceptor.hpp index 648f07e..a6cb28a 100644 --- a/include/boost/corosio/acceptor.hpp +++ b/include/boost/corosio/acceptor.hpp @@ -267,17 +267,6 @@ class acceptor : public io_object BOOST_COROSIO_DECL void cancel(); - /** Return the execution context. - - @return Reference to the execution context that owns this acceptor. - */ - auto - context() const noexcept -> - capy::execution_context& - { - return *ctx_; - } - struct acceptor_impl : io_object_impl { virtual void accept( @@ -294,8 +283,6 @@ class acceptor : public io_object { return *static_cast(impl_); } - - capy::execution_context* ctx_; }; } // namespace corosio diff --git a/src/src/acceptor.cpp b/src/acceptor.cpp similarity index 88% rename from src/src/acceptor.cpp rename to src/acceptor.cpp index 3a630e7..2ee4f7e 100644 --- a/src/src/acceptor.cpp +++ b/src/acceptor.cpp @@ -10,7 +10,9 @@ #include #ifdef _WIN32 -#include "src/detail/win_iocp_sockets.hpp" +#include "detail/win_iocp_sockets.hpp" +#else +#include "detail/posix_sockets.hpp" #endif #include @@ -25,7 +27,8 @@ namespace { using acceptor_service = detail::win_iocp_sockets; using acceptor_impl_type = detail::win_acceptor_impl; #else -#error "Unsupported platform" +using acceptor_service = detail::posix_sockets; +using acceptor_impl_type = detail::posix_acceptor_impl; #endif } // namespace diff --git a/src/src/detail/endpoint_convert.hpp b/src/detail/endpoint_convert.hpp similarity index 100% rename from src/src/detail/endpoint_convert.hpp rename to src/detail/endpoint_convert.hpp diff --git a/src/src/detail/except.cpp b/src/detail/except.cpp similarity index 100% rename from src/src/detail/except.cpp rename to src/detail/except.cpp diff --git a/src/detail/posix_op.hpp b/src/detail/posix_op.hpp new file mode 100644 index 0000000..74ec380 --- /dev/null +++ b/src/detail/posix_op.hpp @@ -0,0 +1,303 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#ifndef BOOST_COROSIO_DETAIL_POSIX_OP_HPP +#define BOOST_COROSIO_DETAIL_POSIX_OP_HPP + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace boost { +namespace corosio { +namespace detail { + +/** Base class for POSIX async operations. + + This class is analogous to overlapped_op on Windows. + It stores the coroutine handle, dispatcher, and result + pointers needed to complete an async operation. +*/ +struct posix_op : capy::execution_context::handler +{ + struct canceller + { + posix_op* op; + void operator()() const noexcept { op->request_cancel(); } + }; + + capy::any_coro h; + capy::any_dispatcher d; + system::error_code* ec_out = nullptr; + std::size_t* bytes_out = nullptr; + + int fd = -1; // Socket file descriptor + std::uint32_t events = 0; // Requested epoll events (EPOLLIN/EPOLLOUT) + int error = 0; // errno on completion + std::size_t bytes_transferred = 0; + + std::atomic cancelled{false}; + std::optional> stop_cb; + + posix_op() + { + data_ = this; + } + + void reset() noexcept + { + fd = -1; + events = 0; + error = 0; + bytes_transferred = 0; + cancelled.store(false, std::memory_order_relaxed); + } + + void operator()() override + { + stop_cb.reset(); + + if (ec_out) + { + if (cancelled.load(std::memory_order_acquire)) + *ec_out = make_error_code(system::errc::operation_canceled); + else if (error != 0) + *ec_out = system::error_code(error, system::system_category()); + else if (is_read_operation() && bytes_transferred == 0) + { + // EOF: 0 bytes transferred with no error indicates end of stream + *ec_out = make_error_code(capy::error::eof); + } + } + + if (bytes_out) + *bytes_out = bytes_transferred; + + d(h).resume(); + } + + // Returns true if this is a read operation (for EOF detection) + virtual bool is_read_operation() const noexcept { return false; } + + void destroy() override + { + stop_cb.reset(); + } + + void request_cancel() noexcept + { + cancelled.store(true, std::memory_order_release); + } + + void start(std::stop_token token) + { + cancelled.store(false, std::memory_order_release); + stop_cb.reset(); + + if (token.stop_possible()) + stop_cb.emplace(token, canceller{this}); + } + + void complete(int err, std::size_t bytes) noexcept + { + error = err; + bytes_transferred = bytes; + } + + /** Called when epoll signals the fd is ready. + Derived classes override this to perform the actual I/O. + Sets error and bytes_transferred appropriately. + */ + virtual void perform_io() noexcept {} +}; + +inline posix_op* +get_posix_op(capy::execution_context::handler* h) noexcept +{ + return static_cast(h->data()); +} + +//------------------------------------------------------------------------------ + +/** Connect operation state. */ +struct posix_connect_op : posix_op +{ + void perform_io() noexcept override + { + // For connect, check SO_ERROR to see if connection succeeded + int err = 0; + socklen_t len = sizeof(err); + if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0) + err = errno; + complete(err, 0); + } +}; + +//------------------------------------------------------------------------------ + +/** Read operation state with buffer descriptors. */ +struct posix_read_op : posix_op +{ + static constexpr std::size_t max_buffers = 16; + iovec iovecs[max_buffers]; + int iovec_count = 0; + + bool is_read_operation() const noexcept override { return true; } + + void reset() noexcept + { + posix_op::reset(); + iovec_count = 0; + } + + void perform_io() noexcept override + { + ssize_t n = ::readv(fd, iovecs, iovec_count); + if (n >= 0) + complete(0, static_cast(n)); + else + complete(errno, 0); + } +}; + +//------------------------------------------------------------------------------ + +/** Write operation state with buffer descriptors. */ +struct posix_write_op : posix_op +{ + static constexpr std::size_t max_buffers = 16; + iovec iovecs[max_buffers]; + int iovec_count = 0; + + void reset() noexcept + { + posix_op::reset(); + iovec_count = 0; + } + + void perform_io() noexcept override + { + ssize_t n = ::writev(fd, iovecs, iovec_count); + if (n >= 0) + complete(0, static_cast(n)); + else + complete(errno, 0); + } +}; + +//------------------------------------------------------------------------------ + +/** Accept operation state. */ +struct posix_accept_op : posix_op +{ + int accepted_fd = -1; + io_object::io_object_impl* peer_impl = nullptr; + io_object::io_object_impl** impl_out = nullptr; + + // Function to create peer impl - set by posix_sockets + using create_peer_fn = io_object::io_object_impl* (*)(void*, int); + create_peer_fn create_peer = nullptr; + void* service_ptr = nullptr; + + void reset() noexcept + { + posix_op::reset(); + accepted_fd = -1; + peer_impl = nullptr; + impl_out = nullptr; + // Don't reset create_peer and service_ptr - they're set once + } + + void perform_io() noexcept override + { + sockaddr_in addr{}; + socklen_t addrlen = sizeof(addr); + int new_fd = ::accept4(fd, reinterpret_cast(&addr), + &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC); + + if (new_fd >= 0) + { + accepted_fd = new_fd; + if (create_peer && service_ptr) + peer_impl = create_peer(service_ptr, new_fd); + complete(0, 0); + } + else + { + complete(errno, 0); + } + } + + void operator()() override + { + stop_cb.reset(); + + bool success = (error == 0 && !cancelled.load(std::memory_order_acquire)); + + if (ec_out) + { + if (cancelled.load(std::memory_order_acquire)) + *ec_out = make_error_code(system::errc::operation_canceled); + else if (error != 0) + *ec_out = system::error_code(error, system::system_category()); + } + + if (success && accepted_fd >= 0 && peer_impl) + { + // Pass impl to awaitable for assignment to peer socket + if (impl_out) + *impl_out = peer_impl; + peer_impl = nullptr; + } + else + { + // Cleanup on failure + if (accepted_fd >= 0) + { + ::close(accepted_fd); + accepted_fd = -1; + } + + if (peer_impl) + { + peer_impl->release(); + peer_impl = nullptr; + } + + if (impl_out) + *impl_out = nullptr; + } + + d(h).resume(); + } +}; + +} // namespace detail +} // namespace corosio +} // namespace boost + +#endif diff --git a/src/detail/posix_resolver_service.hpp b/src/detail/posix_resolver_service.hpp new file mode 100644 index 0000000..0666f71 --- /dev/null +++ b/src/detail/posix_resolver_service.hpp @@ -0,0 +1,152 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#ifndef BOOST_COROSIO_DETAIL_POSIX_RESOLVER_SERVICE_HPP +#define BOOST_COROSIO_DETAIL_POSIX_RESOLVER_SERVICE_HPP + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace boost { +namespace corosio { +namespace detail { + +class posix_resolver_service; +class posix_resolver_impl; + +//------------------------------------------------------------------------------ + +/** Resolver implementation stub for POSIX platforms. + + This is a placeholder implementation that allows compilation on + POSIX platforms. Operations throw std::logic_error indicating + the functionality is not yet implemented. + + @note Full POSIX resolver support is planned for a future release. +*/ +class posix_resolver_impl + : public resolver::resolver_impl + , public capy::intrusive_list::node +{ + friend class posix_resolver_service; + +public: + explicit posix_resolver_impl(posix_resolver_service& svc) noexcept + : svc_(svc) + { + } + + void release() override; + + void resolve( + std::coroutine_handle<>, + capy::any_dispatcher, + std::string_view /*host*/, + std::string_view /*service*/, + resolve_flags /*flags*/, + std::stop_token, + system::error_code*, + resolver_results*) override + { + throw std::logic_error("posix resolver resolve not implemented"); + } + + void cancel() noexcept { /* stub */ } + +private: + posix_resolver_service& svc_; +}; + +//------------------------------------------------------------------------------ + +/** POSIX resolver service stub. + + This service provides placeholder implementations for DNS + resolution on POSIX platforms. Operations throw std::logic_error. + + @note Full POSIX resolver support is planned for a future release. +*/ +class posix_resolver_service + : public capy::execution_context::service +{ +public: + using key_type = posix_resolver_service; + + /** Construct the resolver service. + + @param ctx Reference to the owning execution_context. + */ + explicit posix_resolver_service(capy::execution_context& /*ctx*/) + { + } + + /** Destroy the resolver service. */ + ~posix_resolver_service() + { + } + + posix_resolver_service(posix_resolver_service const&) = delete; + posix_resolver_service& operator=(posix_resolver_service const&) = delete; + + /** Shut down the service. */ + void shutdown() override + { + std::lock_guard lock(mutex_); + + // Release all resolvers + while (auto* impl = resolver_list_.pop_front()) + { + delete impl; + } + } + + /** Create a new resolver implementation. */ + posix_resolver_impl& create_impl() + { + std::lock_guard lock(mutex_); + auto* impl = new posix_resolver_impl(*this); + resolver_list_.push_back(impl); + return *impl; + } + + /** Destroy a resolver implementation. */ + void destroy_impl(posix_resolver_impl& impl) + { + std::lock_guard lock(mutex_); + resolver_list_.remove(&impl); + delete &impl; + } + +private: + std::mutex mutex_; + capy::intrusive_list resolver_list_; +}; + +//------------------------------------------------------------------------------ + +inline void +posix_resolver_impl:: +release() +{ + svc_.destroy_impl(*this); +} + +} // namespace detail +} // namespace corosio +} // namespace boost + +#endif diff --git a/src/detail/posix_scheduler.cpp b/src/detail/posix_scheduler.cpp new file mode 100644 index 0000000..fc293e2 --- /dev/null +++ b/src/detail/posix_scheduler.cpp @@ -0,0 +1,526 @@ +// +// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#ifndef _WIN32 + +#include "detail/posix_scheduler.hpp" +#include "detail/posix_op.hpp" + +#include +#include + +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace boost { +namespace corosio { +namespace detail { + +namespace { + +struct scheduler_context +{ + posix_scheduler const* key; + scheduler_context* next; +}; + +capy::thread_local_ptr context_stack; + +struct thread_context_guard +{ + scheduler_context frame_; + + explicit thread_context_guard( + posix_scheduler const* ctx) noexcept + : frame_{ctx, context_stack.get()} + { + context_stack.set(&frame_); + } + + ~thread_context_guard() noexcept + { + context_stack.set(frame_.next); + } +}; + +} // namespace + +posix_scheduler:: +posix_scheduler( + capy::execution_context&, + int) + : epoll_fd_(-1) + , event_fd_(-1) + , outstanding_work_(0) + , stopped_(false) + , shutdown_(false) +{ + // Create epoll instance + epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC); + if (epoll_fd_ < 0) + detail::throw_system_error( + system::error_code(errno, system::system_category()), + "epoll_create1"); + + // Create eventfd for waking the scheduler + event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); + if (event_fd_ < 0) + { + int err = errno; + ::close(epoll_fd_); + detail::throw_system_error( + system::error_code(err, system::system_category()), + "eventfd"); + } + + // Register eventfd with epoll (data.ptr = nullptr signals wakeup event) + epoll_event ev{}; + ev.events = EPOLLIN; + ev.data.ptr = nullptr; + if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0) + { + int err = errno; + ::close(event_fd_); + ::close(epoll_fd_); + detail::throw_system_error( + system::error_code(err, system::system_category()), + "epoll_ctl"); + } +} + +posix_scheduler:: +~posix_scheduler() +{ + if (event_fd_ >= 0) + ::close(event_fd_); + if (epoll_fd_ >= 0) + ::close(epoll_fd_); +} + +void +posix_scheduler:: +shutdown() +{ + std::unique_lock lock(mutex_); + shutdown_ = true; + + // Drain all completed operations without invoking handlers + while (auto* h = completed_ops_.pop()) + { + lock.unlock(); + h->destroy(); + lock.lock(); + } + + // Reset outstanding work count - any pending I/O operations + // will be cleaned up when their owning objects are destroyed + outstanding_work_.store(0, std::memory_order_release); +} + +void +posix_scheduler:: +post(capy::any_coro h) const +{ + struct post_handler + : capy::execution_context::handler + { + capy::any_coro h_; + + explicit + post_handler(capy::any_coro h) + : h_(h) + { + } + + void operator()() override + { + auto h = h_; + delete this; + h.resume(); + } + + void destroy() override + { + delete this; + } + }; + + auto* ph = new post_handler(h); + outstanding_work_.fetch_add(1, std::memory_order_relaxed); + + { + std::lock_guard lock(mutex_); + completed_ops_.push(ph); + } + wakeup(); +} + +void +posix_scheduler:: +post(capy::execution_context::handler* h) const +{ + outstanding_work_.fetch_add(1, std::memory_order_relaxed); + + { + std::lock_guard lock(mutex_); + completed_ops_.push(h); + } + wakeup(); +} + +void +posix_scheduler:: +on_work_started() noexcept +{ + outstanding_work_.fetch_add(1, std::memory_order_relaxed); +} + +void +posix_scheduler:: +on_work_finished() noexcept +{ + if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1) + stop(); +} + +bool +posix_scheduler:: +running_in_this_thread() const noexcept +{ + for (auto* c = context_stack.get(); c != nullptr; c = c->next) + if (c->key == this) + return true; + return false; +} + +void +posix_scheduler:: +stop() +{ + bool expected = false; + if (stopped_.compare_exchange_strong(expected, true, + std::memory_order_release, std::memory_order_relaxed)) + { + wakeup(); + } +} + +bool +posix_scheduler:: +stopped() const noexcept +{ + return stopped_.load(std::memory_order_acquire); +} + +void +posix_scheduler:: +restart() +{ + stopped_.store(false, std::memory_order_release); +} + +std::size_t +posix_scheduler:: +run() +{ + if (stopped_.load(std::memory_order_acquire)) + return 0; + + if (outstanding_work_.load(std::memory_order_acquire) == 0) + { + stop(); + return 0; + } + + thread_context_guard ctx(this); + + std::size_t n = 0; + while (do_one(-1)) + if (n != (std::numeric_limits::max)()) + ++n; + return n; +} + +std::size_t +posix_scheduler:: +run_one() +{ + if (stopped_.load(std::memory_order_acquire)) + return 0; + + if (outstanding_work_.load(std::memory_order_acquire) == 0) + { + stop(); + return 0; + } + + thread_context_guard ctx(this); + return do_one(-1); +} + +std::size_t +posix_scheduler:: +wait_one(long usec) +{ + if (stopped_.load(std::memory_order_acquire)) + return 0; + + if (outstanding_work_.load(std::memory_order_acquire) == 0) + { + stop(); + return 0; + } + + thread_context_guard ctx(this); + return do_one(usec); +} + +std::size_t +posix_scheduler:: +poll() +{ + if (stopped_.load(std::memory_order_acquire)) + return 0; + + if (outstanding_work_.load(std::memory_order_acquire) == 0) + { + stop(); + return 0; + } + + thread_context_guard ctx(this); + + std::size_t n = 0; + while (do_one(0)) + if (n != (std::numeric_limits::max)()) + ++n; + return n; +} + +std::size_t +posix_scheduler:: +poll_one() +{ + if (stopped_.load(std::memory_order_acquire)) + return 0; + + if (outstanding_work_.load(std::memory_order_acquire) == 0) + { + stop(); + return 0; + } + + thread_context_guard ctx(this); + return do_one(0); +} + +void +posix_scheduler:: +register_fd(int fd, posix_op* op, std::uint32_t events) const +{ + epoll_event ev{}; + ev.events = events; + ev.data.ptr = op; + if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0) + { + detail::throw_system_error( + system::error_code(errno, system::system_category()), + "epoll_ctl ADD"); + } +} + +void +posix_scheduler:: +modify_fd(int fd, posix_op* op, std::uint32_t events) const +{ + epoll_event ev{}; + ev.events = events; + ev.data.ptr = op; + if (::epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &ev) < 0) + { + detail::throw_system_error( + system::error_code(errno, system::system_category()), + "epoll_ctl MOD"); + } +} + +void +posix_scheduler:: +unregister_fd(int fd) const +{ + // EPOLL_CTL_DEL ignores the event parameter (can be NULL on Linux 2.6.9+) + ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr); +} + +void +posix_scheduler:: +work_started() const noexcept +{ + outstanding_work_.fetch_add(1, std::memory_order_relaxed); +} + +void +posix_scheduler:: +work_finished() const noexcept +{ + if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1) + { + const_cast(this)->stop(); + } +} + +void +posix_scheduler:: +wakeup() const +{ + // Write to eventfd to wake up epoll_wait + std::uint64_t val = 1; + ::write(event_fd_, &val, sizeof(val)); +} + +// RAII guard - work_finished called even if handler throws +struct work_guard +{ + posix_scheduler const* self; + ~work_guard() { self->work_finished(); } +}; + +std::size_t +posix_scheduler:: +do_one(long timeout_us) +{ + // Check stopped first + if (stopped_.load(std::memory_order_acquire)) + return 0; + + // First check if there are handlers in the queue + capy::execution_context::handler* h = nullptr; + { + std::lock_guard lock(mutex_); + h = completed_ops_.pop(); + } + + if (h) + { + // Execute handler outside the lock + work_guard g{this}; + (*h)(); + return 1; + } + + // Check if there's actually work to wait for + if (outstanding_work_.load(std::memory_order_acquire) == 0) + return 0; + + // Convert timeout from microseconds to milliseconds + int timeout_ms; + if (timeout_us < 0) + timeout_ms = -1; // Infinite wait + else if (timeout_us == 0) + timeout_ms = 0; // Non-blocking poll + else + timeout_ms = static_cast((timeout_us + 999) / 1000); + + // Wait for events + epoll_event events[64]; + int nfds = ::epoll_wait(epoll_fd_, events, 64, timeout_ms); + + if (nfds < 0) + { + if (errno == EINTR) + return 0; + detail::throw_system_error( + system::error_code(errno, system::system_category()), + "epoll_wait"); + } + + // Process epoll events + for (int i = 0; i < nfds; ++i) + { + if (events[i].data.ptr == nullptr) + { + // eventfd wakeup - drain it + std::uint64_t val; + ::read(event_fd_, &val, sizeof(val)); + continue; + } + + // I/O event - get the operation from data.ptr + auto* op = static_cast(events[i].data.ptr); + + // Unregister the fd from epoll (one-shot behavior) + unregister_fd(op->fd); + + // Check for errors + if (events[i].events & (EPOLLERR | EPOLLHUP)) + { + // Get socket error + int err = 0; + socklen_t len = sizeof(err); + if (::getsockopt(op->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0) + err = errno; + if (err == 0) + err = EIO; // Generic I/O error + + op->complete(err, 0); + } + else + { + // Operation is ready - perform the actual I/O + op->perform_io(); + } + + // Post the operation to the handler queue + { + std::lock_guard lock(mutex_); + completed_ops_.push(op); + } + } + + // Check stopped again after epoll + if (stopped_.load(std::memory_order_acquire)) + return 0; + + // Check again for handlers after processing epoll events + { + std::lock_guard lock(mutex_); + h = completed_ops_.pop(); + } + + if (h) + { + work_guard g{this}; + (*h)(); + return 1; + } + + // If we processed only wakeup events (no I/O completions) and + // there's still outstanding work, continue waiting + if (nfds > 0 && outstanding_work_.load(std::memory_order_acquire) > 0) + { + // Recurse to wait again - this handles the case where we + // only processed eventfd wakeups with no actual completions + return do_one(timeout_us); + } + + return 0; +} + +} // namespace detail +} // namespace corosio +} // namespace boost + +#endif diff --git a/src/src/detail/posix_scheduler.hpp b/src/detail/posix_scheduler.hpp similarity index 54% rename from src/src/detail/posix_scheduler.hpp rename to src/detail/posix_scheduler.hpp index 469d072..a14e379 100644 --- a/src/src/detail/posix_scheduler.hpp +++ b/src/detail/posix_scheduler.hpp @@ -17,8 +17,8 @@ #include #include -#include #include +#include #include namespace boost { @@ -27,12 +27,18 @@ namespace detail { using op_queue = capy::intrusive_queue; -/** POSIX scheduler using condition variables. +// Forward declaration +struct posix_op; - This scheduler implements the scheduler interface using standard - C++ threading primitives (std::mutex, std::condition_variable). - It manages a queue of handlers and provides blocking/non-blocking - execution methods. +/** POSIX scheduler using epoll for I/O multiplexing. + + This scheduler implements the scheduler interface using Linux epoll + for efficient I/O event notification. It manages a queue of handlers + and provides blocking/non-blocking execution methods. + + The scheduler uses an eventfd to wake up epoll_wait when non-I/O + handlers are posted, enabling efficient integration of both + I/O completions and posted handlers. @par Thread Safety All public member functions are thread-safe. @@ -46,6 +52,8 @@ class posix_scheduler /** Construct the scheduler. + Creates an epoll instance and eventfd for event notification. + @param ctx Reference to the owning execution_context. @param concurrency_hint Hint for expected thread count (unused). */ @@ -73,11 +81,50 @@ class posix_scheduler std::size_t poll() override; std::size_t poll_one() override; + /** Return the epoll file descriptor. + + Used by socket services to register file descriptors + for I/O event notification. + + @return The epoll file descriptor. + */ + int epoll_fd() const noexcept { return epoll_fd_; } + + /** Register a file descriptor with epoll. + + @param fd The file descriptor to register. + @param op The operation associated with this fd. + @param events The epoll events to monitor (EPOLLIN, EPOLLOUT, etc.). + */ + void register_fd(int fd, posix_op* op, std::uint32_t events) const; + + /** Modify epoll registration for a file descriptor. + + @param fd The file descriptor to modify. + @param op The operation associated with this fd. + @param events The new epoll events to monitor. + */ + void modify_fd(int fd, posix_op* op, std::uint32_t events) const; + + /** Unregister a file descriptor from epoll. + + @param fd The file descriptor to unregister. + */ + void unregister_fd(int fd) const; + + /** For use by I/O operations to track pending work. */ + void work_started() const noexcept; + + /** For use by I/O operations to track completed work. */ + void work_finished() const noexcept; + private: std::size_t do_one(long timeout_us); + void wakeup() const; + int epoll_fd_; // epoll instance + int event_fd_; // for waking epoll_wait mutable std::mutex mutex_; - mutable std::condition_variable wakeup_; mutable op_queue completed_ops_; mutable std::atomic outstanding_work_; std::atomic stopped_; diff --git a/src/detail/posix_sockets.hpp b/src/detail/posix_sockets.hpp new file mode 100644 index 0000000..4a40e2b --- /dev/null +++ b/src/detail/posix_sockets.hpp @@ -0,0 +1,678 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#ifndef BOOST_COROSIO_DETAIL_POSIX_SOCKETS_HPP +#define BOOST_COROSIO_DETAIL_POSIX_SOCKETS_HPP + +#include +#include +#include +#include +#include +#include +#include + +#include "detail/posix_op.hpp" +#include "detail/posix_scheduler.hpp" +#include "detail/endpoint_convert.hpp" + +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace boost { +namespace corosio { +namespace detail { + +class posix_sockets; +class posix_socket_impl; +class posix_acceptor_impl; + +//------------------------------------------------------------------------------ + +/** Socket implementation for epoll-based I/O. + + This class contains the state for a single socket, including + the native socket handle and pending operations. +*/ +class posix_socket_impl + : public socket::socket_impl + , public capy::intrusive_list::node +{ + friend class posix_sockets; + +public: + explicit posix_socket_impl(posix_sockets& svc) noexcept; + + void release() override; + + void connect( + std::coroutine_handle<>, + capy::any_dispatcher, + endpoint, + std::stop_token, + system::error_code*) override; + + void read_some( + std::coroutine_handle<>, + capy::any_dispatcher, + any_bufref&, + std::stop_token, + system::error_code*, + std::size_t*) override; + + void write_some( + std::coroutine_handle<>, + capy::any_dispatcher, + any_bufref&, + std::stop_token, + system::error_code*, + std::size_t*) override; + + int native_handle() const noexcept { return fd_; } + bool is_open() const noexcept { return fd_ >= 0; } + void cancel() noexcept; + void close_socket() noexcept; + void set_socket(int fd) noexcept { fd_ = fd; } + + posix_connect_op conn_; + posix_read_op rd_; + posix_write_op wr_; + +private: + posix_sockets& svc_; + int fd_ = -1; +}; + +//------------------------------------------------------------------------------ + +/** Acceptor implementation for epoll-based I/O. + + This class contains the state for a listening socket. +*/ +class posix_acceptor_impl + : public acceptor::acceptor_impl + , public capy::intrusive_list::node +{ + friend class posix_sockets; + +public: + explicit posix_acceptor_impl(posix_sockets& svc) noexcept; + + void release() override; + + void accept( + std::coroutine_handle<>, + capy::any_dispatcher, + std::stop_token, + system::error_code*, + io_object::io_object_impl**) override; + + int native_handle() const noexcept { return fd_; } + bool is_open() const noexcept { return fd_ >= 0; } + void cancel() noexcept; + void close_socket() noexcept; + + posix_accept_op acc_; + +private: + posix_sockets& svc_; + int fd_ = -1; +}; + +//------------------------------------------------------------------------------ + +/** POSIX epoll socket management service. + + This service owns all socket implementations and coordinates their + lifecycle with the epoll-based scheduler. +*/ +class posix_sockets + : public capy::execution_context::service +{ +public: + using key_type = posix_sockets; + + /** Construct the socket service. + + @param ctx Reference to the owning execution_context. + */ + explicit posix_sockets(capy::execution_context& ctx); + + /** Destroy the socket service. */ + ~posix_sockets(); + + posix_sockets(posix_sockets const&) = delete; + posix_sockets& operator=(posix_sockets const&) = delete; + + /** Shut down the service. */ + void shutdown() override; + + /** Create a new socket implementation. */ + posix_socket_impl& create_impl(); + + /** Destroy a socket implementation. */ + void destroy_impl(posix_socket_impl& impl); + + /** Create and configure a socket. + + @param impl The socket implementation to initialize. + @return Error code, or success. + */ + system::error_code open_socket(posix_socket_impl& impl); + + /** Create a new acceptor implementation. */ + posix_acceptor_impl& create_acceptor_impl(); + + /** Destroy an acceptor implementation. */ + void destroy_acceptor_impl(posix_acceptor_impl& impl); + + /** Create, bind, and listen on an acceptor socket. + + @param impl The acceptor implementation to initialize. + @param ep The local endpoint to bind to. + @param backlog The listen backlog. + @return Error code, or success. + */ + system::error_code open_acceptor( + posix_acceptor_impl& impl, + endpoint ep, + int backlog); + + /** Return the scheduler. */ + posix_scheduler& scheduler() const noexcept { return sched_; } + + /** Post an operation for completion. */ + void post(posix_op* op); + + /** Notify scheduler of pending I/O work. */ + void work_started() noexcept; + + /** Notify scheduler that I/O work completed. */ + void work_finished() noexcept; + +private: + posix_scheduler& sched_; + std::mutex mutex_; + capy::intrusive_list socket_list_; + capy::intrusive_list acceptor_list_; +}; + +//------------------------------------------------------------------------------ +// posix_socket_impl implementation +//------------------------------------------------------------------------------ + +inline +posix_socket_impl:: +posix_socket_impl(posix_sockets& svc) noexcept + : svc_(svc) +{ +} + +inline void +posix_socket_impl:: +release() +{ + close_socket(); + svc_.destroy_impl(*this); +} + +inline void +posix_socket_impl:: +connect( + std::coroutine_handle<> h, + capy::any_dispatcher d, + endpoint ep, + std::stop_token token, + system::error_code* ec) +{ + auto& op = conn_; + op.reset(); + op.h = h; + op.d = d; + op.ec_out = ec; + op.fd = fd_; + op.start(token); + + // Initiate non-blocking connect + sockaddr_in addr = detail::to_sockaddr_in(ep); + int result = ::connect(fd_, reinterpret_cast(&addr), sizeof(addr)); + + if (result == 0) + { + // Immediate success (rare for TCP) + op.complete(0, 0); + svc_.post(&op); + return; + } + + if (errno == EINPROGRESS) + { + // Connection in progress - register for write-ready + svc_.work_started(); + svc_.scheduler().register_fd(fd_, &op, EPOLLOUT | EPOLLET); + return; + } + + // Immediate error + op.complete(errno, 0); + svc_.post(&op); +} + +inline void +posix_socket_impl:: +read_some( + std::coroutine_handle<> h, + capy::any_dispatcher d, + any_bufref& param, + std::stop_token token, + system::error_code* ec, + std::size_t* bytes_out) +{ + auto& op = rd_; + op.reset(); + op.h = h; + op.d = d; + op.ec_out = ec; + op.bytes_out = bytes_out; + op.fd = fd_; + op.start(token); + + // Fill iovecs from buffer sequence + capy::mutable_buffer bufs[posix_read_op::max_buffers]; + op.iovec_count = static_cast(param.copy_to(bufs, posix_read_op::max_buffers)); + for (int i = 0; i < op.iovec_count; ++i) + { + op.iovecs[i].iov_base = bufs[i].data(); + op.iovecs[i].iov_len = bufs[i].size(); + } + + // Try immediate read first + ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count); + + if (n > 0) + { + // Got data immediately + op.complete(0, static_cast(n)); + svc_.post(&op); + return; + } + + if (n == 0) + { + // EOF + op.complete(0, 0); + svc_.post(&op); + return; + } + + if (errno == EAGAIN || errno == EWOULDBLOCK) + { + // Would block - register for read-ready + svc_.work_started(); + svc_.scheduler().register_fd(fd_, &op, EPOLLIN | EPOLLET); + return; + } + + // Immediate error + op.complete(errno, 0); + svc_.post(&op); +} + +inline void +posix_socket_impl:: +write_some( + std::coroutine_handle<> h, + capy::any_dispatcher d, + any_bufref& param, + std::stop_token token, + system::error_code* ec, + std::size_t* bytes_out) +{ + auto& op = wr_; + op.reset(); + op.h = h; + op.d = d; + op.ec_out = ec; + op.bytes_out = bytes_out; + op.fd = fd_; + op.start(token); + + // Fill iovecs from buffer sequence + capy::mutable_buffer bufs[posix_write_op::max_buffers]; + op.iovec_count = static_cast(param.copy_to(bufs, posix_write_op::max_buffers)); + for (int i = 0; i < op.iovec_count; ++i) + { + op.iovecs[i].iov_base = bufs[i].data(); + op.iovecs[i].iov_len = bufs[i].size(); + } + + // Try immediate write first + ssize_t n = ::writev(fd_, op.iovecs, op.iovec_count); + + if (n > 0) + { + // Wrote data immediately + op.complete(0, static_cast(n)); + svc_.post(&op); + return; + } + + if (errno == EAGAIN || errno == EWOULDBLOCK) + { + // Would block - register for write-ready + svc_.work_started(); + svc_.scheduler().register_fd(fd_, &op, EPOLLOUT | EPOLLET); + return; + } + + // Immediate error (including n == 0 which shouldn't happen for TCP) + op.complete(errno ? errno : EIO, 0); + svc_.post(&op); +} + +inline void +posix_socket_impl:: +cancel() noexcept +{ + conn_.request_cancel(); + rd_.request_cancel(); + wr_.request_cancel(); +} + +inline void +posix_socket_impl:: +close_socket() noexcept +{ + if (fd_ >= 0) + { + // Unregister from epoll before closing + svc_.scheduler().unregister_fd(fd_); + ::close(fd_); + fd_ = -1; + } +} + +//------------------------------------------------------------------------------ +// posix_acceptor_impl implementation +//------------------------------------------------------------------------------ + +inline +posix_acceptor_impl:: +posix_acceptor_impl(posix_sockets& svc) noexcept + : svc_(svc) +{ +} + +inline void +posix_acceptor_impl:: +release() +{ + close_socket(); + svc_.destroy_acceptor_impl(*this); +} + +inline void +posix_acceptor_impl:: +accept( + std::coroutine_handle<> h, + capy::any_dispatcher d, + std::stop_token token, + system::error_code* ec, + io_object::io_object_impl** impl_out) +{ + auto& op = acc_; + op.reset(); + op.h = h; + op.d = d; + op.ec_out = ec; + op.impl_out = impl_out; + op.fd = fd_; + op.start(token); + + // Set up callback for creating peer impl when accept completes via epoll + op.service_ptr = &svc_; + op.create_peer = [](void* svc_ptr, int new_fd) -> io_object::io_object_impl* { + auto& svc = *static_cast(svc_ptr); + auto& peer_impl = svc.create_impl(); + peer_impl.set_socket(new_fd); + return &peer_impl; + }; + + // Try immediate accept first + sockaddr_in addr{}; + socklen_t addrlen = sizeof(addr); + int accepted = ::accept4(fd_, reinterpret_cast(&addr), + &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC); + + if (accepted >= 0) + { + // Got a connection immediately + auto& peer_impl = svc_.create_impl(); + peer_impl.set_socket(accepted); + op.accepted_fd = accepted; + op.peer_impl = &peer_impl; + op.complete(0, 0); + svc_.post(&op); + return; + } + + if (errno == EAGAIN || errno == EWOULDBLOCK) + { + // No pending connections - register for read-ready + svc_.work_started(); + svc_.scheduler().register_fd(fd_, &op, EPOLLIN | EPOLLET); + return; + } + + // Immediate error + op.complete(errno, 0); + svc_.post(&op); +} + +inline void +posix_acceptor_impl:: +cancel() noexcept +{ + acc_.request_cancel(); +} + +inline void +posix_acceptor_impl:: +close_socket() noexcept +{ + if (fd_ >= 0) + { + // Unregister from epoll before closing + svc_.scheduler().unregister_fd(fd_); + ::close(fd_); + fd_ = -1; + } +} + +//------------------------------------------------------------------------------ +// posix_sockets implementation +//------------------------------------------------------------------------------ + +inline +posix_sockets:: +posix_sockets(capy::execution_context& ctx) + : sched_(ctx.use_service()) +{ +} + +inline +posix_sockets:: +~posix_sockets() +{ +} + +inline void +posix_sockets:: +shutdown() +{ + std::lock_guard lock(mutex_); + + // Close all sockets + while (auto* impl = socket_list_.pop_front()) + { + impl->close_socket(); + delete impl; + } + + // Close all acceptors + while (auto* impl = acceptor_list_.pop_front()) + { + impl->close_socket(); + delete impl; + } +} + +inline posix_socket_impl& +posix_sockets:: +create_impl() +{ + auto* impl = new posix_socket_impl(*this); + + { + std::lock_guard lock(mutex_); + socket_list_.push_back(impl); + } + + return *impl; +} + +inline void +posix_sockets:: +destroy_impl(posix_socket_impl& impl) +{ + { + std::lock_guard lock(mutex_); + socket_list_.remove(&impl); + } + + delete &impl; +} + +inline system::error_code +posix_sockets:: +open_socket(posix_socket_impl& impl) +{ + impl.close_socket(); + + // Create non-blocking TCP socket + int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); + if (fd < 0) + { + return system::error_code(errno, system::system_category()); + } + + impl.fd_ = fd; + return {}; +} + +inline posix_acceptor_impl& +posix_sockets:: +create_acceptor_impl() +{ + auto* impl = new posix_acceptor_impl(*this); + + { + std::lock_guard lock(mutex_); + acceptor_list_.push_back(impl); + } + + return *impl; +} + +inline void +posix_sockets:: +destroy_acceptor_impl(posix_acceptor_impl& impl) +{ + { + std::lock_guard lock(mutex_); + acceptor_list_.remove(&impl); + } + + delete &impl; +} + +inline system::error_code +posix_sockets:: +open_acceptor( + posix_acceptor_impl& impl, + endpoint ep, + int backlog) +{ + impl.close_socket(); + + // Create non-blocking TCP socket + int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); + if (fd < 0) + { + return system::error_code(errno, system::system_category()); + } + + // Allow address reuse + int reuse = 1; + ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); + + // Bind to endpoint + sockaddr_in addr = detail::to_sockaddr_in(ep); + if (::bind(fd, reinterpret_cast(&addr), sizeof(addr)) < 0) + { + int err = errno; + ::close(fd); + return system::error_code(err, system::system_category()); + } + + // Start listening + if (::listen(fd, backlog) < 0) + { + int err = errno; + ::close(fd); + return system::error_code(err, system::system_category()); + } + + impl.fd_ = fd; + return {}; +} + +inline void +posix_sockets:: +post(posix_op* op) +{ + sched_.post(op); +} + +inline void +posix_sockets:: +work_started() noexcept +{ + sched_.work_started(); +} + +inline void +posix_sockets:: +work_finished() noexcept +{ + sched_.work_finished(); +} + +} // namespace detail +} // namespace corosio +} // namespace boost + +#endif diff --git a/src/src/detail/win_iocp_resolver_service.cpp b/src/detail/win_iocp_resolver_service.cpp similarity index 98% rename from src/src/detail/win_iocp_resolver_service.cpp rename to src/detail/win_iocp_resolver_service.cpp index d16c98b..9752c30 100644 --- a/src/src/detail/win_iocp_resolver_service.cpp +++ b/src/detail/win_iocp_resolver_service.cpp @@ -9,9 +9,9 @@ #ifdef _WIN32 -#include "src/detail/win_iocp_resolver_service.hpp" -#include "src/detail/win_iocp_scheduler.hpp" -#include "src/detail/endpoint_convert.hpp" +#include "detail/win_iocp_resolver_service.hpp" +#include "detail/win_iocp_scheduler.hpp" +#include "detail/endpoint_convert.hpp" #include #include diff --git a/src/src/detail/win_iocp_resolver_service.hpp b/src/detail/win_iocp_resolver_service.hpp similarity index 96% rename from src/src/detail/win_iocp_resolver_service.hpp rename to src/detail/win_iocp_resolver_service.hpp index e487085..cdb4888 100644 --- a/src/src/detail/win_iocp_resolver_service.hpp +++ b/src/detail/win_iocp_resolver_service.hpp @@ -26,10 +26,10 @@ #include #include -#include "src/detail/windows.hpp" -#include "src/detail/win_overlapped_op.hpp" -#include "src/detail/win_mutex.hpp" -#include "src/detail/win_wsa_init.hpp" +#include "detail/windows.hpp" +#include "detail/win_overlapped_op.hpp" +#include "detail/win_mutex.hpp" +#include "detail/win_wsa_init.hpp" #include diff --git a/src/src/detail/win_iocp_scheduler.cpp b/src/detail/win_iocp_scheduler.cpp similarity index 99% rename from src/src/detail/win_iocp_scheduler.cpp rename to src/detail/win_iocp_scheduler.cpp index 5114c55..bccd1db 100644 --- a/src/src/detail/win_iocp_scheduler.cpp +++ b/src/detail/win_iocp_scheduler.cpp @@ -9,8 +9,8 @@ #ifdef _WIN32 -#include "src/detail/win_iocp_scheduler.hpp" -#include "src/detail/win_overlapped_op.hpp" +#include "detail/win_iocp_scheduler.hpp" +#include "detail/win_overlapped_op.hpp" #include #include diff --git a/src/src/detail/win_iocp_scheduler.hpp b/src/detail/win_iocp_scheduler.hpp similarity index 97% rename from src/src/detail/win_iocp_scheduler.hpp rename to src/detail/win_iocp_scheduler.hpp index cc7d829..f98af97 100644 --- a/src/src/detail/win_iocp_scheduler.hpp +++ b/src/detail/win_iocp_scheduler.hpp @@ -16,13 +16,13 @@ #include #include -#include "src/detail/win_mutex.hpp" +#include "detail/win_mutex.hpp" #include #include #include -#include "src/detail/windows.hpp" +#include "detail/windows.hpp" namespace boost { namespace corosio { diff --git a/src/src/detail/win_iocp_sockets.cpp b/src/detail/win_iocp_sockets.cpp similarity index 99% rename from src/src/detail/win_iocp_sockets.cpp rename to src/detail/win_iocp_sockets.cpp index 90f5c70..6d8e2cc 100644 --- a/src/src/detail/win_iocp_sockets.cpp +++ b/src/detail/win_iocp_sockets.cpp @@ -9,9 +9,9 @@ #ifdef _WIN32 -#include "src/detail/win_iocp_sockets.hpp" -#include "src/detail/win_iocp_scheduler.hpp" -#include "src/detail/endpoint_convert.hpp" +#include "detail/win_iocp_sockets.hpp" +#include "detail/win_iocp_scheduler.hpp" +#include "detail/endpoint_convert.hpp" namespace boost { namespace corosio { diff --git a/src/src/detail/win_iocp_sockets.hpp b/src/detail/win_iocp_sockets.hpp similarity index 98% rename from src/src/detail/win_iocp_sockets.hpp rename to src/detail/win_iocp_sockets.hpp index a1a6132..9182a1f 100644 --- a/src/src/detail/win_iocp_sockets.hpp +++ b/src/detail/win_iocp_sockets.hpp @@ -18,10 +18,10 @@ #include #include -#include "src/detail/windows.hpp" -#include "src/detail/win_overlapped_op.hpp" -#include "src/detail/win_mutex.hpp" -#include "src/detail/win_wsa_init.hpp" +#include "detail/windows.hpp" +#include "detail/win_overlapped_op.hpp" +#include "detail/win_mutex.hpp" +#include "detail/win_wsa_init.hpp" #include #include diff --git a/src/src/detail/win_mutex.hpp b/src/detail/win_mutex.hpp similarity index 97% rename from src/src/detail/win_mutex.hpp rename to src/detail/win_mutex.hpp index ef9e8bb..01dc7ae 100644 --- a/src/src/detail/win_mutex.hpp +++ b/src/detail/win_mutex.hpp @@ -12,7 +12,7 @@ #include -#include "src/detail/windows.hpp" +#include "detail/windows.hpp" namespace boost { namespace corosio { diff --git a/src/src/detail/win_overlapped_op.hpp b/src/detail/win_overlapped_op.hpp similarity index 99% rename from src/src/detail/win_overlapped_op.hpp rename to src/detail/win_overlapped_op.hpp index 3e4e57c..399e5b3 100644 --- a/src/src/detail/win_overlapped_op.hpp +++ b/src/detail/win_overlapped_op.hpp @@ -23,7 +23,7 @@ #include #include -#include "src/detail/windows.hpp" +#include "detail/windows.hpp" namespace boost { namespace corosio { diff --git a/src/src/detail/win_wsa_init.cpp b/src/detail/win_wsa_init.cpp similarity index 96% rename from src/src/detail/win_wsa_init.cpp rename to src/detail/win_wsa_init.cpp index 1bab657..7514704 100644 --- a/src/src/detail/win_wsa_init.cpp +++ b/src/detail/win_wsa_init.cpp @@ -9,7 +9,7 @@ #ifdef _WIN32 -#include "src/detail/win_wsa_init.hpp" +#include "detail/win_wsa_init.hpp" #include diff --git a/src/src/detail/win_wsa_init.hpp b/src/detail/win_wsa_init.hpp similarity index 96% rename from src/src/detail/win_wsa_init.hpp rename to src/detail/win_wsa_init.hpp index aa69b74..45e5ea4 100644 --- a/src/src/detail/win_wsa_init.hpp +++ b/src/detail/win_wsa_init.hpp @@ -12,7 +12,7 @@ #include -#include "src/detail/windows.hpp" +#include "detail/windows.hpp" namespace boost { namespace corosio { diff --git a/src/src/detail/windows.hpp b/src/detail/windows.hpp similarity index 100% rename from src/src/detail/windows.hpp rename to src/detail/windows.hpp diff --git a/src/src/io_context.cpp b/src/io_context.cpp similarity index 90% rename from src/src/io_context.cpp rename to src/io_context.cpp index b869068..ec31287 100644 --- a/src/src/io_context.cpp +++ b/src/io_context.cpp @@ -10,9 +10,9 @@ #include #ifdef _WIN32 -#include "src/detail/win_iocp_scheduler.hpp" +#include "detail/win_iocp_scheduler.hpp" #else -#include "src/detail/posix_scheduler.hpp" +#include "detail/posix_scheduler.hpp" #endif #include diff --git a/src/src/resolver.cpp b/src/resolver.cpp similarity index 82% rename from src/src/resolver.cpp rename to src/resolver.cpp index 07eaa92..ddf828a 100644 --- a/src/src/resolver.cpp +++ b/src/resolver.cpp @@ -10,7 +10,9 @@ #include #ifdef _WIN32 -#include "src/detail/win_iocp_resolver_service.hpp" +#include "detail/win_iocp_resolver_service.hpp" +#else +#include "detail/posix_resolver_service.hpp" #endif namespace boost { @@ -21,7 +23,8 @@ namespace { using resolver_service = detail::win_iocp_resolver_service; using resolver_impl_type = detail::win_resolver_impl; #else -#error "Unsupported platform" +using resolver_service = detail::posix_resolver_service; +using resolver_impl_type = detail::posix_resolver_impl; #endif } // namespace diff --git a/src/src/socket.cpp b/src/socket.cpp similarity index 88% rename from src/src/socket.cpp rename to src/socket.cpp index cb0872f..b52c032 100644 --- a/src/src/socket.cpp +++ b/src/socket.cpp @@ -10,7 +10,9 @@ #include #ifdef _WIN32 -#include "src/detail/win_iocp_sockets.hpp" +#include "detail/win_iocp_sockets.hpp" +#else +#include "detail/posix_sockets.hpp" #endif #include @@ -25,7 +27,8 @@ namespace { using socket_service = detail::win_iocp_sockets; using socket_impl_type = detail::win_socket_impl; #else -#error "Unsupported platform" +using socket_service = detail::posix_sockets; +using socket_impl_type = detail::posix_socket_impl; #endif } // namespace diff --git a/src/src/detail/posix_scheduler.cpp b/src/src/detail/posix_scheduler.cpp deleted file mode 100644 index d4a3af7..0000000 --- a/src/src/detail/posix_scheduler.cpp +++ /dev/null @@ -1,329 +0,0 @@ -// -// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com) -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -// Official repository: https://github.com/cppalliance/corosio -// - -#ifndef _WIN32 - -#include "src/detail/posix_scheduler.hpp" - -#include - -#include - -namespace boost { -namespace corosio { -namespace detail { - -namespace { - -struct scheduler_context -{ - posix_scheduler const* key; - scheduler_context* next; -}; - -capy::thread_local_ptr context_stack; - -struct thread_context_guard -{ - scheduler_context frame_; - - explicit thread_context_guard( - posix_scheduler const* ctx) noexcept - : frame_{ctx, context_stack.get()} - { - context_stack.set(&frame_); - } - - ~thread_context_guard() noexcept - { - context_stack.set(frame_.next); - } -}; - -} // namespace - -posix_scheduler:: -posix_scheduler( - capy::execution_context&, - int) - : outstanding_work_(0) - , stopped_(false) - , shutdown_(false) -{ -} - -posix_scheduler:: -~posix_scheduler() -{ -} - -void -posix_scheduler:: -shutdown() -{ - std::unique_lock lock(mutex_); - shutdown_ = true; - - // Drain all outstanding operations without invoking handlers - while (outstanding_work_.load(std::memory_order_acquire) > 0) - { - while (auto* h = completed_ops_.pop()) - { - outstanding_work_.fetch_sub(1, std::memory_order_relaxed); - lock.unlock(); - h->destroy(); - lock.lock(); - } - - // If work count still positive but queue empty, - // wait briefly for more completions - if (outstanding_work_.load(std::memory_order_acquire) > 0 && - completed_ops_.empty()) - { - lock.unlock(); - std::this_thread::yield(); - lock.lock(); - } - } -} - -void -posix_scheduler:: -post(capy::any_coro h) const -{ - struct post_handler - : capy::execution_context::handler - { - capy::any_coro h_; - - explicit - post_handler(capy::any_coro h) - : h_(h) - { - } - - void operator()() override - { - auto h = h_; - delete this; - h.resume(); - } - - void destroy() override - { - delete this; - } - }; - - auto* ph = new post_handler(h); - outstanding_work_.fetch_add(1, std::memory_order_relaxed); - - { - std::lock_guard lock(mutex_); - completed_ops_.push(ph); - } - wakeup_.notify_one(); -} - -void -posix_scheduler:: -post(capy::execution_context::handler* h) const -{ - outstanding_work_.fetch_add(1, std::memory_order_relaxed); - - { - std::lock_guard lock(mutex_); - completed_ops_.push(h); - } - wakeup_.notify_one(); -} - -void -posix_scheduler:: -on_work_started() noexcept -{ - outstanding_work_.fetch_add(1, std::memory_order_relaxed); -} - -void -posix_scheduler:: -on_work_finished() noexcept -{ - if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1) - stop(); -} - -bool -posix_scheduler:: -running_in_this_thread() const noexcept -{ - for (auto* c = context_stack.get(); c != nullptr; c = c->next) - if (c->key == this) - return true; - return false; -} - -void -posix_scheduler:: -stop() -{ - bool expected = false; - if (stopped_.compare_exchange_strong(expected, true, - std::memory_order_release, std::memory_order_relaxed)) - { - std::lock_guard lock(mutex_); - wakeup_.notify_all(); - } -} - -bool -posix_scheduler:: -stopped() const noexcept -{ - return stopped_.load(std::memory_order_acquire); -} - -void -posix_scheduler:: -restart() -{ - stopped_.store(false, std::memory_order_release); -} - -std::size_t -posix_scheduler:: -run() -{ - if (outstanding_work_.load(std::memory_order_acquire) == 0) - { - stop(); - return 0; - } - - thread_context_guard ctx(this); - - std::size_t n = 0; - while (do_one(-1)) - if (n != (std::numeric_limits::max)()) - ++n; - return n; -} - -std::size_t -posix_scheduler:: -run_one() -{ - if (outstanding_work_.load(std::memory_order_acquire) == 0) - { - stop(); - return 0; - } - - thread_context_guard ctx(this); - return do_one(-1); -} - -std::size_t -posix_scheduler:: -wait_one(long usec) -{ - if (outstanding_work_.load(std::memory_order_acquire) == 0) - { - stop(); - return 0; - } - - thread_context_guard ctx(this); - return do_one(usec); -} - -std::size_t -posix_scheduler:: -poll() -{ - if (outstanding_work_.load(std::memory_order_acquire) == 0) - { - stop(); - return 0; - } - - thread_context_guard ctx(this); - - std::size_t n = 0; - while (do_one(0)) - if (n != (std::numeric_limits::max)()) - ++n; - return n; -} - -std::size_t -posix_scheduler:: -poll_one() -{ - if (outstanding_work_.load(std::memory_order_acquire) == 0) - { - stop(); - return 0; - } - - thread_context_guard ctx(this); - return do_one(0); -} - -// RAII guard - work_finished called even if handler throws -struct work_guard -{ - posix_scheduler* self; - ~work_guard() { self->on_work_finished(); } -}; - -std::size_t -posix_scheduler:: -do_one(long timeout_us) -{ - std::unique_lock lock(mutex_); - - // Check for available work or wait - if (timeout_us < 0) - { - // Infinite wait - wakeup_.wait(lock, [this] { - return stopped_.load(std::memory_order_acquire) || - !completed_ops_.empty(); - }); - } - else if (timeout_us > 0) - { - // Timed wait - wakeup_.wait_for(lock, std::chrono::microseconds(timeout_us), [this] { - return stopped_.load(std::memory_order_acquire) || - !completed_ops_.empty(); - }); - } - // timeout_us == 0: poll, no wait - - if (stopped_.load(std::memory_order_acquire)) - return 0; - - auto* h = completed_ops_.pop(); - if (!h) - return 0; - - lock.unlock(); - - work_guard g{this}; - (*h)(); - return 1; -} - -} // namespace detail -} // namespace corosio -} // namespace boost - -#endif