Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
65e70e3
added async_scope_token
dietmarkuehl May 14, 2025
170901f
clang-format
dietmarkuehl May 14, 2025
7227374
updated the simple_counting_scope implementation
dietmarkuehl May 26, 2025
30bfa8f
added missing token tests
dietmarkuehl May 26, 2025
38efec7
working on bring the scope up to date
dietmarkuehl May 16, 2025
87bd8ec
added state_base
dietmarkuehl Jun 2, 2025
131457b
added spawn_future receiver
dietmarkuehl Jun 2, 2025
d5a84cf
added exec.prop
dietmarkuehl Jun 3, 2025
54e9cc9
added stop_when
dietmarkuehl Jun 3, 2025
58bd68c
fix test issues with Linux and Windows
dietmarkuehl Jun 4, 2025
3d2595e
some progress on implementing spawn_future
dietmarkuehl Jun 4, 2025
909b6a3
fix an issue with lambda upsetting MSVC++
dietmarkuehl Jun 5, 2025
9b8d0dc
added got a first cut at spawn_future
dietmarkuehl Jun 7, 2025
a7a2bae
added typenames needed by gcc
dietmarkuehl Jun 8, 2025
15e99d6
fix the spawn_future completion behavior
dietmarkuehl Jun 8, 2025
c2ef806
fixed the completion signatures for spawn_future
dietmarkuehl Jun 8, 2025
6057be8
clang format
dietmarkuehl Jun 8, 2025
97cf943
implemented and initial version of spawn
dietmarkuehl Jun 10, 2025
65779de
renamed async_scope_token -> scope_token
dietmarkuehl Jun 10, 2025
1a55414
added missing associate implementation (tests are still missing)
dietmarkuehl Jun 22, 2025
8c79f51
a first version of the async_scope components
dietmarkuehl Jun 26, 2025
da08ace
temporarily remove associate test
dietmarkuehl Jun 26, 2025
630c20a
various fixes
dietmarkuehl Jun 30, 2025
7117f8a
Merge branch 'main' into async-scope
dietmarkuehl Jun 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions include/beman/execution/detail/as_tuple.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,16 @@
// ----------------------------------------------------------------------------

