diff --git a/src/Discord.Net.Core/Audio/IAudioClient.cs b/src/Discord.Net.Core/Audio/IAudioClient.cs
index 9be8ceef5..ad347d00d 100644
--- a/src/Discord.Net.Core/Audio/IAudioClient.cs
+++ b/src/Discord.Net.Core/Audio/IAudioClient.cs
@@ -31,5 +31,8 @@ namespace Discord.Audio
AudioOutStream CreatePCMStream(AudioApplication application, int? bitrate = null, int bufferMillis = 1000, int packetLoss = 30);
/// Creates a new direct outgoing stream accepting PCM (raw) data. This is a direct stream with no internal timer.
AudioOutStream CreateDirectPCMStream(AudioApplication application, int? bitrate = null, int packetLoss = 30);
+
+ /// Recycles an RTPFrame's payload buffer. Do not call more than once for a given frame.
+ void RecycleFrame(RTPFrame frame);
}
}
diff --git a/src/Discord.Net.Core/Audio/RTPFrame.cs b/src/Discord.Net.Core/Audio/RTPFrame.cs
index 6254b7173..3323a2bfb 100644
--- a/src/Discord.Net.Core/Audio/RTPFrame.cs
+++ b/src/Discord.Net.Core/Audio/RTPFrame.cs
@@ -5,7 +5,7 @@ namespace Discord.Audio
public readonly ushort Sequence;
public readonly uint Timestamp;
public readonly byte[] Payload;
- public readonly bool Missed;
+ public readonly bool Missed;
public RTPFrame(ushort sequence, uint timestamp, byte[] payload, bool missed)
{
diff --git a/src/Discord.Net.WebSocket/Audio/AudioClient.cs b/src/Discord.Net.WebSocket/Audio/AudioClient.cs
index 1f33b3cc5..c08bef3ae 100644
--- a/src/Discord.Net.WebSocket/Audio/AudioClient.cs
+++ b/src/Discord.Net.WebSocket/Audio/AudioClient.cs
@@ -38,6 +38,7 @@ namespace Discord.Audio
private readonly ConcurrentQueue> _keepaliveTimes;
private readonly ConcurrentDictionary _ssrcMap;
private readonly ConcurrentDictionary _streams;
+ private readonly ConcurrentQueue _frameBuffers;
private Task _heartbeatTask, _keepaliveTask;
private long _lastMessageTime;
@@ -79,6 +80,7 @@ namespace Discord.Audio
_keepaliveTimes = new ConcurrentQueue>();
_ssrcMap = new ConcurrentDictionary();
_streams = new ConcurrentDictionary();
+ _frameBuffers = new ConcurrentQueue();
_serializer = new JsonSerializer { ContractResolver = new DiscordContractResolver() };
_serializer.Error += (s, e) =>
@@ -174,7 +176,7 @@ namespace Discord.Audio
//Assume Thread-safe
if (!_streams.ContainsKey(userId))
{
- var readerStream = new InputStream(); //Consumes header
+ var readerStream = new InputStream(this); //Consumes header
var opusDecoder = new OpusDecodeStream(readerStream); //Passes header
//var jitterBuffer = new JitterBuffer(opusDecoder, _audioLogger);
var rtpReader = new RTPReadStream(opusDecoder); //Generates header
@@ -283,13 +285,13 @@ namespace Discord.Audio
return;
}
}
- private async Task ProcessPacketAsync(byte[] packet)
+ private async Task ProcessPacketAsync(byte[] packet, int offset, int count)
{
try
{
if (_connection.State == ConnectionState.Connecting)
{
- if (packet.Length != 70)
+ if (count != 70)
{
await _audioLogger.DebugAsync($"Malformed Packet").ConfigureAwait(false);
return;
@@ -298,8 +300,8 @@ namespace Discord.Audio
int port;
try
{
- ip = Encoding.UTF8.GetString(packet, 4, 70 - 6).TrimEnd('\0');
- port = (packet[69] << 8) | packet[68];
+ ip = Encoding.UTF8.GetString(packet, offset + 4, 70 - 6).TrimEnd('\0');
+ port = (packet[offset + 69] << 8) | packet[offset + 68];
}
catch (Exception ex)
{
@@ -312,19 +314,19 @@ namespace Discord.Audio
}
else if (_connection.State == ConnectionState.Connected)
{
- if (packet.Length == 8)
+ if (count == 8)
{
await _audioLogger.DebugAsync("Received Keepalive").ConfigureAwait(false);
ulong value =
- ((ulong)packet[0] >> 0) |
- ((ulong)packet[1] >> 8) |
- ((ulong)packet[2] >> 16) |
- ((ulong)packet[3] >> 24) |
- ((ulong)packet[4] >> 32) |
- ((ulong)packet[5] >> 40) |
- ((ulong)packet[6] >> 48) |
- ((ulong)packet[7] >> 56);
+ ((ulong)packet[offset + 0] >> 0) |
+ ((ulong)packet[offset + 1] >> 8) |
+ ((ulong)packet[offset + 2] >> 16) |
+ ((ulong)packet[offset + 3] >> 24) |
+ ((ulong)packet[offset + 4] >> 32) |
+ ((ulong)packet[offset + 5] >> 40) |
+ ((ulong)packet[offset + 6] >> 48) |
+ ((ulong)packet[offset + 7] >> 56);
while (_keepaliveTimes.TryDequeue(out var pair))
{
@@ -341,7 +343,7 @@ namespace Discord.Audio
}
else
{
- if (!RTPReadStream.TryReadSsrc(packet, 0, out var ssrc))
+ if (!RTPReadStream.TryReadSsrc(packet, offset, out var ssrc))
{
await _audioLogger.DebugAsync($"Malformed Frame").ConfigureAwait(false);
return;
@@ -358,14 +360,14 @@ namespace Discord.Audio
}
try
{
- await pair.Writer.WriteAsync(packet, 0, packet.Length).ConfigureAwait(false);
+ await pair.Writer.WriteAsync(packet, offset, count).ConfigureAwait(false);
}
catch (Exception ex)
{
await _audioLogger.DebugAsync($"Malformed Frame", ex).ConfigureAwait(false);
return;
}
- //await _audioLogger.DebugAsync($"Received {packet.Length} bytes from user {userId}").ConfigureAwait(false);
+ //await _audioLogger.DebugAsync($"Received {count} bytes from user {userId}").ConfigureAwait(false);
}
}
}
@@ -461,6 +463,18 @@ namespace Discord.Audio
}
}
+ public void RecycleFrame(RTPFrame frame)
+ {
+ if (_frameBuffers.Count < 100) //2s of audio
+ _frameBuffers.Enqueue(frame.Payload);
+ }
+ internal byte[] GetFrameBuffer()
+ {
+ if (_frameBuffers.TryDequeue(out var buffer))
+ return buffer;
+ return new byte[OpusConverter.FrameBytes];
+ }
+
internal void Dispose(bool disposing)
{
if (disposing)
diff --git a/src/Discord.Net.WebSocket/Audio/Streams/InputStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/InputStream.cs
index b9d6157ea..b876f31be 100644
--- a/src/Discord.Net.WebSocket/Audio/Streams/InputStream.cs
+++ b/src/Discord.Net.WebSocket/Audio/Streams/InputStream.cs
@@ -10,6 +10,7 @@ namespace Discord.Audio.Streams
{
private const int MaxFrames = 100; //1-2 Seconds
+ private readonly AudioClient _client;
private ConcurrentQueue _frames;
private SemaphoreSlim _signal;
private ushort _nextSeq;
@@ -23,8 +24,9 @@ namespace Discord.Audio.Streams
public override bool CanWrite => false;
public override int AvailableFrames => _signal.CurrentCount;
- public InputStream()
+ public InputStream(IAudioClient client)
{
+ _client = (AudioClient)client;
_frames = new ConcurrentQueue();
_signal = new SemaphoreSlim(0, MaxFrames);
}
@@ -70,30 +72,35 @@ namespace Discord.Audio.Streams
_nextMissed = missed;
}
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken)
+ => WriteAsync(buffer, offset, count, cancelToken, false);
+ internal Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken, bool claimBuffer)
{
cancelToken.ThrowIfCancellationRequested();
- if (_signal.CurrentCount >= MaxFrames) //1-2 seconds
- {
- _hasHeader = false;
- return Task.Delay(0); //Buffer overloaded
- }
if (!_hasHeader)
throw new InvalidOperationException("Received payload without an RTP header");
_hasHeader = false;
- byte[] payload = new byte[count];
- Buffer.BlockCopy(buffer, offset, payload, 0, count);
+
+ if (_signal.CurrentCount >= MaxFrames) //1-2 seconds
+ return Task.Delay(0); //Buffer overloaded
_frames.Enqueue(new RTPFrame(
sequence: _nextSeq,
timestamp: _nextTimestamp,
missed: _nextMissed,
- payload: payload
+ payload: CopyBuffer(buffer, offset, count)
));
_signal.Release();
return Task.Delay(0);
}
+ protected byte[] CopyBuffer(byte[] buffer, int offset, int count)
+ {
+ byte[] payload = _client.GetFrameBuffer();
+ Buffer.BlockCopy(buffer, offset, payload, 0, count);
+ return payload;
+ }
+
protected override void Dispose(bool isDisposing)
{
_isDisposed = true;
diff --git a/src/Discord.Net.WebSocket/Audio/Streams/RTPReadStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/RTPReadStream.cs
index 2cedea114..c6b704b1a 100644
--- a/src/Discord.Net.WebSocket/Audio/Streams/RTPReadStream.cs
+++ b/src/Discord.Net.WebSocket/Audio/Streams/RTPReadStream.cs
@@ -7,7 +7,7 @@ namespace Discord.Audio.Streams
public class RTPReadStream : AudioOutStream
{
private readonly AudioStream _next;
- private readonly byte[] _buffer, _nonce;
+ private readonly byte[] _nonce;
public override bool CanRead => true;
public override bool CanSeek => false;
@@ -16,7 +16,6 @@ namespace Discord.Audio.Streams
public RTPReadStream(AudioStream next, int bufferSize = 4000)
{
_next = next;
- _buffer = new byte[bufferSize];
_nonce = new byte[24];
}
diff --git a/src/Discord.Net.WebSocket/DiscordSocketApiClient.cs b/src/Discord.Net.WebSocket/DiscordSocketApiClient.cs
index 7d680eaf2..fc8f010c2 100644
--- a/src/Discord.Net.WebSocket/DiscordSocketApiClient.cs
+++ b/src/Discord.Net.WebSocket/DiscordSocketApiClient.cs
@@ -26,6 +26,8 @@ namespace Discord.API
public event Func Disconnected { add { _disconnectedEvent.Add(value); } remove { _disconnectedEvent.Remove(value); } }
private readonly AsyncEvent> _disconnectedEvent = new AsyncEvent>();
+ private readonly MemoryStream _decompressionStream;
+ private readonly JsonTextReader _decompressionJsonReader;
private CancellationTokenSource _connectCancelToken;
private string _gatewayUrl;
private bool _isExplicitUrl;
@@ -41,23 +43,23 @@ namespace Discord.API
_gatewayUrl = url;
if (url != null)
_isExplicitUrl = true;
+ _decompressionStream = new MemoryStream(10 * 1024); //10 KB
+ _decompressionJsonReader = new JsonTextReader(new StreamReader(_decompressionStream));
+
WebSocketClient = webSocketProvider();
//WebSocketClient.SetHeader("user-agent", DiscordConfig.UserAgent); (Causes issues in .NET Framework 4.6+)
WebSocketClient.BinaryMessage += async (data, index, count) =>
{
using (var compressed = new MemoryStream(data, index + 2, count - 2))
- using (var decompressed = new MemoryStream())
{
+ _decompressionStream.Position = 0;
using (var zlib = new DeflateStream(compressed, CompressionMode.Decompress))
- zlib.CopyTo(decompressed);
- decompressed.Position = 0;
- using (var reader = new StreamReader(decompressed))
- using (var jsonReader = new JsonTextReader(reader))
- {
- var msg = _serializer.Deserialize(jsonReader);
- if (msg != null)
- await _receivedGatewayEvent.InvokeAsync((GatewayOpCode)msg.Operation, msg.Sequence, msg.Type, msg.Payload).ConfigureAwait(false);
- }
+ zlib.CopyTo(_decompressionStream);
+
+ _decompressionStream.Position = 0;
+ var msg = _serializer.Deserialize(_decompressionJsonReader);
+ if (msg != null)
+ await _receivedGatewayEvent.InvokeAsync((GatewayOpCode)msg.Operation, msg.Sequence, msg.Type, msg.Payload).ConfigureAwait(false);
}
};
WebSocketClient.TextMessage += async text =>
diff --git a/src/Discord.Net.WebSocket/DiscordVoiceApiClient.cs b/src/Discord.Net.WebSocket/DiscordVoiceApiClient.cs
index 25dc2cf7b..a4d6daaeb 100644
--- a/src/Discord.Net.WebSocket/DiscordVoiceApiClient.cs
+++ b/src/Discord.Net.WebSocket/DiscordVoiceApiClient.cs
@@ -32,13 +32,15 @@ namespace Discord.Audio
public event Func ReceivedEvent { add { _receivedEvent.Add(value); } remove { _receivedEvent.Remove(value); } }
private readonly AsyncEvent> _receivedEvent = new AsyncEvent>();
- public event Func ReceivedPacket { add { _receivedPacketEvent.Add(value); } remove { _receivedPacketEvent.Remove(value); } }
- private readonly AsyncEvent> _receivedPacketEvent = new AsyncEvent>();
+ public event Func ReceivedPacket { add { _receivedPacketEvent.Add(value); } remove { _receivedPacketEvent.Remove(value); } }
+ private readonly AsyncEvent> _receivedPacketEvent = new AsyncEvent>();
public event Func Disconnected { add { _disconnectedEvent.Add(value); } remove { _disconnectedEvent.Remove(value); } }
private readonly AsyncEvent> _disconnectedEvent = new AsyncEvent>();
private readonly JsonSerializer _serializer;
private readonly SemaphoreSlim _connectionLock;
+ private readonly MemoryStream _decompressionStream;
+ private readonly JsonTextReader _decompressionJsonReader;
private CancellationTokenSource _connectCancelToken;
private IUdpSocket _udp;
private bool _isDisposed;
@@ -55,38 +57,35 @@ namespace Discord.Audio
GuildId = guildId;
_connectionLock = new SemaphoreSlim(1, 1);
_udp = udpSocketProvider();
- _udp.ReceivedDatagram += async (data, index, count) =>
- {
- if (index != 0 || count != data.Length)
- {
- var newData = new byte[count];
- Buffer.BlockCopy(data, index, newData, 0, count);
- data = newData;
- }
- await _receivedPacketEvent.InvokeAsync(data).ConfigureAwait(false);
- };
+ _udp.ReceivedDatagram += (data, index, count) => _receivedPacketEvent.InvokeAsync(data, index, count);
+ _decompressionStream = new MemoryStream(10 * 1024); //10 KB
+ _decompressionJsonReader = new JsonTextReader(new StreamReader(_decompressionStream));
WebSocketClient = webSocketProvider();
//_gatewayClient.SetHeader("user-agent", DiscordConfig.UserAgent); //(Causes issues in .Net 4.6+)
WebSocketClient.BinaryMessage += async (data, index, count) =>
{
using (var compressed = new MemoryStream(data, index + 2, count - 2))
- using (var decompressed = new MemoryStream())
{
+ _decompressionStream.Position = 0;
using (var zlib = new DeflateStream(compressed, CompressionMode.Decompress))
- zlib.CopyTo(decompressed);
- decompressed.Position = 0;
- using (var reader = new StreamReader(decompressed))
- {
- var msg = JsonConvert.DeserializeObject(reader.ReadToEnd());
+ zlib.CopyTo(_decompressionStream);
+
+ _decompressionStream.Position = 0;
+ var msg = _serializer.Deserialize(_decompressionJsonReader);
+ if (msg != null)
await _receivedEvent.InvokeAsync((VoiceOpCode)msg.Operation, msg.Payload).ConfigureAwait(false);
- }
}
};
WebSocketClient.TextMessage += async text =>
{
- var msg = JsonConvert.DeserializeObject(text);
- await _receivedEvent.InvokeAsync((VoiceOpCode)msg.Operation, msg.Payload).ConfigureAwait(false);
+ using (var reader = new StringReader(text))
+ using (var jsonReader = new JsonTextReader(reader))
+ {
+ var msg = _serializer.Deserialize(jsonReader);
+ if (msg != null)
+ await _receivedEvent.InvokeAsync((VoiceOpCode)msg.Operation, msg.Payload).ConfigureAwait(false);
+ }
};
WebSocketClient.Closed += async ex =>
{
diff --git a/src/Discord.Net.WebSocket/Net/DefaultWebSocketClient.cs b/src/Discord.Net.WebSocket/Net/DefaultWebSocketClient.cs
index 282ae210a..7d3f51083 100644
--- a/src/Discord.Net.WebSocket/Net/DefaultWebSocketClient.cs
+++ b/src/Discord.Net.WebSocket/Net/DefaultWebSocketClient.cs
@@ -212,7 +212,7 @@ namespace Discord.Net.WebSockets
else
result = stream.ToArray();
#else
- result = stream.GetBuffer();
+ result = stream.GetBuffer();
#endif
}
}