From faf9d76a6704075d2bce71851bfbc511983e4ce1 Mon Sep 17 00:00:00 2001 From: Karthik Swaminathan Date: Wed, 28 Jan 2026 22:30:30 +1300 Subject: [PATCH 1/2] Fixed #12458 and added test to validate --- crates/wasi-http/src/p3/body.rs | 6 ++ crates/wasi-http/tests/all/p3/mod.rs | 90 ++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+) diff --git a/crates/wasi-http/src/p3/body.rs b/crates/wasi-http/src/p3/body.rs index e4e9f28547ac..bff8042ec1db 100644 --- a/crates/wasi-http/src/p3/body.rs +++ b/crates/wasi-http/src/p3/body.rs @@ -492,6 +492,12 @@ where Poll::Ready(Some(Ok(frame))) => { match frame.into_data().map_err(http_body::Frame::into_trailers) { Ok(mut frame) => { + // Libraries like `Reqwest` generate a 0-length frame after sensing end-of-stream, + // so we have to check for the body's end-of-stream indicator here too + if self.body.is_end_stream() { + break 'result Ok(None); + } + if let Some(cap) = cap { let n = frame.len(); let cap = cap.into(); diff --git a/crates/wasi-http/tests/all/p3/mod.rs b/crates/wasi-http/tests/all/p3/mod.rs index f78d9b54559a..2674046bb08d 100644 --- a/crates/wasi-http/tests/all/p3/mod.rs +++ b/crates/wasi-http/tests/all/p3/mod.rs @@ -9,6 +9,8 @@ use http_body::Body; use http_body_util::{BodyExt as _, Collected, Empty, combinators::UnsyncBoxBody}; use std::io::Write; use std::path::Path; +use std::pin::Pin; +use std::task::{Context, Poll}; use test_programs_artifacts::*; use tokio::{fs, try_join}; use wasm_compose::composer::ComponentComposer; @@ -591,3 +593,91 @@ async fn p3_http_proxy() -> Result<()> { assert_eq!(request_body, body.as_slice()); Ok(()) } + +// Custom body wrapper that sends an empty frame at EOS while reporting is_end_stream() = true +struct BodyWithEmptyFrameAtEos { + inner: http_body_util::StreamBody< + futures::channel::mpsc::Receiver, ErrorCode>>, + >, + sent_empty: bool, + at_eos: bool, +} + +impl http_body::Body for BodyWithEmptyFrameAtEos { + type Data = Bytes; + type Error = ErrorCode; + + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + // First, poll the underlying body + let this = &mut *self; + match Pin::new(&mut this.inner).poll_frame(cx) { + Poll::Ready(None) if !this.sent_empty => { + // When the underlying body ends, send an empty frame + // This simulates HTTP implementations that send empty frames at EOS + this.sent_empty = true; + this.at_eos = true; + Poll::Ready(Some(Ok(http_body::Frame::data(Bytes::new())))) + } + other => other, + } + } + + fn is_end_stream(&self) -> bool { + // Report end of stream once we've reached it + // This ensures is_end_stream() = true when we send the empty frame + self.at_eos + } +} + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn p3_http_empty_frame_at_end_of_stream() -> Result<()> { + _ = env_logger::try_init(); + + // This test verifies the fix which handles the case where a zero-length frame is + // received when is_end_stream() is true. Without the fix, the StreamProducer would + // crash when the WASM guest tries to read such a frame. + + let body = b"test"; + let raw_body = Bytes::copy_from_slice(body); + + let (mut body_tx, body_rx) = futures::channel::mpsc::channel::>(1); + + let wrapped_body = BodyWithEmptyFrameAtEos { + inner: http_body_util::StreamBody::new(body_rx), + sent_empty: false, + at_eos: false, + }; + + let request = http::Request::builder() + .uri("http://localhost/") + .method(http::Method::GET); + + // Use the echo component which actually reads from the stream + let response = futures::join!( + run_http( + P3_HTTP_ECHO_COMPONENT, + request.body(wrapped_body)?, + oneshot::channel().0 + ), + async { + body_tx + .send(Ok(http_body::Frame::data(raw_body))) + .await + .unwrap(); + drop(body_tx); + } + ) + .0? + .unwrap(); + + assert_eq!(response.status().as_u16(), 200); + + // Verify the body was echoed correctly (empty frames should be filtered out by the fix) + let (_, collected_body) = response.into_parts(); + let collected_body = collected_body.to_bytes(); + assert_eq!(collected_body, body.as_slice()); + Ok(()) +} From 5ade6df730865e899164d67127a9dcf46c0a6853 Mon Sep 17 00:00:00 2001 From: Karthik Swaminathan Date: Fri, 30 Jan 2026 08:03:14 +1300 Subject: [PATCH 2/2] Updated frame processing condition and test --- crates/wasi-http/src/p3/body.rs | 2 +- crates/wasi-http/tests/all/p3/mod.rs | 83 ++++++++++++++++++++++++---- 2 files changed, 72 insertions(+), 13 deletions(-) diff --git a/crates/wasi-http/src/p3/body.rs b/crates/wasi-http/src/p3/body.rs index bff8042ec1db..84dcadab80ef 100644 --- a/crates/wasi-http/src/p3/body.rs +++ b/crates/wasi-http/src/p3/body.rs @@ -494,7 +494,7 @@ where Ok(mut frame) => { // Libraries like `Reqwest` generate a 0-length frame after sensing end-of-stream, // so we have to check for the body's end-of-stream indicator here too - if self.body.is_end_stream() { + if frame.len() == 0 && self.body.is_end_stream() { break 'result Ok(None); } diff --git a/crates/wasi-http/tests/all/p3/mod.rs b/crates/wasi-http/tests/all/p3/mod.rs index 2674046bb08d..31a017739a6d 100644 --- a/crates/wasi-http/tests/all/p3/mod.rs +++ b/crates/wasi-http/tests/all/p3/mod.rs @@ -594,16 +594,17 @@ async fn p3_http_proxy() -> Result<()> { Ok(()) } -// Custom body wrapper that sends an empty frame at EOS while reporting is_end_stream() = true -struct BodyWithEmptyFrameAtEos { +// Custom body wrapper that sends a configurable frame at EOS while reporting is_end_stream() = true +struct BodyWithFrameAtEos { inner: http_body_util::StreamBody< futures::channel::mpsc::Receiver, ErrorCode>>, >, - sent_empty: bool, + final_frame: Option, + sent_final: bool, at_eos: bool, } -impl http_body::Body for BodyWithEmptyFrameAtEos { +impl http_body::Body for BodyWithFrameAtEos { type Data = Bytes; type Error = ErrorCode; @@ -614,12 +615,16 @@ impl http_body::Body for BodyWithEmptyFrameAtEos { // First, poll the underlying body let this = &mut *self; match Pin::new(&mut this.inner).poll_frame(cx) { - Poll::Ready(None) if !this.sent_empty => { - // When the underlying body ends, send an empty frame - // This simulates HTTP implementations that send empty frames at EOS - this.sent_empty = true; + Poll::Ready(None) if !this.sent_final => { + // When the underlying body ends, send the configured final frame + // This simulates HTTP implementations that send frames at EOS + this.sent_final = true; this.at_eos = true; - Poll::Ready(Some(Ok(http_body::Frame::data(Bytes::new())))) + if let Some(data) = this.final_frame.take() { + Poll::Ready(Some(Ok(http_body::Frame::data(data)))) + } else { + Poll::Ready(None) + } } other => other, } @@ -627,7 +632,7 @@ impl http_body::Body for BodyWithEmptyFrameAtEos { fn is_end_stream(&self) -> bool { // Report end of stream once we've reached it - // This ensures is_end_stream() = true when we send the empty frame + // This ensures is_end_stream() = true when we send the final frame self.at_eos } } @@ -645,9 +650,10 @@ async fn p3_http_empty_frame_at_end_of_stream() -> Result<()> { let (mut body_tx, body_rx) = futures::channel::mpsc::channel::>(1); - let wrapped_body = BodyWithEmptyFrameAtEos { + let wrapped_body = BodyWithFrameAtEos { inner: http_body_util::StreamBody::new(body_rx), - sent_empty: false, + final_frame: Some(Bytes::new()), // Send empty frame at EOS + sent_final: false, at_eos: false, }; @@ -681,3 +687,56 @@ async fn p3_http_empty_frame_at_end_of_stream() -> Result<()> { assert_eq!(collected_body, body.as_slice()); Ok(()) } + +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn p3_http_data_frame_at_end_of_stream() -> Result<()> { + _ = env_logger::try_init(); + + // This test verifies that when is_end_stream() is true but the frame contains data, + // we still process the data. + + let body = b"test"; + let final_data = b" final"; + let raw_body = Bytes::copy_from_slice(body); + let final_frame = Bytes::copy_from_slice(final_data); + + let (mut body_tx, body_rx) = futures::channel::mpsc::channel::>(1); + + let wrapped_body = BodyWithFrameAtEos { + inner: http_body_util::StreamBody::new(body_rx), + final_frame: Some(final_frame), // Send data frame at EOS with is_end_stream() = true + sent_final: false, + at_eos: false, + }; + + let request = http::Request::builder() + .uri("http://localhost/") + .method(http::Method::GET); + + // Use the echo component which actually reads from the stream + let response = futures::join!( + run_http( + P3_HTTP_ECHO_COMPONENT, + request.body(wrapped_body)?, + oneshot::channel().0 + ), + async { + body_tx + .send(Ok(http_body::Frame::data(raw_body))) + .await + .unwrap(); + drop(body_tx); + } + ) + .0? + .unwrap(); + + assert_eq!(response.status().as_u16(), 200); + + // Verify the body was echoed correctly (the final frame's data should not be lost) + let (_, collected_body) = response.into_parts(); + let collected_body = collected_body.to_bytes(); + let expected = [body.as_slice(), final_data.as_slice()].concat(); + assert_eq!(collected_body, expected.as_slice()); + Ok(()) +}