Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ jobs:
- name: Checkout
uses: actions/checkout@v5
- uses: ./.github/actions/python-uv-setup
with:
sync-flags: "--all-extras --dev"
- name: Install from PyPI using uv
run: |
uv pip install getstream==${{ needs.build.outputs.version }}
30 changes: 27 additions & 3 deletions getstream/video/rtc/pc.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def __init__(
self.connection = connection

self.track_map = {} # track_id -> (MediaRelay, original_track)
self.video_frame_trackers = {} # track_id -> VideoFrameTracker

@self.on("track")
async def on_track(track: aiortc.mediastreams.MediaStreamTrack):
Expand All @@ -144,7 +145,16 @@ async def on_track(track: aiortc.mediastreams.MediaStreamTrack):
)

relay = MediaRelay()
self.track_map[track.id] = (relay, track)
tracked_track = track

# For video tracks, wrap with VideoFrameTracker to capture frame metrics
if track.kind == "video":
from getstream.video.rtc.track_util import VideoFrameTracker

tracked_track = VideoFrameTracker(track)
self.video_frame_trackers[track.id] = tracked_track

self.track_map[track.id] = (relay, tracked_track)

if track.kind == "audio":
from getstream.video.rtc import PcmData
Expand All @@ -154,10 +164,10 @@ def _emit_pcm(pcm: PcmData):
pcm.participant = user
self.emit("audio", pcm)

handler = AudioTrackHandler(relay.subscribe(track), _emit_pcm)
handler = AudioTrackHandler(relay.subscribe(tracked_track), _emit_pcm)
asyncio.create_task(handler.start())

self.emit("track_added", relay.subscribe(track), user)
self.emit("track_added", relay.subscribe(tracked_track), user)

@self.on("icegatheringstatechange")
def on_icegatheringstatechange():
Expand All @@ -182,6 +192,20 @@ def handle_track_ended(self, track: aiortc.mediastreams.MediaStreamTrack) -> Non
# Clean up stored references when track ends
if track.id in self.track_map:
del self.track_map[track.id]
if track.id in self.video_frame_trackers:
del self.video_frame_trackers[track.id]

def get_video_frame_tracker(self) -> Optional[Any]:
"""Get a video frame tracker for stats collection.

Note: Returns the first tracker by insertion order. When multiple video
tracks exist simultaneously (e.g., webcam + screenshare), this may not
match the track being actively consumed. Performance stats calculation
in StatsTracer mitigates this by selecting the highest-resolution track.
"""
if self.video_frame_trackers:
return next(iter(self.video_frame_trackers.values()))
return None

async def restartIce(self):
"""Restart ICE connection for reconnection scenarios."""
Expand Down
12 changes: 10 additions & 2 deletions getstream/video/rtc/peer_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from getstream.video.rtc.track_util import patch_sdp_offer
from getstream.video.rtc.twirp_client_wrapper import SfuRpcError
from getstream.video.rtc.pc import PublisherPeerConnection, SubscriberPeerConnection
from getstream.video.rtc.stats_reporter import DEFAULT_STATS_INTERVAL_MS
from getstream.video.rtc.stats_tracer import StatsTracer

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -57,7 +58,9 @@ async def setup_subscriber(self):
self._setup_pc_tracing(self.subscriber_pc, pc_id)

# Create stats tracer
self.subscriber_stats = StatsTracer(self.subscriber_pc, "subscriber")
self.subscriber_stats = StatsTracer(
self.subscriber_pc, "subscriber", DEFAULT_STATS_INTERVAL_MS / 1000
)

