From 737e8340b64723d8d6e6d70e68ba73b65cfdc558 Mon Sep 17 00:00:00 2001 From: Max Kahan Date: Fri, 30 Jan 2026 01:41:53 +0000 Subject: [PATCH 1/3] add video frame tracking stats --- getstream/video/rtc/pc.py | 30 ++++++- getstream/video/rtc/peer_connection.py | 8 +- getstream/video/rtc/stats_tracer.py | 95 +++++++++++++++++++- getstream/video/rtc/track_util.py | 118 ++++++++++++++++++++++++- 4 files changed, 242 insertions(+), 9 deletions(-) diff --git a/getstream/video/rtc/pc.py b/getstream/video/rtc/pc.py index cc01139b..d8ad1488 100644 --- a/getstream/video/rtc/pc.py +++ b/getstream/video/rtc/pc.py @@ -129,6 +129,7 @@ def __init__( self.connection = connection self.track_map = {} # track_id -> (MediaRelay, original_track) + self.video_frame_trackers = {} # track_id -> VideoFrameTracker @self.on("track") async def on_track(track: aiortc.mediastreams.MediaStreamTrack): @@ -144,7 +145,16 @@ async def on_track(track: aiortc.mediastreams.MediaStreamTrack): ) relay = MediaRelay() - self.track_map[track.id] = (relay, track) + tracked_track = track + + # For video tracks, wrap with VideoFrameTracker to capture frame metrics + if track.kind == "video": + from getstream.video.rtc.track_util import VideoFrameTracker + + tracked_track = VideoFrameTracker(track) + self.video_frame_trackers[track.id] = tracked_track + + self.track_map[track.id] = (relay, tracked_track) if track.kind == "audio": from getstream.video.rtc import PcmData @@ -154,10 +164,10 @@ def _emit_pcm(pcm: PcmData): pcm.participant = user self.emit("audio", pcm) - handler = AudioTrackHandler(relay.subscribe(track), _emit_pcm) + handler = AudioTrackHandler(relay.subscribe(tracked_track), _emit_pcm) asyncio.create_task(handler.start()) - self.emit("track_added", relay.subscribe(track), user) + self.emit("track_added", relay.subscribe(tracked_track), user) @self.on("icegatheringstatechange") def on_icegatheringstatechange(): @@ -182,6 +192,20 @@ def handle_track_ended(self, track: aiortc.mediastreams.MediaStreamTrack) -> Non # Clean up stored references when track ends if track.id in self.track_map: del self.track_map[track.id] + if track.id in self.video_frame_trackers: + del self.video_frame_trackers[track.id] + + def get_video_frame_tracker(self) -> Optional[Any]: + """Get a video frame tracker for stats collection. + + Note: Returns the first tracker by insertion order. When multiple video + tracks exist simultaneously (e.g., webcam + screenshare), this may not + match the track being actively consumed. Performance stats calculation + in StatsTracer mitigates this by selecting the highest-resolution track. + """ + if self.video_frame_trackers: + return next(iter(self.video_frame_trackers.values())) + return None async def restartIce(self): """Restart ICE connection for reconnection scenarios.""" diff --git a/getstream/video/rtc/peer_connection.py b/getstream/video/rtc/peer_connection.py index 89021700..a4007fa0 100644 --- a/getstream/video/rtc/peer_connection.py +++ b/getstream/video/rtc/peer_connection.py @@ -19,6 +19,7 @@ from getstream.video.rtc.track_util import patch_sdp_offer from getstream.video.rtc.twirp_client_wrapper import SfuRpcError from getstream.video.rtc.pc import PublisherPeerConnection, SubscriberPeerConnection +from getstream.video.rtc.stats_reporter import DEFAULT_STATS_INTERVAL_MS from getstream.video.rtc.stats_tracer import StatsTracer logger = logging.getLogger(__name__) @@ -57,7 +58,7 @@ async def setup_subscriber(self): self._setup_pc_tracing(self.subscriber_pc, pc_id) # Create stats tracer - self.subscriber_stats = StatsTracer(self.subscriber_pc, "subscriber") + self.subscriber_stats = StatsTracer(self.subscriber_pc, "subscriber", DEFAULT_STATS_INTERVAL_MS / 1000) @self.subscriber_pc.on("audio") async def on_audio(pcm_data): @@ -131,7 +132,7 @@ async def add_tracks( self._setup_pc_tracing(self.publisher_pc, pc_id) # Create stats tracer - self.publisher_stats = StatsTracer(self.publisher_pc, "publisher") + self.publisher_stats = StatsTracer(self.publisher_pc, "publisher", DEFAULT_STATS_INTERVAL_MS / 1000) if audio and relayed_audio: self.publisher_pc.addTrack(relayed_audio) @@ -139,6 +140,9 @@ async def add_tracks( if video and relayed_video: self.publisher_pc.addTrack(relayed_video) logger.info(f"Added relayed video track {relayed_video.id}") + # Set frame tracker for video stats (BufferedMediaTrack has frame tracking) + if self.publisher_stats: + self.publisher_stats.set_frame_tracker(relayed_video) # Trace createOffer tracer.trace("createOffer", pc_id, []) diff --git a/getstream/video/rtc/stats_tracer.py b/getstream/video/rtc/stats_tracer.py index a0a7aacd..4fa76757 100644 --- a/getstream/video/rtc/stats_tracer.py +++ b/getstream/video/rtc/stats_tracer.py @@ -45,20 +45,28 @@ class StatsTracer: - Delta compression to reduce size by ~90% - Performance stats calculation (encode/decode metrics) - Frame time and FPS history for averaging + - Integration with VideoFrameTracker/BufferedMediaTrack for frame metrics """ - def __init__(self, pc, peer_type: str): + def __init__(self, pc, peer_type: str, interval_s: float = 8.0): """Initialize StatsTracer for a peer connection. Args: pc: The RTCPeerConnection to collect stats from peer_type: "publisher" or "subscriber" + interval_s: Interval between stats collections in seconds (for FPS calculation) """ self._pc = pc self._peer_type = peer_type + self._interval_s = interval_s self._previous_stats: Dict[str, Dict] = {} self._frame_time_history: List[float] = [] self._fps_history: List[float] = [] + self._frame_tracker: Optional[Any] = None + + def set_frame_tracker(self, tracker: Any) -> None: + """Set the frame tracker for publisher stats (video track wrapper).""" + self._frame_tracker = tracker async def get(self) -> ComputedStats: """Get stats with delta compression and performance metrics. @@ -131,6 +139,9 @@ def _report_to_dict(self, report) -> Dict[str, Dict]: # Add ICE candidate stats (not provided by aiortc getStats) self._add_ice_candidate_stats(result) + # Inject frame stats from tracker (not provided by aiortc) + self._inject_frame_stats(result) + return result def _delta_compress( @@ -579,3 +590,85 @@ def _codec_id(self, codec, mid: Optional[str]) -> str: clock_rate = getattr(codec, "clockRate", 0) key = f"{mime_type}:{payload_type}:{clock_rate}:{mid}" return hashlib.md5(key.encode()).hexdigest()[:8] + + def _inject_frame_stats(self, result: Dict[str, Dict]) -> None: + """Inject frame stats from trackers into RTP stats. + + aiortc doesn't provide frame metrics (dimensions, frame count, encode/decode time). + We inject these from our frame trackers into the appropriate RTP stats entries. + """ + if self._peer_type == "publisher": + self._inject_publisher_stats(result) + else: + self._inject_subscriber_stats(result) + + def _inject_publisher_stats(self, result: Dict[str, Dict]) -> None: + """Inject stats for publisher (outbound-rtp).""" + if not self._frame_tracker: + return + + try: + frame_stats = self._frame_tracker.get_frame_stats() + if frame_stats.get("framesSent", 0) == 0: + return + + for stat in result.values(): + if not isinstance(stat, dict): + continue + if stat.get("kind") != "video" or stat.get("type") != "outbound-rtp": + continue + + stat["framesSent"] = frame_stats["framesSent"] + stat["frameWidth"] = frame_stats["frameWidth"] + stat["frameHeight"] = frame_stats["frameHeight"] + stat["totalEncodeTime"] = frame_stats["totalEncodeTime"] + + if self._previous_stats: + prev = self._previous_stats.get(stat.get("id", ""), {}) + delta = frame_stats["framesSent"] - prev.get("framesSent", 0) + if delta > 0: + stat["framesPerSecond"] = delta / self._interval_s + + except Exception as e: + logger.debug(f"Failed to inject publisher stats: {e}") + + def _inject_subscriber_stats(self, result: Dict[str, Dict]) -> None: + """Inject frame stats for subscriber (inbound-rtp video). + + Note: When multiple video tracks exist (e.g., webcam + screenshare), + get_video_frame_tracker() returns the first by insertion order, which + may not match the actively consumed track. This is a known limitation; + _get_decode_stats() mitigates by selecting the highest-resolution track + for performance calculations. + """ + # Get video tracker from PC if not set + if not self._frame_tracker and hasattr(self._pc, "get_video_frame_tracker"): + self._frame_tracker = self._pc.get_video_frame_tracker() + + if not self._frame_tracker: + return + + try: + frame_stats = self._frame_tracker.get_frame_stats() + if frame_stats.get("framesDecoded", 0) == 0: + return + + for stat in result.values(): + if not isinstance(stat, dict): + continue + if stat.get("type") != "inbound-rtp" or stat.get("kind") != "video": + continue + + stat["framesDecoded"] = frame_stats["framesDecoded"] + stat["frameWidth"] = frame_stats["frameWidth"] + stat["frameHeight"] = frame_stats["frameHeight"] + stat["totalDecodeTime"] = frame_stats["totalDecodeTime"] + + if self._previous_stats: + prev = self._previous_stats.get(stat.get("id", ""), {}) + delta = frame_stats["framesDecoded"] - prev.get("framesDecoded", 0) + if delta > 0: + stat["framesPerSecond"] = delta / self._interval_s + + except Exception as e: + logger.debug(f"Failed to inject subscriber stats: {e}") diff --git a/getstream/video/rtc/track_util.py b/getstream/video/rtc/track_util.py index 958ceea8..353c020a 100644 --- a/getstream/video/rtc/track_util.py +++ b/getstream/video/rtc/track_util.py @@ -4,6 +4,7 @@ import io import logging import re +import time import wave from enum import Enum from fractions import Fraction @@ -2143,7 +2144,13 @@ def parse_track_stream_mapping(sdp: str) -> dict: class BufferedMediaTrack(aiortc.mediastreams.MediaStreamTrack): - """A wrapper for MediaStreamTrack that buffers one peeked frame.""" + """A wrapper for MediaStreamTrack that buffers one peeked frame. + + Also tracks video frame statistics when kind is 'video': + - frames_processed: total frames that passed through recv() + - frame_width, frame_height: dimensions of the last frame + - total_processing_time_ms: cumulative time spent in recv() + """ def __init__(self, track): super().__init__() @@ -2153,6 +2160,12 @@ def __init__(self, track): self._id = track.id self._ended = False + # Frame statistics (for video tracks) + self.frames_processed: int = 0 + self.frame_width: int = 0 + self.frame_height: int = 0 + self.total_processing_time_ms: float = 0.0 + @property def kind(self): return self._kind @@ -2165,6 +2178,27 @@ def id(self): def readyState(self): return "ended" if self._ended else self._track.readyState + def get_frame_stats(self) -> Dict[str, Any]: + """Get current frame statistics for StatsTracer injection.""" + return { + "framesSent": self.frames_processed, + "frameWidth": self.frame_width, + "frameHeight": self.frame_height, + "totalEncodeTime": self.total_processing_time_ms / 1000.0, + } + + def _update_frame_stats(self, frame, processing_time_ms: float) -> None: + """Update frame statistics from a video frame.""" + if ( + self._kind == "video" + and hasattr(frame, "width") + and hasattr(frame, "height") + ): + self.frames_processed += 1 + self.frame_width = frame.width + self.frame_height = frame.height + self.total_processing_time_ms += processing_time_ms + async def recv(self): """Returns the next buffered frame if available, otherwise gets a new frame from the track.""" if self._ended: @@ -2172,10 +2206,16 @@ async def recv(self): if self._buffered_frames: # Return the oldest buffered frame (FIFO order) - return self._buffered_frames.pop(0) + frame = self._buffered_frames.pop(0) + self._update_frame_stats(frame, 0.0) + return frame + start_time = time.monotonic() try: - return await self._track.recv() + frame = await self._track.recv() + elapsed_ms = (time.monotonic() - start_time) * 1000 + self._update_frame_stats(frame, elapsed_ms) + return frame except Exception as e: logger.error(f"Error receiving frame from track: {e}") self._ended = True @@ -2214,6 +2254,78 @@ def stop(self): logger.error(f"Error stopping track: {e}") +class VideoFrameTracker(aiortc.mediastreams.MediaStreamTrack): + """A transparent wrapper that tracks video frame statistics. + + Used for subscriber video tracks to capture frame metrics that aiortc + doesn't provide natively (dimensions, frame count, decode time). + """ + + kind = "video" + + def __init__(self, track: MediaStreamTrack): + super().__init__() + self._track = track + self._id = track.id + self._ended = False + + # Frame statistics + self.frames_processed: int = 0 + self.frame_width: int = 0 + self.frame_height: int = 0 + self.total_processing_time_ms: float = 0.0 + + @property + def id(self): + return self._id + + @property + def readyState(self): + return "ended" if self._ended else self._track.readyState + + def get_frame_stats(self) -> Dict[str, Any]: + """Get current frame statistics for StatsTracer injection.""" + return { + "framesDecoded": self.frames_processed, + "frameWidth": self.frame_width, + "frameHeight": self.frame_height, + "totalDecodeTime": self.total_processing_time_ms / 1000.0, + } + + async def recv(self): + """Receive a frame, tracking statistics.""" + if self._ended: + raise MediaStreamError("Track is ended") + + start_time = time.monotonic() + try: + frame = await self._track.recv() + elapsed_ms = (time.monotonic() - start_time) * 1000 + + # Update stats for video frames + if hasattr(frame, "width") and hasattr(frame, "height"): + self.frames_processed += 1 + self.frame_width = frame.width + self.frame_height = frame.height + self.total_processing_time_ms += elapsed_ms + + return frame + except MediaStreamError: + self._ended = True + raise + except Exception as e: + logger.error(f"Error receiving frame: {e}") + self._ended = True + raise MediaStreamError(f"Error receiving frame: {e}") from e + + def stop(self): + """Stop the track.""" + if not self._ended: + self._ended = True + if hasattr(self._track, "stop"): + self._track.stop() + + async def detect_video_properties( video_track: aiortc.mediastreams.MediaStreamTrack, ) -> Dict[str, Any]: From 42f90202959acbc86f1c635b91b6a9ca9e15e7e1 Mon Sep 17 00:00:00 2001 From: Max Kahan Date: Fri, 30 Jan 2026 01:47:38 +0000 Subject: [PATCH 2/3] fix release yaml bug --- .github/workflows/release.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 178fdb2f..ef33d736 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -52,6 +52,8 @@ jobs: - name: Checkout uses: actions/checkout@v5 - uses: ./.github/actions/python-uv-setup + with: + sync-flags: "--all-extras --dev" - name: Install from PyPI using uv run: | uv pip install getstream==${{ needs.build.outputs.version }} From d735c91964705e25628cf48518009b018f0ee5a2 Mon Sep 17 00:00:00 2001 From: Max Kahan Date: Fri, 30 Jan 2026 02:09:04 +0000 Subject: [PATCH 3/3] add tests, ruff and ty formatting --- getstream/video/rtc/peer_connection.py | 8 +- getstream/video/rtc/track_util.py | 2 +- tests/rtc/test_frame_tracking.py | 173 +++++++++++++++++++++++++ 3 files changed, 180 insertions(+), 3 deletions(-) create mode 100644 tests/rtc/test_frame_tracking.py diff --git a/getstream/video/rtc/peer_connection.py b/getstream/video/rtc/peer_connection.py index a4007fa0..3155c4bd 100644 --- a/getstream/video/rtc/peer_connection.py +++ b/getstream/video/rtc/peer_connection.py @@ -58,7 +58,9 @@ async def setup_subscriber(self): self._setup_pc_tracing(self.subscriber_pc, pc_id) # Create stats tracer - self.subscriber_stats = StatsTracer(self.subscriber_pc, "subscriber", DEFAULT_STATS_INTERVAL_MS / 1000) + self.subscriber_stats = StatsTracer( + self.subscriber_pc, "subscriber", DEFAULT_STATS_INTERVAL_MS / 1000 + ) @self.subscriber_pc.on("audio") async def on_audio(pcm_data): @@ -132,7 +134,9 @@ async def add_tracks( self._setup_pc_tracing(self.publisher_pc, pc_id) # Create stats tracer - self.publisher_stats = StatsTracer(self.publisher_pc, "publisher", DEFAULT_STATS_INTERVAL_MS / 1000) + self.publisher_stats = StatsTracer( + self.publisher_pc, "publisher", DEFAULT_STATS_INTERVAL_MS / 1000 + ) if audio and relayed_audio: self.publisher_pc.addTrack(relayed_audio) diff --git a/getstream/video/rtc/track_util.py b/getstream/video/rtc/track_util.py index 353c020a..6f23f66d 100644 --- a/getstream/video/rtc/track_util.py +++ b/getstream/video/rtc/track_util.py @@ -2303,7 +2303,7 @@ async def recv(self): elapsed_ms = (time.monotonic() - start_time) * 1000 # Update stats for video frames - if hasattr(frame, "width") and hasattr(frame, "height"): + if isinstance(frame, av.VideoFrame): self.frames_processed += 1 self.frame_width = frame.width self.frame_height = frame.height diff --git a/tests/rtc/test_frame_tracking.py b/tests/rtc/test_frame_tracking.py new file mode 100644 index 00000000..b148872e --- /dev/null +++ b/tests/rtc/test_frame_tracking.py @@ -0,0 +1,173 @@ +"""Tests for video frame tracking and stats injection.""" + +import asyncio +import pytest +import av +from unittest.mock import Mock +from aiortc import RTCPeerConnection, RTCConfiguration, RTCIceServer +from aiortc.mediastreams import MediaStreamError + + +def create_video_frame(width: int = 320, height: int = 240) -> av.VideoFrame: + """Create a real av.VideoFrame for testing.""" + return av.VideoFrame(width, height, "rgb24") + + +class MockVideoTrack: + """Mock video track for testing frame trackers.""" + + kind = "video" + + def __init__(self, frames: list, delay_ms: float = 0): + self.frames = frames + self.frame_index = 0 + self.delay_ms = delay_ms + self.id = "mock-track" + self._ended = False + + @property + def readyState(self): + return "ended" if self._ended else "live" + + async def recv(self): + if self.frame_index >= len(self.frames): + self._ended = True + raise MediaStreamError("Track ended") + if self.delay_ms > 0: + await asyncio.sleep(self.delay_ms / 1000) + frame = self.frames[self.frame_index] + self.frame_index += 1 + return frame + + def stop(self): + self._ended = True + + +class TestVideoFrameTracker: + """Tests for VideoFrameTracker (subscriber video track wrapper).""" + + @pytest.mark.asyncio + async def test_tracks_frames_and_dimensions(self): + """Should count frames, capture dimensions, and accumulate decode time.""" + from getstream.video.rtc.track_util import VideoFrameTracker + + frames = [ + create_video_frame(320, 240), + create_video_frame(320, 240), + create_video_frame(160, 120), # Resolution change + ] + tracker = VideoFrameTracker(MockVideoTrack(frames, delay_ms=5)) + + for _ in range(3): + await tracker.recv() + + stats = tracker.get_frame_stats() + assert stats["framesDecoded"] == 3 + assert stats["frameWidth"] == 160 # Last frame + assert stats["frameHeight"] == 120 + assert stats["totalDecodeTime"] > 0.01 # At least 15ms accumulated + + @pytest.mark.asyncio + async def test_handles_track_end(self): + """Should raise MediaStreamError when track ends.""" + from getstream.video.rtc.track_util import VideoFrameTracker + + tracker = VideoFrameTracker(MockVideoTrack([create_video_frame()])) + await tracker.recv() + + with pytest.raises(MediaStreamError): + await tracker.recv() + + # Stats still valid after track ends + assert tracker.get_frame_stats()["framesDecoded"] == 1 + + +class TestBufferedMediaTrackFrameTracking: + """Tests for BufferedMediaTrack frame tracking (publisher video).""" + + @pytest.mark.asyncio + async def test_tracks_frames_through_recv(self): + """Should track video frames passing through recv().""" + from getstream.video.rtc.track_util import BufferedMediaTrack + + frames = [create_video_frame(320, 240), create_video_frame(160, 120)] + buffered = BufferedMediaTrack(MockVideoTrack(frames)) + + await buffered.recv() + await buffered.recv() + + stats = buffered.get_frame_stats() + assert stats["framesSent"] == 2 + assert stats["frameWidth"] == 160 + assert stats["frameHeight"] == 120 + + +class TestStatsTracerFrameInjection: + """Tests for StatsTracer frame stats injection.""" + + @pytest.mark.asyncio + async def test_injects_publisher_frame_stats(self): + """Frame stats from tracker should be injected into outbound-rtp.""" + from getstream.video.rtc.stats_tracer import StatsTracer + + pc = RTCPeerConnection( + RTCConfiguration( + iceServers=[RTCIceServer(urls=["stun:stun.l.google.com:19302"])] + ) + ) + + try: + pc.addTransceiver("video", direction="sendonly") + await pc.setLocalDescription(await pc.createOffer()) + await asyncio.sleep(0.2) + + tracer = StatsTracer(pc, "publisher") + tracer.set_frame_tracker( + Mock( + get_frame_stats=lambda: { + "framesSent": 100, + "frameWidth": 320, + "frameHeight": 240, + "totalEncodeTime": 0.5, + } + ) + ) + + # First get() populates stats with frame data + result = await tracer.get() + + # Find video outbound-rtp in delta (first call has all fields) + video_stats = [ + v + for v in result.delta.values() + if isinstance(v, dict) and v.get("kind") == "video" + ] + assert len(video_stats) > 0 + video_rtp = video_stats[0] + assert video_rtp["framesSent"] == 100 + assert video_rtp["frameWidth"] == 320 + assert video_rtp["frameHeight"] == 240 + + finally: + await pc.close() + + @pytest.mark.asyncio + async def test_works_without_frame_tracker(self): + """Stats collection should work without frame tracker set.""" + from getstream.video.rtc.stats_tracer import StatsTracer + + pc = RTCPeerConnection() + + try: + pc.addTransceiver("video", direction="sendonly") + await pc.setLocalDescription(await pc.createOffer()) + + tracer = StatsTracer(pc, "publisher") + # No frame tracker set + + result = await tracer.get() + assert result.delta is not None + assert "timestamp" in result.delta + + finally: + await pc.close()