Skip to content
This repository was archived by the owner on Jan 4, 2026. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 51 additions & 139 deletions src/prod_cons/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use crate::{
traits::{
bbqhdl::BbqHandle,
coordination::Coord,
notifier::{AsyncNotifier, Notifier},
notifier::{
typed::{AsyncNotifierTyped, ConstrFnMut, ConstrFut, Typed, TypedWrapper},
AsyncNotifier, Notifier,
},
storage::Storage,
},
};
Expand Down Expand Up @@ -51,14 +54,14 @@ unsafe impl LenHeader for usize {
}

impl<S: Storage, C: Coord, N: Notifier> BBQueue<S, C, N> {
pub fn framed_producer(&self) -> FramedProducer<&'_ Self, S, C, N> {
pub fn framed_producer(&self) -> FramedProducer<&'_ Self> {
FramedProducer {
bbq: self.bbq_ref(),
pd: PhantomData,
}
}

pub fn framed_consumer(&self) -> FramedConsumer<&'_ Self, S, C, N> {
pub fn framed_consumer(&self) -> FramedConsumer<&'_ Self> {
FramedConsumer {
bbq: self.bbq_ref(),
pd: PhantomData,
Expand All @@ -68,42 +71,28 @@ impl<S: Storage, C: Coord, N: Notifier> BBQueue<S, C, N> {

#[cfg(feature = "std")]
impl<S: Storage, C: Coord, N: Notifier> crate::queue::ArcBBQueue<S, C, N> {
pub fn framed_producer(&self) -> FramedProducer<std::sync::Arc<BBQueue<S, C, N>>, S, C, N> {
pub fn framed_producer(&self) -> FramedProducer<Self> {
FramedProducer {
bbq: self.0.bbq_ref(),
pd: PhantomData,
}
}

pub fn framed_consumer(&self) -> FramedConsumer<std::sync::Arc<BBQueue<S, C, N>>, S, C, N> {
pub fn framed_consumer(&self) -> FramedConsumer<Self> {
FramedConsumer {
bbq: self.0.bbq_ref(),
pd: PhantomData,
}
}
}

pub struct FramedProducer<Q, S, C, N, H = u16>
where
S: Storage,
C: Coord,
N: Notifier,
Q: BbqHandle<S, C, N>,
H: LenHeader,
{
pub struct FramedProducer<Q: BbqHandle, H = u16> {
bbq: Q::Target,
pd: PhantomData<(S, C, N, H)>,
pd: PhantomData<H>,
}

impl<Q, S, C, N, H> FramedProducer<Q, S, C, N, H>
where
S: Storage,
C: Coord,
N: Notifier,
Q: BbqHandle<S, C, N>,
H: LenHeader,
{
pub fn grant(&self, sz: H) -> Result<FramedGrantW<Q, S, C, N, H>, ()> {
impl<Q: BbqHandle, H: LenHeader> FramedProducer<Q, H> {
pub fn grant(&self, sz: H) -> Result<FramedGrantW<Q, H>, ()> {
let (ptr, cap) = self.bbq.sto.ptr_len();
let needed = sz.into() + core::mem::size_of::<H>();

Expand All @@ -121,40 +110,30 @@ where
}
}

impl<Q, S, C, N, H> FramedProducer<Q, S, C, N, H>
where
S: Storage,
C: Coord,
N: AsyncNotifier,
Q: BbqHandle<S, C, N>,
H: LenHeader,
{
pub async fn wait_grant(&self, sz: H) -> FramedGrantW<Q, S, C, N, H> {
impl<Q: BbqHandle<Notifier: AsyncNotifier>, H: LenHeader> FramedProducer<Q, H> {
pub async fn wait_grant(&self, sz: H) -> FramedGrantW<Q, H> {
self.bbq.not.wait_for_not_full(|| self.grant(sz).ok()).await
}
}

pub struct FramedConsumer<Q, S, C, N, H = u16>
where
S: Storage,
C: Coord,
N: Notifier,
Q: BbqHandle<S, C, N>,
H: LenHeader,
{
impl<Q: BbqHandle, H: LenHeader> Typed for FramedProducer<Q, H> {}

impl<Q: BbqHandle<Notifier: AsyncNotifierTyped>, H: LenHeader> TypedWrapper<FramedProducer<Q, H>> {
pub fn wait_grant(
&self,
sz: H,
) -> <Q::Notifier as ConstrFut>::NotFull<impl ConstrFnMut<Out = FramedGrantW<Q, H>>> {
AsyncNotifierTyped::wait_for_not_full(&self.bbq.not, move || self.grant(sz).ok())
}
}

pub struct FramedConsumer<Q: BbqHandle, H: LenHeader = u16> {
bbq: Q::Target,
pd: PhantomData<(S, C, N, H)>,
pd: PhantomData<H>,
}

impl<Q, S, C, N, H> FramedConsumer<Q, S, C, N, H>
where
S: Storage,
C: Coord,
N: Notifier,
Q: BbqHandle<S, C, N>,
H: LenHeader,
{
pub fn read(&self) -> Result<FramedGrantR<Q, S, C, N, H>, ()> {
impl<Q: BbqHandle, H: LenHeader> FramedConsumer<Q, H> {
pub fn read(&self) -> Result<FramedGrantR<Q, H>, ()> {
let (ptr, _cap) = self.bbq.sto.ptr_len();
let (offset, grant_len) = self.bbq.cor.read()?;

Expand Down Expand Up @@ -197,40 +176,29 @@ where
}
}

impl<Q, S, C, N, H> FramedConsumer<Q, S, C, N, H>
where
S: Storage,
C: Coord,
N: AsyncNotifier,
Q: BbqHandle<S, C, N>,
H: LenHeader,
{
pub async fn wait_read(&self) -> FramedGrantR<Q, S, C, N, H> {
impl<Q: BbqHandle<Notifier: AsyncNotifier>, H: LenHeader> FramedConsumer<Q, H> {
pub async fn wait_read(&self) -> FramedGrantR<Q, H> {
self.bbq.not.wait_for_not_empty(|| self.read().ok()).await
}
}

pub struct FramedGrantW<Q, S, C, N, H = u16>
where
S: Storage,
C: Coord,
N: Notifier,
Q: BbqHandle<S, C, N>,
H: LenHeader,
{
impl<Q: BbqHandle, H: LenHeader> Typed for FramedConsumer<Q, H> {}

impl<Q: BbqHandle<Notifier: AsyncNotifierTyped>, H: LenHeader> TypedWrapper<FramedConsumer<Q, H>> {
pub fn wait_read(
&self,
) -> <Q::Notifier as ConstrFut>::NotEmpty<impl ConstrFnMut<Out = FramedGrantR<Q, H>>> {
AsyncNotifierTyped::wait_for_not_empty(&self.bbq.not, move || self.read().ok())
}
}

pub struct FramedGrantW<Q: BbqHandle, H: LenHeader = u16> {
bbq: Q::Target,
base_ptr: NonNull<u8>,
hdr: H,
}

impl<Q, S, C, N, H> Deref for FramedGrantW<Q, S, C, N, H>
where
S: Storage,
C: Coord,
N: Notifier,
Q: BbqHandle<S, C, N>,
H: LenHeader,
{
impl<Q: BbqHandle, H: LenHeader> Deref for FramedGrantW<Q, H> {
type Target = [u8];

fn deref(&self) -> &Self::Target {
Expand All @@ -243,14 +211,7 @@ where
}
}

impl<Q, S, C, N, H> DerefMut for FramedGrantW<Q, S, C, N, H>
where
S: Storage,
C: Coord,
N: Notifier,
Q: BbqHandle<S, C, N>,
H: LenHeader,
{
impl<Q: BbqHandle, H: LenHeader> DerefMut for FramedGrantW<Q, H> {
fn deref_mut(&mut self) -> &mut Self::Target {
let len = self.hdr.into();
let body_ptr = unsafe {
Expand All @@ -261,14 +222,7 @@ where
}
}

impl<Q, S, C, N, H> Drop for FramedGrantW<Q, S, C, N, H>
where
S: Storage,
C: Coord,
N: Notifier,
Q: BbqHandle<S, C, N>,
H: LenHeader,
{
impl<Q: BbqHandle, H: LenHeader> Drop for FramedGrantW<Q, H> {
fn drop(&mut self) {
// Default drop performs an "abort"
let (_ptr, cap) = self.bbq.sto.ptr_len();
Expand All @@ -278,14 +232,7 @@ where
}
}

impl<Q, S, C, N, H> FramedGrantW<Q, S, C, N, H>
where
S: Storage,
C: Coord,
N: Notifier,
Q: BbqHandle<S, C, N>,
H: LenHeader,
{
impl<Q: BbqHandle, H: LenHeader> FramedGrantW<Q, H> {
pub fn commit(self, used: H) {
let (_ptr, cap) = self.bbq.sto.ptr_len();
let hdrlen: usize = const { core::mem::size_of::<H>() };
Expand All @@ -311,27 +258,13 @@ where
}
}

pub struct FramedGrantR<Q, S, C, N, H = u16>
where
S: Storage,
C: Coord,
N: Notifier,
Q: BbqHandle<S, C, N>,
H: LenHeader,
{
pub struct FramedGrantR<Q: BbqHandle, H: LenHeader> {
bbq: Q::Target,
body_ptr: NonNull<u8>,
hdr: H,
}

impl<Q, S, C, N, H> Deref for FramedGrantR<Q, S, C, N, H>
where
S: Storage,
C: Coord,
N: Notifier,
Q: BbqHandle<S, C, N>,
H: LenHeader,
{
impl<Q: BbqHandle, H: LenHeader> Deref for FramedGrantR<Q, H> {
type Target = [u8];

fn deref(&self) -> &Self::Target {
Expand All @@ -340,42 +273,21 @@ where
}
}

impl<Q, S, C, N, H> DerefMut for FramedGrantR<Q, S, C, N, H>
where
S: Storage,
C: Coord,
N: Notifier,
Q: BbqHandle<S, C, N>,
H: LenHeader,
{
impl<Q: BbqHandle, H: LenHeader> DerefMut for FramedGrantR<Q, H> {
fn deref_mut(&mut self) -> &mut Self::Target {
let len: usize = self.hdr.into();
unsafe { core::slice::from_raw_parts_mut(self.body_ptr.as_ptr(), len) }
}
}

impl<Q, S, C, N, H> Drop for FramedGrantR<Q, S, C, N, H>
where
S: Storage,
C: Coord,
N: Notifier,
Q: BbqHandle<S, C, N>,
H: LenHeader,
{
impl<Q: BbqHandle, H: LenHeader> Drop for FramedGrantR<Q, H> {
fn drop(&mut self) {
// Default behavior is "keep" - release zero bytes
self.bbq.cor.release_inner(0);
}
}

impl<Q, S, C, N, H> FramedGrantR<Q, S, C, N, H>
where
S: Storage,
C: Coord,
N: Notifier,
Q: BbqHandle<S, C, N>,
H: LenHeader,
{
impl<Q: BbqHandle, H: LenHeader> FramedGrantR<Q, H> {
pub fn release(self) {
let len: usize = self.hdr.into();
let hdrlen: usize = const { core::mem::size_of::<H>() };
Expand Down
Loading