@self.subscriber_pc.on("audio")
async def on_audio(pcm_data):
Expand Down Expand Up @@ -131,14 +134,19 @@ async def add_tracks(
self._setup_pc_tracing(self.publisher_pc, pc_id)

# Create stats tracer
self.publisher_stats = StatsTracer(self.publisher_pc, "publisher")
self.publisher_stats = StatsTracer(
self.publisher_pc, "publisher", DEFAULT_STATS_INTERVAL_MS / 1000
)

if audio and relayed_audio:
self.publisher_pc.addTrack(relayed_audio)
logger.info(f"Added relayed audio track {relayed_audio.id}")
if video and relayed_video:
self.publisher_pc.addTrack(relayed_video)
logger.info(f"Added relayed video track {relayed_video.id}")
# Set frame tracker for video stats (BufferedMediaTrack has frame tracking)
if self.publisher_stats:
self.publisher_stats.set_frame_tracker(relayed_video)

# Trace createOffer
tracer.trace("createOffer", pc_id, [])
Expand Down
95 changes: 94 additions & 1 deletion getstream/video/rtc/stats_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,28 @@ class StatsTracer:
- Delta compression to reduce size by ~90%
- Performance stats calculation (encode/decode metrics)
- Frame time and FPS history for averaging
- Integration with VideoFrameTracker/BufferedMediaTrack for frame metrics
"""

def __init__(self, pc, peer_type: str):
def __init__(self, pc, peer_type: str, interval_s: float = 8.0):
"""Initialize StatsTracer for a peer connection.

Args:
pc: The RTCPeerConnection to collect stats from
peer_type: "publisher" or "subscriber"
interval_s: Interval between stats collections in seconds (for FPS calculation)
"""
self._pc = pc
self._peer_type = peer_type
self._interval_s = interval_s
self._previous_stats: Dict[str, Dict] = {}
self._frame_time_history: List[float] = []
self._fps_history: List[float] = []
self._frame_tracker: Optional[Any] = None

def set_frame_tracker(self, tracker: Any) -> None:
"""Set the frame tracker for publisher stats (video track wrapper)."""
self._frame_tracker = tracker

async def get(self) -> ComputedStats:
"""Get stats with delta compression and performance metrics.
Expand Down Expand Up @@ -131,6 +139,9 @@ def _report_to_dict(self, report) -> Dict[str, Dict]:
# Add ICE candidate stats (not provided by aiortc getStats)
self._add_ice_candidate_stats(result)

# Inject frame stats from tracker (not provided by aiortc)
self._inject_frame_stats(result)

return result

def _delta_compress(
Expand Down Expand Up @@ -579,3 +590,85 @@ def _codec_id(self, codec, mid: Optional[str]) -> str:
clock_rate = getattr(codec, "clockRate", 0)
key = f"{mime_type}:{payload_type}:{clock_rate}:{mid}"
return hashlib.md5(key.encode()).hexdigest()[:8]

def _inject_frame_stats(self, result: Dict[str, Dict]) -> None:
"""Inject frame stats from trackers into RTP stats.

aiortc doesn't provide frame metrics (dimensions, frame count, encode/decode time).
We inject these from our frame trackers into the appropriate RTP stats entries.
"""
if self._peer_type == "publisher":
self._inject_publisher_stats(result)
else:
self._inject_subscriber_stats(result)

def _inject_publisher_stats(self, result: Dict[str, Dict]) -> None:
"""Inject stats for publisher (outbound-rtp)."""
if not self._frame_tracker:
return

try:
frame_stats = self._frame_tracker.get_frame_stats()
if frame_stats.get("framesSent", 0) == 0:
return

for stat in result.values():
if not isinstance(stat, dict):
continue
if stat.get("kind") != "video" or stat.get("type") != "outbound-rtp":
continue

stat["framesSent"] = frame_stats["framesSent"]
stat["frameWidth"] = frame_stats["frameWidth"]
stat["frameHeight"] = frame_stats["frameHeight"]
stat["totalEncodeTime"] = frame_stats["totalEncodeTime"]

if self._previous_stats:
prev = self._previous_stats.get(stat.get("id", ""), {})
delta = frame_stats["framesSent"] - prev.get("framesSent", 0)
if delta > 0:
stat["framesPerSecond"] = delta / self._interval_s

except Exception as e:
logger.debug(f"Failed to inject publisher stats: {e}")

def _inject_subscriber_stats(self, result: Dict[str, Dict]) -> None:
"""Inject frame stats for subscriber (inbound-rtp video).

Note: When multiple video tracks exist (e.g., webcam + screenshare),
get_video_frame_tracker() returns the first by insertion order, which
may not match the actively consumed track. This is a known limitation;
_get_decode_stats() mitigates by selecting the highest-resolution track
for performance calculations.
"""
# Get video tracker from PC if not set
if not self._frame_tracker and hasattr(self._pc, "get_video_frame_tracker"):
self._frame_tracker = self._pc.get_video_frame_tracker()

if not self._frame_tracker:
return

try:
frame_stats = self._frame_tracker.get_frame_stats()
if frame_stats.get("framesDecoded", 0) == 0:
return

for stat in result.values():
if not isinstance(stat, dict):
continue
if stat.get("type") != "inbound-rtp" or stat.get("kind") != "video":
continue

stat["framesDecoded"] = frame_stats["framesDecoded"]
stat["frameWidth"] = frame_stats["frameWidth"]
stat["frameHeight"] = frame_stats["frameHeight"]
stat["totalDecodeTime"] = frame_stats["totalDecodeTime"]

if self._previous_stats:
prev = self._previous_stats.get(stat.get("id", ""), {})
delta = frame_stats["framesDecoded"] - prev.get("framesDecoded", 0)
if delta > 0:
stat["framesPerSecond"] = delta / self._interval_s

except Exception as e:
logger.debug(f"Failed to inject subscriber stats: {e}")
118 changes: 115 additions & 3 deletions getstream/video/rtc/track_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io
import logging
import re
import time
import wave
from enum import Enum
from fractions import Fraction
Expand Down Expand Up @@ -2143,7 +2144,13 @@ def parse_track_stream_mapping(sdp: str) -> dict:


class BufferedMediaTrack(aiortc.mediastreams.MediaStreamTrack):
"""A wrapper for MediaStreamTrack that buffers one peeked frame."""
"""A wrapper for MediaStreamTrack that buffers one peeked frame.

Also tracks video frame statistics when kind is 'video':
- frames_processed: total frames that passed through recv()
- frame_width, frame_height: dimensions of the last frame
- total_processing_time_ms: cumulative time spent in recv()
"""

def __init__(self, track):
super().__init__()
Expand All @@ -2153,6 +2160,12 @@ def __init__(self, track):
self._id = track.id
self._ended = False

# Frame statistics (for video tracks)
self.frames_processed: int = 0
self.frame_width: int = 0
self.frame_height: int = 0
self.total_processing_time_ms: float = 0.0

@property
def kind(self):
return self._kind
Expand All @@ -2165,17 +2178,44 @@ def id(self):
def readyState(self):
return "ended" if self._ended else self._track.readyState

def get_frame_stats(self) -> Dict[str, Any]:
"""Get current frame statistics for StatsTracer injection."""
return {
"framesSent": self.frames_processed,
"frameWidth": self.frame_width,
"frameHeight": self.frame_height,
"totalEncodeTime": self.total_processing_time_ms / 1000.0,
}

def _update_frame_stats(self, frame, processing_time_ms: float) -> None:
"""Update frame statistics from a video frame."""
if (
self._kind == "video"
and hasattr(frame, "width")
and hasattr(frame, "height")
):
self.frames_processed += 1
self.frame_width = frame.width
self.frame_height = frame.height
self.total_processing_time_ms += processing_time_ms

async def recv(self):
"""Returns the next buffered frame if available, otherwise gets a new frame from the track."""
if self._ended:
raise MediaStreamError("Track is ended")

if self._buffered_frames:
# Return the oldest buffered frame (FIFO order)
return self._buffered_frames.pop(0)
frame = self._buffered_frames.pop(0)
self._update_frame_stats(frame, 0.0)
return frame

start_time = time.monotonic()
try:
return await self._track.recv()
frame = await self._track.recv()
elapsed_ms = (time.monotonic() - start_time) * 1000
self._update_frame_stats(frame, elapsed_ms)
return frame
except Exception as e:
logger.error(f"Error receiving frame from track: {e}")
self._ended = True
Expand Down Expand Up @@ -2214,6 +2254,78 @@ def stop(self):
logger.error(f"Error stopping track: {e}")


class VideoFrameTracker(aiortc.mediastreams.MediaStreamTrack):
"""A transparent wrapper that tracks video frame statistics.

Used for subscriber video tracks to capture frame metrics that aiortc
doesn't provide natively (dimensions, frame count, decode time).
"""

kind = "video"

def __init__(self, track: MediaStreamTrack):
super().__init__()
self._track = track
self._id = track.id
self._ended = False

# Frame statistics
self.frames_processed: int = 0
self.frame_width: int = 0
self.frame_height: int = 0
self.total_processing_time_ms: float = 0.0

@property
def id(self):
return self._id

@property
def readyState(self):
return "ended" if self._ended else self._track.readyState

def get_frame_stats(self) -> Dict[str, Any]:
"""Get current frame statistics for StatsTracer injection."""
return {
"framesDecoded": self.frames_processed,
"frameWidth": self.frame_width,
"frameHeight": self.frame_height,
"totalDecodeTime": self.total_processing_time_ms / 1000.0,
}

async def recv(self):
"""Receive a frame, tracking statistics."""
if self._ended:
raise MediaStreamError("Track is ended")

start_time = time.monotonic()
try:
frame = await self._track.recv()
elapsed_ms = (time.monotonic() - start_time) * 1000

# Update stats for video frames
if isinstance(frame, av.VideoFrame):
self.frames_processed += 1
self.frame_width = frame.width
self.frame_height = frame.height
self.total_processing_time_ms += elapsed_ms

return frame
except MediaStreamError:
self._ended = True
raise
except Exception as e:
logger.error(f"Error receiving frame: {e}")
self._ended = True
raise MediaStreamError(f"Error receiving frame: {e}") from e

def stop(self):
"""Stop the track."""
if not self._ended:
self._ended = True
if hasattr(self._track, "stop"):
self._track.stop()


async def detect_video_properties(
video_track: aiortc.mediastreams.MediaStreamTrack,
) -> Dict[str, Any]:
Expand Down
Loading
Loading