Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ FetchContent_Declare(
execution
# SOURCE_DIR <path-to>/execution
GIT_REPOSITORY https://github.com/bemanproject/execution
GIT_TAG fa6d441
GIT_TAG 330be95
)
FetchContent_MakeAvailable(execution)
FetchContent_Declare(
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ int main() {
}
```

Full runnable examples can be found in `examples/` (e.g., [`./examples/hello.cpp`](./examples/hello.cpp)).
Full runnable examples can be found in [`examples/`](`./example`) (e.g., [`./examples/hello.cpp`](./examples/hello.cpp)). For some explanation see [`./docs/examples.md`](./docs/examples.md).

## Help Welcome

Expand Down
76 changes: 75 additions & 1 deletion docs/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,60 @@

## Code used to prepare Dietmar's C++Now 2025 presentation

- [`c++now-affinity.cpp`](https://github.com/bemanproject/task/blob/main/examples/c%2B%2Bnow-affinity.cpp) [![Compiler Explorer](compiler-explorer.ico)](https://godbolt.org/z/8qEG5x7sz)
<details>
<summary>
[`c++now-affinity.cpp`](https://github.com/bemanproject/task/blob/main/examples/c%2B%2Bnow-affinity.cpp) [![Compiler Explorer](compiler-explorer.ico)](https://godbolt.org/z/8qEG5x7sz): demo scheduler affinity
</summary>

The example program
[`c++now-affinity.cpp`](https://github.com/bemanproject/task/blob/main/examples/c%2B%2Bnow-affinity.cpp)
[![Compiler Explorer](compiler-explorer.ico)](https://godbolt.org/z/8qEG5x7sz) uses
[`demo::thread_loop`](../examples/demo-thread_loop.hpp) to demonstrate
the behavior of _scheduler affinity_: the idea is that scheduler
affinity causes the coroutine to resume on the same scheduler as
the one the coroutine was started on. The program implements three
coroutines which have most of their behavior in common:

1. Each coroutine is executed from `main` using <code>sync_wait(_fun_(loop.get_scheduler()))</code>.
2. Each coroutine prints the id of the thread it is executing on prior to any `co_await` and after all `co_await` expression.
3. Each coroutine `co_await`s the result of `scheduler(sched) | then([]{ ... })` where the function passed to `then` just prints the thread id.
4. `work2` additionally changes the coroutines scheduler to be an `inline_scheduler` and later restores the original scehduler using `change_coroutine_scheduler`.
5. While `work1` and `work2` use the default scheduler (`task_scheduler` a
type-erased scheduler which gets initialized from the receiver's environment's `get_scheduler`), `work3` sets the coroutine's scheduler up to be `inline_scheduler`, effectively causing the coroutine to resume wherever
the `co_await`'s expression resumed.

The output of the program is someting like the below:

```
before id=0x1fd635f00
then id =0x16b64b000
after id =0x1fd635f00

before id=0x1fd635f00
then id =0x16b64b000
after1 id=0x16b64b000
then id =0x16b64b000
after2 id=0x1fd635f00

before id=0x1fd635f00
then id =0x16b64b000
after id =0x16b64b000
```

It shows that:

1. The thread on which the `then`'s function is executed is always the
same and different from the thread each of the coroutines started on.
2. For `work1` the `co_await` resumes on the same thread as the one the
coroutine was started on.
3. For `work2` the first `co_await` after `schedule(sched)` resumes on
the thread used by `sched`. After restoring the original scheduler the
`co_await` resumes on the original thread.
4. For `work3` the `co_await` resumes on the thread used by `sched` as
the `inline_scheduler` doesn't do any actual scheduling.

</details>

- [`c++now-allocator.cpp`](https://github.com/bemanproject/task/blob/main/examples/c%2B%2Bnow-allocator.cpp) [![Compiler Explorer](compiler-explorer.ico)](https://godbolt.org/z/719v7en6a)
- [`c++now-basic.cpp`](https://github.com/bemanproject/task/blob/main/examples/c%2B%2Bnow-basic.cpp) [![Compiler Explorer](compiler-explorer.ico)](https://godbolt.org/z/7Pn5TEhfK)
- [`c++now-cancel.cpp`](https://github.com/bemanproject/task/blob/main/examples/c%2B%2Bnow-cancel.cpp) [![Compiler Explorer](compiler-explorer.ico)](https://godbolt.org/z/vx4PqYvE6)
Expand All @@ -12,3 +65,24 @@
- [`c++now-return.cpp`](https://github.com/bemanproject/task/blob/main/examples/c%2B%2Bnow-return.cpp) [![Compiler Explorer](compiler-explorer.ico)](https://godbolt.org/z/f5YE5W4Ta)
- [`c++now-stop_token.cpp`](https://github.com/bemanproject/task/blob/main/examples/c%2B%2Bnow-stop_token.cpp) [![Compiler Explorer](compiler-explorer.ico)](https://godbolt.org/z/TxYe3jEs7)
- [`c++now-with_error.cpp`](https://github.com/bemanproject/task/blob/main/examples/c%2B%2Bnow-with_error.cpp) [![Compiler Explorer](compiler-explorer.ico)](https://godbolt.org/z/6oqox6zf8)

## Tools Used By The Examples

<details>
<summary>
[`demo::thread_loop`](../examples/demo-thread_loop.hpp) is a `run_loop` whose `run()` is called from a `std::thread`.
</summary>

Technically [`demo::thread_loop`](../examples/demo-thread_loop.hpp)
is a class `public`ly derived from `execution::run_loop` which is
also owning a `std::thread`. The `std::thread` is constructed with
a function object calling `run()` on the
[`demo::thread_loop`](../examples/demo-thread_loop.hpp) object.
Destroying the object calls `finish()` and then `join()`s the
`std::thread`: the destructor will block until the `execution::run_loop`'s
`run()` returns.

The important bit is that work executed on the
[`demo::thread_loop`](../examples/demo-thread_loop.hpp)'s `scheduler`
will be executed on a corresponding `std::thread`.
</details>
1 change: 1 addition & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

set(ALL_EXAMPLES
tls-scheduler
customize
issue-symmetric-transfer
affinity
Expand Down
16 changes: 8 additions & 8 deletions examples/affinity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

#include <beman/execution/execution.hpp>
#include <beman/task/task.hpp>
#include "demo-thread_pool.hpp"
#include "demo-thread_loop.hpp"
#include <iostream>
#include <cassert>

Expand All @@ -30,11 +30,11 @@ struct non_affine {

int main() {
std::cout << std::unitbuf;
demo::thread_pool pool;
demo::thread_loop loop;
ex::sync_wait(ex::just() | ex::then([]() noexcept { std::cout << "main:" << fmt_id << "\n"; }));
ex::sync_wait(ex::schedule(pool.get_scheduler()) |
ex::then([]() noexcept { std::cout << "pool:" << fmt_id << "\n"; }));
ex::sync_wait(ex::schedule(ex::task_scheduler(pool.get_scheduler())) |
ex::sync_wait(ex::schedule(loop.get_scheduler()) |
ex::then([]() noexcept { std::cout << "loop:" << fmt_id << "\n"; }));
ex::sync_wait(ex::schedule(ex::task_scheduler(loop.get_scheduler())) |
ex::then([]() noexcept { std::cout << "any: " << fmt_id << "\n"; }));
ex::sync_wait([]() -> ex::task<void> {
std::cout << "coro:" << fmt_id << "\n";
Expand All @@ -45,19 +45,19 @@ int main() {
std::cout << "cor1:" << fmt_id << "\n";
co_await (ex::schedule(pl.get_scheduler()) | ex::then([] { std::cout << "then:" << fmt_id << "\n"; }));
std::cout << "cor2:" << fmt_id << "\n";
}(pool));
}(loop));

std::cout << "not scheduler affine:\n";
ex::sync_wait([](auto& pl) -> ex::task<void, non_affine> {
std::cout << "cor1:" << fmt_id << "\n";
co_await (ex::schedule(pl.get_scheduler()) | ex::then([] { std::cout << "then:" << fmt_id << "\n"; }));
std::cout << "cor2:" << fmt_id << "\n";
}(pool));
}(loop));

std::cout << "use inline_scheduler:\n";
ex::sync_wait(ex::starts_on(ex::inline_scheduler{}, [](auto& pl) -> ex::task<void> {
std::cout << "cor1:" << fmt_id << "\n";
co_await (ex::schedule(pl.get_scheduler()) | ex::then([] { std::cout << "then:" << fmt_id << "\n"; }));
std::cout << "cor2:" << fmt_id << "\n";
}(pool)));
}(loop)));
}
119 changes: 5 additions & 114 deletions examples/c++now-affinity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <beman/task/task.hpp>
#include <beman/execution/execution.hpp>
#include <beman/execution/stop_token.hpp>
#include "demo-thread_loop.hpp"
#include <condition_variable>
#include <iostream>
#include <mutex>
Expand All @@ -13,117 +14,7 @@

namespace ex = beman::execution;

// ----------------------------------------------------------------------------

namespace {
class thread_context {
struct base {
base* next{};

virtual void do_run() = 0;
base() = default;
base(const base&) = delete;
base(base&&) = delete;
base& operator=(const base&) = delete;
base& operator=(base&&) = delete;
virtual ~base() = default;
};

ex::stop_source source;
std::mutex mutex;
std::condition_variable condition;
std::thread thread{[this] { this->run(this->source.get_token()); }};
base* queue{};

void run(auto token) {
while (true) {
base* next(std::invoke([&]() -> base* {
std::unique_lock cerberus(this->mutex);
this->condition.wait(cerberus,
[this, &token] { return token.stop_requested() || nullptr != this->queue; });
if (nullptr == this->queue) {
return nullptr;
}
base* n = std::exchange(queue, queue->next);
return n;
}));
if (next) {
next->do_run();
} else {
break;
}
}
}

public:
thread_context() = default;
thread_context(const thread_context&) = delete;
thread_context(thread_context&&) = delete;
~thread_context() {
this->source.request_stop();
this->condition.notify_one();
this->thread.join();
}
thread_context& operator=(const thread_context&) = delete;
thread_context& operator=(thread_context&&) = delete;

void enqueue(base* work) noexcept {
{
std::lock_guard cerberus(this->mutex);
work->next = this->queue;
queue = work;
}
this->condition.notify_one();
}

template <typename Receiver>
struct state final : base {
using operation_state_concept = ex::operation_state_t;
Receiver receiver;
thread_context* context;

template <typename R>
state(thread_context* ctxt, R&& r) : receiver(std::forward<R>(r)), context(ctxt) {}
void start() noexcept {
static_assert(ex::operation_state<state>);
this->context->enqueue(this);
}
void do_run() override { ex::set_value(std::move(this->receiver)); }
};

struct scheduler;
struct env {
thread_context* context;
template <typename Tag>
scheduler query(const ex::get_completion_scheduler_t<Tag>&) const noexcept {
return {this->context};
}
};
struct sender {
using sender_concept = ex::sender_t;
using completion_signatures = ex::completion_signatures<ex::set_value_t()>;
thread_context* context;
template <ex::receiver Receiver>
auto connect(Receiver&& r) {
return state<std::remove_cvref_t<Receiver>>(this->context, std::forward<Receiver>(r));
}
env get_env() const noexcept { return {this->context}; }
};
static_assert(ex::sender<sender>);
// static_assert(ex::sender_in<sender>);

struct scheduler {
using scheduler_concept = ex::scheduler_t;
thread_context* context;

sender schedule() noexcept { return {this->context}; }
bool operator==(const scheduler&) const = default;
};
static_assert(ex::scheduler<scheduler>);
scheduler get_scheduler() { return {this}; }
};

// ----------------------------------------------------------------------------

ex::task<> work1(auto sched) {
std::cout << "before id=" << std::this_thread::get_id() << "\n";
Expand Down Expand Up @@ -155,10 +46,10 @@ ex::task<void, inline_context> work3(auto sched) {

int main() {
try {
thread_context context;
ex::sync_wait(work1(context.get_scheduler()));
ex::sync_wait(work2(context.get_scheduler()));
ex::sync_wait(work3(context.get_scheduler()));
demo::thread_loop loop;
ex::sync_wait(work1(loop.get_scheduler()));
ex::sync_wait(work2(loop.get_scheduler()));
ex::sync_wait(work3(loop.get_scheduler()));
} catch (...) {
std::cout << "ERROR: unexpected exception\n";
}
Expand Down
1 change: 0 additions & 1 deletion examples/customize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

#include <beman/execution/execution.hpp>
#include <beman/task/task.hpp>
#include "demo-thread_pool.hpp"
#include <iostream>
#include <utility>
#include <cassert>
Expand Down
33 changes: 33 additions & 0 deletions examples/demo-thread_loop.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// examples/demo-thread_loop.hpp -*-C++-*-
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

#ifndef INCLUDED_EXAMPLES_DEMO_THREAD_LOOP
#define INCLUDED_EXAMPLES_DEMO_THREAD_LOOP

// ----------------------------------------------------------------------------

#include <beman/execution/execution.hpp>
#include <condition_variable>
#include <exception>
#include <functional>
#include <mutex>
#include <thread>

namespace demo {

class thread_loop : public ::beman::execution::run_loop {
private:
std::thread thread{std::bind(&thread_loop::run, this)};

public:
~thread_loop() {
this->finish();
this->thread.join();
}
};

} // namespace demo

// ----------------------------------------------------------------------------

#endif
Loading
Loading