diff --git a/src/RTSPServerApp/RTSPServerWorker.cs b/src/RTSPServerApp/RTSPServerWorker.cs index 54a45a3..95be57b 100644 --- a/src/RTSPServerApp/RTSPServerWorker.cs +++ b/src/RTSPServerApp/RTSPServerWorker.cs @@ -4,6 +4,7 @@ using SharpMp4; using SharpRTSPServer; using System; +using System.Buffers; using System.Collections.Generic; using System.IO; using System.Linq; @@ -156,9 +157,17 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) try { - rtspVideoTrack.FeedInRawSamples((uint)(videoIndex * videoSampleDuration), (List)videoTrack[videoIndex++ % videoTrack.Count]); + using (var buffer = new PooledByteBuffer(initialBufferSize: 0)) + { + foreach (var trackBytes in videoTrack[videoIndex++ % videoTrack.Count]) + { + buffer.Write(trackBytes); + } + + rtspVideoTrack.FeedInRawSamples((uint)(videoIndex * videoSampleDuration), buffer.GetReadOnlySequence()); + } } - catch(Exception ex) + catch (Exception ex) { _logger.LogError(ex, $"FeedInRawSamples failed: {ex.Message}"); } @@ -167,7 +176,8 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { Reset(ref videoIndex, videoTimer, ref audioIndex, audioTimer); } - }; + } + ; } if (rtspAudioTrack != null) @@ -177,7 +187,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) audioTimer = new Timer(audioSampleDuration * 1000 / (rtspAudioTrack as SharpRTSPServer.AACTrack).SamplingRate); audioTimer.Elapsed += (s, e) => { - rtspAudioTrack.FeedInRawSamples((uint)(audioIndex * audioSampleDuration), new List() { audioTrack[0][audioIndex++ % audioTrack[0].Count] }); + rtspAudioTrack.FeedInRawSamples((uint)(audioIndex * audioSampleDuration), new ReadOnlySequence(audioTrack[0][audioIndex++ % audioTrack[0].Count])); if (audioIndex % audioTrack[0].Count == 0) { diff --git a/src/RTSPServerFFmpeg/Program.cs b/src/RTSPServerFFmpeg/Program.cs index a75fcc8..26058e9 100644 --- a/src/RTSPServerFFmpeg/Program.cs +++ b/src/RTSPServerFFmpeg/Program.cs @@ -130,7 +130,7 @@ Task RunUdpClient(ProxyTrack track, Uri uri, CancellationToken cancellationToken { byte[] rtp = udpClient.Receive(ref remoteEndPoint); uint rtpTimestamp = RTPPacketUtil.ReadTS(rtp); - track.FeedInRawSamples(rtpTimestamp, new List() { rtp }); + track.FeedInRawSamples(rtpTimestamp, new(rtp)); } catch (Exception e) { diff --git a/src/RTSPServerPcap/Program.cs b/src/RTSPServerPcap/Program.cs index 3383db1..61b516f 100644 --- a/src/RTSPServerPcap/Program.cs +++ b/src/RTSPServerPcap/Program.cs @@ -4,6 +4,7 @@ using Microsoft.Extensions.Configuration; using SharpRTSPServer; using System; +using System.Buffers; using System.Collections.Generic; using System.Diagnostics; using System.IO; @@ -87,7 +88,7 @@ void Reader_OnReadPacketEvent(object context, IPacket packet) { var ipHeader = ParseIPHeader(packet); - if(ipHeader.Protocol == 6) // TCP - RTSP + if (ipHeader.Protocol == 6) // TCP - RTSP { var tcpHeader = ParseTCPHeader(packet, 4 + ipHeader.HeaderLength); Debug.WriteLine($"Source: {ipHeader.SourceIP}:{tcpHeader.SourcePort}, Dest: {ipHeader.DestintationIP}:{tcpHeader.DestinationPort}, Ver: {ipHeader.Version}"); @@ -113,7 +114,7 @@ void ParseData(byte[] data, object header, uint seconds, uint microseconds) if (udp != null && data.Length > 1) // TODO { - if(data[0] == 0x80 && data[1] != 0xc8) // 0xc8 sender report -> ignore rtcp + if (data[0] == 0x80 && data[1] != 0xc8) // 0xc8 sender report -> ignore rtcp { long messageTime = seconds * 1000 + (microseconds / 1000); long realTime = (uint)_stopwatch.ElapsedMilliseconds; @@ -132,9 +133,9 @@ void ParseData(byte[] data, object header, uint seconds, uint microseconds) { Thread.Sleep(sleep); } - videoTrack?.FeedInRawSamples(RTPPacketUtil.ReadTS(data), new List { data }); + videoTrack?.FeedInRawSamples(RTPPacketUtil.ReadTS(data), new ReadOnlySequence(data)); } - else if(rtspProtocolParser.Ports.Count > 1 && rtspProtocolParser.Ports[1].Contains(udp.SourcePort) && rtspProtocolParser.Ports[1].Contains(udp.DestinationPort)) + else if (rtspProtocolParser.Ports.Count > 1 && rtspProtocolParser.Ports[1].Contains(udp.SourcePort) && rtspProtocolParser.Ports[1].Contains(udp.DestinationPort)) { if (lastAudioMessageTime == -1) lastAudioMessageTime = messageTime; @@ -149,7 +150,7 @@ void ParseData(byte[] data, object header, uint seconds, uint microseconds) Thread.Sleep(sleep); } - audioTrack?.FeedInRawSamples(RTPPacketUtil.ReadTS(data), new List { data }); + audioTrack?.FeedInRawSamples(RTPPacketUtil.ReadTS(data), new ReadOnlySequence(data)); } } } @@ -305,19 +306,19 @@ public bool Parse(string rtsp) { int.TryParse(line.Substring(CONTENT_LENGTH.Length).Trim(), out contentLength); } - else if(line.StartsWith(TRANSPORT)) // SETUP response + else if (line.StartsWith(TRANSPORT)) // SETUP response { int[] clientPorts = null; int[] serverPorts = null; string[] split = line.Substring(TRANSPORT.Length).Trim().Split(';'); - foreach(var s in split) + foreach (var s in split) { string str = s.Trim(); - if(str.StartsWith(CLIENT_PORT)) + if (str.StartsWith(CLIENT_PORT)) { clientPorts = str.Substring(CLIENT_PORT.Length).Split('-').Select(int.Parse).ToArray(); } - else if(str.StartsWith(SERVER_PORT)) + else if (str.StartsWith(SERVER_PORT)) { serverPorts = str.Substring(SERVER_PORT.Length).Split('-').Select(int.Parse).ToArray(); } @@ -337,7 +338,7 @@ public bool Parse(string rtsp) if (ms.Position == ms.Length && contentLength > (ms.Length - ms.Position)) { return true; - } + } } } } @@ -364,7 +365,7 @@ public UDPHeader(ushort sourcePort, ushort destinationPort, ushort length, ushor public class TCPHeader { - public TCPHeader(ushort sourcePort, ushort destinationPort, uint sequenceNumber, uint acknowledgementNumber, int tcpHeaderLength, int flags, ushort window, ushort checksum, ushort urgentPointer) + public TCPHeader(ushort sourcePort, ushort destinationPort, uint sequenceNumber, uint acknowledgementNumber, int tcpHeaderLength, int flags, ushort window, ushort checksum, ushort urgentPointer) { SourcePort = sourcePort; DestinationPort = destinationPort; @@ -391,18 +392,18 @@ public TCPHeader(ushort sourcePort, ushort destinationPort, uint sequenceNumber, public class IPHeader { public IPHeader( - int family, - int version, + int family, + int version, int headerLength, - byte differentiatedServicesField, - ushort totalLength, - ushort identification, - byte flags, - ushort fragmentOffset, - byte ttl, - byte protocol, - ushort headerCheckSum, - IPAddress sourceIP, + byte differentiatedServicesField, + ushort totalLength, + ushort identification, + byte flags, + ushort fragmentOffset, + byte ttl, + byte protocol, + ushort headerCheckSum, + IPAddress sourceIP, IPAddress destintationIP) { Family = family; diff --git a/src/SharpRTSPClient/PooledByteBuffer.cs b/src/SharpRTSPClient/PooledByteBuffer.cs new file mode 100644 index 0000000..b905f7b --- /dev/null +++ b/src/SharpRTSPClient/PooledByteBuffer.cs @@ -0,0 +1,161 @@ +using System; +using System.Buffers; +using System.Collections.Generic; + +namespace SharpRTSPClient +{ + /// + /// A pooled buffer writer that implements using for efficient writing of byte data and + /// allows reading the written content through a using the method. + /// + public sealed class PooledByteBuffer : IBufferWriter, IDisposable + { + private const int DefaultBufferSize = 4096; + + private readonly List _buffers = new List(); + private readonly ArrayPool _pool; + private int _currentIndex; + private int _currentOffset; + private bool _disposed; + + /// + /// Initializes a new instance of the class with an optional initial buffer size and array pool. + /// + /// The initial size of the buffer to rent from the pool. Defaults to 4096 bytes. + /// The array pool to use. If null, is used. + public PooledByteBuffer(int initialBufferSize = DefaultBufferSize, ArrayPool pool = null) + { + _pool = pool ?? ArrayPool.Shared; + AddNewBuffer(initialBufferSize); + } + + /// + /// Notifies the buffer writer that bytes were written. + /// + /// The number of bytes written. + /// Thrown if count is negative or exceeds the current buffer capacity. + public void Advance(int count) + { + if (count < 0 || _currentOffset + count > _buffers[_currentIndex].Length) + { + throw new ArgumentOutOfRangeException(nameof(count)); + } + + _currentOffset += count; + } + + /// + /// Returns a buffer to write to, ensuring at least bytes are available. + /// + /// The minimum number of bytes required. May be 0. + /// A writable memory buffer. + public Memory GetMemory(int sizeHint = 0) + { + EnsureCapacity(sizeHint); + return _buffers[_currentIndex].AsMemory(_currentOffset); + } + + /// + /// Returns a buffer to write to, ensuring at least bytes are available. + /// + /// The minimum number of bytes required. May be 0. + /// A writable span buffer. + public Span GetSpan(int sizeHint = 0) + { + EnsureCapacity(sizeHint); + return _buffers[_currentIndex].AsSpan(_currentOffset); + } + + /// + /// Returns a representing the written data across all buffers. + /// + /// A read-only sequence of bytes. + public ReadOnlySequence GetReadOnlySequence() + { + SequenceSegment first = null; + SequenceSegment last = null; + + for (var i = 0; i < _buffers.Count; i++) + { + var buffer = _buffers[i]; + var length = (i == _currentIndex) ? _currentOffset : buffer.Length; + + if (length == 0) + { + continue; + } + + var segment = new SequenceSegment(buffer.AsMemory(0, length)); + + if (first == null) + { + first = segment; + } + + if (last != null) + { + last.SetNext(segment); + } + + last = segment; + } + + if (first == null || last == null) + { + return ReadOnlySequence.Empty; + } + + return new ReadOnlySequence(first, 0, last, last.Memory.Length); + } + + /// + /// Releases all buffers back to the pool and clears internal state. + /// + public void Dispose() + { + if (_disposed) + { + return; + } + + foreach (var buffer in _buffers) + { + _pool.Return(buffer); + } + + _buffers.Clear(); + _disposed = true; + } + + private void EnsureCapacity(int sizeHint) + { + if (_currentOffset + sizeHint > _buffers[_currentIndex].Length) + { + var newSize = Math.Max(sizeHint, DefaultBufferSize); + AddNewBuffer(newSize); + } + } + + private void AddNewBuffer(int size) + { + var buffer = _pool.Rent(size); + _buffers.Add(buffer); + _currentIndex = _buffers.Count - 1; + _currentOffset = 0; + } + + private class SequenceSegment : ReadOnlySequenceSegment + { + public SequenceSegment(ReadOnlyMemory memory) + { + Memory = memory; + } + + public void SetNext(SequenceSegment next) + { + Next = next; + next.RunningIndex = RunningIndex + Memory.Length; + } + } + } +} diff --git a/src/SharpRTSPClient/RTSPClient.cs b/src/SharpRTSPClient/RTSPClient.cs index c7b3fa0..12feda0 100644 --- a/src/SharpRTSPClient/RTSPClient.cs +++ b/src/SharpRTSPClient/RTSPClient.cs @@ -5,6 +5,7 @@ using Rtsp.Rtp; using Rtsp.Sdp; using System; +using System.Buffers; using System.Collections.Generic; using System.Diagnostics; using System.IO; @@ -59,7 +60,7 @@ private enum RtspStatus { WaitingToConnect, Connecting, ConnectFailed, Connected private IRtspTransport _rtspSocket; // RTSP connection private RtspStatus _rtspSocketStatus = RtspStatus.WaitingToConnect; - + // this wraps around a the RTSP tcpSocket stream private RtspListener _rtspClient; private RTPTransport _rtpTransport = RTPTransport.UDP; // Mode, either RTP over UDP or RTP over TCP using the RTSP socket @@ -101,7 +102,7 @@ private enum RtspStatus { WaitingToConnect, Connecting, ConnectFailed, Connected /// Called when the Setup command are completed, so we can start the right Play message (with or without playback informations) /// public event EventHandler SetupMessageCompleted; - + /// /// Video SSRC. /// @@ -118,9 +119,9 @@ static RTSPClient() { RtspUtils.RegisterUri(); } - catch(Exception ex) + catch (Exception ex) { - if(Log.ErrorEnabled) Log.Error(ex.Message); + if (Log.ErrorEnabled) Log.Error(ex.Message); } } @@ -152,16 +153,16 @@ public RTSPClient(ILoggerFactory loggerFactory) /// Callback for user certificate selection. /// Automatically try to reconnect after losing the connection. public void Connect( - string url, - RTPTransport rtpTransport, - string username = null, - string password = null, - MediaRequest mediaRequest = MediaRequest.VIDEO_AND_AUDIO, + string url, + RTPTransport rtpTransport, + string username = null, + string password = null, + MediaRequest mediaRequest = MediaRequest.VIDEO_AND_AUDIO, bool playbackSession = false, - RemoteCertificateValidationCallback userCertificateSelectionCallback = null, + RemoteCertificateValidationCallback userCertificateSelectionCallback = null, bool autoReconnect = false) { - if (string.IsNullOrEmpty(url)) + if (string.IsNullOrEmpty(url)) throw new ArgumentNullException(nameof(url)); Connect(new Uri(url), rtpTransport, username, password, mediaRequest, playbackSession, userCertificateSelectionCallback, autoReconnect); @@ -179,16 +180,16 @@ public void Connect( /// Callback for user certificate selection. /// Automatically try to reconnect after losing the connection. public void Connect( - Uri uri, - RTPTransport rtpTransport, - string username = null, - string password = null, + Uri uri, + RTPTransport rtpTransport, + string username = null, + string password = null, MediaRequest mediaRequest = MediaRequest.VIDEO_AND_AUDIO, - bool playbackSession = false, - RemoteCertificateValidationCallback userCertificateSelectionCallback = null, + bool playbackSession = false, + RemoteCertificateValidationCallback userCertificateSelectionCallback = null, bool autoReconnect = false) { - if (uri == null) + if (uri == null) throw new ArgumentNullException(nameof(uri)); // Use URI to extract username and password and to make a new URL without the username and password @@ -220,12 +221,12 @@ public void Connect( /// Callback for user certificate selection. /// Automatically try to reconnect after losing the connection. public void Connect( - Uri uri, - RTPTransport rtpTransport, - NetworkCredential credentials = null, - MediaRequest mediaRequest = MediaRequest.VIDEO_AND_AUDIO, - bool playbackSession = false, - RemoteCertificateValidationCallback userCertificateSelectionCallback = null, + Uri uri, + RTPTransport rtpTransport, + NetworkCredential credentials = null, + MediaRequest mediaRequest = MediaRequest.VIDEO_AND_AUDIO, + bool playbackSession = false, + RemoteCertificateValidationCallback userCertificateSelectionCallback = null, bool autoReconnect = false) { if (_rtspClient != null) @@ -349,9 +350,9 @@ public void TryReconnect() /// Returns true if this connection failed, or if it connected but is no longer connected. /// /// - public bool StreamingFinished() + public bool StreamingFinished() { - switch(_rtspSocketStatus) + switch (_rtspSocketStatus) { case RtspStatus.ConnectFailed: return true; @@ -475,7 +476,7 @@ private void StopClient() // Stop the keepalive timer var keepaliveTimer = _keepaliveTimer; - if(keepaliveTimer != null) + if (keepaliveTimer != null) { keepaliveTimer.Elapsed -= SendKeepAlive; keepaliveTimer.Dispose(); @@ -536,12 +537,12 @@ public byte[] BuildRtcpReceiverReport(uint ssrc) // TODO: do not send just an empty report // https://www.rfc-editor.org/rfc/rfc3550.txt // https://learn.microsoft.com/en-us/openspecs/office_protocols/ms-rtp/953b588a-4e9d-4ec8-b4d1-913f9b9d04ef - byte[] rtcp_receiver_report = new byte[8]; - int version = 2; - int paddingBit = 0; - int reportCount = 0; // an empty report - int packetType = 201; // Receiver Report - int length = rtcp_receiver_report.Length / 4 - 1; // num 32 bit words minus 1 + var rtcp_receiver_report = new byte[8]; + var version = 2; + var paddingBit = 0; + var reportCount = 0; // an empty report + var packetType = 201; // Receiver Report + var length = rtcp_receiver_report.Length / 4 - 1; // num 32 bit words minus 1 rtcp_receiver_report[0] = (byte)((version << 6) + (paddingBit << 5) + reportCount); rtcp_receiver_report[1] = (byte)packetType; rtcp_receiver_report[2] = (byte)(length >> 8 & 0xFF); @@ -598,11 +599,25 @@ private void VideoRtpDataReceived(object sender, RtspDataEventArgs e) return; } - using (RawMediaFrame nalUnits = _videoPayloadProcessor.ProcessPacket(rtpPacket)) // this will cache the Packets until there is a Frame + var receivedVideoData = ReceivedVideoData; + if (receivedVideoData != null) { - if (nalUnits.Any()) + using (var nalUnits = _videoPayloadProcessor.ProcessPacket(rtpPacket)) // this will cache the Packets until there is a Frame { - ReceivedVideoData?.Invoke(this, new SimpleDataEventArgs(nalUnits.Data, nalUnits.ClockTimestamp, nalUnits.RtpTimestamp)); + using (var buffer = new PooledByteBuffer()) + { + foreach (var segment in nalUnits.Data) + { + buffer.Write(segment.Span); + } + + var sequence = buffer.GetReadOnlySequence(); + + if (!sequence.IsEmpty) + { + receivedVideoData(this, new SimpleDataEventArgs(sequence, nalUnits.ClockTimestamp, nalUnits.RtpTimestamp)); + } + } } } } @@ -629,7 +644,7 @@ private void AudioRtpDataReceived(object sender, RtspDataEventArgs e) if (rtpPacket.PayloadType != _audioPayload) { _logger.LogDebug("Ignoring this Audio RTP payload"); - return; + return; } ReceivedRawAudioRTP?.Invoke(this, @@ -661,11 +676,25 @@ private void AudioRtpDataReceived(object sender, RtspDataEventArgs e) return; } - using (var audioFrames = _audioPayloadProcessor.ProcessPacket(rtpPacket)) + var receivedAudioData = ReceivedAudioData; + if (receivedAudioData != null) { - if (audioFrames.Any()) + using (var audioFrames = _audioPayloadProcessor.ProcessPacket(rtpPacket)) { - ReceivedAudioData?.Invoke(this, new SimpleDataEventArgs(audioFrames.Data, audioFrames.ClockTimestamp, audioFrames.RtpTimestamp)); + using (var buffer = new PooledByteBuffer()) + { + foreach (var segment in audioFrames.Data) + { + buffer.Write(segment.Span); + } + + var sequence = buffer.GetReadOnlySequence(); + + if (!sequence.IsEmpty) + { + receivedAudioData(this, new SimpleDataEventArgs(sequence, audioFrames.ClockTimestamp, audioFrames.RtpTimestamp)); + } + } } } } @@ -1013,7 +1042,7 @@ private void HandleDescribeResponse(RtspResponse message) _logger.LogDebug("SDP:\n{sdp}", Encoding.UTF8.GetString(message.Data.Span.ToArray())); SdpFile sdpData; - using(var ms = new MemoryStream(message.Data.Span.ToArray())) + using (var ms = new MemoryStream(message.Data.Span.ToArray())) using (StreamReader sdpStream = new StreamReader(ms)) { sdpData = SdpFile.ReadLoose(sdpStream); @@ -1063,7 +1092,7 @@ private void HandleDescribeResponse(RtspResponse message) { // found a valid codec payloadName = rtpmap.EncodingName.ToUpperInvariant(); - switch(payloadName) + switch (payloadName) { case "H264": _videoPayloadProcessor = new H264Payload(_loggerFactory.CreateLogger()); @@ -1080,7 +1109,8 @@ private void HandleDescribeResponse(RtspResponse message) default: _videoPayloadProcessor = null; break; - }; + } + ; _videoPayload = media.PayloadType; } else @@ -1089,7 +1119,7 @@ private void HandleDescribeResponse(RtspResponse message) if (media.PayloadType < 96) { // PayloadType is a static value, so we can use it to determine the codec - switch(media.PayloadType) + switch (media.PayloadType) { case 26: { @@ -1109,7 +1139,8 @@ private void HandleDescribeResponse(RtspResponse message) payloadName = string.Empty; } break; - }; + } + ; } } @@ -1164,7 +1195,7 @@ private void HandleDescribeResponse(RtspResponse message) setupMessage.AddTransport(transport); setupMessage.AddAuthorization(_authentication, _uri, _rtspSocket.NextCommandIndex()); if (_playbackSession) { setupMessage.AddRequireOnvifRequest(); } - + // Add SETUP message to list of mesages to send _setupMessages.Enqueue(setupMessage); NewVideoStream?.Invoke(this, new NewStreamEventArgs(media.PayloadType, payloadName, streamConfigurationData)); @@ -1189,7 +1220,7 @@ private void HandleDescribeResponse(RtspResponse message) if (media.PayloadType < 96) { // fixed payload type - switch(media.PayloadType) + switch (media.PayloadType) { case 0: (_audioPayloadProcessor, _audioCodec) = (new G711Payload(), "PCMU"); @@ -1200,13 +1231,14 @@ private void HandleDescribeResponse(RtspResponse message) default: (_audioPayloadProcessor, _audioCodec) = (null, ""); break; - }; + } + ; } else { // dynamic payload type _audioCodec = rtpmap?.EncodingName?.ToUpperInvariant() ?? string.Empty; - switch(_audioCodec) + switch (_audioCodec) { // Create AAC RTP Parser // Example fmtp is "96 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=1490" @@ -1229,7 +1261,8 @@ private void HandleDescribeResponse(RtspResponse message) default: _audioPayloadProcessor = null; break; - }; + } + ; if (_audioPayloadProcessor is AACPayload aacPayloadProcessor) { _audioCodec = "AAC"; @@ -1246,12 +1279,12 @@ private void HandleDescribeResponse(RtspResponse message) // Send the SETUP RTSP command if we have a matching Payload Decoder if (_audioPayloadProcessor != null) { - RtspTransport transport = CalculateTransport(_audioRtpTransport); + var transport = CalculateTransport(_audioRtpTransport); // Generate SETUP messages if (transport != null) { - RtspRequestSetup setupMessage = new RtspRequestSetup() + var setupMessage = new RtspRequestSetup() { RtspUri = _audioUri, }; @@ -1288,14 +1321,14 @@ private Uri GetControlUri(Media media) var attrib = media.Attributs.FirstOrDefault(a => a.Key == "control"); if (attrib != null) { - string sdpControl = attrib.Value; + var sdpControl = attrib.Value; if (sdpControl.StartsWith("rtsp://", StringComparison.OrdinalIgnoreCase) || sdpControl.StartsWith("rtsps://", StringComparison.OrdinalIgnoreCase) || sdpControl.StartsWith("http://", StringComparison.OrdinalIgnoreCase)) { // the "track" or "stream id" - string control = sdpControl; //absolute path + var control = sdpControl; //absolute path controlUri = new Uri(control); } else @@ -1311,7 +1344,7 @@ private Uri GetControlUri(Media media) private RtspTransport CalculateTransport(IRtpTransport transport) { - switch(_rtpTransport) + switch (_rtpTransport) { // Server interleaves the RTP packets over the RTSP connection // Example for TCP mode (RTP over RTSP) Transport: RTP/AVP/TCP;interleaved=0-1 @@ -1341,7 +1374,7 @@ private RtspTransport CalculateTransport(IRtpTransport transport) }; default: return null; - }; + } } private void SendKeepAlive(object sender, System.Timers.ElapsedEventArgs e) @@ -1364,7 +1397,7 @@ private void SendKeepAlive(object sender, System.Timers.ElapsedEventArgs e) else { keepAliveMessage = new RtspRequestOptions - { + { RtspUri = _uri, Session = _session }; @@ -1422,20 +1455,20 @@ public interface IStreamConfigurationData public class SimpleDataEventArgs : EventArgs { - public SimpleDataEventArgs(IEnumerable> data, DateTime timestamp, uint rtpTimestamp) + public SimpleDataEventArgs(ReadOnlySequence data, DateTime timestamp, uint rtpTimestamp) { Data = data; Timestamp = timestamp; - RtpTimestamp = rtpTimestamp; + RtpTimestamp = rtpTimestamp; } public DateTime Timestamp { get; } public uint RtpTimestamp { get; } - public IEnumerable> Data { get; } + public ReadOnlySequence Data { get; } public override string ToString() { - return $"{Timestamp}: Data {Data.Count()}"; + return $"{Timestamp}: Data lenght {Data.Length} bytes"; } } @@ -1458,14 +1491,14 @@ public class RawRtpDataEventArgs : EventArgs public RawRtpDataEventArgs( ReadOnlyMemory data, int csrcCount, - int? extensionHeaderId, - bool hasPadding, - bool isMarker, - bool isWellFormed, - int payloadSize, + int? extensionHeaderId, + bool hasPadding, + bool isMarker, + bool isWellFormed, + int payloadSize, int payloadType, - int sequenceNumber, - uint ssrc, + int sequenceNumber, + uint ssrc, uint timestamp, int version, int payloadStart) @@ -1516,7 +1549,7 @@ public static void AddAuthorization(this RtspMessage message, Authentication aut } string authorization = authentication.GetResponse(commandCounter, uri.AbsoluteUri, message.Method, new byte[0]); - + // remove if already one... message.Headers.Remove(RtspHeaderNames.Authorization); message.Headers.Add(RtspHeaderNames.Authorization, authorization); @@ -1526,7 +1559,7 @@ public static void AddAuthorization(this RtspMessage message, Authentication aut public class CustomLoggerFactory : ILoggerFactory { public void AddProvider(ILoggerProvider provider) - { } + { } public ILogger CreateLogger(string categoryName) { @@ -1534,7 +1567,7 @@ public ILogger CreateLogger(string categoryName) } public void Dispose() - { } + { } } public class CustomLogger : ILogger diff --git a/src/SharpRTSPServer/AACTrack.cs b/src/SharpRTSPServer/AACTrack.cs index ed60d00..9ca3e60 100644 --- a/src/SharpRTSPServer/AACTrack.cs +++ b/src/SharpRTSPServer/AACTrack.cs @@ -117,53 +117,49 @@ public override StringBuilder BuildSDP(StringBuilder sdp) /// An array of AAC fragments. By default single fragment is expected. /// RTP timestamp in the timescale of the track. /// RTP packets. - public override (List>, List>) CreateRtpPackets(List samples, uint rtpTimestamp) + public override IByteBuffer CreateRtpPackets(ReadOnlySequence samples, uint rtpTimestamp) { - List> rtpPackets = new List>(); - List> memoryOwners = new List>(); + var byteBuffer = new PooledByteBuffer(initialBufferSize: 0); - for (int i = 0; i < samples.Count; i++) + foreach (var sample in samples) { // append AU header (required for AAC) - var audioPacket = AppendAUHeader(samples[i]); + var audioPacket = AppendAUHeader(sample.Span); // Put the whole Audio Packet into one RTP packet. // 12 is header size when there are no CSRCs or extensions var size = 12 + audioPacket.Length; - var owner = MemoryPool.Shared.Rent(size); - memoryOwners.Add(owner); - - var rtpPacket = owner.Memory.Slice(0, size); + var rtpPacket = byteBuffer.GetSpan(size).Slice(0, size); + byteBuffer.Advance(size); const bool rtpPadding = false; const bool rtpHasExtension = false; int rtpCsrcCount = 0; const bool rtpMarker = true; // always 1 as this is the last (and only) RTP packet for this audio timestamp - RTPPacketUtil.WriteHeader(rtpPacket.Span, + RTPPacketUtil.WriteHeader(rtpPacket, RTPPacketUtil.RTP_VERSION, rtpPadding, rtpHasExtension, rtpCsrcCount, rtpMarker, PayloadType); // sequence number is set just before send - RTPPacketUtil.WriteTS(rtpPacket.Span, rtpTimestamp); + RTPPacketUtil.WriteTS(rtpPacket, rtpTimestamp); // Now append the audio packet audioPacket.CopyTo(rtpPacket.Slice(12)); - - rtpPackets.Add(rtpPacket); } - return (rtpPackets, memoryOwners); + return byteBuffer; } - private static byte[] AppendAUHeader(byte[] frame) + private static byte[] AppendAUHeader(ReadOnlySpan frame) { short frameLen = (short)(frame.Length << 3); - byte[] header = new byte[4]; + byte[] header = new byte[4 + frame.Length]; header[0] = 0x00; header[1] = 0x10; // 16 bits size of the header header[2] = (byte)((frameLen >> 8) & 0xFF); header[3] = (byte)(frameLen & 0xFF); - return header.Concat(frame).ToArray(); + frame.CopyTo(header.AsSpan(4)); + return header; } private static int GetAACLevel(int samplingFrequency, int channelConfiguration) diff --git a/src/SharpRTSPServer/BufferSegment.cs b/src/SharpRTSPServer/BufferSegment.cs new file mode 100644 index 0000000..3020ae5 --- /dev/null +++ b/src/SharpRTSPServer/BufferSegment.cs @@ -0,0 +1,54 @@ +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Text; +using static System.Net.Mime.MediaTypeNames; + +namespace SharpRTSPServer +{ + /// + /// Represents a segment of a of backed by a of . + /// + public sealed class BufferSegment : ReadOnlySequenceSegment, IDisposable + { + public static readonly BufferSegment Empty = new BufferSegment(); + private readonly IMemoryOwner _owner; + + private BufferSegment() + { + } + + /// + /// Initializes a new instance of the class. + /// + /// The memory owner that provides the memory for the segment. + /// The size of the memory slice to use for the segment. + public BufferSegment(IMemoryOwner owner, int size) + { + Memory = owner.Memory.Slice(0, size); + _owner = owner; + } + + /// + /// Appends a new segment to the current segment. + /// + /// The memory owner that provides the memory for the new segment. + /// The size of the memory slice to use for the new segment. + /// The newly created . + public BufferSegment Append(IMemoryOwner owner, int size) + { + var segment = new BufferSegment(owner, size); + segment.RunningIndex = RunningIndex + Memory.Length; + Next = segment; + return segment; + } + + /// + /// Releases the resources used by the . + /// + public void Dispose() + { + _owner?.Dispose(); + } + } +} diff --git a/src/SharpRTSPServer/G711Track.cs b/src/SharpRTSPServer/G711Track.cs index 07adc65..417731b 100644 --- a/src/SharpRTSPServer/G711Track.cs +++ b/src/SharpRTSPServer/G711Track.cs @@ -59,34 +59,29 @@ public override StringBuilder BuildSDP(StringBuilder sdp) /// An array of PCMU fragments. By default single fragment is expected. /// RTP timestamp in the timescale of the track. /// RTP packets. - public override (List>, List>) CreateRtpPackets(List samples, uint rtpTimestamp) + public override IByteBuffer CreateRtpPackets(ReadOnlySequence samples, uint rtpTimestamp) { - List> rtpPackets = new List>(); - List> memoryOwners = new List>(); + var byteBuffer = new PooledByteBuffer(initialBufferSize: 0); - for (int i = 0; i < samples.Count; i++) + foreach (var audioPacket in samples) { - var audioPacket = samples[i]; var size = 12 + audioPacket.Length; - var owner = MemoryPool.Shared.Rent(size); - memoryOwners.Add(owner); - - var rtpPacket = owner.Memory.Slice(0, size); + var rtpPacket = byteBuffer.GetSpan(size).Slice(0, size); + byteBuffer.Advance(size); const bool rtpPadding = false; const bool rtpHasExtension = false; int rtpCsrcCount = 0; const bool rtpMarker = true; - RTPPacketUtil.WriteHeader(rtpPacket.Span, + RTPPacketUtil.WriteHeader(rtpPacket, RTPPacketUtil.RTP_VERSION, rtpPadding, rtpHasExtension, rtpCsrcCount, rtpMarker, PayloadType); - RTPPacketUtil.WriteTS(rtpPacket.Span, rtpTimestamp); - audioPacket.CopyTo(rtpPacket.Slice(12)); - rtpPackets.Add(rtpPacket); + RTPPacketUtil.WriteTS(rtpPacket, rtpTimestamp); + audioPacket.Span.CopyTo(rtpPacket.Slice(12)); } - return (rtpPackets, memoryOwners); + return byteBuffer; } } @@ -144,34 +139,29 @@ public override StringBuilder BuildSDP(StringBuilder sdp) /// An array of PCMA fragments. By default single fragment is expected. /// RTP timestamp in the timescale of the track. /// RTP packets. - public override (List>, List>) CreateRtpPackets(List samples, uint rtpTimestamp) + public override IByteBuffer CreateRtpPackets(ReadOnlySequence samples, uint rtpTimestamp) { - List> rtpPackets = new List>(); - List> memoryOwners = new List>(); + var byteBuffer = new PooledByteBuffer(initialBufferSize: 0); - for (int i = 0; i < samples.Count; i++) + foreach (var audioPacket in samples) { - var audioPacket = samples[i]; var size = 12 + audioPacket.Length; - var owner = MemoryPool.Shared.Rent(size); - memoryOwners.Add(owner); - - var rtpPacket = owner.Memory.Slice(0, size); + var rtpPacket = byteBuffer.GetSpan(size).Slice(0, size); + byteBuffer.Advance(size); const bool rtpPadding = false; const bool rtpHasExtension = false; int rtpCsrcCount = 0; const bool rtpMarker = true; - RTPPacketUtil.WriteHeader(rtpPacket.Span, + RTPPacketUtil.WriteHeader(rtpPacket, RTPPacketUtil.RTP_VERSION, rtpPadding, rtpHasExtension, rtpCsrcCount, rtpMarker, PayloadType); - RTPPacketUtil.WriteTS(rtpPacket.Span, rtpTimestamp); - audioPacket.CopyTo(rtpPacket.Slice(12)); - rtpPackets.Add(rtpPacket); + RTPPacketUtil.WriteTS(rtpPacket, rtpTimestamp); + audioPacket.Span.CopyTo(rtpPacket.Slice(12)); } - return (rtpPackets, memoryOwners); + return byteBuffer; } } } diff --git a/src/SharpRTSPServer/H264Track.cs b/src/SharpRTSPServer/H264Track.cs index 80cbd72..0bd4359 100644 --- a/src/SharpRTSPServer/H264Track.cs +++ b/src/SharpRTSPServer/H264Track.cs @@ -160,18 +160,13 @@ public override StringBuilder BuildSDP(StringBuilder sdp) /// An array of H264 NALUs. /// RTP timestamp in the timescale of the track. /// RTP packets. - public override (List>, List>) CreateRtpPackets(List samples, uint rtpTimestamp) + public override IByteBuffer CreateRtpPackets(ReadOnlySequence samples, uint rtpTimestamp) { - List> rtpPackets = new List>(); - List> memoryOwners = new List>(); - for (int x = 0; x < samples.Count; x++) + var byteBuffer = new PooledByteBuffer(initialBufferSize: 0); + var length = samples.Length; + foreach (var rawNal in samples) { - var rawNal = samples[x]; - bool lastNal = false; - if (x == samples.Count - 1) - { - lastNal = true; // last NAL in our nal_array - } + bool lastNal = (length -= rawNal.Length) == 0; // The H264 Payload could be sent as one large RTP packet (assuming the receiver can handle it) // or as a Fragmented Data, split over several RTP packets with the same Timestamp. @@ -196,7 +191,7 @@ public override (List>, List>) CreateRtpPackets( int endBit = 0; // consume first byte of the raw_nal. It is used in the FU header - byte firstByte = rawNal[0]; + byte firstByte = rawNal.Span[0]; nalPointer++; dataRemaining--; @@ -210,9 +205,8 @@ public override (List>, List>) CreateRtpPackets( // 12 is header size. 2 bytes for H264 FU-A header var fuHeader = 2; var destSize = 12 + fuHeader + payloadSize; - var owner = MemoryPool.Shared.Rent(destSize); - memoryOwners.Add(owner); - var rtpPacket = owner.Memory.Slice(0, destSize); + var rtpPacket = byteBuffer.GetSpan( destSize).Slice(0, destSize); + byteBuffer.Advance(destSize); // RTP Packet Header // 0 - Version, P, X, CC, M, PT and Sequence Number @@ -225,13 +219,11 @@ public override (List>, List>) CreateRtpPackets( const bool rtpHasExtension = false; const int rtpCsrcCount = 0; - RTPPacketUtil.WriteHeader(rtpPacket.Span, RTPPacketUtil.RTP_VERSION, + RTPPacketUtil.WriteHeader(rtpPacket, RTPPacketUtil.RTP_VERSION, rtpPadding, rtpHasExtension, rtpCsrcCount, lastNal && endBit == 1, PayloadType); // sequence number and SSRC are set just before send - RTPPacketUtil.WriteTS(rtpPacket.Span, rtpTimestamp); - - // For H264 we need https://www.rfc-editor.org/rfc/rfc6184#page-29 + RTPPacketUtil.WriteTS(rtpPacket, rtpTimestamp); // Now append the Fragmentation Header (with Start and End marker) and part of the raw_nal const byte fBit = 0; @@ -239,16 +231,14 @@ public override (List>, List>) CreateRtpPackets( const byte type = 28; // FU-A Fragmentation // FU-A header - rtpPacket.Span[12] = (byte)((fBit << 7) + (nri << 5) + type); - rtpPacket.Span[13] = (byte)((startBit << 7) + (endBit << 6) + (0 << 5) + (firstByte & 0x1F)); + rtpPacket[12] = (byte)((fBit << 7) + (nri << 5) + type); + rtpPacket[13] = (byte)((startBit << 7) + (endBit << 6) + (0 << 5) + (firstByte & 0x1F)); - rawNal.AsSpan(nalPointer, payloadSize).CopyTo(rtpPacket.Slice(14).Span); + rawNal.Span.Slice(nalPointer, payloadSize).CopyTo(rtpPacket.Slice(14)); nalPointer += payloadSize; dataRemaining -= payloadSize; - rtpPackets.Add(rtpPacket); - startBit = 0; } } @@ -259,30 +249,28 @@ public override (List>, List>) CreateRtpPackets( // Also with RTP over RTSP there is a limit of 65535 bytes for the RTP packet. // 12 is header size when there are no CSRCs or extensions - var owner = MemoryPool.Shared.Rent(12 + rawNal.Length); - memoryOwners.Add(owner); - var rtpPacket = owner.Memory.Slice(0, 12 + rawNal.Length); + var destSize = 12 + rawNal.Length; + var rtpPacket = byteBuffer.GetSpan(destSize).Slice(0, destSize); + byteBuffer.Advance(destSize); const bool rtpPadding = false; const bool rtpHasExtension = false; const int rtpCsrcCount = 0; - RTPPacketUtil.WriteHeader(rtpPacket.Span, + RTPPacketUtil.WriteHeader(rtpPacket, RTPPacketUtil.RTP_VERSION, rtpPadding, rtpHasExtension, rtpCsrcCount, lastNal, PayloadType); // sequence number and SSRC are set just before send - RTPPacketUtil.WriteTS(rtpPacket.Span, rtpTimestamp); + RTPPacketUtil.WriteTS(rtpPacket, rtpTimestamp); // Now append the raw NAL - rawNal.CopyTo(rtpPacket.Slice(12)); - - rtpPackets.Add(rtpPacket); + rawNal.Span.CopyTo(rtpPacket.Slice(12)); } } - return (rtpPackets, memoryOwners); + return byteBuffer; } } } diff --git a/src/SharpRTSPServer/H265Track.cs b/src/SharpRTSPServer/H265Track.cs index 42af4b5..14a4024 100644 --- a/src/SharpRTSPServer/H265Track.cs +++ b/src/SharpRTSPServer/H265Track.cs @@ -132,18 +132,13 @@ public override StringBuilder BuildSDP(StringBuilder sdp) /// An array of H265 NALUs. /// RTP timestamp in the timescale of the track. /// RTP packets. - public override (List>, List>) CreateRtpPackets(List samples, uint rtpTimestamp) + public override IByteBuffer CreateRtpPackets(ReadOnlySequence samples, uint rtpTimestamp) { - List> rtpPackets = new List>(); - List> memoryOwners = new List>(); - for (int x = 0; x < samples.Count; x++) + var byteBuffer = new PooledByteBuffer(initialBufferSize: 0); + var length = samples.Length; + foreach (var rawNal in samples) { - var rawNal = samples[x]; - bool lastNal = false; - if (x == samples.Count - 1) - { - lastNal = true; // last NAL in our nal_array - } + bool lastNal = (length -= rawNal.Length) == 0; // The H265 Payload could be sent as one large RTP packet (assuming the receiver can handle it) // or as a Fragmented Data, split over several RTP packets with the same Timestamp. @@ -165,11 +160,11 @@ public override (List>, List>) CreateRtpPackets( int endBit = 0; // consume first byte of the raw_nal. It is used in the FU header - byte firstByte = rawNal[0]; + byte firstByte = rawNal.Span[0]; nalPointer++; dataRemaining--; - byte secondByte = rawNal[1]; + byte secondByte = rawNal.Span[1]; nalPointer++; dataRemaining--; @@ -183,9 +178,8 @@ public override (List>, List>) CreateRtpPackets( // 12 is header size. 3 bytes for H265 FU-A header var fuHeader = 3; var destSize = 12 + fuHeader + payloadSize; - var owner = MemoryPool.Shared.Rent(destSize); - memoryOwners.Add(owner); - var rtpPacket = owner.Memory.Slice(0, destSize); + var rtpPacket = byteBuffer.GetSpan(destSize).Slice(0, destSize); + byteBuffer.Advance(destSize); // RTP Packet Header // 0 - Version, P, X, CC, M, PT and Sequence Number @@ -198,11 +192,11 @@ public override (List>, List>) CreateRtpPackets( const bool rtpHasExtension = false; const int rtpCsrcCount = 0; - RTPPacketUtil.WriteHeader(rtpPacket.Span, RTPPacketUtil.RTP_VERSION, + RTPPacketUtil.WriteHeader(rtpPacket, RTPPacketUtil.RTP_VERSION, rtpPadding, rtpHasExtension, rtpCsrcCount, lastNal && endBit == 1, PayloadType); // sequence number and SSRC are set just before send - RTPPacketUtil.WriteTS(rtpPacket.Span, rtpTimestamp); + RTPPacketUtil.WriteTS(rtpPacket, rtpTimestamp); // For H265 we need https://www.rfc-editor.org/rfc/rfc7798#section-4.4.3 @@ -212,19 +206,17 @@ public override (List>, List>) CreateRtpPackets( const byte type = 49; // FU Fragmentation // PayloadHdr - rtpPacket.Span[12] = (byte)((fBit << 7) | ((type << 1) & 0x7E) | (firstByte & 0x1)); - rtpPacket.Span[13] = secondByte; + rtpPacket[12] = (byte)((fBit << 7) | ((type << 1) & 0x7E) | (firstByte & 0x1)); + rtpPacket[13] = secondByte; // FU header - rtpPacket.Span[14] = (byte)((startBit << 7) | (endBit << 6) | nalType); + rtpPacket[14] = (byte)((startBit << 7) | (endBit << 6) | nalType); - rawNal.AsSpan(nalPointer, payloadSize).CopyTo(rtpPacket.Slice(15).Span); + rawNal.Span.Slice(nalPointer, payloadSize).CopyTo(rtpPacket.Slice(15)); nalPointer += payloadSize; dataRemaining -= payloadSize; - rtpPackets.Add(rtpPacket); - startBit = 0; } } @@ -235,30 +227,28 @@ public override (List>, List>) CreateRtpPackets( // Also with RTP over RTSP there is a limit of 65535 bytes for the RTP packet. // 12 is header size when there are no CSRCs or extensions - var owner = MemoryPool.Shared.Rent(12 + rawNal.Length); - memoryOwners.Add(owner); - var rtpPacket = owner.Memory.Slice(0, 12 + rawNal.Length); + int destSize = 12 + rawNal.Length; + var rtpPacket = byteBuffer.GetSpan(destSize).Slice(0, destSize); + byteBuffer.Advance(destSize); const bool rtpPadding = false; const bool rtpHasExtension = false; const int rtpCsrcCount = 0; - RTPPacketUtil.WriteHeader(rtpPacket.Span, + RTPPacketUtil.WriteHeader(rtpPacket, RTPPacketUtil.RTP_VERSION, rtpPadding, rtpHasExtension, rtpCsrcCount, lastNal, PayloadType); // sequence number and SSRC are set just before send - RTPPacketUtil.WriteTS(rtpPacket.Span, rtpTimestamp); + RTPPacketUtil.WriteTS(rtpPacket, rtpTimestamp); // Now append the raw NAL - rawNal.CopyTo(rtpPacket.Slice(12)); - - rtpPackets.Add(rtpPacket); + rawNal.Span.CopyTo(rtpPacket.Slice(12)); } } - return (rtpPackets, memoryOwners); + return byteBuffer; } } } diff --git a/src/SharpRTSPServer/IByteBuffer.cs b/src/SharpRTSPServer/IByteBuffer.cs new file mode 100644 index 0000000..dd98fc8 --- /dev/null +++ b/src/SharpRTSPServer/IByteBuffer.cs @@ -0,0 +1,10 @@ +using System; +using System.Buffers; + +namespace SharpRTSPServer +{ + public interface IByteBuffer : IBufferWriter, IDisposable + { + ReadOnlySequence GetReadOnlySequence(); + } +} \ No newline at end of file diff --git a/src/SharpRTSPServer/MJpegTrack.cs b/src/SharpRTSPServer/MJpegTrack.cs index fb14bd1..312585d 100644 --- a/src/SharpRTSPServer/MJpegTrack.cs +++ b/src/SharpRTSPServer/MJpegTrack.cs @@ -32,17 +32,16 @@ public override StringBuilder BuildSDP(StringBuilder sdp) } /// - public override (List>, List>) CreateRtpPackets(List samples, uint rtpTimestamp) + public override IByteBuffer CreateRtpPackets(ReadOnlySequence samples, uint rtpTimestamp) { - if (samples.Count != 1) + if (samples.Length != 1) { throw new InvalidOperationException("Only 1 sample is supported."); } - var rtpPackets = new List>(); - var memoryOwners = new List>(); + var byteBuffer = new PooledByteBuffer(initialBufferSize: 0); - for (int i = 0; i < samples.Count; i++) + foreach (var sample in samples) { // https://en.wikipedia.org/wiki/JPEG_File_Interchange_Format const ushort SoiMarker = 0xFFD8; // SOI - Start of image header @@ -51,7 +50,7 @@ public override (List>, List>) CreateRtpPackets( //const ushort SosMarker = 0xFFDA; // SOS - Start of Scan marker const ushort EoiMarker = 0xFFD9; // EOI - End of Image marker - var jpegImage = samples[i].AsSpan(); + var jpegImage = sample.Span; var header = BinaryPrimitives.ReadUInt16BigEndian(jpegImage); if (header != SoiMarker) @@ -71,131 +70,131 @@ public override (List>, List>) CreateRtpPackets( var firstQuantizationtable = ReadOnlySpan.Empty; var secondQuantizationtable = ReadOnlySpan.Empty; - Span reader; - var jpegInfo = ParseJpeg(jpegImage, out firstQuantizationtable, out secondQuantizationtable, out reader); - - // Build a list of 1 or more RTP packets - // The last packet will have the M bit set to '1' - - var endOfFrame = false; - var firstFrame = true; - - // -8 for UDP header, -20 for IP header, -16 normal RTP header len. ** LESS RTP EXTENSIONS !!! - var packetMTU = 1400; // 65535; - - var dataPointer = 0; - - while (reader.Length > 0) + using (var imageDataBuffer = new PooledByteBuffer(initialBufferSize: 0)) { - bool shouldSendQuantizationTables = firstFrame && q > 127; + var jpegInfo = ParseJpeg(jpegImage, out firstQuantizationtable, out secondQuantizationtable, imageDataBuffer); - firstFrame = false; + // Build a list of 1 or more RTP packets + // The last packet will have the M bit set to '1' - int payloadSize = Math.Min(packetMTU, reader.Length); + var endOfFrame = false; + var firstFrame = true; - endOfFrame = payloadSize == reader.Length; + // -8 for UDP header, -20 for IP header, -16 normal RTP header len. ** LESS RTP EXTENSIONS !!! + var packetMTU = 1400; // 65535; - // 12 is header size. then jpeg header, then payload - var destSize = 12 + 8 + payloadSize; - var owner = MemoryPool.Shared.Rent(destSize); - memoryOwners.Add(owner); - var rtpPacket = owner.Memory.Slice(0, destSize); + var dataPointer = 0; - // RTP Packet Header - // 0 - Version, P, X, CC, M, PT and Sequence Number - //32 - Timestamp. H264 uses a 90kHz clock - //64 - SSRC - //96 - CSRCs (optional) - //nn - Extension ID and Length - //nn - Extension header + var imageDataReader = imageDataBuffer.GetReadOnlySequence(); + while (imageDataReader.Length > 0) + { + bool shouldSendQuantizationTables = firstFrame && q > 127; - var rtpPacketSpan = rtpPacket.Span; + firstFrame = false; - rtpPacketSpan.Slice(3, 9).Clear(); + int payloadSize = Math.Min(packetMTU, (int)imageDataReader.Length); - RTPPacketUtil.WriteHeader( - rtpPacket: rtpPacketSpan, - rtpVersion: RTPPacketUtil.RTP_VERSION, - rtpPadding: false, - rtpExtension: false, - rtpCsrcCount: 0, - rtpMarker: endOfFrame, - rtpPayloadType: PayloadType); + endOfFrame = payloadSize == imageDataReader.Length; - // sequence number and SSRC are set just before send - RTPPacketUtil.WriteTS(rtpPacketSpan, rtpTimestamp); + // 12 is header size. then jpeg header, then payload + var destSize = 12 + 8 + payloadSize; + var rtpPacket = byteBuffer.GetSpan(destSize).Slice(0, destSize); + byteBuffer.Advance(destSize); - rtpPacketSpan = rtpPacketSpan.Slice(12); + // RTP Packet Header + // 0 - Version, P, X, CC, M, PT and Sequence Number + //32 - Timestamp. H264 uses a 90kHz clock + //64 - SSRC + //96 - CSRCs (optional) + //nn - Extension ID and Length + //nn - Extension header - // For JPEG we need https://www.rfc-editor.org/rfc/rfc2435 + var rtpPacketSpan = rtpPacket; - BinaryPrimitives.WriteInt32BigEndian(rtpPacketSpan, dataPointer & 0x00FFFFFF); - rtpPacketSpan = rtpPacketSpan.Slice(4); + rtpPacketSpan.Slice(3, 9).Clear(); - // Write JPEG Header - https://datatracker.ietf.org/doc/html/rfc2435#section-3.1 - rtpPacketSpan[0] = jpegInfo.type; - rtpPacketSpan[1] = q; - rtpPacketSpan[2] = (byte)(jpegInfo.width >> 3); - rtpPacketSpan[3] = (byte)(jpegInfo.height >> 3); - rtpPacketSpan = rtpPacketSpan.Slice(4); + RTPPacketUtil.WriteHeader( + rtpPacket: rtpPacketSpan, + rtpVersion: RTPPacketUtil.RTP_VERSION, + rtpPadding: false, + rtpExtension: false, + rtpCsrcCount: 0, + rtpMarker: endOfFrame, + rtpPayloadType: PayloadType); - // write quantization tables - if (shouldSendQuantizationTables) - { - // Write Restart Marker header - https://datatracker.ietf.org/doc/html/rfc2435#section-3.1.7 - // Not present for type 1 + // sequence number and SSRC are set just before send + RTPPacketUtil.WriteTS(rtpPacketSpan, rtpTimestamp); - // Write Quantization Table header https://datatracker.ietf.org/doc/html/rfc2435#section-3.1.8 + rtpPacketSpan = rtpPacketSpan.Slice(12); - if (secondQuantizationtable.IsEmpty) - { - // MBZ - rtpPacketSpan[0] = (byte)(firstQuantizationtable[0] & 0xf); + // For JPEG we need https://www.rfc-editor.org/rfc/rfc2435 - // Precision - rtpPacketSpan[1] = (byte)(firstQuantizationtable[0] >> 4); + BinaryPrimitives.WriteInt32BigEndian(rtpPacketSpan, dataPointer & 0x00FFFFFF); + rtpPacketSpan = rtpPacketSpan.Slice(4); - // Length - var qtSize = firstQuantizationtable.Length - 1; - BinaryPrimitives.WriteInt16BigEndian(rtpPacketSpan.Slice(2), (short)qtSize); + // Write JPEG Header - https://datatracker.ietf.org/doc/html/rfc2435#section-3.1 + rtpPacketSpan[0] = jpegInfo.type; + rtpPacketSpan[1] = q; + rtpPacketSpan[2] = (byte)(jpegInfo.width >> 3); + rtpPacketSpan[3] = (byte)(jpegInfo.height >> 3); + rtpPacketSpan = rtpPacketSpan.Slice(4); - // Quantization Table Data - firstQuantizationtable.Slice(1).CopyTo(rtpPacketSpan.Slice(4)); - qtSize += 4; - rtpPacketSpan = rtpPacketSpan.Slice(qtSize); - payloadSize -= qtSize; - } - else // nbQuantizationTables == 2 + // write quantization tables + if (shouldSendQuantizationTables) { - // MBZ - rtpPacketSpan[0] = 0; - - // Precision - rtpPacketSpan[1] = (byte)(firstQuantizationtable[0] >> 4); - - // Length - var qtSize = firstQuantizationtable.Length + secondQuantizationtable.Length - 2; - BinaryPrimitives.WriteInt16BigEndian(rtpPacketSpan.Slice(2), (short)qtSize); - - // Quantization Table Data - firstQuantizationtable.Slice(1).CopyTo(rtpPacketSpan.Slice(4)); - secondQuantizationtable.Slice(1).CopyTo(rtpPacketSpan.Slice(3 + firstQuantizationtable.Length)); - qtSize += 4; - rtpPacketSpan = rtpPacketSpan.Slice(qtSize); - payloadSize -= qtSize; + // Write Restart Marker header - https://datatracker.ietf.org/doc/html/rfc2435#section-3.1.7 + // Not present for type 1 + + // Write Quantization Table header https://datatracker.ietf.org/doc/html/rfc2435#section-3.1.8 + + if (secondQuantizationtable.IsEmpty) + { + // MBZ + rtpPacketSpan[0] = (byte)(firstQuantizationtable[0] & 0xf); + + // Precision + rtpPacketSpan[1] = (byte)(firstQuantizationtable[0] >> 4); + + // Length + var qtSize = firstQuantizationtable.Length - 1; + BinaryPrimitives.WriteInt16BigEndian(rtpPacketSpan.Slice(2), (short)qtSize); + + // Quantization Table Data + firstQuantizationtable.Slice(1).CopyTo(rtpPacketSpan.Slice(4)); + qtSize += 4; + rtpPacketSpan = rtpPacketSpan.Slice(qtSize); + payloadSize -= qtSize; + } + else // nbQuantizationTables == 2 + { + // MBZ + rtpPacketSpan[0] = 0; + + // Precision + rtpPacketSpan[1] = (byte)(firstQuantizationtable[0] >> 4); + + // Length + var qtSize = firstQuantizationtable.Length + secondQuantizationtable.Length - 2; + BinaryPrimitives.WriteInt16BigEndian(rtpPacketSpan.Slice(2), (short)qtSize); + + // Quantization Table Data + firstQuantizationtable.Slice(1).CopyTo(rtpPacketSpan.Slice(4)); + secondQuantizationtable.Slice(1).CopyTo(rtpPacketSpan.Slice(3 + firstQuantizationtable.Length)); + qtSize += 4; + rtpPacketSpan = rtpPacketSpan.Slice(qtSize); + payloadSize -= qtSize; + } } - } - - // Write JPEG Payload - reader.Slice(0, rtpPacketSpan.Length).CopyTo(rtpPacketSpan); - reader = reader.Slice(rtpPacketSpan.Length); - dataPointer += rtpPacketSpan.Length; - rtpPackets.Add(rtpPacket); + // Write JPEG Payload + imageDataReader.Slice(0, rtpPacketSpan.Length).CopyTo(rtpPacketSpan); + imageDataReader = imageDataReader.Slice(rtpPacketSpan.Length); + dataPointer += rtpPacketSpan.Length; + } } } - return (rtpPackets, memoryOwners); + return byteBuffer; } public struct JpgComponent @@ -224,11 +223,11 @@ public JpgComponent(byte id, byte samp, byte qt) /// Thrown when JPEG image passed to this function does not meet the criteria for being used in RTP without re-encoding. /// The criteria are: image dimensions must be less than or equal to 2040 x 2040 pixels and chroma subsampling must be set to 4:2:0 or 4:2:2. /// - public static (int width, int height, int bpp, byte type) ParseJpeg(Span jpegImage, out ReadOnlySpan firstQuantizationTable, out ReadOnlySpan secondQuantizationTable, out Span imageData) + public static (int width, int height, int bpp, byte type) ParseJpeg(ReadOnlySpan jpegImage, out ReadOnlySpan firstQuantizationTable, out ReadOnlySpan secondQuantizationTable, IBufferWriter imageData) { firstQuantizationTable = ReadOnlySpan.Empty; secondQuantizationTable = ReadOnlySpan.Empty; - Span br = jpegImage; + var br = jpegImage; bool isDriPresent = false; // JPG magic bytes @@ -244,7 +243,7 @@ public static (int width, int height, int bpp, byte type) ParseJpeg(Span j // Start-Of-Frame (SOF) has 4 possible values if (br[1] == 0xc0 || br[1] == 0xc1 || br[1] == 0xc2 || br[1] == 0xc3) { - imageData = br; + var imageDataSpan = br; br = br.Slice(2); br = br.Slice(2); @@ -293,7 +292,7 @@ public static (int width, int height, int bpp, byte type) ParseJpeg(Span j { type = 0; } - else if(sortedComponents.First().samp == 0x22) + else if (sortedComponents.First().samp == 0x22) { type = 1; } @@ -302,7 +301,7 @@ public static (int width, int height, int bpp, byte type) ParseJpeg(Span j throw new NotSupportedException($"Unsupported chroma subsampling 0x{sortedComponents.First().samp:X2}. Supported chroma subsampling values are 4:2:0 (0x22,0x11,0x11) or 4:2:2 (0x21,0x11,0x11)."); } - foreach(var comp in sortedComponents.Skip(1)) + foreach (var comp in sortedComponents.Skip(1)) { if (comp.samp != 0x11) { @@ -316,6 +315,7 @@ public static (int width, int height, int bpp, byte type) ParseJpeg(Span j type += 64; } + imageData.Write(imageDataSpan); return (width, height, bpp, type); } @@ -341,7 +341,7 @@ public static (int width, int height, int bpp, byte type) ParseJpeg(Span j } // restart marker - if(marker == 0xdd) + if (marker == 0xdd) { isDriPresent = true; } diff --git a/src/SharpRTSPServer/MemoryExtensions.cs b/src/SharpRTSPServer/MemoryExtensions.cs new file mode 100644 index 0000000..fffc2bb --- /dev/null +++ b/src/SharpRTSPServer/MemoryExtensions.cs @@ -0,0 +1,15 @@ +using System; +using System.Buffers; + +namespace SharpRTSPServer +{ + internal static class MemoryExtensions + { + public static ReadOnlySpan GetFirstSpan(this ReadOnlySequence sequence) +#if NET8_0_OR_GREATER + => sequence.FirstSpan; +#else + => sequence.GetFirstSpan(); +#endif + } +} diff --git a/src/SharpRTSPServer/PooledByteBuffer.cs b/src/SharpRTSPServer/PooledByteBuffer.cs new file mode 100644 index 0000000..81f65d1 --- /dev/null +++ b/src/SharpRTSPServer/PooledByteBuffer.cs @@ -0,0 +1,161 @@ +using System; +using System.Buffers; +using System.Collections.Generic; + +namespace SharpRTSPServer +{ + /// + /// A pooled buffer writer that implements using for efficient writing of byte data and + /// allows reading the written content through a using the method. + /// + public sealed class PooledByteBuffer : IByteBuffer + { + private const int DefaultBufferSize = 4096; + + private readonly List _buffers = new List(); + private readonly ArrayPool _pool; + private int _currentIndex; + private int _currentOffset; + private bool _disposed; + + /// + /// Initializes a new instance of the class with an optional initial buffer size and array pool. + /// + /// The initial size of the buffer to rent from the pool. Defaults to 4096 bytes. + /// The array pool to use. If null, is used. + public PooledByteBuffer(int initialBufferSize = DefaultBufferSize, ArrayPool pool = null) + { + _pool = pool ?? ArrayPool.Shared; + AddNewBuffer(initialBufferSize); + } + + /// + /// Notifies the buffer writer that bytes were written. + /// + /// The number of bytes written. + /// Thrown if count is negative or exceeds the current buffer capacity. + public void Advance(int count) + { + if (count < 0 || _currentOffset + count > _buffers[_currentIndex].Length) + { + throw new ArgumentOutOfRangeException(nameof(count)); + } + + _currentOffset += count; + } + + /// + /// Returns a buffer to write to, ensuring at least bytes are available. + /// + /// The minimum number of bytes required. May be 0. + /// A writable memory buffer. + public Memory GetMemory(int sizeHint = 0) + { + EnsureCapacity(sizeHint); + return _buffers[_currentIndex].AsMemory(_currentOffset); + } + + /// + /// Returns a buffer to write to, ensuring at least bytes are available. + /// + /// The minimum number of bytes required. May be 0. + /// A writable span buffer. + public Span GetSpan(int sizeHint = 0) + { + EnsureCapacity(sizeHint); + return _buffers[_currentIndex].AsSpan(_currentOffset); + } + + /// + /// Returns a representing the written data across all buffers. + /// + /// A read-only sequence of bytes. + public ReadOnlySequence GetReadOnlySequence() + { + SequenceSegment first = null; + SequenceSegment last = null; + + for (var i = 0; i < _buffers.Count; i++) + { + var buffer = _buffers[i]; + var length = (i == _currentIndex) ? _currentOffset : buffer.Length; + + if (length == 0) + { + continue; + } + + var segment = new SequenceSegment(buffer.AsMemory(0, length)); + + if (first == null) + { + first = segment; + } + + if (last != null) + { + last.SetNext(segment); + } + + last = segment; + } + + if (first == null || last == null) + { + return ReadOnlySequence.Empty; + } + + return new ReadOnlySequence(first, 0, last, last.Memory.Length); + } + + /// + /// Releases all buffers back to the pool and clears internal state. + /// + public void Dispose() + { + if (_disposed) + { + return; + } + + foreach (var buffer in _buffers) + { + _pool.Return(buffer); + } + + _buffers.Clear(); + _disposed = true; + } + + private void EnsureCapacity(int sizeHint) + { + if (_currentOffset + sizeHint > _buffers[_currentIndex].Length) + { + var newSize = Math.Max(sizeHint, DefaultBufferSize); + AddNewBuffer(newSize); + } + } + + private void AddNewBuffer(int size) + { + var buffer = _pool.Rent(size); + _buffers.Add(buffer); + _currentIndex = _buffers.Count - 1; + _currentOffset = 0; + } + + private class SequenceSegment : ReadOnlySequenceSegment + { + public SequenceSegment(ReadOnlyMemory memory) + { + Memory = memory; + } + + public void SetNext(SequenceSegment next) + { + Next = next; + next.RunningIndex = RunningIndex + Memory.Length; + } + } + } +} diff --git a/src/SharpRTSPServer/ProxyTrack.cs b/src/SharpRTSPServer/ProxyTrack.cs index 19cbc72..931e434 100644 --- a/src/SharpRTSPServer/ProxyTrack.cs +++ b/src/SharpRTSPServer/ProxyTrack.cs @@ -16,11 +16,11 @@ public class ProxyTrack : TrackBase, IDisposable private bool _disposedValue; public override bool IsReady - { - get + { + get { return _isReady; - } + } } public Uri Uri { get; } @@ -40,23 +40,20 @@ public void Start() _isReady = true; } - public override (List>, List>) CreateRtpPackets(List samples, uint rtpTimestamp) + public override IByteBuffer CreateRtpPackets(ReadOnlySequence samples, uint rtpTimestamp) { - List> rtpPackets = new List>(); - List> memoryOwners = new List>(); - var owner = MemoryPool.Shared.Rent(samples[0].Length); - memoryOwners.Add(owner); - var rtpPacket = owner.Memory.Slice(0, samples[0].Length); - MemoryExtensions.CopyTo(samples[0], rtpPacket); - rtpPackets.Add(rtpPacket); - return (rtpPackets, memoryOwners); + int length = (int)samples.Length; + var byteBuffer = new PooledByteBuffer(length); + samples.CopyTo(byteBuffer.GetSpan(length)); + byteBuffer.Advance(length); + return byteBuffer; } protected virtual void Dispose(bool disposing) { if (!_disposedValue) { - if(disposing) + if (disposing) { _isReady = false; } diff --git a/src/SharpRTSPServer/RTPPacketUtil.cs b/src/SharpRTSPServer/RTPPacketUtil.cs index 0172462..05dba7b 100644 --- a/src/SharpRTSPServer/RTPPacketUtil.cs +++ b/src/SharpRTSPServer/RTPPacketUtil.cs @@ -30,7 +30,7 @@ public static void WriteSSRC(Span rtp_packet, uint ssrc) BinaryPrimitives.WriteUInt32BigEndian(rtp_packet.Slice(8), ssrc); } - public static uint ReadTS(byte[] data) + public static uint ReadTS(ReadOnlySpan data) { uint rtpTimestamp = ((uint)data[4] << 24) + (uint)(data[5] << 16) + (uint)(data[6] << 8) + data[7]; return rtpTimestamp; diff --git a/src/SharpRTSPServer/RTSPServer.cs b/src/SharpRTSPServer/RTSPServer.cs index 65ffc61..a0f0058 100644 --- a/src/SharpRTSPServer/RTSPServer.cs +++ b/src/SharpRTSPServer/RTSPServer.cs @@ -499,7 +499,7 @@ private void HandleDescribe(RtspListener listener, RtspRequest message) private string GenerateSDP() { - if(!string.IsNullOrEmpty(_sdp)) + if (!string.IsNullOrEmpty(_sdp)) return _sdp; // sdp StringBuilder sdp = new StringBuilder(); @@ -523,7 +523,7 @@ private string GenerateSDP() private RTSPConnection ConnectionByRtpTransport(IRtpTransport rtpTransport) { - if (rtpTransport == null) + if (rtpTransport == null) return null; lock (_connectionList) @@ -534,7 +534,7 @@ private RTSPConnection ConnectionByRtpTransport(IRtpTransport rtpTransport) private RTSPConnection ConnectionBySessionId(string sessionId) { - if (sessionId == null) + if (sessionId == null) return null; lock (_connectionList) @@ -578,7 +578,7 @@ public bool CanAcceptNewSamples() return true; } - public void FeedInRawRTP(int streamType, uint rtpTimestamp, List> rtpPackets) + public void FeedInRawRTP(int streamType, uint rtpTimestamp, ReadOnlySequence rtpPackets) { if (streamType != 0 && streamType != 1) throw new ArgumentException("Invalid streamType! Video = 0, Audio = 1"); @@ -614,7 +614,7 @@ public void FeedInRawRTP(int streamType, uint rtpTimestamp, List> r } } - public void SendRawRTP(RTSPConnection connection, RTPStream stream, List> rtpPackets) + public void SendRawRTP(RTSPConnection connection, RTPStream stream, ReadOnlySequence rtpPackets) { if (!connection.Play) return; @@ -622,38 +622,44 @@ public void SendRawRTP(RTSPConnection connection, RTPStream stream, List.Shared.Rent(28)) + using (var rtcpBuffer = new PooledByteBuffer(28)) { - var rtcpSenderReport = rtcpOwner.Memory.Slice(0, 28).Span; + const int size = 28; // 7 32-bit words + var rtcpSenderReport = rtcpBuffer.GetMemory(size).Slice(0, size).Span; + rtcpBuffer.Advance(size); const bool hasPadding = false; const int reportCount = 0; // an empty report int length = (rtcpSenderReport.Length / 4) - 1; // num 32 bit words minus 1 @@ -715,7 +723,7 @@ private void RemoveSession(RTSPConnection connection) private static string TransportLogName(IRtpTransport transport) { - switch(transport) + switch (transport) { case RtpTcpTransport _: return "TCP"; @@ -725,7 +733,8 @@ private static string TransportLogName(IRtpTransport transport) return "UDP"; default: return ""; - }; + } + ; } #region IDisposable @@ -746,7 +755,7 @@ protected virtual void Dispose(bool disposing) StopListen(); _stopping?.Dispose(); - if(VideoTrack is IDisposable disposableVideoTrack) + if (VideoTrack is IDisposable disposableVideoTrack) { disposableVideoTrack.Dispose(); VideoTrack = null; @@ -772,7 +781,7 @@ public void OverrideSDP(string sdp, bool mungleSDP = true) // we have to fill in the trackID to identify the session in RTSP using (var textReader = new StringReader(sdp)) { - while(true) + while (true) { string line = textReader.ReadLine(); @@ -781,7 +790,7 @@ public void OverrideSDP(string sdp, bool mungleSDP = true) builder.AppendLine(line); - if(line.StartsWith("m=")) + if (line.StartsWith("m=")) { builder.AppendLine($"a=control:trackID={mediaIndex++}"); } @@ -859,7 +868,7 @@ public class RTSPConnection /// /// RTSP Session ID used with this client connection. /// - public string SessionId { get; set; } = ""; + public string SessionId { get; set; } = ""; /// /// Video stream. @@ -889,7 +898,7 @@ public void UpdateKeepAlive() public interface IRtpSender { - void FeedInRawRTP(int streamType, uint rtpTimestamp, List> rtpPackets); + void FeedInRawRTP(int streamType, uint rtpTimestamp, ReadOnlySequence rtpPackets); bool CanAcceptNewSamples(); } @@ -930,9 +939,9 @@ public interface ITrack /// An array of samples. /// RTP timestamp in the timescale of the track. /// RTP packets. - (List>, List>) CreateRtpPackets(List samples, uint rtpTimestamp); - - void FeedInRawSamples(uint rtpTimestamp, List samples); + IByteBuffer CreateRtpPackets(ReadOnlySequence samples, uint rtpTimestamp); + + void FeedInRawSamples(uint rtpTimestamp, ReadOnlySequence samples); } public class CustomLoggerFactory : ILoggerFactory diff --git a/src/SharpRTSPServer/TrackBase.cs b/src/SharpRTSPServer/TrackBase.cs index c9e913b..ca9a6a2 100644 --- a/src/SharpRTSPServer/TrackBase.cs +++ b/src/SharpRTSPServer/TrackBase.cs @@ -1,6 +1,7 @@ using System; using System.Buffers; using System.Collections.Generic; +using System.Runtime.InteropServices; using System.Text; namespace SharpRTSPServer @@ -22,9 +23,9 @@ public abstract class TrackBase : ITrack public abstract StringBuilder BuildSDP(StringBuilder sdp); - public abstract (List>, List>) CreateRtpPackets(List samples, uint rtpTimestamp); + public abstract IByteBuffer CreateRtpPackets(ReadOnlySequence samples, uint rtpTimestamp); - public virtual void FeedInRawSamples(uint rtpTimestamp, List samples) + public virtual void FeedInRawSamples(uint rtpTimestamp, ReadOnlySequence samples) { if (Sink == null) throw new InvalidOperationException("Sink is null!!!"); @@ -35,13 +36,9 @@ public virtual void FeedInRawSamples(uint rtpTimestamp, List samples) if (ID != (int)TrackType.Video && ID != (int)TrackType.Audio) throw new ArgumentOutOfRangeException("ID must be 0 for video or 1 for audio"); - (List> rtpPackets, List> memoryOwners) = CreateRtpPackets(samples, rtpTimestamp); - - Sink.FeedInRawRTP(ID, rtpTimestamp, rtpPackets); - - foreach (var owner in memoryOwners) + using (var rtpPackets = CreateRtpPackets(samples, rtpTimestamp)) { - owner.Dispose(); + Sink.FeedInRawRTP(ID, rtpTimestamp, rtpPackets.GetReadOnlySequence()); } } }