diff --git a/crates/video-streamer/src/streamer/mod.rs b/crates/video-streamer/src/streamer/mod.rs index 3642270b4..fd9eec368 100644 --- a/crates/video-streamer/src/streamer/mod.rs +++ b/crates/video-streamer/src/streamer/mod.rs @@ -99,9 +99,10 @@ pub fn webm_stream( } } - const MAX_RETRY_COUNT: usize = 3; - // To make sure we don't retry forever - // Retry is set to 0 when we successfully read a tag + // With a 3-second timeout per EOF wait, 25 retries gives a 75-second maximum stall + // before giving up. The counter resets to 0 on every successful tag read, so this + // only triggers on continuous EOF with zero progress. + const MAX_RETRY_COUNT: usize = 25; let mut retry_count = 0; let result = loop { @@ -190,6 +191,10 @@ pub fn webm_stream( }, _ = stop_notifier.notified() => { let _ = tx.send(WhenEofControlFlow::Break); + }, + _ = tokio::time::sleep(std::time::Duration::from_secs(3)) => { + trace!("EOF wait timed out, retrying"); + let _ = tx.send(WhenEofControlFlow::Continue); } } }); diff --git a/crates/video-streamer/src/streamer/tag_writers.rs b/crates/video-streamer/src/streamer/tag_writers.rs index a98001fe1..bb43dd7e4 100644 --- a/crates/video-streamer/src/streamer/tag_writers.rs +++ b/crates/video-streamer/src/streamer/tag_writers.rs @@ -1,4 +1,4 @@ -use std::time::Instant; +use std::time::{Duration, Instant}; use anyhow::Context; use cadeau::xmf::vpx::{VpxCodec, VpxDecoder, VpxEncoder}; @@ -14,7 +14,7 @@ use crate::debug::mastroka_spec_name; const VPX_EFLAG_FORCE_KF: u32 = 0x00000001; #[cfg(feature = "perf-diagnostics")] -fn duration_as_millis_u64(duration: std::time::Duration) -> u64 { +fn duration_as_millis_u64(duration: Duration) -> u64 { u64::try_from(duration.as_millis()).unwrap_or(u64::MAX) } @@ -105,7 +105,9 @@ where last_encoded_abs_time: Option, // Adaptive frame skipping state + #[cfg(feature = "perf-diagnostics")] stream_start: Instant, + processing_time: Duration, last_ratio: f64, frames_since_last_encode: u32, adaptive_frame_skip: bool, @@ -192,7 +194,9 @@ where decoder, cut_block_state: CutBlockState::HaventMet, last_encoded_abs_time: None, + #[cfg(feature = "perf-diagnostics")] stream_start: Instant::now(), + processing_time: Duration::ZERO, last_ratio: 1.0, frames_since_last_encode: 0, adaptive_frame_skip: config.adaptive_frame_skip, @@ -263,7 +267,9 @@ where "VideoBlock created" ); + let processing_started = Instant::now(); self.process_current_block(&video_block)?; + self.processing_time += processing_started.elapsed(); Ok(WriterResult::Continue) } @@ -369,13 +375,15 @@ where Ok(frame) } + /// Ratio of media time to active processing time (excluding idle waits like EOF retries), + /// so that temporary stalls do not permanently corrupt the frame skip decision. fn current_realtime_ratio(&self, media_advanced_ms: u64) -> f64 { #[allow(clippy::cast_possible_truncation)] // u64 max is ~584 million years in ms; no real truncation risk - let wall_ms = self.stream_start.elapsed().as_millis() as u64; - if wall_ms == 0 { + let processing_ms = self.processing_time.as_millis() as u64; + if processing_ms == 0 { 1.0 } else { - media_advanced_ms as f64 / wall_ms as f64 + media_advanced_ms as f64 / processing_ms as f64 } }