Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions crates/wasi-http/src/p3/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,12 @@ where
Poll::Ready(Some(Ok(frame))) => {
match frame.into_data().map_err(http_body::Frame::into_trailers) {
Ok(mut frame) => {
// Libraries like `Reqwest` generate a 0-length frame after sensing end-of-stream,
// so we have to check for the body's end-of-stream indicator here too
if frame.len() == 0 && self.body.is_end_stream() {
break 'result Ok(None);
}
Comment on lines 495 to 499
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like it should also be checking the size of the frame received because if it's a nonzero-sized-frame but the body has ended it seems like we'd still want to yield/copy that frame and then only on the next poll sever the stream when is_end_stream is true

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point. I have updated the check and added an extra test. Thanks.


if let Some(cap) = cap {
let n = frame.len();
let cap = cap.into();
Expand Down
149 changes: 149 additions & 0 deletions crates/wasi-http/tests/all/p3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use http_body::Body;
use http_body_util::{BodyExt as _, Collected, Empty, combinators::UnsyncBoxBody};
use std::io::Write;
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
use test_programs_artifacts::*;
use tokio::{fs, try_join};
use wasm_compose::composer::ComponentComposer;
Expand Down Expand Up @@ -591,3 +593,150 @@ async fn p3_http_proxy() -> Result<()> {
assert_eq!(request_body, body.as_slice());
Ok(())
}

// Custom body wrapper that sends a configurable frame at EOS while reporting is_end_stream() = true
struct BodyWithFrameAtEos {
inner: http_body_util::StreamBody<
futures::channel::mpsc::Receiver<Result<http_body::Frame<Bytes>, ErrorCode>>,
>,
final_frame: Option<Bytes>,
sent_final: bool,
at_eos: bool,
}

impl http_body::Body for BodyWithFrameAtEos {
type Data = Bytes;
type Error = ErrorCode;

fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
// First, poll the underlying body
let this = &mut *self;
match Pin::new(&mut this.inner).poll_frame(cx) {
Poll::Ready(None) if !this.sent_final => {
// When the underlying body ends, send the configured final frame
// This simulates HTTP implementations that send frames at EOS
this.sent_final = true;
this.at_eos = true;
if let Some(data) = this.final_frame.take() {
Poll::Ready(Some(Ok(http_body::Frame::data(data))))
} else {
Poll::Ready(None)
}
}
other => other,
}
}

fn is_end_stream(&self) -> bool {
// Report end of stream once we've reached it
// This ensures is_end_stream() = true when we send the final frame
self.at_eos
}
}

#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn p3_http_empty_frame_at_end_of_stream() -> Result<()> {
_ = env_logger::try_init();

// This test verifies the fix which handles the case where a zero-length frame is
// received when is_end_stream() is true. Without the fix, the StreamProducer would
// crash when the WASM guest tries to read such a frame.

let body = b"test";
let raw_body = Bytes::copy_from_slice(body);

let (mut body_tx, body_rx) = futures::channel::mpsc::channel::<Result<_, ErrorCode>>(1);

let wrapped_body = BodyWithFrameAtEos {
inner: http_body_util::StreamBody::new(body_rx),
final_frame: Some(Bytes::new()), // Send empty frame at EOS
sent_final: false,
at_eos: false,
};

let request = http::Request::builder()
.uri("http://localhost/")
.method(http::Method::GET);

// Use the echo component which actually reads from the stream
let response = futures::join!(
run_http(
P3_HTTP_ECHO_COMPONENT,
request.body(wrapped_body)?,
oneshot::channel().0
),
async {
body_tx
.send(Ok(http_body::Frame::data(raw_body)))
.await
.unwrap();
drop(body_tx);
}
)
.0?
.unwrap();

assert_eq!(response.status().as_u16(), 200);

// Verify the body was echoed correctly (empty frames should be filtered out by the fix)
let (_, collected_body) = response.into_parts();
let collected_body = collected_body.to_bytes();
assert_eq!(collected_body, body.as_slice());
Ok(())
}

#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn p3_http_data_frame_at_end_of_stream() -> Result<()> {
_ = env_logger::try_init();

// This test verifies that when is_end_stream() is true but the frame contains data,
// we still process the data.

let body = b"test";
let final_data = b" final";
let raw_body = Bytes::copy_from_slice(body);
let final_frame = Bytes::copy_from_slice(final_data);

let (mut body_tx, body_rx) = futures::channel::mpsc::channel::<Result<_, ErrorCode>>(1);

let wrapped_body = BodyWithFrameAtEos {
inner: http_body_util::StreamBody::new(body_rx),
final_frame: Some(final_frame), // Send data frame at EOS with is_end_stream() = true
sent_final: false,
at_eos: false,
};

let request = http::Request::builder()
.uri("http://localhost/")
.method(http::Method::GET);

// Use the echo component which actually reads from the stream
let response = futures::join!(
run_http(
P3_HTTP_ECHO_COMPONENT,
request.body(wrapped_body)?,
oneshot::channel().0
),
async {
body_tx
.send(Ok(http_body::Frame::data(raw_body)))
.await
.unwrap();
drop(body_tx);
}
)
.0?
.unwrap();

assert_eq!(response.status().as_u16(), 200);

// Verify the body was echoed correctly (the final frame's data should not be lost)
let (_, collected_body) = response.into_parts();
let collected_body = collected_body.to_bytes();
let expected = [body.as_slice(), final_data.as_slice()].concat();
assert_eq!(collected_body, expected.as_slice());
Ok(())
}