namespace beman::execution::detail {
/*!
* \brief Turn a completion signatures into a std::tuple type.
* \internal
*/
template <typename T>
struct as_tuple;
/*!
* \brief The actual operational partial specialization of as_tuple.
* \internal
*/
template <typename Rc, typename... A>
struct as_tuple<Rc(A...)> {
using type = ::beman::execution::detail::decayed_tuple<Rc, A...>;
Expand Down
10 changes: 6 additions & 4 deletions include/beman/execution/detail/associate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,15 @@ struct impls_for<associate_t> : ::beman::execution::detail::default_impls {
struct op_state {
using sop_t = op_t;
using sscope_token = scope_token;
struct assoc_t {
sscope_token tok;
sop_t op;
};

bool associated{false};
union {
Receiver* rcvr;
struct {
sscope_token tok;
sop_t op;
} assoc;
assoc_t assoc;
};
explicit op_state(Receiver& r) noexcept : rcvr(::std::addressof(r)) {}
explicit op_state(sscope_token tk, wrap_sender&& sndr, Receiver& r) try
Expand Down
17 changes: 9 additions & 8 deletions include/beman/execution/detail/counting_scope_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// ----------------------------------------------------------------------------

namespace beman::execution::detail {
struct counting_scope_base;
class counting_scope_base;
}

// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -71,7 +71,7 @@ class beman::execution::detail::counting_scope_base : ::beman::execution::detail

// ----------------------------------------------------------------------------

beman::execution::detail::counting_scope_base::~counting_scope_base() {
inline beman::execution::detail::counting_scope_base::~counting_scope_base() {
::std::lock_guard kerberos(this->mutex);
switch (this->state) {
default:
Expand All @@ -83,7 +83,7 @@ beman::execution::detail::counting_scope_base::~counting_scope_base() {
}
}

auto beman::execution::detail::counting_scope_base::close() noexcept -> void {
inline auto beman::execution::detail::counting_scope_base::close() noexcept -> void {
switch (this->state) {
default:
break;
Expand All @@ -99,12 +99,12 @@ auto beman::execution::detail::counting_scope_base::close() noexcept -> void {
}
}

auto beman::execution::detail::counting_scope_base::add_node(node* n, ::std::lock_guard<::std::mutex>&) noexcept
inline auto beman::execution::detail::counting_scope_base::add_node(node* n, ::std::lock_guard<::std::mutex>&) noexcept
-> void {
n->next = std::exchange(this->head, n);
}

auto beman::execution::detail::counting_scope_base::try_associate() noexcept -> bool {
inline auto beman::execution::detail::counting_scope_base::try_associate() noexcept -> bool {
::std::lock_guard lock(this->mutex);
switch (this->state) {
default:
Expand All @@ -118,7 +118,8 @@ auto beman::execution::detail::counting_scope_base::try_associate() noexcept ->
return true;
}
}
auto beman::execution::detail::counting_scope_base::disassociate() noexcept -> void {

inline auto beman::execution::detail::counting_scope_base::disassociate() noexcept -> void {
{
::std::lock_guard lock(this->mutex);
if (0u < --this->count)
Expand All @@ -128,7 +129,7 @@ auto beman::execution::detail::counting_scope_base::disassociate() noexcept -> v
this->complete();
}

auto beman::execution::detail::counting_scope_base::complete() noexcept -> void {
inline auto beman::execution::detail::counting_scope_base::complete() noexcept -> void {
node* current{[this] {
::std::lock_guard lock(this->mutex);
return ::std::exchange(this->head, nullptr);
Expand All @@ -138,7 +139,7 @@ auto beman::execution::detail::counting_scope_base::complete() noexcept -> void
}
}

auto beman::execution::detail::counting_scope_base::start_node(node* n) -> void {
inline auto beman::execution::detail::counting_scope_base::start_node(node* n) -> void {
::std::lock_guard kerberos(this->mutex);
switch (this->state) {
case ::beman::execution::detail::counting_scope_base::state_t::unused:
Expand Down
28 changes: 6 additions & 22 deletions include/beman/execution/detail/schedule_from.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,37 @@
#ifndef INCLUDED_BEMAN_EXECUTION_DETAIL_SCHEDULE_FROM
#define INCLUDED_BEMAN_EXECUTION_DETAIL_SCHEDULE_FROM

#include <beman/execution/detail/as_tuple.hpp>
#include <beman/execution/detail/child_type.hpp>
#include <beman/execution/detail/meta_combine.hpp>
#include <beman/execution/detail/completion_signatures_of_t.hpp>
#include <beman/execution/detail/connect.hpp>
#include <beman/execution/detail/decayed_tuple.hpp>
#include <beman/execution/detail/default_domain.hpp>
#include <beman/execution/detail/default_impls.hpp>
#include <beman/execution/detail/error_types_of_t.hpp>
#include <beman/execution/detail/env_of_t.hpp>
#include <beman/execution/detail/error_types_of_t.hpp>
#include <beman/execution/detail/fwd_env.hpp>
#include <beman/execution/detail/get_domain.hpp>
#include <beman/execution/detail/get_env.hpp>
#include <beman/execution/detail/impls_for.hpp>
#include <beman/execution/detail/join_env.hpp>
#include <beman/execution/detail/make_sender.hpp>
#include <beman/execution/detail/meta_combine.hpp>
#include <beman/execution/detail/meta_prepend.hpp>
#include <beman/execution/detail/meta_to.hpp>
#include <beman/execution/detail/meta_transform.hpp>
#include <beman/execution/detail/meta_unique.hpp>
#include <beman/execution/detail/query_with_default.hpp>
#include <beman/execution/detail/sched_attrs.hpp>
#include <beman/execution/detail/schedule.hpp>
#include <beman/execution/detail/schedule_result_t.hpp>
#include <beman/execution/detail/schedule.hpp>
#include <beman/execution/detail/scheduler.hpp>
#include <beman/execution/detail/sender.hpp>
#include <beman/execution/detail/sender_in.hpp>
#include <beman/execution/detail/sender.hpp>
#include <beman/execution/detail/set_error.hpp>
#include <beman/execution/detail/set_stopped.hpp>
#include <beman/execution/detail/start.hpp>
#include <beman/execution/detail/transform_sender.hpp>
#include <beman/execution/detail/meta_unique.hpp>

#include <exception>
#include <type_traits>
Expand All @@ -43,23 +44,6 @@
// ----------------------------------------------------------------------------

namespace beman::execution::detail {
/*!
* \brief Turn a completion signatures into a std::tuple type.
* \internal
*/
template <typename>
struct as_tuple;
/*!
* \brief The actual operational partial specialization of as_tuple.
* \internal
*/
template <typename Tag, typename... T>
struct as_tuple<Tag(T...)> {
using type = ::beman::execution::detail::decayed_tuple<Tag, T...>;
};
template <typename T>
using as_tuple_t = typename as_tuple<T>::type;

struct schedule_from_t {
template <::beman::execution::scheduler Scheduler, ::beman::execution::sender Sender>
auto operator()(Scheduler&& scheduler, Sender&& sender) const {
Expand Down
5 changes: 3 additions & 2 deletions include/beman/execution/detail/sender_decompose.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ auto get_sender_data(Sender&& sender) {
//-dk:TODO should use a dynamic/language approach:
auto&& [tag, data, ... children] = sender;
return sender_meta<decltype(tag), decltype(data), ::std::tuple<decltype(children)...>>;
#endif
#else
using sender_type = ::std::remove_cvref_t<Sender>;
static constexpr ::beman::execution::detail::sender_convert_to_any_t at{};

Expand All @@ -68,10 +68,11 @@ auto get_sender_data(Sender&& sender) {
return ::beman::execution::detail::sender_data{tag, data, ::std::tie(c0)};
} else if constexpr (requires { sender_type{at, at}; }) {
auto&& [tag, data] = sender;
return ::beman::execution::detail::sender_data{tag, data, ::std::tuple<>()};
return ::beman::execution::detail::sender_data{tag, data, ::std::tuple<>{}};
} else {
return ::beman::execution::detail::sender_meta<void, void, void>{};
}
#endif
}

template <typename Sender>
Expand Down
176 changes: 88 additions & 88 deletions include/beman/execution/detail/stop_when.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,105 +29,105 @@ namespace beman::execution::detail {
inline constexpr struct stop_when_t {

template <::beman::execution::sender Sndr, ::beman::execution::stoppable_token Tok>
struct sender {
using sender_concept = ::beman::execution::sender_t;

std::remove_cvref_t<Sndr> sndr;
std::remove_cvref_t<Tok> tok;

template <::beman::execution::receiver Rcvr>
struct state {
using operation_state_concept = ::beman::execution::operation_state_t;
using rcvr_t = ::std::remove_cvref_t<Rcvr>;
using token1_t = ::std::remove_cvref_t<Tok>;
using token2_t =
decltype(::beman::execution::get_stop_token(::beman::execution::get_env(::std::declval<rcvr_t>())));

struct cb_t {
::beman::execution::inplace_stop_source& source;
auto operator()() const noexcept { this->source.request_stop(); }
};
struct base_state {
rcvr_t rcvr;
::beman::execution::inplace_stop_source source{};
};
struct env {
base_state* st;
auto query(const ::beman::execution::get_stop_token_t&) const noexcept {
return this->st->source.get_token();
}
template <typename Q, typename... A>
requires requires(const Q& q, A&&... a, const rcvr_t& r) {
q(::beman::execution::get_env(r), ::std::forward<A>(a)...);
}
auto query(const Q& q, A&&... a) const noexcept {
return q(::beman::execution::get_env(this->st->rcvr), ::std::forward<A>(a)...);
}
};

struct receiver {
using receiver_concept = ::beman::execution::receiver_t;
base_state* st;

auto get_env() const noexcept -> env { return env{this->st}; }
template <typename... A>
auto set_value(A&&... a) const noexcept -> void {
::beman::execution::set_value(::std::move(this->st->rcvr), ::std::forward<A>(a)...);
}
template <typename E>
auto set_error(E&& e) const noexcept -> void {
::beman::execution::set_error(::std::move(this->st->rcvr), ::std::forward<E>(e));
}
auto set_stopped() const noexcept -> void {
::beman::execution::set_stopped(::std::move(this->st->rcvr));
}
};
using inner_state_t =
decltype(::beman::execution::connect(::std::declval<Sndr>(), ::std::declval<receiver>()));

token1_t tok;
base_state base;
std::optional<::beman::execution::stop_callback_for_t<token1_t, cb_t>> cb1;
std::optional<::beman::execution::stop_callback_for_t<token2_t, cb_t>> cb2;
inner_state_t inner_state;

template <::beman::execution::sender S,
::beman::execution::stoppable_token T,
::beman::execution::receiver R>
state(S&& s, T&& t, R&& r)
: tok(::std::forward<T>(t)),
base{::std::forward<R>(r)},
inner_state(::beman::execution::connect(::std::forward<S>(s), receiver(&this->base))) {}

auto start() & noexcept {
this->cb1.emplace(this->tok, cb_t(this->base.source));
this->cb2.emplace(::beman::execution::get_stop_token(::beman::execution::get_env(this->base.rcvr)),
cb_t(this->base.source));
::beman::execution::start(this->inner_state);
}
};

template <typename E>
auto get_completion_signatures(const E& e) const noexcept {
return ::beman::execution::get_completion_signatures(this->sndr, e);
}
template <::beman::execution::receiver Rcvr>
auto connect(Rcvr&& rcvr) && -> state<Rcvr> {
return state<Rcvr>{std::move(this->sndr), ::std::move(this->tok), ::std::forward<Rcvr>(rcvr)};
}
};
struct sender;

template <::beman::execution::sender Sndr, ::beman::execution::stoppable_token Tok>
auto operator()(Sndr&& sndr, Tok&& tok) const noexcept {
if constexpr (::beman::execution::unstoppable_token<Tok>) {
return ::std::forward<Sndr>(sndr);
} else {
return sender<Sndr, Tok>(::std::forward<Sndr>(sndr), ::std::forward<Tok>(tok));
return sender<Sndr, Tok>(*this, ::std::forward<Tok>(tok), ::std::forward<Sndr>(sndr));
}
}
} stop_when{};
} // namespace beman::execution::detail

template <::beman::execution::sender Sndr, ::beman::execution::stoppable_token Tok>
struct beman::execution::detail::stop_when_t::sender {
using sender_concept = ::beman::execution::sender_t;

stop_when_t stop_when{};
std::remove_cvref_t<Tok> tok;
std::remove_cvref_t<Sndr> sndr;

template <::beman::execution::receiver Rcvr>
struct state {
using operation_state_concept = ::beman::execution::operation_state_t;
using rcvr_t = ::std::remove_cvref_t<Rcvr>;
using token1_t = ::std::remove_cvref_t<Tok>;
using token2_t =
decltype(::beman::execution::get_stop_token(::beman::execution::get_env(::std::declval<rcvr_t>())));

struct cb_t {
::beman::execution::inplace_stop_source& source;
auto operator()() const noexcept { this->source.request_stop(); }
};
struct base_state {
rcvr_t rcvr;
::beman::execution::inplace_stop_source source{};
};
struct env {
base_state* st;
auto query(const ::beman::execution::get_stop_token_t&) const noexcept {
return this->st->source.get_token();
}
template <typename Q, typename... A>
requires requires(const Q& q, A&&... a, const rcvr_t& r) {
q(::beman::execution::get_env(r), ::std::forward<A>(a)...);
}
auto query(const Q& q, A&&... a) const noexcept {
return q(::beman::execution::get_env(this->st->rcvr), ::std::forward<A>(a)...);
}
};

struct receiver {
using receiver_concept = ::beman::execution::receiver_t;
base_state* st;

auto get_env() const noexcept -> env { return env{this->st}; }
template <typename... A>
auto set_value(A&&... a) const noexcept -> void {
::beman::execution::set_value(::std::move(this->st->rcvr), ::std::forward<A>(a)...);
}
template <typename E>
auto set_error(E&& e) const noexcept -> void {
::beman::execution::set_error(::std::move(this->st->rcvr), ::std::forward<E>(e));
}
auto set_stopped() const noexcept -> void { ::beman::execution::set_stopped(::std::move(this->st->rcvr)); }
};
using inner_state_t =
decltype(::beman::execution::connect(::std::declval<Sndr>(), ::std::declval<receiver>()));

token1_t tok;
base_state base;
std::optional<::beman::execution::stop_callback_for_t<token1_t, cb_t>> cb1;
std::optional<::beman::execution::stop_callback_for_t<token2_t, cb_t>> cb2;
inner_state_t inner_state;

template <::beman::execution::sender S, ::beman::execution::stoppable_token T, ::beman::execution::receiver R>
state(S&& s, T&& t, R&& r)
: tok(::std::forward<T>(t)),
base{::std::forward<R>(r)},
inner_state(::beman::execution::connect(::std::forward<S>(s), receiver(&this->base))) {}

auto start() & noexcept {
this->cb1.emplace(this->tok, cb_t(this->base.source));
this->cb2.emplace(::beman::execution::get_stop_token(::beman::execution::get_env(this->base.rcvr)),
cb_t(this->base.source));
::beman::execution::start(this->inner_state);
}
};

template <typename E>
auto get_completion_signatures(const E& e) const noexcept {
return ::beman::execution::get_completion_signatures(this->sndr, e);
}
template <::beman::execution::receiver Rcvr>
auto connect(Rcvr&& rcvr) && -> state<Rcvr> {
return state<Rcvr>{std::move(this->sndr), ::std::move(this->tok), ::std::forward<Rcvr>(rcvr)};
}
};

// ----------------------------------------------------------------------------

#endif
Loading
Loading