From f51e86e73678327809264ea72af3dc8c042fa395 Mon Sep 17 00:00:00 2001 From: RogueException Date: Fri, 21 Jul 2017 22:47:30 -0300 Subject: [PATCH] Reduce voice buffer allocations, add frame recycling --- src/Discord.Net.Core/Audio/IAudioClient.cs | 3 ++ src/Discord.Net.Core/Audio/RTPFrame.cs | 2 +- src/Discord.Net.WebSocket/Audio/AudioClient.cs | 48 ++++++++++++++-------- .../Audio/Streams/InputStream.cs | 25 +++++++---- .../Audio/Streams/RTPReadStream.cs | 3 +- .../DiscordSocketApiClient.cs | 22 +++++----- src/Discord.Net.WebSocket/DiscordVoiceApiClient.cs | 41 +++++++++--------- .../Net/DefaultWebSocketClient.cs | 2 +- 8 files changed, 85 insertions(+), 61 deletions(-) diff --git a/src/Discord.Net.Core/Audio/IAudioClient.cs b/src/Discord.Net.Core/Audio/IAudioClient.cs index 9be8ceef5..ad347d00d 100644 --- a/src/Discord.Net.Core/Audio/IAudioClient.cs +++ b/src/Discord.Net.Core/Audio/IAudioClient.cs @@ -31,5 +31,8 @@ namespace Discord.Audio AudioOutStream CreatePCMStream(AudioApplication application, int? bitrate = null, int bufferMillis = 1000, int packetLoss = 30); /// Creates a new direct outgoing stream accepting PCM (raw) data. This is a direct stream with no internal timer. AudioOutStream CreateDirectPCMStream(AudioApplication application, int? bitrate = null, int packetLoss = 30); + + /// Recycles an RTPFrame's payload buffer. Do not call more than once for a given frame. + void RecycleFrame(RTPFrame frame); } } diff --git a/src/Discord.Net.Core/Audio/RTPFrame.cs b/src/Discord.Net.Core/Audio/RTPFrame.cs index 6254b7173..3323a2bfb 100644 --- a/src/Discord.Net.Core/Audio/RTPFrame.cs +++ b/src/Discord.Net.Core/Audio/RTPFrame.cs @@ -5,7 +5,7 @@ namespace Discord.Audio public readonly ushort Sequence; public readonly uint Timestamp; public readonly byte[] Payload; - public readonly bool Missed; + public readonly bool Missed; public RTPFrame(ushort sequence, uint timestamp, byte[] payload, bool missed) { diff --git a/src/Discord.Net.WebSocket/Audio/AudioClient.cs b/src/Discord.Net.WebSocket/Audio/AudioClient.cs index 1f33b3cc5..c08bef3ae 100644 --- a/src/Discord.Net.WebSocket/Audio/AudioClient.cs +++ b/src/Discord.Net.WebSocket/Audio/AudioClient.cs @@ -38,6 +38,7 @@ namespace Discord.Audio private readonly ConcurrentQueue> _keepaliveTimes; private readonly ConcurrentDictionary _ssrcMap; private readonly ConcurrentDictionary _streams; + private readonly ConcurrentQueue _frameBuffers; private Task _heartbeatTask, _keepaliveTask; private long _lastMessageTime; @@ -79,6 +80,7 @@ namespace Discord.Audio _keepaliveTimes = new ConcurrentQueue>(); _ssrcMap = new ConcurrentDictionary(); _streams = new ConcurrentDictionary(); + _frameBuffers = new ConcurrentQueue(); _serializer = new JsonSerializer { ContractResolver = new DiscordContractResolver() }; _serializer.Error += (s, e) => @@ -174,7 +176,7 @@ namespace Discord.Audio //Assume Thread-safe if (!_streams.ContainsKey(userId)) { - var readerStream = new InputStream(); //Consumes header + var readerStream = new InputStream(this); //Consumes header var opusDecoder = new OpusDecodeStream(readerStream); //Passes header //var jitterBuffer = new JitterBuffer(opusDecoder, _audioLogger); var rtpReader = new RTPReadStream(opusDecoder); //Generates header @@ -283,13 +285,13 @@ namespace Discord.Audio return; } } - private async Task ProcessPacketAsync(byte[] packet) + private async Task ProcessPacketAsync(byte[] packet, int offset, int count) { try { if (_connection.State == ConnectionState.Connecting) { - if (packet.Length != 70) + if (count != 70) { await _audioLogger.DebugAsync($"Malformed Packet").ConfigureAwait(false); return; @@ -298,8 +300,8 @@ namespace Discord.Audio int port; try { - ip = Encoding.UTF8.GetString(packet, 4, 70 - 6).TrimEnd('\0'); - port = (packet[69] << 8) | packet[68]; + ip = Encoding.UTF8.GetString(packet, offset + 4, 70 - 6).TrimEnd('\0'); + port = (packet[offset + 69] << 8) | packet[offset + 68]; } catch (Exception ex) { @@ -312,19 +314,19 @@ namespace Discord.Audio } else if (_connection.State == ConnectionState.Connected) { - if (packet.Length == 8) + if (count == 8) { await _audioLogger.DebugAsync("Received Keepalive").ConfigureAwait(false); ulong value = - ((ulong)packet[0] >> 0) | - ((ulong)packet[1] >> 8) | - ((ulong)packet[2] >> 16) | - ((ulong)packet[3] >> 24) | - ((ulong)packet[4] >> 32) | - ((ulong)packet[5] >> 40) | - ((ulong)packet[6] >> 48) | - ((ulong)packet[7] >> 56); + ((ulong)packet[offset + 0] >> 0) | + ((ulong)packet[offset + 1] >> 8) | + ((ulong)packet[offset + 2] >> 16) | + ((ulong)packet[offset + 3] >> 24) | + ((ulong)packet[offset + 4] >> 32) | + ((ulong)packet[offset + 5] >> 40) | + ((ulong)packet[offset + 6] >> 48) | + ((ulong)packet[offset + 7] >> 56); while (_keepaliveTimes.TryDequeue(out var pair)) { @@ -341,7 +343,7 @@ namespace Discord.Audio } else { - if (!RTPReadStream.TryReadSsrc(packet, 0, out var ssrc)) + if (!RTPReadStream.TryReadSsrc(packet, offset, out var ssrc)) { await _audioLogger.DebugAsync($"Malformed Frame").ConfigureAwait(false); return; @@ -358,14 +360,14 @@ namespace Discord.Audio } try { - await pair.Writer.WriteAsync(packet, 0, packet.Length).ConfigureAwait(false); + await pair.Writer.WriteAsync(packet, offset, count).ConfigureAwait(false); } catch (Exception ex) { await _audioLogger.DebugAsync($"Malformed Frame", ex).ConfigureAwait(false); return; } - //await _audioLogger.DebugAsync($"Received {packet.Length} bytes from user {userId}").ConfigureAwait(false); + //await _audioLogger.DebugAsync($"Received {count} bytes from user {userId}").ConfigureAwait(false); } } } @@ -461,6 +463,18 @@ namespace Discord.Audio } } + public void RecycleFrame(RTPFrame frame) + { + if (_frameBuffers.Count < 100) //2s of audio + _frameBuffers.Enqueue(frame.Payload); + } + internal byte[] GetFrameBuffer() + { + if (_frameBuffers.TryDequeue(out var buffer)) + return buffer; + return new byte[OpusConverter.FrameBytes]; + } + internal void Dispose(bool disposing) { if (disposing) diff --git a/src/Discord.Net.WebSocket/Audio/Streams/InputStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/InputStream.cs index b9d6157ea..b876f31be 100644 --- a/src/Discord.Net.WebSocket/Audio/Streams/InputStream.cs +++ b/src/Discord.Net.WebSocket/Audio/Streams/InputStream.cs @@ -10,6 +10,7 @@ namespace Discord.Audio.Streams { private const int MaxFrames = 100; //1-2 Seconds + private readonly AudioClient _client; private ConcurrentQueue _frames; private SemaphoreSlim _signal; private ushort _nextSeq; @@ -23,8 +24,9 @@ namespace Discord.Audio.Streams public override bool CanWrite => false; public override int AvailableFrames => _signal.CurrentCount; - public InputStream() + public InputStream(IAudioClient client) { + _client = (AudioClient)client; _frames = new ConcurrentQueue(); _signal = new SemaphoreSlim(0, MaxFrames); } @@ -70,30 +72,35 @@ namespace Discord.Audio.Streams _nextMissed = missed; } public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken) + => WriteAsync(buffer, offset, count, cancelToken, false); + internal Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken, bool claimBuffer) { cancelToken.ThrowIfCancellationRequested(); - if (_signal.CurrentCount >= MaxFrames) //1-2 seconds - { - _hasHeader = false; - return Task.Delay(0); //Buffer overloaded - } if (!_hasHeader) throw new InvalidOperationException("Received payload without an RTP header"); _hasHeader = false; - byte[] payload = new byte[count]; - Buffer.BlockCopy(buffer, offset, payload, 0, count); + + if (_signal.CurrentCount >= MaxFrames) //1-2 seconds + return Task.Delay(0); //Buffer overloaded _frames.Enqueue(new RTPFrame( sequence: _nextSeq, timestamp: _nextTimestamp, missed: _nextMissed, - payload: payload + payload: CopyBuffer(buffer, offset, count) )); _signal.Release(); return Task.Delay(0); } + protected byte[] CopyBuffer(byte[] buffer, int offset, int count) + { + byte[] payload = _client.GetFrameBuffer(); + Buffer.BlockCopy(buffer, offset, payload, 0, count); + return payload; + } + protected override void Dispose(bool isDisposing) { _isDisposed = true; diff --git a/src/Discord.Net.WebSocket/Audio/Streams/RTPReadStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/RTPReadStream.cs index 2cedea114..c6b704b1a 100644 --- a/src/Discord.Net.WebSocket/Audio/Streams/RTPReadStream.cs +++ b/src/Discord.Net.WebSocket/Audio/Streams/RTPReadStream.cs @@ -7,7 +7,7 @@ namespace Discord.Audio.Streams public class RTPReadStream : AudioOutStream { private readonly AudioStream _next; - private readonly byte[] _buffer, _nonce; + private readonly byte[] _nonce; public override bool CanRead => true; public override bool CanSeek => false; @@ -16,7 +16,6 @@ namespace Discord.Audio.Streams public RTPReadStream(AudioStream next, int bufferSize = 4000) { _next = next; - _buffer = new byte[bufferSize]; _nonce = new byte[24]; } diff --git a/src/Discord.Net.WebSocket/DiscordSocketApiClient.cs b/src/Discord.Net.WebSocket/DiscordSocketApiClient.cs index 7d680eaf2..fc8f010c2 100644 --- a/src/Discord.Net.WebSocket/DiscordSocketApiClient.cs +++ b/src/Discord.Net.WebSocket/DiscordSocketApiClient.cs @@ -26,6 +26,8 @@ namespace Discord.API public event Func Disconnected { add { _disconnectedEvent.Add(value); } remove { _disconnectedEvent.Remove(value); } } private readonly AsyncEvent> _disconnectedEvent = new AsyncEvent>(); + private readonly MemoryStream _decompressionStream; + private readonly JsonTextReader _decompressionJsonReader; private CancellationTokenSource _connectCancelToken; private string _gatewayUrl; private bool _isExplicitUrl; @@ -41,23 +43,23 @@ namespace Discord.API _gatewayUrl = url; if (url != null) _isExplicitUrl = true; + _decompressionStream = new MemoryStream(10 * 1024); //10 KB + _decompressionJsonReader = new JsonTextReader(new StreamReader(_decompressionStream)); + WebSocketClient = webSocketProvider(); //WebSocketClient.SetHeader("user-agent", DiscordConfig.UserAgent); (Causes issues in .NET Framework 4.6+) WebSocketClient.BinaryMessage += async (data, index, count) => { using (var compressed = new MemoryStream(data, index + 2, count - 2)) - using (var decompressed = new MemoryStream()) { + _decompressionStream.Position = 0; using (var zlib = new DeflateStream(compressed, CompressionMode.Decompress)) - zlib.CopyTo(decompressed); - decompressed.Position = 0; - using (var reader = new StreamReader(decompressed)) - using (var jsonReader = new JsonTextReader(reader)) - { - var msg = _serializer.Deserialize(jsonReader); - if (msg != null) - await _receivedGatewayEvent.InvokeAsync((GatewayOpCode)msg.Operation, msg.Sequence, msg.Type, msg.Payload).ConfigureAwait(false); - } + zlib.CopyTo(_decompressionStream); + + _decompressionStream.Position = 0; + var msg = _serializer.Deserialize(_decompressionJsonReader); + if (msg != null) + await _receivedGatewayEvent.InvokeAsync((GatewayOpCode)msg.Operation, msg.Sequence, msg.Type, msg.Payload).ConfigureAwait(false); } }; WebSocketClient.TextMessage += async text => diff --git a/src/Discord.Net.WebSocket/DiscordVoiceApiClient.cs b/src/Discord.Net.WebSocket/DiscordVoiceApiClient.cs index 25dc2cf7b..a4d6daaeb 100644 --- a/src/Discord.Net.WebSocket/DiscordVoiceApiClient.cs +++ b/src/Discord.Net.WebSocket/DiscordVoiceApiClient.cs @@ -32,13 +32,15 @@ namespace Discord.Audio public event Func ReceivedEvent { add { _receivedEvent.Add(value); } remove { _receivedEvent.Remove(value); } } private readonly AsyncEvent> _receivedEvent = new AsyncEvent>(); - public event Func ReceivedPacket { add { _receivedPacketEvent.Add(value); } remove { _receivedPacketEvent.Remove(value); } } - private readonly AsyncEvent> _receivedPacketEvent = new AsyncEvent>(); + public event Func ReceivedPacket { add { _receivedPacketEvent.Add(value); } remove { _receivedPacketEvent.Remove(value); } } + private readonly AsyncEvent> _receivedPacketEvent = new AsyncEvent>(); public event Func Disconnected { add { _disconnectedEvent.Add(value); } remove { _disconnectedEvent.Remove(value); } } private readonly AsyncEvent> _disconnectedEvent = new AsyncEvent>(); private readonly JsonSerializer _serializer; private readonly SemaphoreSlim _connectionLock; + private readonly MemoryStream _decompressionStream; + private readonly JsonTextReader _decompressionJsonReader; private CancellationTokenSource _connectCancelToken; private IUdpSocket _udp; private bool _isDisposed; @@ -55,38 +57,35 @@ namespace Discord.Audio GuildId = guildId; _connectionLock = new SemaphoreSlim(1, 1); _udp = udpSocketProvider(); - _udp.ReceivedDatagram += async (data, index, count) => - { - if (index != 0 || count != data.Length) - { - var newData = new byte[count]; - Buffer.BlockCopy(data, index, newData, 0, count); - data = newData; - } - await _receivedPacketEvent.InvokeAsync(data).ConfigureAwait(false); - }; + _udp.ReceivedDatagram += (data, index, count) => _receivedPacketEvent.InvokeAsync(data, index, count); + _decompressionStream = new MemoryStream(10 * 1024); //10 KB + _decompressionJsonReader = new JsonTextReader(new StreamReader(_decompressionStream)); WebSocketClient = webSocketProvider(); //_gatewayClient.SetHeader("user-agent", DiscordConfig.UserAgent); //(Causes issues in .Net 4.6+) WebSocketClient.BinaryMessage += async (data, index, count) => { using (var compressed = new MemoryStream(data, index + 2, count - 2)) - using (var decompressed = new MemoryStream()) { + _decompressionStream.Position = 0; using (var zlib = new DeflateStream(compressed, CompressionMode.Decompress)) - zlib.CopyTo(decompressed); - decompressed.Position = 0; - using (var reader = new StreamReader(decompressed)) - { - var msg = JsonConvert.DeserializeObject(reader.ReadToEnd()); + zlib.CopyTo(_decompressionStream); + + _decompressionStream.Position = 0; + var msg = _serializer.Deserialize(_decompressionJsonReader); + if (msg != null) await _receivedEvent.InvokeAsync((VoiceOpCode)msg.Operation, msg.Payload).ConfigureAwait(false); - } } }; WebSocketClient.TextMessage += async text => { - var msg = JsonConvert.DeserializeObject(text); - await _receivedEvent.InvokeAsync((VoiceOpCode)msg.Operation, msg.Payload).ConfigureAwait(false); + using (var reader = new StringReader(text)) + using (var jsonReader = new JsonTextReader(reader)) + { + var msg = _serializer.Deserialize(jsonReader); + if (msg != null) + await _receivedEvent.InvokeAsync((VoiceOpCode)msg.Operation, msg.Payload).ConfigureAwait(false); + } }; WebSocketClient.Closed += async ex => { diff --git a/src/Discord.Net.WebSocket/Net/DefaultWebSocketClient.cs b/src/Discord.Net.WebSocket/Net/DefaultWebSocketClient.cs index 282ae210a..7d3f51083 100644 --- a/src/Discord.Net.WebSocket/Net/DefaultWebSocketClient.cs +++ b/src/Discord.Net.WebSocket/Net/DefaultWebSocketClient.cs @@ -212,7 +212,7 @@ namespace Discord.Net.WebSockets else result = stream.ToArray(); #else - result = stream.GetBuffer(); + result = stream.GetBuffer(); #endif } }