From cd7ac2684fb2def5eb6ec6231724073ddd1773a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Mon, 21 Jul 2025 00:29:36 +0200 Subject: [PATCH 01/12] created a basic affine_on-transforming scheduler --- examples/CMakeLists.txt | 1 + examples/tls-scheduler.cpp | 139 ++++++++++++++++++++++++ include/beman/task/detail/affine_on.hpp | 40 ++++--- 3 files changed, 164 insertions(+), 16 deletions(-) create mode 100644 examples/tls-scheduler.cpp diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index ecae2c5..ba10af6 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -1,6 +1,7 @@ # SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception set(ALL_EXAMPLES + tls-scheduler customize issue-symmetric-transfer affinity diff --git a/examples/tls-scheduler.cpp b/examples/tls-scheduler.cpp new file mode 100644 index 0000000..db4ac98 --- /dev/null +++ b/examples/tls-scheduler.cpp @@ -0,0 +1,139 @@ +// examples/hello.cpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include +#include +#include + +namespace ex = beman::execution; + +struct tls_domain { + template <::beman::execution::sender Sndr, ::beman::execution::scheduler Sch, ::beman::execution::receiver Rcvr> + struct affine_state { + struct receiver { + using receiver_concept = ::beman::execution::receiver_t; + affine_state* st; + auto set_value() && noexcept -> void { this->st->complete(); } + template + auto set_error(E&& e) && noexcept -> void { + ::beman::execution::set_error(::std::move(this->st->rcvr), ::std::forward(e)); + } + auto set_stopped() && noexcept -> void { ::beman::execution::set_stopped(::std::move(this->st->rcvr)); } + }; + using operation_state_concept = ::beman::execution::operation_state_t; + using state_t = decltype(::beman::execution::connect( + ::beman::execution::affine_on(::std::declval(), ::std::declval()), ::std::declval())); + + std::remove_cvref_t rcvr; + state_t state; + + template <::beman::execution::sender S, ::beman::execution::scheduler SC, ::beman::execution::receiver R> + affine_state(S&& s, SC&& sc, R&& r) + : rcvr(::std::forward(r)), + state(::beman::execution::connect( + ::beman::execution::affine_on(::std::forward(s), ::std::forward(sc)), receiver{this})) {} + auto start() & noexcept { + std::cout << "affine_state::start\n"; + ::beman::execution::start(this->state); + } + auto complete() { + std::cout << "affine_state::complete\n"; + ::beman::execution::set_value(::std::move(this->rcvr)); + } + }; + template <::beman::execution::sender Sndr, ::beman::execution::scheduler Sch> + struct affine_sender { + using sender_concept = ::beman::execution::sender_t; + + std::remove_cvref_t sndr; + std::remove_cvref_t sch; + + template + auto get_completion_signatures(const Env& env) const noexcept { + return ::beman::execution::get_completion_signatures(this->sndr, env); + } + template <::beman::execution::receiver Rcvr> + auto connect(Rcvr&& rcvr) && { + return affine_state(std::move(sndr), std::move(sch), std::forward(rcvr)); + } + }; + template <::beman::execution::sender Sndr, typename... Env> + requires std::same_as<::beman::execution::tag_of_t<::std::remove_cvref_t>, + ::beman::execution::affine_on_t> + auto transform_sender(Sndr&& s, const Env&... env) const noexcept { + auto [tag, sch, sndr] = s; + return affine_sender{::std::forward_like(sndr), + ::std::forward_like(sch)}; + } +}; + +template <::beman::execution::scheduler Scheduler> +struct tls_scheduler { + using scheduler_concept = ::beman::execution::scheduler_t; + struct env { + Scheduler sched; + template + auto query(const ::beman::execution::get_completion_scheduler_t&) const noexcept -> tls_scheduler { + return {this->sched}; + } + }; + template <::beman::execution::receiver Receiver> + struct state { + using operation_state_concept = ::beman::execution::operation_state_t; + using upstream_state_t = decltype(::beman::execution::connect( + ::beman::execution::schedule(std::declval()), std::declval>())); + + upstream_state_t upstream; + template <::beman::execution::sender Sndr, ::beman::execution::receiver Rcvr> + state(Sndr&& sndr, Rcvr&& rcvr) + : upstream(::beman::execution::connect(std::forward(sndr), std::forward(rcvr))) {} + auto start() & noexcept -> void { ::beman::execution::start(this->upstream); } + }; + struct tls_sender { + using sender_concept = ::beman::execution::sender_t; + using sender_type = decltype(::beman::execution::schedule(std::declval())); + + Scheduler sched; + + template + auto get_completion_signatures(const Env&... env) const noexcept { + return ::beman::execution::get_completion_signatures(::beman::execution::schedule(Scheduler(this->sched)), + env...); + } + auto get_env() const noexcept -> env { return {this->sched}; } + + template <::beman::execution::receiver Receiver> + auto connect(Receiver&& rcvr) && -> state { + return state(::beman::execution::schedule(this->sched), std::forward(rcvr)); + } + }; + + Scheduler sched; + template + requires(!std::same_as>) + tls_scheduler(Sch&& sch) : sched(sch) {} + + auto schedule() -> tls_sender { return {this->sched}; } + + auto query(const ::beman::execution::get_domain_t&) const noexcept -> tls_domain { return {}; } + + auto operator==(const tls_scheduler&) const -> bool = default; +}; +static_assert(::beman::execution::sender::tls_sender>); +static_assert(::beman::execution::sender_in::tls_sender, + ::beman::execution::empty_env>); +static_assert(::beman::execution::scheduler>); + +// ---------------------------------------------------------------------------- + +struct tls_env { + using scheduler_type = tls_scheduler<::beman::execution::task_scheduler>; +}; + +int main() { + ex::sync_wait([]() -> ex::task { + std::cout << "before co_await\n"; + co_await (ex::just() | ex::then([] { std::cout << "co_await'ed then\n"; })); + std::cout << "after co_await\n"; + }()); +} diff --git a/include/beman/task/detail/affine_on.hpp b/include/beman/task/detail/affine_on.hpp index 843a43d..950b7d3 100644 --- a/include/beman/task/detail/affine_on.hpp +++ b/include/beman/task/detail/affine_on.hpp @@ -19,12 +19,13 @@ struct affine_on_t { auto operator()(Sender&& sndr, Scheduler&& scheduler) const { using result_t = sender<::std::remove_cvref_t, ::std::remove_cvref_t>; static_assert(::beman::execution::sender); - return result_t{*this, ::std::forward(sndr), ::std::forward(scheduler)}; + return result_t(::std::forward(scheduler), ::std::forward(sndr)); } }; template <::beman::execution::sender Sender, ::beman::execution::scheduler Scheduler> -struct affine_on_t::sender { +struct affine_on_t::sender + : ::beman::execution::detail::product_type<::beman::task::detail::affine_on_t, Scheduler, Sender> { using sender_concept = ::beman::execution::sender_t; static constexpr bool elide_schedule = ::std::same_as<::beman::task::detail::inline_scheduler, Scheduler>; @@ -32,44 +33,51 @@ struct affine_on_t::sender { auto get_completion_signatures(const Env& env) const& noexcept { if constexpr (elide_schedule) { return ::beman::execution::get_completion_signatures( - ::std::remove_cvref_t(::std::move(this->upstream)), env); + ::std::remove_cvref_t(::std::move(this->template get<2>())), env); } else { return ::beman::execution::get_completion_signatures( - ::beman::execution::continues_on(this->upstream, this->scheduler), env); + ::beman::execution::continues_on(this->template get<2>(), this->template get<1>()), env); } } template auto get_completion_signatures(const Env& env) && noexcept { if constexpr (elide_schedule) { return ::beman::execution::get_completion_signatures( - ::std::remove_cvref_t(::std::move(this->upstream)), env); + ::std::remove_cvref_t(::std::move(this->template get<2>())), env); } else { return ::beman::execution::get_completion_signatures( - ::beman::execution::continues_on(::std::move(this->upstream), ::std::move(this->scheduler)), env); + ::beman::execution::continues_on(::std::move(this->template get<2>()), + ::std::move(this->template get<1>())), + env); } } - affine_on_t tag{}; - Sender upstream; - Scheduler scheduler; + template + sender(Sch&& sch, S&& s) + : ::beman::execution::detail::product_type<::beman::task::detail::affine_on_t, Scheduler, Sender>{ + {{::beman::task::detail::affine_on_t{}}, + {Scheduler(::std::forward(sch))}, + {Sender(::std::forward(s))}}} {} template <::beman::execution::receiver Receiver> auto connect(Receiver&& receiver) const& { if constexpr (elide_schedule) { - return ::beman::execution::connect(this->upstream, ::std::forward(receiver)); + return ::beman::execution::connect(this->template get<2>(), ::std::forward(receiver)); } else { - return ::beman::execution::connect(::beman::execution::continues_on(this->upstream, this->scheduler), - ::std::forward(receiver)); + return ::beman::execution::connect( + ::beman::execution::continues_on(this->template get<2>(), this->template get<1>()), + ::std::forward(receiver)); } } template <::beman::execution::receiver Receiver> auto connect(Receiver&& receiver) && { if constexpr (elide_schedule) { - return ::beman::execution::connect(::std::move(this->upstream), ::std::forward(receiver)); + return ::beman::execution::connect(::std::move(this->template get<2>()), + ::std::forward(receiver)); } else { - return ::beman::execution::connect( - ::beman::execution::continues_on(::std::move(this->upstream), ::std::move(this->scheduler)), - ::std::forward(receiver)); + return ::beman::execution::connect(::beman::execution::continues_on(::std::move(this->template get<2>()), + ::std::move(this->template get<1>())), + ::std::forward(receiver)); } } }; From 13880830047c4edd554a6b4add96d7eaf31f4e50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Tue, 22 Jul 2025 00:54:08 +0200 Subject: [PATCH 02/12] added an example using the tls_scheduler --- examples/demo-tls_scheduler.hpp | 158 +++++++++++++++++++++ examples/tls-scheduler.cpp | 155 +++++--------------- include/beman/task/detail/promise_env.hpp | 6 +- include/beman/task/detail/promise_type.hpp | 8 +- 4 files changed, 204 insertions(+), 123 deletions(-) create mode 100644 examples/demo-tls_scheduler.hpp diff --git a/examples/demo-tls_scheduler.hpp b/examples/demo-tls_scheduler.hpp new file mode 100644 index 0000000..133161d --- /dev/null +++ b/examples/demo-tls_scheduler.hpp @@ -0,0 +1,158 @@ +// examples/demo-tls_scheduler.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_EXAMPLES_DEMO_TLS_SCHEDULER +#define INCLUDED_EXAMPLES_DEMO_TLS_SCHEDULER + +#include +#include +#include + +// ---------------------------------------------------------------------------- + +namespace demo { +struct tls_domain { + template <::beman::execution::receiver Rcvr, typename Data> + struct affine_state_base { + std::remove_cvref_t rcvr; + Data data{}; + + auto complete() { + std::cout << "affine_state::complete\n"; + this->data.restore(); + ::beman::execution::set_value(::std::move(this->rcvr)); + } + }; + + template <::beman::execution::receiver Rcvr, typename Data> + struct affine_receiver { + using receiver_concept = ::beman::execution::receiver_t; + struct env_t { + affine_state_base* st; + //template + // requires requires(affine_state_base* self, Q&& q, A&&... a) { std::forward(q)(::beman::execution::get_env(self->st->rcvr), ::std::forward(a)...); } + //auto query(Q&& q, A&&... a) const noexcept { std::forward(q)(::beman::execution::get_env(this->st->rcvr, ::std::forward(a)...)); } + }; + affine_state_base* st; + auto set_value() && noexcept -> void { this->st->complete(); } + template + auto set_error(E&& e) && noexcept -> void { + ::beman::execution::set_error(::std::move(this->st->rcvr), ::std::forward(e)); + } + auto set_stopped() && noexcept -> void { ::beman::execution::set_stopped(::std::move(this->st->rcvr)); } + + auto get_env() const noexcept -> env_t { return { this->st }; } + }; + + template <::beman::execution::sender Sndr, ::beman::execution::scheduler Sch, ::beman::execution::receiver Rcvr> + struct affine_state: affine_state_base { + using operation_state_concept = ::beman::execution::operation_state_t; + using env_t = decltype(::beman::execution::get_env(std::declval())); + using base_t = affine_state_base; + using data_t = typename Sch::type; + using state_t = decltype(::beman::execution::connect( + ::beman::execution::affine_on(::std::declval(), ::std::declval()), ::std::declval>())); + + state_t state; + + template <::beman::execution::sender S, ::beman::execution::scheduler SC, ::beman::execution::receiver R> + affine_state(S&& s, SC&& sc, R&& r) + : base_t{::std::forward(r)}, + state(::beman::execution::connect( + ::beman::execution::affine_on(::std::forward(s), ::std::forward(sc)), affine_receiver{this})) {} + auto start() & noexcept { + std::cout << "affine_state::start\n"; + this->data.save(); + ::beman::execution::start(this->state); + } + }; + template <::beman::execution::sender Sndr, ::beman::execution::scheduler Sch> + struct affine_sender { + using sender_concept = ::beman::execution::sender_t; + + std::remove_cvref_t sndr; + std::remove_cvref_t sch; + + template + auto get_completion_signatures(const Env& env) const noexcept { + return ::beman::execution::get_completion_signatures(this->sndr, env); + } + template <::beman::execution::receiver Rcvr> + auto connect(Rcvr&& rcvr) && { + return affine_state(std::move(sndr), std::move(sch), std::forward(rcvr)); + } + }; + template <::beman::execution::sender Sndr, typename... Env> + requires std::same_as<::beman::execution::tag_of_t<::std::remove_cvref_t>, + ::beman::execution::affine_on_t> + auto transform_sender(Sndr&& s, const Env&... env) const noexcept { + auto [tag, sch, sndr] = s; + return affine_sender{::beman::execution::detail::forward_like(sndr), + ::beman::execution::detail::forward_like(sch)}; + } +}; + +template +struct tls_scheduler { + using scheduler_concept = ::beman::execution::scheduler_t; + using type = Data; + struct env { + Scheduler sched; + template + auto query(const ::beman::execution::get_completion_scheduler_t&) const noexcept -> tls_scheduler { + return {this->sched}; + } + }; + template <::beman::execution::receiver Receiver> + struct state { + using operation_state_concept = ::beman::execution::operation_state_t; + using upstream_state_t = decltype(::beman::execution::connect( + ::beman::execution::schedule(std::declval()), std::declval>())); + + upstream_state_t upstream; + + template <::beman::execution::sender Sndr, ::beman::execution::receiver Rcvr> + state(Sndr&& sndr, Rcvr&& rcvr) + : upstream(::beman::execution::connect(std::forward(sndr), std::forward(rcvr))) {} + auto start() & noexcept -> void { ::beman::execution::start(this->upstream); } + }; + struct tls_sender { + using sender_concept = ::beman::execution::sender_t; + using sender_type = decltype(::beman::execution::schedule(std::declval())); + + Scheduler sched; + + template + auto get_completion_signatures(const Env&... env) const noexcept { + return ::beman::execution::get_completion_signatures(::beman::execution::schedule(Scheduler(this->sched)), + env...); + } + auto get_env() const noexcept -> env { return {this->sched}; } + + template <::beman::execution::receiver Receiver> + auto connect(Receiver&& rcvr) && -> state { + return state(::beman::execution::schedule(this->sched), std::forward(rcvr)); + } + }; + + Scheduler sched; + template + requires(!std::same_as>) + tls_scheduler(Sch&& sch) : sched(sch) {} + tls_scheduler(tls_scheduler const&) = default; + tls_scheduler(tls_scheduler &&) = default; + tls_scheduler& operator=(tls_scheduler const&) = default; + tls_scheduler& operator=(tls_scheduler &&) = default; + + auto schedule() -> tls_sender { return {this->sched}; } + + auto query(const ::beman::execution::get_domain_t&) const noexcept -> tls_domain { return {}; } + + auto operator==(const tls_scheduler&) const -> bool = default; +}; + +} + +// ---------------------------------------------------------------------------- + +#endif diff --git a/examples/tls-scheduler.cpp b/examples/tls-scheduler.cpp index db4ac98..195488b 100644 --- a/examples/tls-scheduler.cpp +++ b/examples/tls-scheduler.cpp @@ -1,139 +1,58 @@ // examples/hello.cpp -*-C++-*- // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +#include "demo-tls_scheduler.hpp" #include #include #include +#include namespace ex = beman::execution; -struct tls_domain { - template <::beman::execution::sender Sndr, ::beman::execution::scheduler Sch, ::beman::execution::receiver Rcvr> - struct affine_state { - struct receiver { - using receiver_concept = ::beman::execution::receiver_t; - affine_state* st; - auto set_value() && noexcept -> void { this->st->complete(); } - template - auto set_error(E&& e) && noexcept -> void { - ::beman::execution::set_error(::std::move(this->st->rcvr), ::std::forward(e)); - } - auto set_stopped() && noexcept -> void { ::beman::execution::set_stopped(::std::move(this->st->rcvr)); } - }; - using operation_state_concept = ::beman::execution::operation_state_t; - using state_t = decltype(::beman::execution::connect( - ::beman::execution::affine_on(::std::declval(), ::std::declval()), ::std::declval())); - - std::remove_cvref_t rcvr; - state_t state; - - template <::beman::execution::sender S, ::beman::execution::scheduler SC, ::beman::execution::receiver R> - affine_state(S&& s, SC&& sc, R&& r) - : rcvr(::std::forward(r)), - state(::beman::execution::connect( - ::beman::execution::affine_on(::std::forward(s), ::std::forward(sc)), receiver{this})) {} - auto start() & noexcept { - std::cout << "affine_state::start\n"; - ::beman::execution::start(this->state); - } - auto complete() { - std::cout << "affine_state::complete\n"; - ::beman::execution::set_value(::std::move(this->rcvr)); - } - }; - template <::beman::execution::sender Sndr, ::beman::execution::scheduler Sch> - struct affine_sender { - using sender_concept = ::beman::execution::sender_t; +// ---------------------------------------------------------------------------- - std::remove_cvref_t sndr; - std::remove_cvref_t sch; +class tls_data { + static thread_local std::string value; - template - auto get_completion_signatures(const Env& env) const noexcept { - return ::beman::execution::get_completion_signatures(this->sndr, env); - } - template <::beman::execution::receiver Rcvr> - auto connect(Rcvr&& rcvr) && { - return affine_state(std::move(sndr), std::move(sch), std::forward(rcvr)); - } - }; - template <::beman::execution::sender Sndr, typename... Env> - requires std::same_as<::beman::execution::tag_of_t<::std::remove_cvref_t>, - ::beman::execution::affine_on_t> - auto transform_sender(Sndr&& s, const Env&... env) const noexcept { - auto [tag, sch, sndr] = s; - return affine_sender{::std::forward_like(sndr), - ::std::forward_like(sch)}; - } +public: + static std::string get() { return tls_data::value; } + static void set(std::string_view v) { tls_data::value = v; } }; -template <::beman::execution::scheduler Scheduler> -struct tls_scheduler { - using scheduler_concept = ::beman::execution::scheduler_t; - struct env { - Scheduler sched; - template - auto query(const ::beman::execution::get_completion_scheduler_t&) const noexcept -> tls_scheduler { - return {this->sched}; - } - }; - template <::beman::execution::receiver Receiver> - struct state { - using operation_state_concept = ::beman::execution::operation_state_t; - using upstream_state_t = decltype(::beman::execution::connect( - ::beman::execution::schedule(std::declval()), std::declval>())); - - upstream_state_t upstream; - template <::beman::execution::sender Sndr, ::beman::execution::receiver Rcvr> - state(Sndr&& sndr, Rcvr&& rcvr) - : upstream(::beman::execution::connect(std::forward(sndr), std::forward(rcvr))) {} - auto start() & noexcept -> void { ::beman::execution::start(this->upstream); } - }; - struct tls_sender { - using sender_concept = ::beman::execution::sender_t; - using sender_type = decltype(::beman::execution::schedule(std::declval())); - - Scheduler sched; - - template - auto get_completion_signatures(const Env&... env) const noexcept { - return ::beman::execution::get_completion_signatures(::beman::execution::schedule(Scheduler(this->sched)), - env...); - } - auto get_env() const noexcept -> env { return {this->sched}; } - - template <::beman::execution::receiver Receiver> - auto connect(Receiver&& rcvr) && -> state { - return state(::beman::execution::schedule(this->sched), std::forward(rcvr)); - } - }; - - Scheduler sched; - template - requires(!std::same_as>) - tls_scheduler(Sch&& sch) : sched(sch) {} +thread_local std::string tls_data::value{}; - auto schedule() -> tls_sender { return {this->sched}; } - - auto query(const ::beman::execution::get_domain_t&) const noexcept -> tls_domain { return {}; } - - auto operator==(const tls_scheduler&) const -> bool = default; +struct tls_save { + std::optional value; + void save() { this->value = tls_data::get(); } + void restore() { tls_data::set(*this->value); } }; -static_assert(::beman::execution::sender::tls_sender>); -static_assert(::beman::execution::sender_in::tls_sender, - ::beman::execution::empty_env>); -static_assert(::beman::execution::scheduler>); - -// ---------------------------------------------------------------------------- struct tls_env { - using scheduler_type = tls_scheduler<::beman::execution::task_scheduler>; + using scheduler_type = demo::tls_scheduler; + //using scheduler_type = ex::inline_scheduler; }; +ex::task run_timer(std::string name) { + tls_data::set(name); + + for (int i = 0; i != 3; ++i) { + co_await ex::just(); + std::cout << "data=" << tls_data::get() << "\n"; + } +} + int main() { - ex::sync_wait([]() -> ex::task { - std::cout << "before co_await\n"; - co_await (ex::just() | ex::then([] { std::cout << "co_await'ed then\n"; })); - std::cout << "after co_await\n"; - }()); + ex::counting_scope scope; + ex::sync_wait([](auto& scope)->ex::task { + auto scheduler = co_await ex::read_env(ex::get_scheduler); + +#if 1 + ex::spawn(ex::write_env(run_timer("timer 1") | ex::upon_error([](auto&&) noexcept {}) | ex::then([]() noexcept { std::cout << "loop done\n"; }), + ex::detail::make_env(ex::get_scheduler, scheduler)), scope.get_token()); + ex::spawn(ex::write_env(run_timer("timer 2") | ex::upon_error([](auto&&) noexcept {}) | ex::then([]() noexcept { std::cout << "loop done\n"; }), + ex::detail::make_env(ex::get_scheduler, scheduler)), scope.get_token()); +#endif + co_return; + }(scope)); + ex::sync_wait(scope.join()); } diff --git a/include/beman/task/detail/promise_env.hpp b/include/beman/task/detail/promise_env.hpp index 33c3ee4..b02561f 100644 --- a/include/beman/task/detail/promise_env.hpp +++ b/include/beman/task/detail/promise_env.hpp @@ -14,13 +14,13 @@ template struct promise_env { const Promise* promise; - auto query(::beman::execution::get_allocator_t) const noexcept -> typename Promise::allocator_type { + auto query(::beman::execution::get_allocator_t const&) const noexcept -> typename Promise::allocator_type { return this->promise->get_allocator(); } - auto query(::beman::execution::get_scheduler_t) const noexcept -> typename Promise::scheduler_type { + auto query(::beman::execution::get_scheduler_t const&) const noexcept -> typename Promise::scheduler_type { return this->promise->get_scheduler(); } - auto query(::beman::execution::get_stop_token_t) const noexcept -> typename Promise::stop_token_type { + auto query(::beman::execution::get_stop_token_t const&) const noexcept -> typename Promise::stop_token_type { return this->promise->get_stop_token(); } diff --git a/include/beman/task/detail/promise_type.hpp b/include/beman/task/detail/promise_type.hpp index 084b7df..e9de242 100644 --- a/include/beman/task/detail/promise_type.hpp +++ b/include/beman/task/detail/promise_type.hpp @@ -74,10 +74,14 @@ class promise_type auto await_transform(Sender&& sender) noexcept { if constexpr (requires { ::std::forward(sender).as_awaitable(*this); - typename ::std::remove_cvref_t::task_concept; + //typename ::std::remove_cvref_t::task_concept; }) { return ::std::forward(sender).as_awaitable(*this); - } else { + } + else if constexpr (::std::same_as<::beman::execution::tag_of_t<::std::remove_cvref_t>, ::beman::execution::read_env_t>) { + return ::beman::execution::as_awaitable(::std::forward(sender), *this); + } + else { return ::beman::execution::as_awaitable( ::beman::task::affine_on(::std::forward(sender), this->get_scheduler()), *this); } From dca9c55365fbffdacbad569a25295f70ad536d60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Tue, 29 Jul 2025 22:18:20 +0100 Subject: [PATCH 03/12] started to add some documentation of examples --- README.md | 2 +- docs/examples.md | 76 ++++++++++++++++- examples/affinity.cpp | 16 ++-- examples/c++now-affinity.cpp | 119 ++------------------------- examples/customize.cpp | 1 - examples/demo-thread_loop.hpp | 33 ++++++++ examples/demo-thread_pool.hpp | 105 ----------------------- examples/environment.cpp | 14 ++-- examples/issue-start-reschedules.cpp | 18 ++-- examples/tls-scheduler.cpp | 2 - 10 files changed, 138 insertions(+), 248 deletions(-) create mode 100644 examples/demo-thread_loop.hpp delete mode 100644 examples/demo-thread_pool.hpp diff --git a/README.md b/README.md index 4479eaa..680fbe6 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,7 @@ int main() { } ``` -Full runnable examples can be found in `examples/` (e.g., [`./examples/hello.cpp`](./examples/hello.cpp)). +Full runnable examples can be found in [`examples/`](`./example`) (e.g., [`./examples/hello.cpp`](./examples/hello.cpp)). For some explanation see [`./docs/examples.md`](./docs/examples.md). ## Help Welcome diff --git a/docs/examples.md b/docs/examples.md index 785ef7b..58a0e10 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -2,7 +2,60 @@ ## Code used to prepare Dietmar's C++Now 2025 presentation -- [`c++now-affinity.cpp`](https://github.com/bemanproject/task/blob/main/examples/c%2B%2Bnow-affinity.cpp) [![Compiler Explorer](compiler-explorer.ico)](https://godbolt.org/z/8qEG5x7sz) +
+ +[`c++now-affinity.cpp`](https://github.com/bemanproject/task/blob/main/examples/c%2B%2Bnow-affinity.cpp) [![Compiler Explorer](compiler-explorer.ico)](https://godbolt.org/z/8qEG5x7sz): demo scheduler affinity + + +The example program +[`c++now-affinity.cpp`](https://github.com/bemanproject/task/blob/main/examples/c%2B%2Bnow-affinity.cpp) +[![Compiler Explorer](compiler-explorer.ico)](https://godbolt.org/z/8qEG5x7sz) uses +[`demo::thread_loop`](../examples/demo-thread_loop.hpp) to demonstrate +the behavior of _scheduler affinity_: the idea is that scheduler +affinity causes the coroutine to resume on the same scheduler as +the one the coroutine was started on. The program implements three +coroutines which have most of their behavior in common: + +1. Each coroutine is executed from `main` using sync_wait(_fun_(loop.get_scheduler())). +2. Each coroutine prints the id of the thread it is executing on prior to any `co_await` and after all `co_await` expression. +3. Each coroutine `co_await`s the result of `scheduler(sched) | then([]{ ... })` where the function passed to `then` just prints the thread id. +4. `work2` additionally changes the coroutines scheduler to be an `inline_scheduler` and later restores the original scehduler using `change_coroutine_scheduler`. +5. While `work1` and `work2` use the default scheduler (`task_scheduler` a +type-erased scheduler which gets initialized from the receiver's environment's `get_scheduler`), `work3` sets the coroutine's scheduler up to be `inline_scheduler`, effectively causing the coroutine to resume wherever +the `co_await`'s expression resumed. + +The output of the program is someting like the below: + +``` +before id=0x1fd635f00 +then id =0x16b64b000 +after id =0x1fd635f00 + +before id=0x1fd635f00 +then id =0x16b64b000 +after1 id=0x16b64b000 +then id =0x16b64b000 +after2 id=0x1fd635f00 + +before id=0x1fd635f00 +then id =0x16b64b000 +after id =0x16b64b000 +``` + +It shows that: + +1. The thread on which the `then`'s function is executed is always the +same and different from the thread each of the coroutines started on. +2. For `work1` the `co_await` resumes on the same thread as the one the +coroutine was started on. +3. For `work2` the first `co_await` after `schedule(sched)` resumes on +the thread used by `sched`. After restoring the original scheduler the +`co_await` resumes on the original thread. +4. For `work3` the `co_await` resumes on the thread used by `sched` as +the `inline_scheduler` doesn't do any actual scheduling. + +
+ - [`c++now-allocator.cpp`](https://github.com/bemanproject/task/blob/main/examples/c%2B%2Bnow-allocator.cpp) [![Compiler Explorer](compiler-explorer.ico)](https://godbolt.org/z/719v7en6a) - [`c++now-basic.cpp`](https://github.com/bemanproject/task/blob/main/examples/c%2B%2Bnow-basic.cpp) [![Compiler Explorer](compiler-explorer.ico)](https://godbolt.org/z/7Pn5TEhfK) - [`c++now-cancel.cpp`](https://github.com/bemanproject/task/blob/main/examples/c%2B%2Bnow-cancel.cpp) [![Compiler Explorer](compiler-explorer.ico)](https://godbolt.org/z/vx4PqYvE6) @@ -12,3 +65,24 @@ - [`c++now-return.cpp`](https://github.com/bemanproject/task/blob/main/examples/c%2B%2Bnow-return.cpp) [![Compiler Explorer](compiler-explorer.ico)](https://godbolt.org/z/f5YE5W4Ta) - [`c++now-stop_token.cpp`](https://github.com/bemanproject/task/blob/main/examples/c%2B%2Bnow-stop_token.cpp) [![Compiler Explorer](compiler-explorer.ico)](https://godbolt.org/z/TxYe3jEs7) - [`c++now-with_error.cpp`](https://github.com/bemanproject/task/blob/main/examples/c%2B%2Bnow-with_error.cpp) [![Compiler Explorer](compiler-explorer.ico)](https://godbolt.org/z/6oqox6zf8) + +## Tools Used By The Examples + +
+ +[`demo::thread_loop`](../examples/demo-thread_loop.hpp) is a `run_loop` whose `run()` is called from a `std::thread`. + + +Technically [`demo::thread_loop`](../examples/demo-thread_loop.hpp) +is a class `public`ly derived from `execution::run_loop` which is +also owning a `std::thread`. The `std::thread` is constructed with +a function object calling `run()` on the +[`demo::thread_loop`](../examples/demo-thread_loop.hpp) object. +Destroying the object calls `finish()` and then `join()`s the +`std::thread`: the destructor will block until the `execution::run_loop`'s +`run()` returns. + +The important bit is that work executed on the +[`demo::thread_loop`](../examples/demo-thread_loop.hpp)'s `scheduler` +will be executed on a corresponding `std::thread`. +
diff --git a/examples/affinity.cpp b/examples/affinity.cpp index 0475e48..77742d7 100644 --- a/examples/affinity.cpp +++ b/examples/affinity.cpp @@ -3,7 +3,7 @@ #include #include -#include "demo-thread_pool.hpp" +#include "demo-thread_loop.hpp" #include #include @@ -30,11 +30,11 @@ struct non_affine { int main() { std::cout << std::unitbuf; - demo::thread_pool pool; + demo::thread_loop loop; ex::sync_wait(ex::just() | ex::then([]() noexcept { std::cout << "main:" << fmt_id << "\n"; })); - ex::sync_wait(ex::schedule(pool.get_scheduler()) | - ex::then([]() noexcept { std::cout << "pool:" << fmt_id << "\n"; })); - ex::sync_wait(ex::schedule(ex::task_scheduler(pool.get_scheduler())) | + ex::sync_wait(ex::schedule(loop.get_scheduler()) | + ex::then([]() noexcept { std::cout << "loop:" << fmt_id << "\n"; })); + ex::sync_wait(ex::schedule(ex::task_scheduler(loop.get_scheduler())) | ex::then([]() noexcept { std::cout << "any: " << fmt_id << "\n"; })); ex::sync_wait([]() -> ex::task { std::cout << "coro:" << fmt_id << "\n"; @@ -45,19 +45,19 @@ int main() { std::cout << "cor1:" << fmt_id << "\n"; co_await (ex::schedule(pl.get_scheduler()) | ex::then([] { std::cout << "then:" << fmt_id << "\n"; })); std::cout << "cor2:" << fmt_id << "\n"; - }(pool)); + }(loop)); std::cout << "not scheduler affine:\n"; ex::sync_wait([](auto& pl) -> ex::task { std::cout << "cor1:" << fmt_id << "\n"; co_await (ex::schedule(pl.get_scheduler()) | ex::then([] { std::cout << "then:" << fmt_id << "\n"; })); std::cout << "cor2:" << fmt_id << "\n"; - }(pool)); + }(loop)); std::cout << "use inline_scheduler:\n"; ex::sync_wait(ex::starts_on(ex::inline_scheduler{}, [](auto& pl) -> ex::task { std::cout << "cor1:" << fmt_id << "\n"; co_await (ex::schedule(pl.get_scheduler()) | ex::then([] { std::cout << "then:" << fmt_id << "\n"; })); std::cout << "cor2:" << fmt_id << "\n"; - }(pool))); + }(loop))); } diff --git a/examples/c++now-affinity.cpp b/examples/c++now-affinity.cpp index 0d35431..642fde8 100644 --- a/examples/c++now-affinity.cpp +++ b/examples/c++now-affinity.cpp @@ -4,6 +4,7 @@ #include #include #include +#include "demo-thread_loop.hpp" #include #include #include @@ -13,117 +14,7 @@ namespace ex = beman::execution; -// ---------------------------------------------------------------------------- - namespace { -class thread_context { - struct base { - base* next{}; - - virtual void do_run() = 0; - base() = default; - base(const base&) = delete; - base(base&&) = delete; - base& operator=(const base&) = delete; - base& operator=(base&&) = delete; - virtual ~base() = default; - }; - - ex::stop_source source; - std::mutex mutex; - std::condition_variable condition; - std::thread thread{[this] { this->run(this->source.get_token()); }}; - base* queue{}; - - void run(auto token) { - while (true) { - base* next(std::invoke([&]() -> base* { - std::unique_lock cerberus(this->mutex); - this->condition.wait(cerberus, - [this, &token] { return token.stop_requested() || nullptr != this->queue; }); - if (nullptr == this->queue) { - return nullptr; - } - base* n = std::exchange(queue, queue->next); - return n; - })); - if (next) { - next->do_run(); - } else { - break; - } - } - } - - public: - thread_context() = default; - thread_context(const thread_context&) = delete; - thread_context(thread_context&&) = delete; - ~thread_context() { - this->source.request_stop(); - this->condition.notify_one(); - this->thread.join(); - } - thread_context& operator=(const thread_context&) = delete; - thread_context& operator=(thread_context&&) = delete; - - void enqueue(base* work) noexcept { - { - std::lock_guard cerberus(this->mutex); - work->next = this->queue; - queue = work; - } - this->condition.notify_one(); - } - - template - struct state final : base { - using operation_state_concept = ex::operation_state_t; - Receiver receiver; - thread_context* context; - - template - state(thread_context* ctxt, R&& r) : receiver(std::forward(r)), context(ctxt) {} - void start() noexcept { - static_assert(ex::operation_state); - this->context->enqueue(this); - } - void do_run() override { ex::set_value(std::move(this->receiver)); } - }; - - struct scheduler; - struct env { - thread_context* context; - template - scheduler query(const ex::get_completion_scheduler_t&) const noexcept { - return {this->context}; - } - }; - struct sender { - using sender_concept = ex::sender_t; - using completion_signatures = ex::completion_signatures; - thread_context* context; - template - auto connect(Receiver&& r) { - return state>(this->context, std::forward(r)); - } - env get_env() const noexcept { return {this->context}; } - }; - static_assert(ex::sender); - // static_assert(ex::sender_in); - - struct scheduler { - using scheduler_concept = ex::scheduler_t; - thread_context* context; - - sender schedule() noexcept { return {this->context}; } - bool operator==(const scheduler&) const = default; - }; - static_assert(ex::scheduler); - scheduler get_scheduler() { return {this}; } -}; - -// ---------------------------------------------------------------------------- ex::task<> work1(auto sched) { std::cout << "before id=" << std::this_thread::get_id() << "\n"; @@ -155,10 +46,10 @@ ex::task work3(auto sched) { int main() { try { - thread_context context; - ex::sync_wait(work1(context.get_scheduler())); - ex::sync_wait(work2(context.get_scheduler())); - ex::sync_wait(work3(context.get_scheduler())); + demo::thread_loop loop; + ex::sync_wait(work1(loop.get_scheduler())); + ex::sync_wait(work2(loop.get_scheduler())); + ex::sync_wait(work3(loop.get_scheduler())); } catch (...) { std::cout << "ERROR: unexpected exception\n"; } diff --git a/examples/customize.cpp b/examples/customize.cpp index b9d9778..0a5e163 100644 --- a/examples/customize.cpp +++ b/examples/customize.cpp @@ -3,7 +3,6 @@ #include #include -#include "demo-thread_pool.hpp" #include #include #include diff --git a/examples/demo-thread_loop.hpp b/examples/demo-thread_loop.hpp new file mode 100644 index 0000000..ce5017a --- /dev/null +++ b/examples/demo-thread_loop.hpp @@ -0,0 +1,33 @@ +// examples/demo-thread_loop.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_EXAMPLES_DEMO_THREAD_LOOP +#define INCLUDED_EXAMPLES_DEMO_THREAD_LOOP + +// ---------------------------------------------------------------------------- + +#include +#include +#include +#include +#include +#include + +namespace demo { + +class thread_loop: public ::beman::execution::run_loop { +private: + std::thread thread{std::bind(&thread_loop::run, this)}; + +public: + ~thread_loop() { + this->finish(); + this->thread.join(); + } +}; + +} // namespace demo + +// ---------------------------------------------------------------------------- + +#endif diff --git a/examples/demo-thread_pool.hpp b/examples/demo-thread_pool.hpp deleted file mode 100644 index 898cdc0..0000000 --- a/examples/demo-thread_pool.hpp +++ /dev/null @@ -1,105 +0,0 @@ -// examples/demo-thread_pool.hpp -*-C++-*- -// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception - -#ifndef INCLUDED_EXAMPLES_DEMO_THREAD_POOL -#define INCLUDED_EXAMPLES_DEMO_THREAD_POOL - -// ---------------------------------------------------------------------------- - -#include -#include -#include -#include -#include - -namespace demo { -struct thread_pool { - - struct node { - node* next; - virtual void run() = 0; - - protected: - ~node() = default; - }; - - std::mutex mutex; - std::condition_variable condition; - node* stack{}; - bool stopped{false}; - std::thread driver{[this] { - while (std::optional n = [this] { - std::unique_lock cerberus(mutex); - condition.wait(cerberus, [this] { return stopped || stack; }); - return this->stack ? std::optional(std::exchange(this->stack, this->stack->next)) - : std::optional(); - }()) { - (*n)->run(); - } - }}; - - thread_pool() = default; - ~thread_pool() { - this->stop(); - this->driver.join(); - } - void stop() { - { - std::lock_guard cerberus(this->mutex); - stopped = true; - } - this->condition.notify_one(); - } - - struct scheduler { - using scheduler_concept = beman::execution::scheduler_t; - struct env { - thread_pool* pool; - - template - scheduler query(const beman::execution::get_completion_scheduler_t&) const noexcept { - return {this->pool}; - } - }; - template - struct state final : thread_pool::node { - using operation_state_concept = beman::execution::operation_state_t; - std::remove_cvref_t receiver; - thread_pool* pool; - - template - state(R&& r, thread_pool* p) : node{}, receiver(std::forward(r)), pool(p) {} - void start() & noexcept { - { - std::lock_guard cerberus(this->pool->mutex); - this->next = std::exchange(this->pool->stack, this); - } - this->pool->condition.notify_one(); - } - void run() override { beman::execution::set_value(std::move(this->receiver)); } - }; - struct sender { - using sender_concept = beman::execution::sender_t; - using completion_signatures = beman::execution::completion_signatures; - thread_pool* pool; - template - state connect(Receiver&& receiver) { - return state(std::forward(receiver), pool); - } - - env get_env() const noexcept { return {this->pool}; } - }; - thread_pool* pool; - sender schedule() { return {this->pool}; } - bool operator==(const scheduler&) const = default; - }; - scheduler get_scheduler() { return {this}; } -}; - -static_assert(beman::execution::scheduler); - -} // namespace demo - -// ---------------------------------------------------------------------------- - -#endif diff --git a/examples/environment.cpp b/examples/environment.cpp index 006e08b..8cccafc 100644 --- a/examples/environment.cpp +++ b/examples/environment.cpp @@ -11,7 +11,7 @@ #include #include #include "demo-scope.hpp" -#include "demo-thread_pool.hpp" +#include "demo-thread_loop.hpp" namespace ex = beman::execution; namespace net = beman::net; @@ -135,20 +135,20 @@ const std::string text("####"); [[maybe_unused]] const std::string white("\x1b[37m" + text + "\x1b[0m:"); int main() { - demo::thread_pool pool1; - demo::thread_pool pool2; + demo::thread_loop loop1; + demo::thread_loop loop2; net::io_context context; demo::scope scope; environment::set("main"); - ex::sync_wait(ex::schedule(pool1.get_scheduler()) | ex::then([] { environment::set("thread1"); }) | + ex::sync_wait(ex::schedule(loop1.get_scheduler()) | ex::then([] { environment::set("thread1"); }) | ex::then([] { std::cout << print_env << "\n"; })); std::cout << print_env << "\n"; - spawn(env_scheduler(magenta, pool1.get_scheduler()), scope, run(context.get_scheduler(), 100ms)); - spawn(env_scheduler(green, pool1.get_scheduler()), scope, run(context.get_scheduler(), 150ms)); - spawn(env_scheduler(blue, pool1.get_scheduler()), scope, run(context.get_scheduler(), 250ms)); + spawn(env_scheduler(magenta, loop1.get_scheduler()), scope, run(context.get_scheduler(), 100ms)); + spawn(env_scheduler(green, loop1.get_scheduler()), scope, run(context.get_scheduler(), 150ms)); + spawn(env_scheduler(blue, loop1.get_scheduler()), scope, run(context.get_scheduler(), 250ms)); while (!scope.empty()) { context.run(); diff --git a/examples/issue-start-reschedules.cpp b/examples/issue-start-reschedules.cpp index 750a948..f960d41 100644 --- a/examples/issue-start-reschedules.cpp +++ b/examples/issue-start-reschedules.cpp @@ -3,7 +3,7 @@ #include #include -#include "demo-thread_pool.hpp" +#include "demo-thread_loop.hpp" #include #include @@ -16,15 +16,15 @@ ex::task<> test(auto sched) { } int main() { - demo::thread_pool pool1; - demo::thread_pool pool2; + demo::thread_loop loop1; + demo::thread_loop loop2; std::cout << "main =" << std::this_thread::get_id() << "\n"; - ex::sync_wait(ex::schedule(pool1.get_scheduler()) | - ex::then([] { std::cout << "pool1=" << std::this_thread::get_id() << "\n"; })); - ex::sync_wait(ex::schedule(pool2.get_scheduler()) | - ex::then([] { std::cout << "pool2=" << std::this_thread::get_id() << "\n"; })); + ex::sync_wait(ex::schedule(loop1.get_scheduler()) | + ex::then([] { std::cout << "loop1=" << std::this_thread::get_id() << "\n"; })); + ex::sync_wait(ex::schedule(loop2.get_scheduler()) | + ex::then([] { std::cout << "loop2=" << std::this_thread::get_id() << "\n"; })); std::cout << "--- use 1 ---\n"; - ex::sync_wait(test(pool2.get_scheduler())); + ex::sync_wait(test(loop2.get_scheduler())); std::cout << "--- use 2 ---\n"; - ex::sync_wait(ex::starts_on(pool1.get_scheduler(), test(pool2.get_scheduler()))); + ex::sync_wait(ex::starts_on(loop1.get_scheduler(), test(loop2.get_scheduler()))); } diff --git a/examples/tls-scheduler.cpp b/examples/tls-scheduler.cpp index 195488b..b921cd6 100644 --- a/examples/tls-scheduler.cpp +++ b/examples/tls-scheduler.cpp @@ -46,12 +46,10 @@ int main() { ex::sync_wait([](auto& scope)->ex::task { auto scheduler = co_await ex::read_env(ex::get_scheduler); -#if 1 ex::spawn(ex::write_env(run_timer("timer 1") | ex::upon_error([](auto&&) noexcept {}) | ex::then([]() noexcept { std::cout << "loop done\n"; }), ex::detail::make_env(ex::get_scheduler, scheduler)), scope.get_token()); ex::spawn(ex::write_env(run_timer("timer 2") | ex::upon_error([](auto&&) noexcept {}) | ex::then([]() noexcept { std::cout << "loop done\n"; }), ex::detail::make_env(ex::get_scheduler, scheduler)), scope.get_token()); -#endif co_return; }(scope)); ex::sync_wait(scope.join()); From d327f487842022fe7bcec97576c555e2d763be24 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Thu, 31 Jul 2025 19:10:34 +0100 Subject: [PATCH 04/12] clang-format --- docs/examples.md | 2 +- examples/demo-thread_loop.hpp | 6 ++-- examples/demo-tls_scheduler.hpp | 40 ++++++++++++---------- examples/tls-scheduler.cpp | 22 +++++++----- include/beman/task/detail/promise_env.hpp | 6 ++-- include/beman/task/detail/promise_type.hpp | 9 +++-- 6 files changed, 46 insertions(+), 39 deletions(-) diff --git a/docs/examples.md b/docs/examples.md index 58a0e10..ec7787e 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -5,7 +5,7 @@
[`c++now-affinity.cpp`](https://github.com/bemanproject/task/blob/main/examples/c%2B%2Bnow-affinity.cpp) [![Compiler Explorer](compiler-explorer.ico)](https://godbolt.org/z/8qEG5x7sz): demo scheduler affinity - + The example program [`c++now-affinity.cpp`](https://github.com/bemanproject/task/blob/main/examples/c%2B%2Bnow-affinity.cpp) diff --git a/examples/demo-thread_loop.hpp b/examples/demo-thread_loop.hpp index ce5017a..5194dd3 100644 --- a/examples/demo-thread_loop.hpp +++ b/examples/demo-thread_loop.hpp @@ -15,11 +15,11 @@ namespace demo { -class thread_loop: public ::beman::execution::run_loop { -private: +class thread_loop : public ::beman::execution::run_loop { + private: std::thread thread{std::bind(&thread_loop::run, this)}; -public: + public: ~thread_loop() { this->finish(); this->thread.join(); diff --git a/examples/demo-tls_scheduler.hpp b/examples/demo-tls_scheduler.hpp index 133161d..a3e7a67 100644 --- a/examples/demo-tls_scheduler.hpp +++ b/examples/demo-tls_scheduler.hpp @@ -29,37 +29,41 @@ struct tls_domain { using receiver_concept = ::beman::execution::receiver_t; struct env_t { affine_state_base* st; - //template - // requires requires(affine_state_base* self, Q&& q, A&&... a) { std::forward(q)(::beman::execution::get_env(self->st->rcvr), ::std::forward(a)...); } - //auto query(Q&& q, A&&... a) const noexcept { std::forward(q)(::beman::execution::get_env(this->st->rcvr, ::std::forward(a)...)); } + // template + // requires requires(affine_state_base* self, Q&& q, A&&... a) { + // std::forward(q)(::beman::execution::get_env(self->st->rcvr), ::std::forward(a)...); } + // auto query(Q&& q, A&&... a) const noexcept { + // std::forward(q)(::beman::execution::get_env(this->st->rcvr, ::std::forward(a)...)); } }; affine_state_base* st; - auto set_value() && noexcept -> void { this->st->complete(); } + auto set_value() && noexcept -> void { this->st->complete(); } template auto set_error(E&& e) && noexcept -> void { ::beman::execution::set_error(::std::move(this->st->rcvr), ::std::forward(e)); } auto set_stopped() && noexcept -> void { ::beman::execution::set_stopped(::std::move(this->st->rcvr)); } - auto get_env() const noexcept -> env_t { return { this->st }; } + auto get_env() const noexcept -> env_t { return {this->st}; } }; template <::beman::execution::sender Sndr, ::beman::execution::scheduler Sch, ::beman::execution::receiver Rcvr> - struct affine_state: affine_state_base { + struct affine_state : affine_state_base { using operation_state_concept = ::beman::execution::operation_state_t; - using env_t = decltype(::beman::execution::get_env(std::declval())); - using base_t = affine_state_base; - using data_t = typename Sch::type; + using env_t = decltype(::beman::execution::get_env(std::declval())); + using base_t = affine_state_base; + using data_t = typename Sch::type; using state_t = decltype(::beman::execution::connect( - ::beman::execution::affine_on(::std::declval(), ::std::declval()), ::std::declval>())); + ::beman::execution::affine_on(::std::declval(), ::std::declval()), + ::std::declval>())); - state_t state; + state_t state; template <::beman::execution::sender S, ::beman::execution::scheduler SC, ::beman::execution::receiver R> affine_state(S&& s, SC&& sc, R&& r) : base_t{::std::forward(r)}, state(::beman::execution::connect( - ::beman::execution::affine_on(::std::forward(s), ::std::forward(sc)), affine_receiver{this})) {} + ::beman::execution::affine_on(::std::forward(s), ::std::forward(sc)), + affine_receiver{this})) {} auto start() & noexcept { std::cout << "affine_state::start\n"; this->data.save(); @@ -95,7 +99,7 @@ struct tls_domain { template struct tls_scheduler { using scheduler_concept = ::beman::execution::scheduler_t; - using type = Data; + using type = Data; struct env { Scheduler sched; template @@ -139,10 +143,10 @@ struct tls_scheduler { template requires(!std::same_as>) tls_scheduler(Sch&& sch) : sched(sch) {} - tls_scheduler(tls_scheduler const&) = default; - tls_scheduler(tls_scheduler &&) = default; - tls_scheduler& operator=(tls_scheduler const&) = default; - tls_scheduler& operator=(tls_scheduler &&) = default; + tls_scheduler(const tls_scheduler&) = default; + tls_scheduler(tls_scheduler&&) = default; + tls_scheduler& operator=(const tls_scheduler&) = default; + tls_scheduler& operator=(tls_scheduler&&) = default; auto schedule() -> tls_sender { return {this->sched}; } @@ -151,7 +155,7 @@ struct tls_scheduler { auto operator==(const tls_scheduler&) const -> bool = default; }; -} +} // namespace demo // ---------------------------------------------------------------------------- diff --git a/examples/tls-scheduler.cpp b/examples/tls-scheduler.cpp index b921cd6..eb8f5a2 100644 --- a/examples/tls-scheduler.cpp +++ b/examples/tls-scheduler.cpp @@ -14,7 +14,7 @@ namespace ex = beman::execution; class tls_data { static thread_local std::string value; -public: + public: static std::string get() { return tls_data::value; } static void set(std::string_view v) { tls_data::value = v; } }; @@ -23,13 +23,13 @@ thread_local std::string tls_data::value{}; struct tls_save { std::optional value; - void save() { this->value = tls_data::get(); } - void restore() { tls_data::set(*this->value); } + void save() { this->value = tls_data::get(); } + void restore() { tls_data::set(*this->value); } }; struct tls_env { using scheduler_type = demo::tls_scheduler; - //using scheduler_type = ex::inline_scheduler; + // using scheduler_type = ex::inline_scheduler; }; ex::task run_timer(std::string name) { @@ -43,13 +43,17 @@ ex::task run_timer(std::string name) { int main() { ex::counting_scope scope; - ex::sync_wait([](auto& scope)->ex::task { + ex::sync_wait([](auto& scope) -> ex::task { auto scheduler = co_await ex::read_env(ex::get_scheduler); - ex::spawn(ex::write_env(run_timer("timer 1") | ex::upon_error([](auto&&) noexcept {}) | ex::then([]() noexcept { std::cout << "loop done\n"; }), - ex::detail::make_env(ex::get_scheduler, scheduler)), scope.get_token()); - ex::spawn(ex::write_env(run_timer("timer 2") | ex::upon_error([](auto&&) noexcept {}) | ex::then([]() noexcept { std::cout << "loop done\n"; }), - ex::detail::make_env(ex::get_scheduler, scheduler)), scope.get_token()); + ex::spawn(ex::write_env(run_timer("timer 1") | ex::upon_error([](auto&&) noexcept {}) | + ex::then([]() noexcept { std::cout << "loop done\n"; }), + ex::detail::make_env(ex::get_scheduler, scheduler)), + scope.get_token()); + ex::spawn(ex::write_env(run_timer("timer 2") | ex::upon_error([](auto&&) noexcept {}) | + ex::then([]() noexcept { std::cout << "loop done\n"; }), + ex::detail::make_env(ex::get_scheduler, scheduler)), + scope.get_token()); co_return; }(scope)); ex::sync_wait(scope.join()); diff --git a/include/beman/task/detail/promise_env.hpp b/include/beman/task/detail/promise_env.hpp index b02561f..7b7a4ae 100644 --- a/include/beman/task/detail/promise_env.hpp +++ b/include/beman/task/detail/promise_env.hpp @@ -14,13 +14,13 @@ template struct promise_env { const Promise* promise; - auto query(::beman::execution::get_allocator_t const&) const noexcept -> typename Promise::allocator_type { + auto query(const ::beman::execution::get_allocator_t&) const noexcept -> typename Promise::allocator_type { return this->promise->get_allocator(); } - auto query(::beman::execution::get_scheduler_t const&) const noexcept -> typename Promise::scheduler_type { + auto query(const ::beman::execution::get_scheduler_t&) const noexcept -> typename Promise::scheduler_type { return this->promise->get_scheduler(); } - auto query(::beman::execution::get_stop_token_t const&) const noexcept -> typename Promise::stop_token_type { + auto query(const ::beman::execution::get_stop_token_t&) const noexcept -> typename Promise::stop_token_type { return this->promise->get_stop_token(); } diff --git a/include/beman/task/detail/promise_type.hpp b/include/beman/task/detail/promise_type.hpp index e9de242..309619f 100644 --- a/include/beman/task/detail/promise_type.hpp +++ b/include/beman/task/detail/promise_type.hpp @@ -74,14 +74,13 @@ class promise_type auto await_transform(Sender&& sender) noexcept { if constexpr (requires { ::std::forward(sender).as_awaitable(*this); - //typename ::std::remove_cvref_t::task_concept; + // typename ::std::remove_cvref_t::task_concept; }) { return ::std::forward(sender).as_awaitable(*this); - } - else if constexpr (::std::same_as<::beman::execution::tag_of_t<::std::remove_cvref_t>, ::beman::execution::read_env_t>) { + } else if constexpr (::std::same_as<::beman::execution::tag_of_t<::std::remove_cvref_t>, + ::beman::execution::read_env_t>) { return ::beman::execution::as_awaitable(::std::forward(sender), *this); - } - else { + } else { return ::beman::execution::as_awaitable( ::beman::task::affine_on(::std::forward(sender), this->get_scheduler()), *this); } From 3cd353d6ee9e58892aa1c2c2dded585ed41187c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Thu, 31 Jul 2025 22:28:08 +0100 Subject: [PATCH 05/12] some clean-up for tls_scheduler and updated execution reference --- CMakeLists.txt | 2 +- examples/demo-tls_scheduler.hpp | 25 +++++++++++++++++-------- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 6930b1c..02a315e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -36,7 +36,7 @@ FetchContent_Declare( execution # SOURCE_DIR /execution GIT_REPOSITORY https://github.com/bemanproject/execution - GIT_TAG fa6d441 + GIT_TAG 330be95 ) FetchContent_MakeAvailable(execution) FetchContent_Declare( diff --git a/examples/demo-tls_scheduler.hpp b/examples/demo-tls_scheduler.hpp index a3e7a67..363980b 100644 --- a/examples/demo-tls_scheduler.hpp +++ b/examples/demo-tls_scheduler.hpp @@ -29,19 +29,22 @@ struct tls_domain { using receiver_concept = ::beman::execution::receiver_t; struct env_t { affine_state_base* st; - // template - // requires requires(affine_state_base* self, Q&& q, A&&... a) { - // std::forward(q)(::beman::execution::get_env(self->st->rcvr), ::std::forward(a)...); } - // auto query(Q&& q, A&&... a) const noexcept { - // std::forward(q)(::beman::execution::get_env(this->st->rcvr, ::std::forward(a)...)); } + template + requires requires(affine_state_base* self, Q&& q, A&&... a) { + std::forward(q)(::beman::execution::get_env(self->st->rcvr), ::std::forward(a)...); + } + auto query(Q&& q, A&&... a) const noexcept { + std::forward(q)(::beman::execution::get_env(this->st->rcvr, ::std::forward(a)...)); + } }; affine_state_base* st; - auto set_value() && noexcept -> void { this->st->complete(); } + template auto set_error(E&& e) && noexcept -> void { ::beman::execution::set_error(::std::move(this->st->rcvr), ::std::forward(e)); } auto set_stopped() && noexcept -> void { ::beman::execution::set_stopped(::std::move(this->st->rcvr)); } + auto set_value() && noexcept -> void { this->st->complete(); } auto get_env() const noexcept -> env_t { return {this->st}; } }; @@ -64,7 +67,7 @@ struct tls_domain { state(::beman::execution::connect( ::beman::execution::affine_on(::std::forward(s), ::std::forward(sc)), affine_receiver{this})) {} - auto start() & noexcept { + auto start() & noexcept -> void { std::cout << "affine_state::start\n"; this->data.save(); ::beman::execution::start(this->state); @@ -82,6 +85,10 @@ struct tls_domain { return ::beman::execution::get_completion_signatures(this->sndr, env); } template <::beman::execution::receiver Rcvr> + auto connect(Rcvr&& rcvr) const& { + return affine_state(sndr, sch, std::forward(rcvr)); + } + template <::beman::execution::receiver Rcvr> auto connect(Rcvr&& rcvr) && { return affine_state(std::move(sndr), std::move(sch), std::forward(rcvr)); } @@ -134,7 +141,7 @@ struct tls_scheduler { auto get_env() const noexcept -> env { return {this->sched}; } template <::beman::execution::receiver Receiver> - auto connect(Receiver&& rcvr) && -> state { + auto connect(Receiver&& rcvr) -> state { return state(::beman::execution::schedule(this->sched), std::forward(rcvr)); } }; @@ -154,6 +161,8 @@ struct tls_scheduler { auto operator==(const tls_scheduler&) const -> bool = default; }; +static_assert(::beman::execution::scheduler< + tls_scheduler().get_scheduler())>>); } // namespace demo From 97f9cb4c84bb3848a5460a944a0655a40a6942b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Thu, 31 Jul 2025 22:55:15 +0100 Subject: [PATCH 06/12] fixed -Werror issue --- examples/demo-tls_scheduler.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/demo-tls_scheduler.hpp b/examples/demo-tls_scheduler.hpp index 363980b..eecbcea 100644 --- a/examples/demo-tls_scheduler.hpp +++ b/examples/demo-tls_scheduler.hpp @@ -96,7 +96,7 @@ struct tls_domain { template <::beman::execution::sender Sndr, typename... Env> requires std::same_as<::beman::execution::tag_of_t<::std::remove_cvref_t>, ::beman::execution::affine_on_t> - auto transform_sender(Sndr&& s, const Env&... env) const noexcept { + auto transform_sender(Sndr&& s, const Env&...) const noexcept { auto [tag, sch, sndr] = s; return affine_sender{::beman::execution::detail::forward_like(sndr), ::beman::execution::detail::forward_like(sch)}; From 91bf3d36688479505d649b71f97165d3137c961d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Fri, 1 Aug 2025 21:17:08 +0100 Subject: [PATCH 07/12] release the coroutine early --- include/beman/task/detail/state.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/include/beman/task/detail/state.hpp b/include/beman/task/detail/state.hpp index daaf9b7..4eccef2 100644 --- a/include/beman/task/detail/state.hpp +++ b/include/beman/task/detail/state.hpp @@ -72,6 +72,7 @@ struct state : ::beman::task::detail::state_base, ::beman::task::detail::s auto start() & noexcept -> void { this->handle.start(this).resume(); } std::coroutine_handle<> do_complete() override { + this->handle.release(); this->result_complete(::std::move(this->receiver)); return std::noop_coroutine(); } From 7c17bfbf393aaf9caf6bd4d40a67d74123b2a8fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Sun, 3 Aug 2025 21:07:43 +0100 Subject: [PATCH 08/12] trying to work around MSVC++ ASAN vs. coroutine issue --- .github/workflows/ci_tests.yml | 2 +- include/beman/task/detail/state.hpp | 2 +- tests/beman/task/CMakeLists.txt | 6 ++++-- tests/beman/task/msvc-asan-issue.test.cpp | 24 +++++++++++++++++++++++ 4 files changed, 30 insertions(+), 4 deletions(-) create mode 100644 tests/beman/task/msvc-asan-issue.test.cpp diff --git a/.github/workflows/ci_tests.yml b/.github/workflows/ci_tests.yml index c6b707b..13093bc 100644 --- a/.github/workflows/ci_tests.yml +++ b/.github/workflows/ci_tests.yml @@ -113,7 +113,7 @@ jobs: { "cxxversions": ["c++23"], "tests": [ { "stdlibs": ["stl"], - "tests": ["Debug.Default", "Release.Default", "Release.MaxSan"] + "tests": ["Debug.Default", "Release.Default"] } ] }, diff --git a/include/beman/task/detail/state.hpp b/include/beman/task/detail/state.hpp index 4eccef2..df2b315 100644 --- a/include/beman/task/detail/state.hpp +++ b/include/beman/task/detail/state.hpp @@ -72,7 +72,7 @@ struct state : ::beman::task::detail::state_base, ::beman::task::detail::s auto start() & noexcept -> void { this->handle.start(this).resume(); } std::coroutine_handle<> do_complete() override { - this->handle.release(); + this->handle.reset(); this->result_complete(::std::move(this->receiver)); return std::noop_coroutine(); } diff --git a/tests/beman/task/CMakeLists.txt b/tests/beman/task/CMakeLists.txt index b807fd8..872d5fa 100644 --- a/tests/beman/task/CMakeLists.txt +++ b/tests/beman/task/CMakeLists.txt @@ -3,11 +3,10 @@ list( APPEND task_tests - single_thread_context + msvc-asan-issue affine_on allocator_of allocator_support - task_scheduler completion error_types_of final_awaiter @@ -20,10 +19,13 @@ list( promise_type result_type scheduler_of + single_thread_context state_base sub_visit task + task_scheduler with_error + ) foreach(test ${task_tests}) diff --git a/tests/beman/task/msvc-asan-issue.test.cpp b/tests/beman/task/msvc-asan-issue.test.cpp new file mode 100644 index 0000000..0414de4 --- /dev/null +++ b/tests/beman/task/msvc-asan-issue.test.cpp @@ -0,0 +1,24 @@ +#include + +struct task { + struct final_awaiter { + static constexpr bool await_ready() noexcept { return false; } + static auto await_suspend(std::coroutine_handle<> h) noexcept { + h.destroy(); + return std::noop_coroutine(); + } + static constexpr void await_resume() noexcept {} + }; + struct promise_type { + constexpr std::suspend_always initial_suspend() noexcept { return {}; } + constexpr final_awaiter final_suspend() noexcept { return {}; } + void unhandled_exception() {} + task get_return_object() { return {std::coroutine_handle::from_promise(*this)}; } + void return_void() {} + }; + std::coroutine_handle<> h; +}; + +int main() { + []() -> task { co_return; }().h.resume(); +} From ec6d06216790ad61e7cbf12275ed9e0dcf972fa5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Sun, 3 Aug 2025 21:12:31 +0100 Subject: [PATCH 09/12] fix issues moaned about by CI --- .github/workflows/ci_tests.yml | 2 +- tests/beman/task/CMakeLists.txt | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/ci_tests.yml b/.github/workflows/ci_tests.yml index 13093bc..59dfc82 100644 --- a/.github/workflows/ci_tests.yml +++ b/.github/workflows/ci_tests.yml @@ -113,7 +113,7 @@ jobs: { "cxxversions": ["c++23"], "tests": [ { "stdlibs": ["stl"], - "tests": ["Debug.Default", "Release.Default"] + "tests": ["Release.Default"] } ] }, diff --git a/tests/beman/task/CMakeLists.txt b/tests/beman/task/CMakeLists.txt index 872d5fa..1d69bbe 100644 --- a/tests/beman/task/CMakeLists.txt +++ b/tests/beman/task/CMakeLists.txt @@ -25,7 +25,6 @@ list( task task_scheduler with_error - ) foreach(test ${task_tests}) From f269b0839bdfed81a49eccd0ad081eeadf3383e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Sun, 3 Aug 2025 21:18:19 +0100 Subject: [PATCH 10/12] remove address sanitizer for MSVC++ --- .github/workflows/ci_tests.yml | 2 +- infra/cmake/msvc-toolchain.cmake | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci_tests.yml b/.github/workflows/ci_tests.yml index 59dfc82..c6b707b 100644 --- a/.github/workflows/ci_tests.yml +++ b/.github/workflows/ci_tests.yml @@ -113,7 +113,7 @@ jobs: { "cxxversions": ["c++23"], "tests": [ { "stdlibs": ["stl"], - "tests": ["Release.Default"] + "tests": ["Debug.Default", "Release.Default", "Release.MaxSan"] } ] }, diff --git a/infra/cmake/msvc-toolchain.cmake b/infra/cmake/msvc-toolchain.cmake index c2fffa7..569dedd 100644 --- a/infra/cmake/msvc-toolchain.cmake +++ b/infra/cmake/msvc-toolchain.cmake @@ -23,6 +23,7 @@ set(CMAKE_CXX_COMPILER cl) if(BEMAN_BUILDSYS_SANITIZER STREQUAL "MaxSan") # /Zi flag (add debug symbol) is needed when using address sanitizer # See C5072: https://learn.microsoft.com/en-us/cpp/error-messages/compiler-warnings/compiler-warning-c5072 + # set(SANITIZER_FLAGS /Zi") set(SANITIZER_FLAGS "/fsanitize=address /Zi") endif() From 46ba59df2ddb54c40243ff76edb55173b0731539 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Sun, 3 Aug 2025 21:26:33 +0100 Subject: [PATCH 11/12] fix incorrect location of comment to disable address sanitizer --- infra/cmake/msvc-toolchain.cmake | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/infra/cmake/msvc-toolchain.cmake b/infra/cmake/msvc-toolchain.cmake index 569dedd..5150c28 100644 --- a/infra/cmake/msvc-toolchain.cmake +++ b/infra/cmake/msvc-toolchain.cmake @@ -23,8 +23,8 @@ set(CMAKE_CXX_COMPILER cl) if(BEMAN_BUILDSYS_SANITIZER STREQUAL "MaxSan") # /Zi flag (add debug symbol) is needed when using address sanitizer # See C5072: https://learn.microsoft.com/en-us/cpp/error-messages/compiler-warnings/compiler-warning-c5072 - # set(SANITIZER_FLAGS /Zi") - set(SANITIZER_FLAGS "/fsanitize=address /Zi") + set(SANITIZER_FLAGS /Zi") + # set(SANITIZER_FLAGS "/fsanitize=address /Zi") endif() set(CMAKE_CXX_FLAGS_DEBUG_INIT "/EHsc /permissive- ${SANITIZER_FLAGS}") From 1ff8ad5e0c17411aad72bf013ddceca2e23f233c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Sun, 3 Aug 2025 21:30:10 +0100 Subject: [PATCH 12/12] added a missing quote --- infra/cmake/msvc-toolchain.cmake | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/infra/cmake/msvc-toolchain.cmake b/infra/cmake/msvc-toolchain.cmake index 5150c28..9b8f1b8 100644 --- a/infra/cmake/msvc-toolchain.cmake +++ b/infra/cmake/msvc-toolchain.cmake @@ -23,7 +23,7 @@ set(CMAKE_CXX_COMPILER cl) if(BEMAN_BUILDSYS_SANITIZER STREQUAL "MaxSan") # /Zi flag (add debug symbol) is needed when using address sanitizer # See C5072: https://learn.microsoft.com/en-us/cpp/error-messages/compiler-warnings/compiler-warning-c5072 - set(SANITIZER_FLAGS /Zi") + set(SANITIZER_FLAGS "/Zi") # set(SANITIZER_FLAGS "/fsanitize=address /Zi") endif()