diff --git a/CMakeLists.txt b/CMakeLists.txt index 6930b1c..02a315e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -36,7 +36,7 @@ FetchContent_Declare( execution # SOURCE_DIR /execution GIT_REPOSITORY https://github.com/bemanproject/execution - GIT_TAG fa6d441 + GIT_TAG 330be95 ) FetchContent_MakeAvailable(execution) FetchContent_Declare( diff --git a/README.md b/README.md index 4479eaa..680fbe6 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,7 @@ int main() { } ``` -Full runnable examples can be found in `examples/` (e.g., [`./examples/hello.cpp`](./examples/hello.cpp)). +Full runnable examples can be found in [`examples/`](`./example`) (e.g., [`./examples/hello.cpp`](./examples/hello.cpp)). For some explanation see [`./docs/examples.md`](./docs/examples.md). ## Help Welcome diff --git a/docs/examples.md b/docs/examples.md index 785ef7b..ec7787e 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -2,7 +2,60 @@ ## Code used to prepare Dietmar's C++Now 2025 presentation -- [`c++now-affinity.cpp`](https://github.com/bemanproject/task/blob/main/examples/c%2B%2Bnow-affinity.cpp) [![Compiler Explorer](compiler-explorer.ico)](https://godbolt.org/z/8qEG5x7sz) +
+ +[`c++now-affinity.cpp`](https://github.com/bemanproject/task/blob/main/examples/c%2B%2Bnow-affinity.cpp) [![Compiler Explorer](compiler-explorer.ico)](https://godbolt.org/z/8qEG5x7sz): demo scheduler affinity + + +The example program +[`c++now-affinity.cpp`](https://github.com/bemanproject/task/blob/main/examples/c%2B%2Bnow-affinity.cpp) +[![Compiler Explorer](compiler-explorer.ico)](https://godbolt.org/z/8qEG5x7sz) uses +[`demo::thread_loop`](../examples/demo-thread_loop.hpp) to demonstrate +the behavior of _scheduler affinity_: the idea is that scheduler +affinity causes the coroutine to resume on the same scheduler as +the one the coroutine was started on. The program implements three +coroutines which have most of their behavior in common: + +1. Each coroutine is executed from `main` using sync_wait(_fun_(loop.get_scheduler())). +2. Each coroutine prints the id of the thread it is executing on prior to any `co_await` and after all `co_await` expression. +3. Each coroutine `co_await`s the result of `scheduler(sched) | then([]{ ... })` where the function passed to `then` just prints the thread id. +4. `work2` additionally changes the coroutines scheduler to be an `inline_scheduler` and later restores the original scehduler using `change_coroutine_scheduler`. +5. While `work1` and `work2` use the default scheduler (`task_scheduler` a +type-erased scheduler which gets initialized from the receiver's environment's `get_scheduler`), `work3` sets the coroutine's scheduler up to be `inline_scheduler`, effectively causing the coroutine to resume wherever +the `co_await`'s expression resumed. + +The output of the program is someting like the below: + +``` +before id=0x1fd635f00 +then id =0x16b64b000 +after id =0x1fd635f00 + +before id=0x1fd635f00 +then id =0x16b64b000 +after1 id=0x16b64b000 +then id =0x16b64b000 +after2 id=0x1fd635f00 + +before id=0x1fd635f00 +then id =0x16b64b000 +after id =0x16b64b000 +``` + +It shows that: + +1. The thread on which the `then`'s function is executed is always the +same and different from the thread each of the coroutines started on. +2. For `work1` the `co_await` resumes on the same thread as the one the +coroutine was started on. +3. For `work2` the first `co_await` after `schedule(sched)` resumes on +the thread used by `sched`. After restoring the original scheduler the +`co_await` resumes on the original thread. +4. For `work3` the `co_await` resumes on the thread used by `sched` as +the `inline_scheduler` doesn't do any actual scheduling. + +
+ - [`c++now-allocator.cpp`](https://github.com/bemanproject/task/blob/main/examples/c%2B%2Bnow-allocator.cpp) [![Compiler Explorer](compiler-explorer.ico)](https://godbolt.org/z/719v7en6a) - [`c++now-basic.cpp`](https://github.com/bemanproject/task/blob/main/examples/c%2B%2Bnow-basic.cpp) [![Compiler Explorer](compiler-explorer.ico)](https://godbolt.org/z/7Pn5TEhfK) - [`c++now-cancel.cpp`](https://github.com/bemanproject/task/blob/main/examples/c%2B%2Bnow-cancel.cpp) [![Compiler Explorer](compiler-explorer.ico)](https://godbolt.org/z/vx4PqYvE6) @@ -12,3 +65,24 @@ - [`c++now-return.cpp`](https://github.com/bemanproject/task/blob/main/examples/c%2B%2Bnow-return.cpp) [![Compiler Explorer](compiler-explorer.ico)](https://godbolt.org/z/f5YE5W4Ta) - [`c++now-stop_token.cpp`](https://github.com/bemanproject/task/blob/main/examples/c%2B%2Bnow-stop_token.cpp) [![Compiler Explorer](compiler-explorer.ico)](https://godbolt.org/z/TxYe3jEs7) - [`c++now-with_error.cpp`](https://github.com/bemanproject/task/blob/main/examples/c%2B%2Bnow-with_error.cpp) [![Compiler Explorer](compiler-explorer.ico)](https://godbolt.org/z/6oqox6zf8) + +## Tools Used By The Examples + +
+ +[`demo::thread_loop`](../examples/demo-thread_loop.hpp) is a `run_loop` whose `run()` is called from a `std::thread`. + + +Technically [`demo::thread_loop`](../examples/demo-thread_loop.hpp) +is a class `public`ly derived from `execution::run_loop` which is +also owning a `std::thread`. The `std::thread` is constructed with +a function object calling `run()` on the +[`demo::thread_loop`](../examples/demo-thread_loop.hpp) object. +Destroying the object calls `finish()` and then `join()`s the +`std::thread`: the destructor will block until the `execution::run_loop`'s +`run()` returns. + +The important bit is that work executed on the +[`demo::thread_loop`](../examples/demo-thread_loop.hpp)'s `scheduler` +will be executed on a corresponding `std::thread`. +
diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index ecae2c5..ba10af6 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -1,6 +1,7 @@ # SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception set(ALL_EXAMPLES + tls-scheduler customize issue-symmetric-transfer affinity diff --git a/examples/affinity.cpp b/examples/affinity.cpp index 0475e48..77742d7 100644 --- a/examples/affinity.cpp +++ b/examples/affinity.cpp @@ -3,7 +3,7 @@ #include #include -#include "demo-thread_pool.hpp" +#include "demo-thread_loop.hpp" #include #include @@ -30,11 +30,11 @@ struct non_affine { int main() { std::cout << std::unitbuf; - demo::thread_pool pool; + demo::thread_loop loop; ex::sync_wait(ex::just() | ex::then([]() noexcept { std::cout << "main:" << fmt_id << "\n"; })); - ex::sync_wait(ex::schedule(pool.get_scheduler()) | - ex::then([]() noexcept { std::cout << "pool:" << fmt_id << "\n"; })); - ex::sync_wait(ex::schedule(ex::task_scheduler(pool.get_scheduler())) | + ex::sync_wait(ex::schedule(loop.get_scheduler()) | + ex::then([]() noexcept { std::cout << "loop:" << fmt_id << "\n"; })); + ex::sync_wait(ex::schedule(ex::task_scheduler(loop.get_scheduler())) | ex::then([]() noexcept { std::cout << "any: " << fmt_id << "\n"; })); ex::sync_wait([]() -> ex::task { std::cout << "coro:" << fmt_id << "\n"; @@ -45,19 +45,19 @@ int main() { std::cout << "cor1:" << fmt_id << "\n"; co_await (ex::schedule(pl.get_scheduler()) | ex::then([] { std::cout << "then:" << fmt_id << "\n"; })); std::cout << "cor2:" << fmt_id << "\n"; - }(pool)); + }(loop)); std::cout << "not scheduler affine:\n"; ex::sync_wait([](auto& pl) -> ex::task { std::cout << "cor1:" << fmt_id << "\n"; co_await (ex::schedule(pl.get_scheduler()) | ex::then([] { std::cout << "then:" << fmt_id << "\n"; })); std::cout << "cor2:" << fmt_id << "\n"; - }(pool)); + }(loop)); std::cout << "use inline_scheduler:\n"; ex::sync_wait(ex::starts_on(ex::inline_scheduler{}, [](auto& pl) -> ex::task { std::cout << "cor1:" << fmt_id << "\n"; co_await (ex::schedule(pl.get_scheduler()) | ex::then([] { std::cout << "then:" << fmt_id << "\n"; })); std::cout << "cor2:" << fmt_id << "\n"; - }(pool))); + }(loop))); } diff --git a/examples/c++now-affinity.cpp b/examples/c++now-affinity.cpp index 0d35431..642fde8 100644 --- a/examples/c++now-affinity.cpp +++ b/examples/c++now-affinity.cpp @@ -4,6 +4,7 @@ #include #include #include +#include "demo-thread_loop.hpp" #include #include #include @@ -13,117 +14,7 @@ namespace ex = beman::execution; -// ---------------------------------------------------------------------------- - namespace { -class thread_context { - struct base { - base* next{}; - - virtual void do_run() = 0; - base() = default; - base(const base&) = delete; - base(base&&) = delete; - base& operator=(const base&) = delete; - base& operator=(base&&) = delete; - virtual ~base() = default; - }; - - ex::stop_source source; - std::mutex mutex; - std::condition_variable condition; - std::thread thread{[this] { this->run(this->source.get_token()); }}; - base* queue{}; - - void run(auto token) { - while (true) { - base* next(std::invoke([&]() -> base* { - std::unique_lock cerberus(this->mutex); - this->condition.wait(cerberus, - [this, &token] { return token.stop_requested() || nullptr != this->queue; }); - if (nullptr == this->queue) { - return nullptr; - } - base* n = std::exchange(queue, queue->next); - return n; - })); - if (next) { - next->do_run(); - } else { - break; - } - } - } - - public: - thread_context() = default; - thread_context(const thread_context&) = delete; - thread_context(thread_context&&) = delete; - ~thread_context() { - this->source.request_stop(); - this->condition.notify_one(); - this->thread.join(); - } - thread_context& operator=(const thread_context&) = delete; - thread_context& operator=(thread_context&&) = delete; - - void enqueue(base* work) noexcept { - { - std::lock_guard cerberus(this->mutex); - work->next = this->queue; - queue = work; - } - this->condition.notify_one(); - } - - template - struct state final : base { - using operation_state_concept = ex::operation_state_t; - Receiver receiver; - thread_context* context; - - template - state(thread_context* ctxt, R&& r) : receiver(std::forward(r)), context(ctxt) {} - void start() noexcept { - static_assert(ex::operation_state); - this->context->enqueue(this); - } - void do_run() override { ex::set_value(std::move(this->receiver)); } - }; - - struct scheduler; - struct env { - thread_context* context; - template - scheduler query(const ex::get_completion_scheduler_t&) const noexcept { - return {this->context}; - } - }; - struct sender { - using sender_concept = ex::sender_t; - using completion_signatures = ex::completion_signatures; - thread_context* context; - template - auto connect(Receiver&& r) { - return state>(this->context, std::forward(r)); - } - env get_env() const noexcept { return {this->context}; } - }; - static_assert(ex::sender); - // static_assert(ex::sender_in); - - struct scheduler { - using scheduler_concept = ex::scheduler_t; - thread_context* context; - - sender schedule() noexcept { return {this->context}; } - bool operator==(const scheduler&) const = default; - }; - static_assert(ex::scheduler); - scheduler get_scheduler() { return {this}; } -}; - -// ---------------------------------------------------------------------------- ex::task<> work1(auto sched) { std::cout << "before id=" << std::this_thread::get_id() << "\n"; @@ -155,10 +46,10 @@ ex::task work3(auto sched) { int main() { try { - thread_context context; - ex::sync_wait(work1(context.get_scheduler())); - ex::sync_wait(work2(context.get_scheduler())); - ex::sync_wait(work3(context.get_scheduler())); + demo::thread_loop loop; + ex::sync_wait(work1(loop.get_scheduler())); + ex::sync_wait(work2(loop.get_scheduler())); + ex::sync_wait(work3(loop.get_scheduler())); } catch (...) { std::cout << "ERROR: unexpected exception\n"; } diff --git a/examples/customize.cpp b/examples/customize.cpp index b9d9778..0a5e163 100644 --- a/examples/customize.cpp +++ b/examples/customize.cpp @@ -3,7 +3,6 @@ #include #include -#include "demo-thread_pool.hpp" #include #include #include diff --git a/examples/demo-thread_loop.hpp b/examples/demo-thread_loop.hpp new file mode 100644 index 0000000..5194dd3 --- /dev/null +++ b/examples/demo-thread_loop.hpp @@ -0,0 +1,33 @@ +// examples/demo-thread_loop.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_EXAMPLES_DEMO_THREAD_LOOP +#define INCLUDED_EXAMPLES_DEMO_THREAD_LOOP + +// ---------------------------------------------------------------------------- + +#include +#include +#include +#include +#include +#include + +namespace demo { + +class thread_loop : public ::beman::execution::run_loop { + private: + std::thread thread{std::bind(&thread_loop::run, this)}; + + public: + ~thread_loop() { + this->finish(); + this->thread.join(); + } +}; + +} // namespace demo + +// ---------------------------------------------------------------------------- + +#endif diff --git a/examples/demo-thread_pool.hpp b/examples/demo-thread_pool.hpp deleted file mode 100644 index 898cdc0..0000000 --- a/examples/demo-thread_pool.hpp +++ /dev/null @@ -1,105 +0,0 @@ -// examples/demo-thread_pool.hpp -*-C++-*- -// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception - -#ifndef INCLUDED_EXAMPLES_DEMO_THREAD_POOL -#define INCLUDED_EXAMPLES_DEMO_THREAD_POOL - -// ---------------------------------------------------------------------------- - -#include -#include -#include -#include -#include - -namespace demo { -struct thread_pool { - - struct node { - node* next; - virtual void run() = 0; - - protected: - ~node() = default; - }; - - std::mutex mutex; - std::condition_variable condition; - node* stack{}; - bool stopped{false}; - std::thread driver{[this] { - while (std::optional n = [this] { - std::unique_lock cerberus(mutex); - condition.wait(cerberus, [this] { return stopped || stack; }); - return this->stack ? std::optional(std::exchange(this->stack, this->stack->next)) - : std::optional(); - }()) { - (*n)->run(); - } - }}; - - thread_pool() = default; - ~thread_pool() { - this->stop(); - this->driver.join(); - } - void stop() { - { - std::lock_guard cerberus(this->mutex); - stopped = true; - } - this->condition.notify_one(); - } - - struct scheduler { - using scheduler_concept = beman::execution::scheduler_t; - struct env { - thread_pool* pool; - - template - scheduler query(const beman::execution::get_completion_scheduler_t&) const noexcept { - return {this->pool}; - } - }; - template - struct state final : thread_pool::node { - using operation_state_concept = beman::execution::operation_state_t; - std::remove_cvref_t receiver; - thread_pool* pool; - - template - state(R&& r, thread_pool* p) : node{}, receiver(std::forward(r)), pool(p) {} - void start() & noexcept { - { - std::lock_guard cerberus(this->pool->mutex); - this->next = std::exchange(this->pool->stack, this); - } - this->pool->condition.notify_one(); - } - void run() override { beman::execution::set_value(std::move(this->receiver)); } - }; - struct sender { - using sender_concept = beman::execution::sender_t; - using completion_signatures = beman::execution::completion_signatures; - thread_pool* pool; - template - state connect(Receiver&& receiver) { - return state(std::forward(receiver), pool); - } - - env get_env() const noexcept { return {this->pool}; } - }; - thread_pool* pool; - sender schedule() { return {this->pool}; } - bool operator==(const scheduler&) const = default; - }; - scheduler get_scheduler() { return {this}; } -}; - -static_assert(beman::execution::scheduler); - -} // namespace demo - -// ---------------------------------------------------------------------------- - -#endif diff --git a/examples/demo-tls_scheduler.hpp b/examples/demo-tls_scheduler.hpp new file mode 100644 index 0000000..eecbcea --- /dev/null +++ b/examples/demo-tls_scheduler.hpp @@ -0,0 +1,171 @@ +// examples/demo-tls_scheduler.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_EXAMPLES_DEMO_TLS_SCHEDULER +#define INCLUDED_EXAMPLES_DEMO_TLS_SCHEDULER + +#include +#include +#include + +// ---------------------------------------------------------------------------- + +namespace demo { +struct tls_domain { + template <::beman::execution::receiver Rcvr, typename Data> + struct affine_state_base { + std::remove_cvref_t rcvr; + Data data{}; + + auto complete() { + std::cout << "affine_state::complete\n"; + this->data.restore(); + ::beman::execution::set_value(::std::move(this->rcvr)); + } + }; + + template <::beman::execution::receiver Rcvr, typename Data> + struct affine_receiver { + using receiver_concept = ::beman::execution::receiver_t; + struct env_t { + affine_state_base* st; + template + requires requires(affine_state_base* self, Q&& q, A&&... a) { + std::forward(q)(::beman::execution::get_env(self->st->rcvr), ::std::forward(a)...); + } + auto query(Q&& q, A&&... a) const noexcept { + std::forward(q)(::beman::execution::get_env(this->st->rcvr, ::std::forward(a)...)); + } + }; + affine_state_base* st; + + template + auto set_error(E&& e) && noexcept -> void { + ::beman::execution::set_error(::std::move(this->st->rcvr), ::std::forward(e)); + } + auto set_stopped() && noexcept -> void { ::beman::execution::set_stopped(::std::move(this->st->rcvr)); } + auto set_value() && noexcept -> void { this->st->complete(); } + + auto get_env() const noexcept -> env_t { return {this->st}; } + }; + + template <::beman::execution::sender Sndr, ::beman::execution::scheduler Sch, ::beman::execution::receiver Rcvr> + struct affine_state : affine_state_base { + using operation_state_concept = ::beman::execution::operation_state_t; + using env_t = decltype(::beman::execution::get_env(std::declval())); + using base_t = affine_state_base; + using data_t = typename Sch::type; + using state_t = decltype(::beman::execution::connect( + ::beman::execution::affine_on(::std::declval(), ::std::declval()), + ::std::declval>())); + + state_t state; + + template <::beman::execution::sender S, ::beman::execution::scheduler SC, ::beman::execution::receiver R> + affine_state(S&& s, SC&& sc, R&& r) + : base_t{::std::forward(r)}, + state(::beman::execution::connect( + ::beman::execution::affine_on(::std::forward(s), ::std::forward(sc)), + affine_receiver{this})) {} + auto start() & noexcept -> void { + std::cout << "affine_state::start\n"; + this->data.save(); + ::beman::execution::start(this->state); + } + }; + template <::beman::execution::sender Sndr, ::beman::execution::scheduler Sch> + struct affine_sender { + using sender_concept = ::beman::execution::sender_t; + + std::remove_cvref_t sndr; + std::remove_cvref_t sch; + + template + auto get_completion_signatures(const Env& env) const noexcept { + return ::beman::execution::get_completion_signatures(this->sndr, env); + } + template <::beman::execution::receiver Rcvr> + auto connect(Rcvr&& rcvr) const& { + return affine_state(sndr, sch, std::forward(rcvr)); + } + template <::beman::execution::receiver Rcvr> + auto connect(Rcvr&& rcvr) && { + return affine_state(std::move(sndr), std::move(sch), std::forward(rcvr)); + } + }; + template <::beman::execution::sender Sndr, typename... Env> + requires std::same_as<::beman::execution::tag_of_t<::std::remove_cvref_t>, + ::beman::execution::affine_on_t> + auto transform_sender(Sndr&& s, const Env&...) const noexcept { + auto [tag, sch, sndr] = s; + return affine_sender{::beman::execution::detail::forward_like(sndr), + ::beman::execution::detail::forward_like(sch)}; + } +}; + +template +struct tls_scheduler { + using scheduler_concept = ::beman::execution::scheduler_t; + using type = Data; + struct env { + Scheduler sched; + template + auto query(const ::beman::execution::get_completion_scheduler_t&) const noexcept -> tls_scheduler { + return {this->sched}; + } + }; + template <::beman::execution::receiver Receiver> + struct state { + using operation_state_concept = ::beman::execution::operation_state_t; + using upstream_state_t = decltype(::beman::execution::connect( + ::beman::execution::schedule(std::declval()), std::declval>())); + + upstream_state_t upstream; + + template <::beman::execution::sender Sndr, ::beman::execution::receiver Rcvr> + state(Sndr&& sndr, Rcvr&& rcvr) + : upstream(::beman::execution::connect(std::forward(sndr), std::forward(rcvr))) {} + auto start() & noexcept -> void { ::beman::execution::start(this->upstream); } + }; + struct tls_sender { + using sender_concept = ::beman::execution::sender_t; + using sender_type = decltype(::beman::execution::schedule(std::declval())); + + Scheduler sched; + + template + auto get_completion_signatures(const Env&... env) const noexcept { + return ::beman::execution::get_completion_signatures(::beman::execution::schedule(Scheduler(this->sched)), + env...); + } + auto get_env() const noexcept -> env { return {this->sched}; } + + template <::beman::execution::receiver Receiver> + auto connect(Receiver&& rcvr) -> state { + return state(::beman::execution::schedule(this->sched), std::forward(rcvr)); + } + }; + + Scheduler sched; + template + requires(!std::same_as>) + tls_scheduler(Sch&& sch) : sched(sch) {} + tls_scheduler(const tls_scheduler&) = default; + tls_scheduler(tls_scheduler&&) = default; + tls_scheduler& operator=(const tls_scheduler&) = default; + tls_scheduler& operator=(tls_scheduler&&) = default; + + auto schedule() -> tls_sender { return {this->sched}; } + + auto query(const ::beman::execution::get_domain_t&) const noexcept -> tls_domain { return {}; } + + auto operator==(const tls_scheduler&) const -> bool = default; +}; +static_assert(::beman::execution::scheduler< + tls_scheduler().get_scheduler())>>); + +} // namespace demo + +// ---------------------------------------------------------------------------- + +#endif diff --git a/examples/environment.cpp b/examples/environment.cpp index 006e08b..8cccafc 100644 --- a/examples/environment.cpp +++ b/examples/environment.cpp @@ -11,7 +11,7 @@ #include #include #include "demo-scope.hpp" -#include "demo-thread_pool.hpp" +#include "demo-thread_loop.hpp" namespace ex = beman::execution; namespace net = beman::net; @@ -135,20 +135,20 @@ const std::string text("####"); [[maybe_unused]] const std::string white("\x1b[37m" + text + "\x1b[0m:"); int main() { - demo::thread_pool pool1; - demo::thread_pool pool2; + demo::thread_loop loop1; + demo::thread_loop loop2; net::io_context context; demo::scope scope; environment::set("main"); - ex::sync_wait(ex::schedule(pool1.get_scheduler()) | ex::then([] { environment::set("thread1"); }) | + ex::sync_wait(ex::schedule(loop1.get_scheduler()) | ex::then([] { environment::set("thread1"); }) | ex::then([] { std::cout << print_env << "\n"; })); std::cout << print_env << "\n"; - spawn(env_scheduler(magenta, pool1.get_scheduler()), scope, run(context.get_scheduler(), 100ms)); - spawn(env_scheduler(green, pool1.get_scheduler()), scope, run(context.get_scheduler(), 150ms)); - spawn(env_scheduler(blue, pool1.get_scheduler()), scope, run(context.get_scheduler(), 250ms)); + spawn(env_scheduler(magenta, loop1.get_scheduler()), scope, run(context.get_scheduler(), 100ms)); + spawn(env_scheduler(green, loop1.get_scheduler()), scope, run(context.get_scheduler(), 150ms)); + spawn(env_scheduler(blue, loop1.get_scheduler()), scope, run(context.get_scheduler(), 250ms)); while (!scope.empty()) { context.run(); diff --git a/examples/issue-start-reschedules.cpp b/examples/issue-start-reschedules.cpp index 750a948..f960d41 100644 --- a/examples/issue-start-reschedules.cpp +++ b/examples/issue-start-reschedules.cpp @@ -3,7 +3,7 @@ #include #include -#include "demo-thread_pool.hpp" +#include "demo-thread_loop.hpp" #include #include @@ -16,15 +16,15 @@ ex::task<> test(auto sched) { } int main() { - demo::thread_pool pool1; - demo::thread_pool pool2; + demo::thread_loop loop1; + demo::thread_loop loop2; std::cout << "main =" << std::this_thread::get_id() << "\n"; - ex::sync_wait(ex::schedule(pool1.get_scheduler()) | - ex::then([] { std::cout << "pool1=" << std::this_thread::get_id() << "\n"; })); - ex::sync_wait(ex::schedule(pool2.get_scheduler()) | - ex::then([] { std::cout << "pool2=" << std::this_thread::get_id() << "\n"; })); + ex::sync_wait(ex::schedule(loop1.get_scheduler()) | + ex::then([] { std::cout << "loop1=" << std::this_thread::get_id() << "\n"; })); + ex::sync_wait(ex::schedule(loop2.get_scheduler()) | + ex::then([] { std::cout << "loop2=" << std::this_thread::get_id() << "\n"; })); std::cout << "--- use 1 ---\n"; - ex::sync_wait(test(pool2.get_scheduler())); + ex::sync_wait(test(loop2.get_scheduler())); std::cout << "--- use 2 ---\n"; - ex::sync_wait(ex::starts_on(pool1.get_scheduler(), test(pool2.get_scheduler()))); + ex::sync_wait(ex::starts_on(loop1.get_scheduler(), test(loop2.get_scheduler()))); } diff --git a/examples/tls-scheduler.cpp b/examples/tls-scheduler.cpp new file mode 100644 index 0000000..eb8f5a2 --- /dev/null +++ b/examples/tls-scheduler.cpp @@ -0,0 +1,60 @@ +// examples/hello.cpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include "demo-tls_scheduler.hpp" +#include +#include +#include +#include + +namespace ex = beman::execution; + +// ---------------------------------------------------------------------------- + +class tls_data { + static thread_local std::string value; + + public: + static std::string get() { return tls_data::value; } + static void set(std::string_view v) { tls_data::value = v; } +}; + +thread_local std::string tls_data::value{}; + +struct tls_save { + std::optional value; + void save() { this->value = tls_data::get(); } + void restore() { tls_data::set(*this->value); } +}; + +struct tls_env { + using scheduler_type = demo::tls_scheduler; + // using scheduler_type = ex::inline_scheduler; +}; + +ex::task run_timer(std::string name) { + tls_data::set(name); + + for (int i = 0; i != 3; ++i) { + co_await ex::just(); + std::cout << "data=" << tls_data::get() << "\n"; + } +} + +int main() { + ex::counting_scope scope; + ex::sync_wait([](auto& scope) -> ex::task { + auto scheduler = co_await ex::read_env(ex::get_scheduler); + + ex::spawn(ex::write_env(run_timer("timer 1") | ex::upon_error([](auto&&) noexcept {}) | + ex::then([]() noexcept { std::cout << "loop done\n"; }), + ex::detail::make_env(ex::get_scheduler, scheduler)), + scope.get_token()); + ex::spawn(ex::write_env(run_timer("timer 2") | ex::upon_error([](auto&&) noexcept {}) | + ex::then([]() noexcept { std::cout << "loop done\n"; }), + ex::detail::make_env(ex::get_scheduler, scheduler)), + scope.get_token()); + co_return; + }(scope)); + ex::sync_wait(scope.join()); +} diff --git a/include/beman/task/detail/affine_on.hpp b/include/beman/task/detail/affine_on.hpp index 843a43d..950b7d3 100644 --- a/include/beman/task/detail/affine_on.hpp +++ b/include/beman/task/detail/affine_on.hpp @@ -19,12 +19,13 @@ struct affine_on_t { auto operator()(Sender&& sndr, Scheduler&& scheduler) const { using result_t = sender<::std::remove_cvref_t, ::std::remove_cvref_t>; static_assert(::beman::execution::sender); - return result_t{*this, ::std::forward(sndr), ::std::forward(scheduler)}; + return result_t(::std::forward(scheduler), ::std::forward(sndr)); } }; template <::beman::execution::sender Sender, ::beman::execution::scheduler Scheduler> -struct affine_on_t::sender { +struct affine_on_t::sender + : ::beman::execution::detail::product_type<::beman::task::detail::affine_on_t, Scheduler, Sender> { using sender_concept = ::beman::execution::sender_t; static constexpr bool elide_schedule = ::std::same_as<::beman::task::detail::inline_scheduler, Scheduler>; @@ -32,44 +33,51 @@ struct affine_on_t::sender { auto get_completion_signatures(const Env& env) const& noexcept { if constexpr (elide_schedule) { return ::beman::execution::get_completion_signatures( - ::std::remove_cvref_t(::std::move(this->upstream)), env); + ::std::remove_cvref_t(::std::move(this->template get<2>())), env); } else { return ::beman::execution::get_completion_signatures( - ::beman::execution::continues_on(this->upstream, this->scheduler), env); + ::beman::execution::continues_on(this->template get<2>(), this->template get<1>()), env); } } template auto get_completion_signatures(const Env& env) && noexcept { if constexpr (elide_schedule) { return ::beman::execution::get_completion_signatures( - ::std::remove_cvref_t(::std::move(this->upstream)), env); + ::std::remove_cvref_t(::std::move(this->template get<2>())), env); } else { return ::beman::execution::get_completion_signatures( - ::beman::execution::continues_on(::std::move(this->upstream), ::std::move(this->scheduler)), env); + ::beman::execution::continues_on(::std::move(this->template get<2>()), + ::std::move(this->template get<1>())), + env); } } - affine_on_t tag{}; - Sender upstream; - Scheduler scheduler; + template + sender(Sch&& sch, S&& s) + : ::beman::execution::detail::product_type<::beman::task::detail::affine_on_t, Scheduler, Sender>{ + {{::beman::task::detail::affine_on_t{}}, + {Scheduler(::std::forward(sch))}, + {Sender(::std::forward(s))}}} {} template <::beman::execution::receiver Receiver> auto connect(Receiver&& receiver) const& { if constexpr (elide_schedule) { - return ::beman::execution::connect(this->upstream, ::std::forward(receiver)); + return ::beman::execution::connect(this->template get<2>(), ::std::forward(receiver)); } else { - return ::beman::execution::connect(::beman::execution::continues_on(this->upstream, this->scheduler), - ::std::forward(receiver)); + return ::beman::execution::connect( + ::beman::execution::continues_on(this->template get<2>(), this->template get<1>()), + ::std::forward(receiver)); } } template <::beman::execution::receiver Receiver> auto connect(Receiver&& receiver) && { if constexpr (elide_schedule) { - return ::beman::execution::connect(::std::move(this->upstream), ::std::forward(receiver)); + return ::beman::execution::connect(::std::move(this->template get<2>()), + ::std::forward(receiver)); } else { - return ::beman::execution::connect( - ::beman::execution::continues_on(::std::move(this->upstream), ::std::move(this->scheduler)), - ::std::forward(receiver)); + return ::beman::execution::connect(::beman::execution::continues_on(::std::move(this->template get<2>()), + ::std::move(this->template get<1>())), + ::std::forward(receiver)); } } }; diff --git a/include/beman/task/detail/promise_env.hpp b/include/beman/task/detail/promise_env.hpp index 33c3ee4..7b7a4ae 100644 --- a/include/beman/task/detail/promise_env.hpp +++ b/include/beman/task/detail/promise_env.hpp @@ -14,13 +14,13 @@ template struct promise_env { const Promise* promise; - auto query(::beman::execution::get_allocator_t) const noexcept -> typename Promise::allocator_type { + auto query(const ::beman::execution::get_allocator_t&) const noexcept -> typename Promise::allocator_type { return this->promise->get_allocator(); } - auto query(::beman::execution::get_scheduler_t) const noexcept -> typename Promise::scheduler_type { + auto query(const ::beman::execution::get_scheduler_t&) const noexcept -> typename Promise::scheduler_type { return this->promise->get_scheduler(); } - auto query(::beman::execution::get_stop_token_t) const noexcept -> typename Promise::stop_token_type { + auto query(const ::beman::execution::get_stop_token_t&) const noexcept -> typename Promise::stop_token_type { return this->promise->get_stop_token(); } diff --git a/include/beman/task/detail/promise_type.hpp b/include/beman/task/detail/promise_type.hpp index 084b7df..309619f 100644 --- a/include/beman/task/detail/promise_type.hpp +++ b/include/beman/task/detail/promise_type.hpp @@ -74,9 +74,12 @@ class promise_type auto await_transform(Sender&& sender) noexcept { if constexpr (requires { ::std::forward(sender).as_awaitable(*this); - typename ::std::remove_cvref_t::task_concept; + // typename ::std::remove_cvref_t::task_concept; }) { return ::std::forward(sender).as_awaitable(*this); + } else if constexpr (::std::same_as<::beman::execution::tag_of_t<::std::remove_cvref_t>, + ::beman::execution::read_env_t>) { + return ::beman::execution::as_awaitable(::std::forward(sender), *this); } else { return ::beman::execution::as_awaitable( ::beman::task::affine_on(::std::forward(sender), this->get_scheduler()), *this); diff --git a/include/beman/task/detail/state.hpp b/include/beman/task/detail/state.hpp index daaf9b7..df2b315 100644 --- a/include/beman/task/detail/state.hpp +++ b/include/beman/task/detail/state.hpp @@ -72,6 +72,7 @@ struct state : ::beman::task::detail::state_base, ::beman::task::detail::s auto start() & noexcept -> void { this->handle.start(this).resume(); } std::coroutine_handle<> do_complete() override { + this->handle.reset(); this->result_complete(::std::move(this->receiver)); return std::noop_coroutine(); } diff --git a/infra/cmake/msvc-toolchain.cmake b/infra/cmake/msvc-toolchain.cmake index c2fffa7..9b8f1b8 100644 --- a/infra/cmake/msvc-toolchain.cmake +++ b/infra/cmake/msvc-toolchain.cmake @@ -23,7 +23,8 @@ set(CMAKE_CXX_COMPILER cl) if(BEMAN_BUILDSYS_SANITIZER STREQUAL "MaxSan") # /Zi flag (add debug symbol) is needed when using address sanitizer # See C5072: https://learn.microsoft.com/en-us/cpp/error-messages/compiler-warnings/compiler-warning-c5072 - set(SANITIZER_FLAGS "/fsanitize=address /Zi") + set(SANITIZER_FLAGS "/Zi") + # set(SANITIZER_FLAGS "/fsanitize=address /Zi") endif() set(CMAKE_CXX_FLAGS_DEBUG_INIT "/EHsc /permissive- ${SANITIZER_FLAGS}") diff --git a/tests/beman/task/CMakeLists.txt b/tests/beman/task/CMakeLists.txt index b807fd8..1d69bbe 100644 --- a/tests/beman/task/CMakeLists.txt +++ b/tests/beman/task/CMakeLists.txt @@ -3,11 +3,10 @@ list( APPEND task_tests - single_thread_context + msvc-asan-issue affine_on allocator_of allocator_support - task_scheduler completion error_types_of final_awaiter @@ -20,9 +19,11 @@ list( promise_type result_type scheduler_of + single_thread_context state_base sub_visit task + task_scheduler with_error ) diff --git a/tests/beman/task/msvc-asan-issue.test.cpp b/tests/beman/task/msvc-asan-issue.test.cpp new file mode 100644 index 0000000..0414de4 --- /dev/null +++ b/tests/beman/task/msvc-asan-issue.test.cpp @@ -0,0 +1,24 @@ +#include + +struct task { + struct final_awaiter { + static constexpr bool await_ready() noexcept { return false; } + static auto await_suspend(std::coroutine_handle<> h) noexcept { + h.destroy(); + return std::noop_coroutine(); + } + static constexpr void await_resume() noexcept {} + }; + struct promise_type { + constexpr std::suspend_always initial_suspend() noexcept { return {}; } + constexpr final_awaiter final_suspend() noexcept { return {}; } + void unhandled_exception() {} + task get_return_object() { return {std::coroutine_handle::from_promise(*this)}; } + void return_void() {} + }; + std::coroutine_handle<> h; +}; + +int main() { + []() -> task { co_return; }().h.resume(); +}