diff --git a/crates/wasi-http/src/p3/body.rs b/crates/wasi-http/src/p3/body.rs index e4e9f28547ac..84dcadab80ef 100644 --- a/crates/wasi-http/src/p3/body.rs +++ b/crates/wasi-http/src/p3/body.rs @@ -492,6 +492,12 @@ where Poll::Ready(Some(Ok(frame))) => { match frame.into_data().map_err(http_body::Frame::into_trailers) { Ok(mut frame) => { + // Libraries like `Reqwest` generate a 0-length frame after sensing end-of-stream, + // so we have to check for the body's end-of-stream indicator here too + if frame.len() == 0 && self.body.is_end_stream() { + break 'result Ok(None); + } + if let Some(cap) = cap { let n = frame.len(); let cap = cap.into(); diff --git a/crates/wasi-http/tests/all/p3/mod.rs b/crates/wasi-http/tests/all/p3/mod.rs index f78d9b54559a..31a017739a6d 100644 --- a/crates/wasi-http/tests/all/p3/mod.rs +++ b/crates/wasi-http/tests/all/p3/mod.rs @@ -9,6 +9,8 @@ use http_body::Body; use http_body_util::{BodyExt as _, Collected, Empty, combinators::UnsyncBoxBody}; use std::io::Write; use std::path::Path; +use std::pin::Pin; +use std::task::{Context, Poll}; use test_programs_artifacts::*; use tokio::{fs, try_join}; use wasm_compose::composer::ComponentComposer; @@ -591,3 +593,150 @@ async fn p3_http_proxy() -> Result<()> { assert_eq!(request_body, body.as_slice()); Ok(()) } + +// Custom body wrapper that sends a configurable frame at EOS while reporting is_end_stream() = true +struct BodyWithFrameAtEos { + inner: http_body_util::StreamBody< + futures::channel::mpsc::Receiver, ErrorCode>>, + >, + final_frame: Option, + sent_final: bool, + at_eos: bool, +} + +impl http_body::Body for BodyWithFrameAtEos { + type Data = Bytes; + type Error = ErrorCode; + + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + // First, poll the underlying body + let this = &mut *self; + match Pin::new(&mut this.inner).poll_frame(cx) { + Poll::Ready(None) if !this.sent_final => { + // When the underlying body ends, send the configured final frame + // This simulates HTTP implementations that send frames at EOS + this.sent_final = true; + this.at_eos = true; + if let Some(data) = this.final_frame.take() { + Poll::Ready(Some(Ok(http_body::Frame::data(data)))) + } else { + Poll::Ready(None) + } + } + other => other, + } + } + + fn is_end_stream(&self) -> bool { + // Report end of stream once we've reached it + // This ensures is_end_stream() = true when we send the final frame + self.at_eos + } +} + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn p3_http_empty_frame_at_end_of_stream() -> Result<()> { + _ = env_logger::try_init(); + + // This test verifies the fix which handles the case where a zero-length frame is + // received when is_end_stream() is true. Without the fix, the StreamProducer would + // crash when the WASM guest tries to read such a frame. + + let body = b"test"; + let raw_body = Bytes::copy_from_slice(body); + + let (mut body_tx, body_rx) = futures::channel::mpsc::channel::>(1); + + let wrapped_body = BodyWithFrameAtEos { + inner: http_body_util::StreamBody::new(body_rx), + final_frame: Some(Bytes::new()), // Send empty frame at EOS + sent_final: false, + at_eos: false, + }; + + let request = http::Request::builder() + .uri("http://localhost/") + .method(http::Method::GET); + + // Use the echo component which actually reads from the stream + let response = futures::join!( + run_http( + P3_HTTP_ECHO_COMPONENT, + request.body(wrapped_body)?, + oneshot::channel().0 + ), + async { + body_tx + .send(Ok(http_body::Frame::data(raw_body))) + .await + .unwrap(); + drop(body_tx); + } + ) + .0? + .unwrap(); + + assert_eq!(response.status().as_u16(), 200); + + // Verify the body was echoed correctly (empty frames should be filtered out by the fix) + let (_, collected_body) = response.into_parts(); + let collected_body = collected_body.to_bytes(); + assert_eq!(collected_body, body.as_slice()); + Ok(()) +} + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn p3_http_data_frame_at_end_of_stream() -> Result<()> { + _ = env_logger::try_init(); + + // This test verifies that when is_end_stream() is true but the frame contains data, + // we still process the data. + + let body = b"test"; + let final_data = b" final"; + let raw_body = Bytes::copy_from_slice(body); + let final_frame = Bytes::copy_from_slice(final_data); + + let (mut body_tx, body_rx) = futures::channel::mpsc::channel::>(1); + + let wrapped_body = BodyWithFrameAtEos { + inner: http_body_util::StreamBody::new(body_rx), + final_frame: Some(final_frame), // Send data frame at EOS with is_end_stream() = true + sent_final: false, + at_eos: false, + }; + + let request = http::Request::builder() + .uri("http://localhost/") + .method(http::Method::GET); + + // Use the echo component which actually reads from the stream + let response = futures::join!( + run_http( + P3_HTTP_ECHO_COMPONENT, + request.body(wrapped_body)?, + oneshot::channel().0 + ), + async { + body_tx + .send(Ok(http_body::Frame::data(raw_body))) + .await + .unwrap(); + drop(body_tx); + } + ) + .0? + .unwrap(); + + assert_eq!(response.status().as_u16(), 200); + + // Verify the body was echoed correctly (the final frame's data should not be lost) + let (_, collected_body) = response.into_parts(); + let collected_body = collected_body.to_bytes(); + let expected = [body.as_slice(), final_data.as_slice()].concat(); + assert_eq!(collected_body, expected.as_slice()); + Ok(()) +}