Skip to content

Commit d382efa

Browse files
committed
fix: add lifetime bound to YieldFut
1 parent 7abf348 commit d382efa

File tree

4 files changed

+91
-84
lines changed

4 files changed

+91
-84
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ use futures_util::{pin_mut, stream::StreamExt};
88

99
#[tokio::main]
1010
async fn main() {
11-
let stream = async_stream(|r#yield| async move {
11+
let stream = async_stream(|yielder| async move {
1212
for i in 0..3 {
13-
r#yield(i).await;
13+
yielder.r#yield(i).await;
1414
}
1515
});
1616
pin_mut!(stream);

src/lib.rs

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,21 +27,28 @@ thread_local! {
2727
#[thread_local]
2828
static STORE: Cell<*mut ()> = Cell::new(ptr::null_mut());
2929

30-
pub(crate) fn r#yield<T>(value: T) -> YieldFut<T> {
31-
YieldFut { value: Some(value) }
30+
pub struct Yielder<T> {
31+
_p: PhantomData<T>
32+
}
33+
34+
impl<T> Yielder<T> {
35+
pub fn r#yield(&self, value: T) -> YieldFut<'_, T> {
36+
YieldFut { value: Some(value), _p: PhantomData }
37+
}
3238
}
3339

3440
/// Future returned by an [`AsyncStream`]'s yield function.
3541
///
3642
/// This future must be `.await`ed inside the generator in order for the item to be yielded by the stream.
3743
#[must_use = "stream will not yield this item unless the future returned by yield is awaited"]
38-
pub struct YieldFut<T> {
39-
value: Option<T>
44+
pub struct YieldFut<'y, T> {
45+
value: Option<T>,
46+
_p: PhantomData<&'y ()>
4047
}
4148

42-
impl<T> Unpin for YieldFut<T> {}
49+
impl<T> Unpin for YieldFut<'_, T> {}
4350

44-
impl<T> Future for YieldFut<T> {
51+
impl<T> Future for YieldFut<'_, T> {
4552
type Output = ();
4653

4754
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
@@ -153,16 +160,16 @@ where
153160

154161
/// Create an asynchronous [`Stream`] from an asynchronous generator function.
155162
///
156-
/// The provided function will be given a "yielder" function, which, when called, causes the stream to yield an item:
163+
/// The provided function will be given a [`Yielder`], which, when called, causes the stream to yield an item:
157164
/// ```
158165
/// use async_stream_lite::async_stream;
159166
/// use futures::{pin_mut, stream::StreamExt};
160167
///
161168
/// #[tokio::main]
162169
/// async fn main() {
163-
/// let stream = async_stream(|r#yield| async move {
170+
/// let stream = async_stream(|yielder| async move {
164171
/// for i in 0..3 {
165-
/// r#yield(i).await;
172+
/// yielder.r#yield(i).await;
166173
/// }
167174
/// });
168175
/// pin_mut!(stream);
@@ -181,9 +188,9 @@ where
181188
/// };
182189
///
183190
/// fn zero_to_three() -> impl Stream<Item = u32> {
184-
/// async_stream(|r#yield| async move {
191+
/// async_stream(|yielder| async move {
185192
/// for i in 0..3 {
186-
/// r#yield(i).await;
193+
/// yielder.r#yield(i).await;
187194
/// }
188195
/// })
189196
/// }
@@ -207,9 +214,9 @@ where
207214
/// };
208215
///
209216
/// fn zero_to_three() -> BoxStream<'static, u32> {
210-
/// Box::pin(async_stream(|r#yield| async move {
217+
/// Box::pin(async_stream(|yielder| async move {
211218
/// for i in 0..3 {
212-
/// r#yield(i).await;
219+
/// yielder.r#yield(i).await;
213220
/// }
214221
/// }))
215222
/// }
@@ -232,18 +239,18 @@ where
232239
/// };
233240
///
234241
/// fn zero_to_three() -> impl Stream<Item = u32> {
235-
/// async_stream(|r#yield| async move {
242+
/// async_stream(|yielder| async move {
236243
/// for i in 0..3 {
237-
/// r#yield(i).await;
244+
/// yielder.r#yield(i).await;
238245
/// }
239246
/// })
240247
/// }
241248
///
242249
/// fn double<S: Stream<Item = u32>>(input: S) -> impl Stream<Item = u32> {
243-
/// async_stream(|r#yield| async move {
250+
/// async_stream(|yielder| async move {
244251
/// pin_mut!(input);
245252
/// while let Some(value) = input.next().await {
246-
/// r#yield(value * 2).await;
253+
/// yielder.r#yield(value * 2).await;
247254
/// }
248255
/// })
249256
/// }
@@ -261,10 +268,10 @@ where
261268
/// See also [`try_async_stream`], a variant of [`async_stream`] which supports try notation (`?`).
262269
pub fn async_stream<T, F, U>(generator: F) -> AsyncStream<T, U>
263270
where
264-
F: FnOnce(fn(value: T) -> YieldFut<T>) -> U,
271+
F: FnOnce(Yielder<T>) -> U,
265272
U: Future<Output = ()>
266273
{
267-
let generator = generator(r#yield::<T>);
274+
let generator = generator(Yielder { _p: PhantomData });
268275
AsyncStream {
269276
_p: PhantomData,
270277
done: false,

src/tests.rs

Lines changed: 52 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@ use futures::{
66
};
77
use tokio::sync::mpsc;
88

9-
use super::{YieldFut, async_stream};
9+
use super::{Yielder, async_stream};
1010

1111
#[tokio::test]
1212
async fn noop_stream() {
13-
let s = async_stream(|_yield: fn(()) -> YieldFut<()>| async move {});
13+
let s = async_stream(|_yielder: Yielder<()>| async move {});
1414
pin_mut!(s);
1515

1616
#[allow(clippy::never_loop)]
@@ -25,7 +25,7 @@ async fn empty_stream() {
2525

2626
{
2727
let r = &mut ran;
28-
let s = async_stream(|_yield: fn(()) -> YieldFut<()>| async move {
28+
let s = async_stream(|_yield: Yielder<()>| async move {
2929
*r = true;
3030
println!("hello world!");
3131
});
@@ -42,8 +42,8 @@ async fn empty_stream() {
4242

4343
#[tokio::test]
4444
async fn yield_single_value() {
45-
let s = async_stream(|r#yield| async move {
46-
r#yield("hello").await;
45+
let s = async_stream(|yielder| async move {
46+
yielder.r#yield("hello").await;
4747
});
4848

4949
let values: Vec<_> = s.collect().await;
@@ -54,8 +54,8 @@ async fn yield_single_value() {
5454

5555
#[tokio::test]
5656
async fn fused() {
57-
let s = async_stream(|r#yield| async move {
58-
r#yield("hello").await;
57+
let s = async_stream(|yielder| async move {
58+
yielder.r#yield("hello").await;
5959
});
6060
pin_mut!(s);
6161

@@ -69,10 +69,10 @@ async fn fused() {
6969

7070
#[tokio::test]
7171
async fn yield_multi_value() {
72-
let stream = async_stream(|r#yield| async move {
73-
r#yield("hello").await;
74-
r#yield("world").await;
75-
r#yield("foobar").await;
72+
let stream = async_stream(|yielder| async move {
73+
yielder.r#yield("hello").await;
74+
yielder.r#yield("world").await;
75+
yielder.r#yield("foobar").await;
7676
});
7777

7878
let values: Vec<_> = stream.collect().await;
@@ -88,10 +88,10 @@ async fn unit_yield_in_select() {
8888
#[allow(clippy::unused_async)]
8989
async fn do_stuff_async() {}
9090

91-
let stream = async_stream(|r#yield| async move {
91+
let stream = async_stream(|yielder| async move {
9292
tokio::select! {
93-
() = do_stuff_async() => r#yield(()).await,
94-
else => r#yield(()).await
93+
() = do_stuff_async() => yielder.r#yield(()).await,
94+
else => yielder.r#yield(()).await
9595
}
9696
});
9797

@@ -105,11 +105,11 @@ async fn yield_with_select() {
105105
async fn do_stuff_async() {}
106106
async fn more_async_work() {}
107107

108-
let stream = async_stream(|r#yield| async move {
108+
let stream = async_stream(|yielder| async move {
109109
tokio::select! {
110-
() = do_stuff_async() => r#yield("hey").await,
111-
() = more_async_work() => r#yield("hey").await,
112-
else => r#yield("hey").await
110+
() = do_stuff_async() => yielder.r#yield("hey").await,
111+
() = more_async_work() => yielder.r#yield("hey").await,
112+
else => yielder.r#yield("hey").await
113113
}
114114
});
115115

@@ -120,10 +120,10 @@ async fn yield_with_select() {
120120
#[tokio::test]
121121
async fn return_stream() {
122122
fn build_stream() -> impl Stream<Item = u32> {
123-
async_stream(|r#yield| async move {
124-
r#yield(1).await;
125-
r#yield(2).await;
126-
r#yield(3).await;
123+
async_stream(|yielder| async move {
124+
yielder.r#yield(1).await;
125+
yielder.r#yield(2).await;
126+
yielder.r#yield(3).await;
127127
})
128128
}
129129

@@ -139,10 +139,10 @@ async fn return_stream() {
139139
#[tokio::test]
140140
async fn boxed_stream() {
141141
fn build_stream() -> BoxStream<'static, u32> {
142-
Box::pin(async_stream(|r#yield| async move {
143-
r#yield(1).await;
144-
r#yield(2).await;
145-
r#yield(3).await;
142+
Box::pin(async_stream(|yielder| async move {
143+
yielder.r#yield(1).await;
144+
yielder.r#yield(2).await;
145+
yielder.r#yield(3).await;
146146
}))
147147
}
148148

@@ -159,9 +159,9 @@ async fn boxed_stream() {
159159
async fn consume_channel() {
160160
let (tx, mut rx) = mpsc::channel(10);
161161

162-
let stream = async_stream(|r#yield| async move {
162+
let stream = async_stream(|yielder| async move {
163163
while let Some(v) = rx.recv().await {
164-
r#yield(v).await;
164+
yielder.r#yield(v).await;
165165
}
166166
});
167167

@@ -182,8 +182,8 @@ async fn borrow_self() {
182182

183183
impl Data {
184184
fn stream(&self) -> impl Stream<Item = &str> + '_ {
185-
async_stream(|r#yield| async move {
186-
r#yield(&self.0[..]).await;
185+
async_stream(|yielder| async move {
186+
yielder.r#yield(&self.0[..]).await;
187187
})
188188
}
189189
}
@@ -201,8 +201,8 @@ async fn borrow_self_boxed() {
201201

202202
impl Data {
203203
fn stream(&self) -> BoxStream<'_, &str> {
204-
Box::pin(async_stream(|r#yield| async move {
205-
r#yield(&self.0[..]).await;
204+
Box::pin(async_stream(|yielder| async move {
205+
yielder.r#yield(&self.0[..]).await;
206206
}))
207207
}
208208
}
@@ -216,16 +216,16 @@ async fn borrow_self_boxed() {
216216

217217
#[tokio::test]
218218
async fn stream_in_stream() {
219-
let s = async_stream(|r#yield| async move {
220-
let s = async_stream(|r#yield| async move {
219+
let s = async_stream(|yielder| async move {
220+
let s = async_stream(|yielder| async move {
221221
for i in 0..3 {
222-
r#yield(i).await;
222+
yielder.r#yield(i).await;
223223
}
224224
});
225225
pin_mut!(s);
226226

227227
while let Some(v) = s.next().await {
228-
r#yield(v).await;
228+
yielder.r#yield(v).await;
229229
}
230230
});
231231

@@ -235,37 +235,37 @@ async fn stream_in_stream() {
235235

236236
#[tokio::test]
237237
async fn streamception() {
238-
let s = async_stream(|r#yield| async move {
239-
let s = async_stream(|r#yield| async move {
240-
let s = async_stream(|r#yield| async move {
241-
let s = async_stream(|r#yield| async move {
242-
let s = async_stream(|r#yield| async move {
238+
let s = async_stream(|yielder| async move {
239+
let s = async_stream(|yielder| async move {
240+
let s = async_stream(|yielder| async move {
241+
let s = async_stream(|yielder| async move {
242+
let s = async_stream(|yielder| async move {
243243
for i in 0..3 {
244-
r#yield(i).await;
244+
yielder.r#yield(i).await;
245245
}
246246
});
247247
pin_mut!(s);
248248

249249
while let Some(v) = s.next().await {
250-
r#yield(v).await;
250+
yielder.r#yield(v).await;
251251
}
252252
});
253253
pin_mut!(s);
254254

255255
while let Some(v) = s.next().await {
256-
r#yield(v).await;
256+
yielder.r#yield(v).await;
257257
}
258258
});
259259
pin_mut!(s);
260260

261261
while let Some(v) = s.next().await {
262-
r#yield(v).await;
262+
yielder.r#yield(v).await;
263263
}
264264
});
265265
pin_mut!(s);
266266

267267
while let Some(v) = s.next().await {
268-
r#yield(v).await;
268+
yielder.r#yield(v).await;
269269
}
270270
});
271271

@@ -275,9 +275,9 @@ async fn streamception() {
275275

276276
#[tokio::test]
277277
async fn yield_non_unpin_value() {
278-
let s: Vec<_> = async_stream(|r#yield| async move {
278+
let s: Vec<_> = async_stream(|yielder| async move {
279279
for i in 0..3 {
280-
r#yield(async move { i }).await;
280+
yielder.r#yield(async move { i }).await;
281281
}
282282
})
283283
.buffered(1)
@@ -290,10 +290,10 @@ async fn yield_non_unpin_value() {
290290
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
291291
async fn multithreaded() {
292292
fn build_stream() -> impl Stream<Item = u32> {
293-
async_stream(|r#yield| async move {
294-
r#yield(1).await;
295-
r#yield(2).await;
296-
r#yield(3).await;
293+
async_stream(|yielder| async move {
294+
yielder.r#yield(1).await;
295+
yielder.r#yield(2).await;
296+
yielder.r#yield(3).await;
297297
})
298298
}
299299

0 commit comments

Comments
 (0)