From f37d00762dd93a574acd4c0c13e3670ef7f0e2a3 Mon Sep 17 00:00:00 2001 From: Jon Heinritz Date: Fri, 14 Mar 2025 16:05:37 +0100 Subject: [PATCH 1/5] Give type access to manually implemented notifier futures --- src/prod_cons/framed.rs | 36 +++++++++++++++++++- src/prod_cons/stream.rs | 45 ++++++++++++++++++++++++- src/traits/notifier/mod.rs | 1 + src/traits/notifier/typed.rs | 65 ++++++++++++++++++++++++++++++++++++ 4 files changed, 145 insertions(+), 2 deletions(-) create mode 100644 src/traits/notifier/typed.rs diff --git a/src/prod_cons/framed.rs b/src/prod_cons/framed.rs index 6347c36..399e35a 100644 --- a/src/prod_cons/framed.rs +++ b/src/prod_cons/framed.rs @@ -9,7 +9,10 @@ use crate::{ traits::{ bbqhdl::BbqHandle, coordination::Coord, - notifier::{AsyncNotifier, Notifier}, + notifier::{ + typed::{AsyncNotifierTyped, ConstrFnMut, ConstrFut, TypedWrapper}, + AsyncNotifier, Notifier, + }, storage::Storage, }, }; @@ -134,6 +137,22 @@ where } } +impl TypedWrapper> +where + S: Storage, + C: Coord, + N: AsyncNotifierTyped, + Q: BbqHandle, + H: LenHeader, +{ + pub fn wait_grant( + &self, + sz: H, + ) -> ::NotFull>> { + self.bbq.not.wait_for_not_full(move || self.grant(sz).ok()) + } +} + pub struct FramedConsumer where S: Storage, @@ -210,6 +229,21 @@ where } } +impl TypedWrapper> +where + S: Storage, + C: Coord, + N: AsyncNotifierTyped, + Q: BbqHandle, + H: LenHeader, +{ + pub fn wait_read( + &self, + ) -> ::NotEmpty>> { + self.bbq.not.wait_for_not_empty(move || self.read().ok()) + } +} + pub struct FramedGrantW where S: Storage, diff --git a/src/prod_cons/stream.rs b/src/prod_cons/stream.rs index e944e3a..a9316fc 100644 --- a/src/prod_cons/stream.rs +++ b/src/prod_cons/stream.rs @@ -9,7 +9,10 @@ use crate::{ traits::{ bbqhdl::BbqHandle, coordination::Coord, - notifier::{AsyncNotifier, Notifier}, + notifier::{ + typed::{AsyncNotifierTyped, ConstrFnMut, ConstrFut, TypedWrapper}, + AsyncNotifier, Notifier, + }, storage::Storage, }, }; @@ -118,6 +121,32 @@ where } } +impl TypedWrapper> +where + S: Storage, + C: Coord, + N: AsyncNotifierTyped, + Q: BbqHandle, +{ + pub fn wait_grant_max_remaining( + &self, + max: usize, + ) -> ::NotFull> + '_> { + self.bbq + .not + .wait_for_not_full(move || self.grant_max_remaining(max).ok()) + } + + pub fn wait_grant_exact( + &self, + sz: usize, + ) -> ::NotFull> + '_> { + self.bbq + .not + .wait_for_not_full(move || self.grant_exact(sz).ok()) + } +} + pub struct StreamConsumer where S: Storage, @@ -164,6 +193,20 @@ where } } +impl TypedWrapper> +where + S: Storage, + C: Coord, + N: AsyncNotifierTyped, + Q: BbqHandle, +{ + pub fn wait_read( + &self, + ) -> ::NotEmpty> + '_> { + self.bbq.not.wait_for_not_empty(move || self.read().ok()) + } +} + pub struct StreamGrantW where S: Storage, diff --git a/src/traits/notifier/mod.rs b/src/traits/notifier/mod.rs index cd63a21..9d76b71 100644 --- a/src/traits/notifier/mod.rs +++ b/src/traits/notifier/mod.rs @@ -2,6 +2,7 @@ pub mod maitake; pub mod blocking; +pub mod typed; pub trait Notifier { const INIT: Self; diff --git a/src/traits/notifier/typed.rs b/src/traits/notifier/typed.rs new file mode 100644 index 0000000..f5ef5f9 --- /dev/null +++ b/src/traits/notifier/typed.rs @@ -0,0 +1,65 @@ +use crate::traits::notifier::{AsyncNotifier, Notifier}; +use core::{future::Future, ops::Deref}; + +pub trait AsyncNotifierTyped: Notifier { + type FutNotEmpty: Future; + type FutNotFull: Future; + + fn wait_for_not_empty Option>(&self, f: F) -> Self::FutNotEmpty; + fn wait_for_not_full Option>(&self, f: F) -> Self::FutNotFull; +} + +impl AsyncNotifier for N +where + N: AsyncNotifierTyped, +{ + async fn wait_for_not_empty Option>(&self, f: F) -> T { + self.wait_for_not_empty(f).await + } + + async fn wait_for_not_full Option>(&self, f: F) -> T { + self.wait_for_not_full(f).await + } +} + +pub trait Typed: Sized { + fn typed(self) -> TypedWrapper { + TypedWrapper(self) + } +} + +impl Typed for T {} + +pub struct TypedWrapper(T); + +impl Deref for TypedWrapper { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +pub trait ConstrFnMut<'a>: FnMut() -> Option { + type Out; +} + +impl ConstrFnMut<'_> for F +where + F: FnMut() -> Option, +{ + type Out = Out; +} + +pub trait ConstrFut<'a>: AsyncNotifierTyped { + type NotFull>; + type NotEmpty>; +} + +impl<'a, N> ConstrFut<'a> for N +where + N: AsyncNotifierTyped, +{ + type NotFull> = ::FutNotFull; + type NotEmpty> = ::FutNotEmpty; +} From c304fe7b3030df63d4d9c582f4a56f84fbb29eaf Mon Sep 17 00:00:00 2001 From: Jon Heinritz Date: Sun, 16 Mar 2025 02:31:38 +0100 Subject: [PATCH 2/5] Only implement `Typed` for types that actually use it --- src/prod_cons/framed.rs | 21 ++++++++++++++++++++- src/prod_cons/stream.rs | 20 +++++++++++++++++++- src/traits/notifier/typed.rs | 2 -- 3 files changed, 39 insertions(+), 4 deletions(-) diff --git a/src/prod_cons/framed.rs b/src/prod_cons/framed.rs index 399e35a..29af839 100644 --- a/src/prod_cons/framed.rs +++ b/src/prod_cons/framed.rs @@ -10,7 +10,7 @@ use crate::{ bbqhdl::BbqHandle, coordination::Coord, notifier::{ - typed::{AsyncNotifierTyped, ConstrFnMut, ConstrFut, TypedWrapper}, + typed::{AsyncNotifierTyped, ConstrFnMut, ConstrFut, Typed, TypedWrapper}, AsyncNotifier, Notifier, }, storage::Storage, @@ -137,6 +137,15 @@ where } } +impl Typed for FramedProducer +where + S: Storage, + C: Coord, + N: Notifier, + Q: BbqHandle, +{ +} + impl TypedWrapper> where S: Storage, @@ -229,6 +238,16 @@ where } } +impl Typed for FramedConsumer +where + S: Storage, + C: Coord, + N: AsyncNotifier, + Q: BbqHandle, + H: LenHeader, +{ +} + impl TypedWrapper> where S: Storage, diff --git a/src/prod_cons/stream.rs b/src/prod_cons/stream.rs index a9316fc..5d04627 100644 --- a/src/prod_cons/stream.rs +++ b/src/prod_cons/stream.rs @@ -10,7 +10,7 @@ use crate::{ bbqhdl::BbqHandle, coordination::Coord, notifier::{ - typed::{AsyncNotifierTyped, ConstrFnMut, ConstrFut, TypedWrapper}, + typed::{AsyncNotifierTyped, ConstrFnMut, ConstrFut, Typed, TypedWrapper}, AsyncNotifier, Notifier, }, storage::Storage, @@ -121,6 +121,15 @@ where } } +impl Typed for StreamProducer +where + S: Storage, + C: Coord, + N: Notifier, + Q: BbqHandle, +{ +} + impl TypedWrapper> where S: Storage, @@ -193,6 +202,15 @@ where } } +impl Typed for StreamConsumer +where + S: Storage, + C: Coord, + N: Notifier, + Q: BbqHandle, +{ +} + impl TypedWrapper> where S: Storage, diff --git a/src/traits/notifier/typed.rs b/src/traits/notifier/typed.rs index f5ef5f9..d9e9bde 100644 --- a/src/traits/notifier/typed.rs +++ b/src/traits/notifier/typed.rs @@ -28,8 +28,6 @@ pub trait Typed: Sized { } } -impl Typed for T {} - pub struct TypedWrapper(T); impl Deref for TypedWrapper { From 284e2ed942714fd7c3750d62405688964eeac6c4 Mon Sep 17 00:00:00 2001 From: Jon Heinritz Date: Mon, 17 Mar 2025 03:14:06 +0100 Subject: [PATCH 3/5] dev --- src/prod_cons/framed.rs | 23 +++++----------------- src/traits/notifier/typed.rs | 38 +++++++++++++++++++++++++++++++++++- 2 files changed, 42 insertions(+), 19 deletions(-) diff --git a/src/prod_cons/framed.rs b/src/prod_cons/framed.rs index 29af839..7f661f9 100644 --- a/src/prod_cons/framed.rs +++ b/src/prod_cons/framed.rs @@ -10,7 +10,7 @@ use crate::{ bbqhdl::BbqHandle, coordination::Coord, notifier::{ - typed::{AsyncNotifierTyped, ConstrFnMut, ConstrFut, Typed, TypedWrapper}, + typed::{AsyncNotifierTyped, BbqSync, ConstrFnMut, ConstrFut, Typed, TypedWrapper}, AsyncNotifier, Notifier, }, storage::Storage, @@ -88,22 +88,16 @@ impl crate::queue::ArcBBQueue { pub struct FramedProducer where - S: Storage, - C: Coord, - N: Notifier, - Q: BbqHandle, + Self: BbqSync, H: LenHeader, { - bbq: Q::Target, + bbq: >::Target, pd: PhantomData<(S, C, N, H)>, } impl FramedProducer where - S: Storage, - C: Coord, - N: Notifier, - Q: BbqHandle, + Self: BbqSync, H: LenHeader, { pub fn grant(&self, sz: H) -> Result, ()> { @@ -137,14 +131,7 @@ where } } -impl Typed for FramedProducer -where - S: Storage, - C: Coord, - N: Notifier, - Q: BbqHandle, -{ -} +impl Typed for FramedProducer where Self: BbqSync {} impl TypedWrapper> where diff --git a/src/traits/notifier/typed.rs b/src/traits/notifier/typed.rs index d9e9bde..45eb4e1 100644 --- a/src/traits/notifier/typed.rs +++ b/src/traits/notifier/typed.rs @@ -1,4 +1,9 @@ -use crate::traits::notifier::{AsyncNotifier, Notifier}; +use crate::traits::{ + bbqhdl::BbqHandle, + coordination::Coord, + notifier::{AsyncNotifier, Notifier}, + storage::Storage, +}; use core::{future::Future, ops::Deref}; pub trait AsyncNotifierTyped: Notifier { @@ -49,6 +54,37 @@ where type Out = Out; } +#[allow(private_bounds)] +pub trait BbqSync +where + Self: Imply, + Self: Imply, + Self: Imply, + Self: Imply>, +{ +} + +impl BbqSync for T +where + Self: Imply, + Self: Imply, + Self: Imply, + Self: Imply>, +{ +} + +pub(crate) trait Imply: ImplyInner {} + +impl Imply for S {} + +pub(crate) trait ImplyInner { + type Is; +} + +impl ImplyInner for S { + type Is = T; +} + pub trait ConstrFut<'a>: AsyncNotifierTyped { type NotFull>; type NotEmpty>; From 79b0083feebc6ada2e5884a84301a76dc0c91aab Mon Sep 17 00:00:00 2001 From: Jon Heinritz Date: Mon, 17 Mar 2025 03:15:34 +0100 Subject: [PATCH 4/5] fix: add required `FnMut` bound to the associated types as well --- src/traits/notifier/typed.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/traits/notifier/typed.rs b/src/traits/notifier/typed.rs index 45eb4e1..8fc538e 100644 --- a/src/traits/notifier/typed.rs +++ b/src/traits/notifier/typed.rs @@ -7,8 +7,8 @@ use crate::traits::{ use core::{future::Future, ops::Deref}; pub trait AsyncNotifierTyped: Notifier { - type FutNotEmpty: Future; - type FutNotFull: Future; + type FutNotEmpty Option, T>: Future; + type FutNotFull Option, T>: Future; fn wait_for_not_empty Option>(&self, f: F) -> Self::FutNotEmpty; fn wait_for_not_full Option>(&self, f: F) -> Self::FutNotFull; From 512c7695a3272009095f0c76312a2ade7bfb36e8 Mon Sep 17 00:00:00 2001 From: Jon Heinritz Date: Tue, 18 Mar 2025 15:08:21 +0100 Subject: [PATCH 5/5] feat: refactor `BbqHandle` to use associated types instead of generics --- src/prod_cons/framed.rs | 200 ++++++-------------------------- src/prod_cons/stream.rs | 216 +++++++---------------------------- src/traits/bbqhdl.rs | 39 ++++++- src/traits/notifier/typed.rs | 38 +----- 4 files changed, 112 insertions(+), 381 deletions(-) diff --git a/src/prod_cons/framed.rs b/src/prod_cons/framed.rs index 7f661f9..2871651 100644 --- a/src/prod_cons/framed.rs +++ b/src/prod_cons/framed.rs @@ -10,7 +10,7 @@ use crate::{ bbqhdl::BbqHandle, coordination::Coord, notifier::{ - typed::{AsyncNotifierTyped, BbqSync, ConstrFnMut, ConstrFut, Typed, TypedWrapper}, + typed::{AsyncNotifierTyped, ConstrFnMut, ConstrFut, Typed, TypedWrapper}, AsyncNotifier, Notifier, }, storage::Storage, @@ -54,14 +54,14 @@ unsafe impl LenHeader for usize { } impl BBQueue { - pub fn framed_producer(&self) -> FramedProducer<&'_ Self, S, C, N> { + pub fn framed_producer(&self) -> FramedProducer<&'_ Self> { FramedProducer { bbq: self.bbq_ref(), pd: PhantomData, } } - pub fn framed_consumer(&self) -> FramedConsumer<&'_ Self, S, C, N> { + pub fn framed_consumer(&self) -> FramedConsumer<&'_ Self> { FramedConsumer { bbq: self.bbq_ref(), pd: PhantomData, @@ -71,14 +71,14 @@ impl BBQueue { #[cfg(feature = "std")] impl crate::queue::ArcBBQueue { - pub fn framed_producer(&self) -> FramedProducer>, S, C, N> { + pub fn framed_producer(&self) -> FramedProducer { FramedProducer { bbq: self.0.bbq_ref(), pd: PhantomData, } } - pub fn framed_consumer(&self) -> FramedConsumer>, S, C, N> { + pub fn framed_consumer(&self) -> FramedConsumer { FramedConsumer { bbq: self.0.bbq_ref(), pd: PhantomData, @@ -86,21 +86,13 @@ impl crate::queue::ArcBBQueue { } } -pub struct FramedProducer -where - Self: BbqSync, - H: LenHeader, -{ - bbq: >::Target, - pd: PhantomData<(S, C, N, H)>, +pub struct FramedProducer { + bbq: Q::Target, + pd: PhantomData, } -impl FramedProducer -where - Self: BbqSync, - H: LenHeader, -{ - pub fn grant(&self, sz: H) -> Result, ()> { +impl FramedProducer { + pub fn grant(&self, sz: H) -> Result, ()> { let (ptr, cap) = self.bbq.sto.ptr_len(); let needed = sz.into() + core::mem::size_of::(); @@ -118,58 +110,30 @@ where } } -impl FramedProducer -where - S: Storage, - C: Coord, - N: AsyncNotifier, - Q: BbqHandle, - H: LenHeader, -{ - pub async fn wait_grant(&self, sz: H) -> FramedGrantW { +impl, H: LenHeader> FramedProducer { + pub async fn wait_grant(&self, sz: H) -> FramedGrantW { self.bbq.not.wait_for_not_full(|| self.grant(sz).ok()).await } } -impl Typed for FramedProducer where Self: BbqSync {} +impl Typed for FramedProducer {} -impl TypedWrapper> -where - S: Storage, - C: Coord, - N: AsyncNotifierTyped, - Q: BbqHandle, - H: LenHeader, -{ +impl, H: LenHeader> TypedWrapper> { pub fn wait_grant( &self, sz: H, - ) -> ::NotFull>> { - self.bbq.not.wait_for_not_full(move || self.grant(sz).ok()) + ) -> ::NotFull>> { + AsyncNotifierTyped::wait_for_not_full(&self.bbq.not, move || self.grant(sz).ok()) } } -pub struct FramedConsumer -where - S: Storage, - C: Coord, - N: Notifier, - Q: BbqHandle, - H: LenHeader, -{ +pub struct FramedConsumer { bbq: Q::Target, - pd: PhantomData<(S, C, N, H)>, + pd: PhantomData, } -impl FramedConsumer -where - S: Storage, - C: Coord, - N: Notifier, - Q: BbqHandle, - H: LenHeader, -{ - pub fn read(&self) -> Result, ()> { +impl FramedConsumer { + pub fn read(&self) -> Result, ()> { let (ptr, _cap) = self.bbq.sto.ptr_len(); let (offset, grant_len) = self.bbq.cor.read()?; @@ -212,65 +176,29 @@ where } } -impl FramedConsumer -where - S: Storage, - C: Coord, - N: AsyncNotifier, - Q: BbqHandle, - H: LenHeader, -{ - pub async fn wait_read(&self) -> FramedGrantR { +impl, H: LenHeader> FramedConsumer { + pub async fn wait_read(&self) -> FramedGrantR { self.bbq.not.wait_for_not_empty(|| self.read().ok()).await } } -impl Typed for FramedConsumer -where - S: Storage, - C: Coord, - N: AsyncNotifier, - Q: BbqHandle, - H: LenHeader, -{ -} +impl Typed for FramedConsumer {} -impl TypedWrapper> -where - S: Storage, - C: Coord, - N: AsyncNotifierTyped, - Q: BbqHandle, - H: LenHeader, -{ +impl, H: LenHeader> TypedWrapper> { pub fn wait_read( &self, - ) -> ::NotEmpty>> { - self.bbq.not.wait_for_not_empty(move || self.read().ok()) + ) -> ::NotEmpty>> { + AsyncNotifierTyped::wait_for_not_empty(&self.bbq.not, move || self.read().ok()) } } -pub struct FramedGrantW -where - S: Storage, - C: Coord, - N: Notifier, - Q: BbqHandle, - H: LenHeader, -{ +pub struct FramedGrantW { bbq: Q::Target, base_ptr: NonNull, hdr: H, } -impl Deref for FramedGrantW -where - S: Storage, - C: Coord, - N: Notifier, - Q: BbqHandle, - H: LenHeader, -{ +impl Deref for FramedGrantW { type Target = [u8]; fn deref(&self) -> &Self::Target { @@ -283,14 +211,7 @@ where } } -impl DerefMut for FramedGrantW -where - S: Storage, - C: Coord, - N: Notifier, - Q: BbqHandle, - H: LenHeader, -{ +impl DerefMut for FramedGrantW { fn deref_mut(&mut self) -> &mut Self::Target { let len = self.hdr.into(); let body_ptr = unsafe { @@ -301,14 +222,7 @@ where } } -impl Drop for FramedGrantW -where - S: Storage, - C: Coord, - N: Notifier, - Q: BbqHandle, - H: LenHeader, -{ +impl Drop for FramedGrantW { fn drop(&mut self) { // Default drop performs an "abort" let (_ptr, cap) = self.bbq.sto.ptr_len(); @@ -318,14 +232,7 @@ where } } -impl FramedGrantW -where - S: Storage, - C: Coord, - N: Notifier, - Q: BbqHandle, - H: LenHeader, -{ +impl FramedGrantW { pub fn commit(self, used: H) { let (_ptr, cap) = self.bbq.sto.ptr_len(); let hdrlen: usize = const { core::mem::size_of::() }; @@ -351,27 +258,13 @@ where } } -pub struct FramedGrantR -where - S: Storage, - C: Coord, - N: Notifier, - Q: BbqHandle, - H: LenHeader, -{ +pub struct FramedGrantR { bbq: Q::Target, body_ptr: NonNull, hdr: H, } -impl Deref for FramedGrantR -where - S: Storage, - C: Coord, - N: Notifier, - Q: BbqHandle, - H: LenHeader, -{ +impl Deref for FramedGrantR { type Target = [u8]; fn deref(&self) -> &Self::Target { @@ -380,42 +273,21 @@ where } } -impl DerefMut for FramedGrantR -where - S: Storage, - C: Coord, - N: Notifier, - Q: BbqHandle, - H: LenHeader, -{ +impl DerefMut for FramedGrantR { fn deref_mut(&mut self) -> &mut Self::Target { let len: usize = self.hdr.into(); unsafe { core::slice::from_raw_parts_mut(self.body_ptr.as_ptr(), len) } } } -impl Drop for FramedGrantR -where - S: Storage, - C: Coord, - N: Notifier, - Q: BbqHandle, - H: LenHeader, -{ +impl Drop for FramedGrantR { fn drop(&mut self) { // Default behavior is "keep" - release zero bytes self.bbq.cor.release_inner(0); } } -impl FramedGrantR -where - S: Storage, - C: Coord, - N: Notifier, - Q: BbqHandle, - H: LenHeader, -{ +impl FramedGrantR { pub fn release(self) { let len: usize = self.hdr.into(); let hdrlen: usize = const { core::mem::size_of::() }; diff --git a/src/prod_cons/stream.rs b/src/prod_cons/stream.rs index 5d04627..5cf60de 100644 --- a/src/prod_cons/stream.rs +++ b/src/prod_cons/stream.rs @@ -1,11 +1,10 @@ use core::{ - marker::PhantomData, ops::{Deref, DerefMut}, ptr::NonNull, }; use crate::{ - queue::BBQueue, + queue::{ArcBBQueue, BBQueue}, traits::{ bbqhdl::BbqHandle, coordination::Coord, @@ -18,57 +17,39 @@ use crate::{ }; impl BBQueue { - pub fn stream_producer(&self) -> StreamProducer<&'_ Self, S, C, N> { + pub fn stream_producer(&self) -> StreamProducer<&Self> { StreamProducer { bbq: self.bbq_ref(), - pd: PhantomData, } } - pub fn stream_consumer(&self) -> StreamConsumer<&'_ Self, S, C, N> { + pub fn stream_consumer(&self) -> StreamConsumer<&Self> { StreamConsumer { bbq: self.bbq_ref(), - pd: PhantomData, } } } -#[cfg(feature = "std")] -impl crate::queue::ArcBBQueue { - pub fn stream_producer(&self) -> StreamProducer>, S, C, N> { +impl ArcBBQueue { + pub fn stream_producer(&self) -> StreamProducer { StreamProducer { - bbq: self.0.bbq_ref(), - pd: PhantomData, + bbq: self.bbq_ref(), } } - pub fn stream_consumer(&self) -> StreamConsumer>, S, C, N> { + pub fn stream_consumer(&self) -> StreamConsumer { StreamConsumer { - bbq: self.0.bbq_ref(), - pd: PhantomData, + bbq: self.bbq_ref(), } } } -pub struct StreamProducer -where - S: Storage, - C: Coord, - N: Notifier, - Q: BbqHandle, -{ +pub struct StreamProducer { bbq: Q::Target, - pd: PhantomData<(S, C, N)>, } -impl StreamProducer -where - S: Storage, - C: Coord, - N: Notifier, - Q: BbqHandle, -{ - pub fn grant_max_remaining(&self, max: usize) -> Result, ()> { +impl StreamProducer { + pub fn grant_max_remaining(&self, max: usize) -> Result, ()> { let (ptr, cap) = self.bbq.sto.ptr_len(); let (offset, len) = self.bbq.cor.grant_max_remaining(cap, max)?; let ptr = unsafe { @@ -83,7 +64,7 @@ where }) } - pub fn grant_exact(&self, sz: usize) -> Result, ()> { + pub fn grant_exact(&self, sz: usize) -> Result, ()> { let (ptr, cap) = self.bbq.sto.ptr_len(); let offset = self.bbq.cor.grant_exact(cap, sz)?; let ptr = unsafe { @@ -99,21 +80,15 @@ where } } -impl StreamProducer -where - S: Storage, - C: Coord, - N: AsyncNotifier, - Q: BbqHandle, -{ - pub async fn wait_grant_max_remaining(&self, max: usize) -> StreamGrantW { +impl> StreamProducer { + pub async fn wait_grant_max_remaining(&self, max: usize) -> StreamGrantW { self.bbq .not .wait_for_not_full(|| self.grant_max_remaining(max).ok()) .await } - pub async fn wait_grant_exact(&self, sz: usize) -> StreamGrantW { + pub async fn wait_grant_exact(&self, sz: usize) -> StreamGrantW { self.bbq .not .wait_for_not_full(|| self.grant_exact(sz).ok()) @@ -121,60 +96,32 @@ where } } -impl Typed for StreamProducer -where - S: Storage, - C: Coord, - N: Notifier, - Q: BbqHandle, -{ -} +impl Typed for StreamProducer {} -impl TypedWrapper> -where - S: Storage, - C: Coord, - N: AsyncNotifierTyped, - Q: BbqHandle, -{ +impl> TypedWrapper> { pub fn wait_grant_max_remaining( &self, max: usize, - ) -> ::NotFull> + '_> { - self.bbq - .not - .wait_for_not_full(move || self.grant_max_remaining(max).ok()) + ) -> ::NotFull> + '_> { + AsyncNotifierTyped::wait_for_not_full(&self.bbq.not, move || { + self.grant_max_remaining(max).ok() + }) } pub fn wait_grant_exact( &self, sz: usize, - ) -> ::NotFull> + '_> { - self.bbq - .not - .wait_for_not_full(move || self.grant_exact(sz).ok()) + ) -> ::NotFull> + '_> { + AsyncNotifierTyped::wait_for_not_full(&self.bbq.not, move || self.grant_exact(sz).ok()) } } -pub struct StreamConsumer -where - S: Storage, - C: Coord, - N: Notifier, - Q: BbqHandle, -{ +pub struct StreamConsumer { bbq: Q::Target, - pd: PhantomData<(S, C, N)>, } -impl StreamConsumer -where - S: Storage, - C: Coord, - N: Notifier, - Q: BbqHandle, -{ - pub fn read(&self) -> Result, ()> { +impl StreamConsumer { + pub fn read(&self) -> Result, ()> { let (ptr, _cap) = self.bbq.sto.ptr_len(); let (offset, len) = self.bbq.cor.read()?; let ptr = unsafe { @@ -190,61 +137,30 @@ where } } -impl StreamConsumer -where - S: Storage, - C: Coord, - N: AsyncNotifier, - Q: BbqHandle, -{ - pub async fn wait_read(&self) -> StreamGrantR { +impl> StreamConsumer { + pub async fn wait_read(&self) -> StreamGrantR { self.bbq.not.wait_for_not_empty(|| self.read().ok()).await } } -impl Typed for StreamConsumer -where - S: Storage, - C: Coord, - N: Notifier, - Q: BbqHandle, -{ -} +impl Typed for StreamConsumer {} -impl TypedWrapper> -where - S: Storage, - C: Coord, - N: AsyncNotifierTyped, - Q: BbqHandle, -{ +impl> TypedWrapper> { pub fn wait_read( &self, - ) -> ::NotEmpty> + '_> { - self.bbq.not.wait_for_not_empty(move || self.read().ok()) + ) -> ::NotEmpty>> { + AsyncNotifierTyped::wait_for_not_empty(&self.bbq.not, move || self.read().ok()) } } -pub struct StreamGrantW -where - S: Storage, - C: Coord, - N: Notifier, - Q: BbqHandle, -{ +pub struct StreamGrantW { bbq: Q::Target, ptr: NonNull, len: usize, to_commit: usize, } -impl Deref for StreamGrantW -where - S: Storage, - C: Coord, - N: Notifier, - Q: BbqHandle, -{ +impl Deref for StreamGrantW { type Target = [u8]; fn deref(&self) -> &Self::Target { @@ -252,25 +168,13 @@ where } } -impl DerefMut for StreamGrantW -where - S: Storage, - C: Coord, - N: Notifier, - Q: BbqHandle, -{ +impl DerefMut for StreamGrantW { fn deref_mut(&mut self) -> &mut Self::Target { unsafe { core::slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len) } } } -impl Drop for StreamGrantW -where - S: Storage, - C: Coord, - N: Notifier, - Q: BbqHandle, -{ +impl Drop for StreamGrantW { fn drop(&mut self) { let StreamGrantW { bbq, @@ -288,13 +192,7 @@ where } } -impl StreamGrantW -where - S: Storage, - C: Coord, - N: Notifier, - Q: BbqHandle, -{ +impl StreamGrantW { pub fn commit(self, used: usize) { let (_, cap) = self.bbq.sto.ptr_len(); let used = used.min(self.len); @@ -306,26 +204,14 @@ where } } -pub struct StreamGrantR -where - S: Storage, - C: Coord, - N: Notifier, - Q: BbqHandle, -{ +pub struct StreamGrantR { bbq: Q::Target, ptr: NonNull, len: usize, to_release: usize, } -impl Deref for StreamGrantR -where - S: Storage, - C: Coord, - N: Notifier, - Q: BbqHandle, -{ +impl Deref for StreamGrantR { type Target = [u8]; fn deref(&self) -> &Self::Target { @@ -333,25 +219,13 @@ where } } -impl DerefMut for StreamGrantR -where - S: Storage, - C: Coord, - N: Notifier, - Q: BbqHandle, -{ +impl DerefMut for StreamGrantR { fn deref_mut(&mut self) -> &mut Self::Target { unsafe { core::slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len) } } } -impl Drop for StreamGrantR -where - S: Storage, - C: Coord, - N: Notifier, - Q: BbqHandle, -{ +impl Drop for StreamGrantR { fn drop(&mut self) { let StreamGrantR { bbq, @@ -368,13 +242,7 @@ where } } -impl StreamGrantR -where - S: Storage, - C: Coord, - N: Notifier, - Q: BbqHandle, -{ +impl StreamGrantR { pub fn release(self, used: usize) { let used = used.min(self.len); self.bbq.cor.release_inner(used); diff --git a/src/traits/bbqhdl.rs b/src/traits/bbqhdl.rs index 2ce17e4..6c35d35 100644 --- a/src/traits/bbqhdl.rs +++ b/src/traits/bbqhdl.rs @@ -1,29 +1,56 @@ use core::ops::Deref; +use std::sync::Arc; -use crate::queue::BBQueue; +use crate::queue::{ArcBBQueue, BBQueue}; use super::{coordination::Coord, notifier::Notifier, storage::Storage}; -pub trait BbqHandle { - type Target: Deref> + Clone; +pub trait BbqHandle { + type Target: Deref> + Clone; fn bbq_ref(&self) -> Self::Target; + + type Storage: Storage; + type Coord: Coord; + type Notifier: Notifier; } -impl<'a, S: Storage, C: Coord, N: Notifier> BbqHandle for &'a BBQueue { +impl BbqHandle for &BBQueue { type Target = Self; #[inline(always)] fn bbq_ref(&self) -> Self::Target { *self } + + type Storage = S; + type Coord = C; + type Notifier = N; +} + +#[cfg(feature = "std")] +impl BbqHandle for ArcBBQueue { + type Target = Arc>; + + #[inline(always)] + fn bbq_ref(&self) -> Self::Target { + self.0.clone() + } + + type Storage = S; + type Coord = C; + type Notifier = N; } #[cfg(feature = "std")] -impl BbqHandle for std::sync::Arc> { - type Target = std::sync::Arc>; +impl BbqHandle for Arc> { + type Target = Self; #[inline(always)] fn bbq_ref(&self) -> Self::Target { self.clone() } + + type Storage = S; + type Coord = C; + type Notifier = N; } diff --git a/src/traits/notifier/typed.rs b/src/traits/notifier/typed.rs index 8fc538e..678ad8f 100644 --- a/src/traits/notifier/typed.rs +++ b/src/traits/notifier/typed.rs @@ -1,9 +1,4 @@ -use crate::traits::{ - bbqhdl::BbqHandle, - coordination::Coord, - notifier::{AsyncNotifier, Notifier}, - storage::Storage, -}; +use crate::traits::notifier::{AsyncNotifier, Notifier}; use core::{future::Future, ops::Deref}; pub trait AsyncNotifierTyped: Notifier { @@ -54,37 +49,6 @@ where type Out = Out; } -#[allow(private_bounds)] -pub trait BbqSync -where - Self: Imply, - Self: Imply, - Self: Imply, - Self: Imply>, -{ -} - -impl BbqSync for T -where - Self: Imply, - Self: Imply, - Self: Imply, - Self: Imply>, -{ -} - -pub(crate) trait Imply: ImplyInner {} - -impl Imply for S {} - -pub(crate) trait ImplyInner { - type Is; -} - -impl ImplyInner for S { - type Is = T; -} - pub trait ConstrFut<'a>: AsyncNotifierTyped { type NotFull>; type NotEmpty>;