@@ -31,5 +31,8 @@ namespace Discord.Audio | |||
AudioOutStream CreatePCMStream(AudioApplication application, int? bitrate = null, int bufferMillis = 1000, int packetLoss = 30); | |||
/// <summary>Creates a new direct outgoing stream accepting PCM (raw) data. This is a direct stream with no internal timer.</summary> | |||
AudioOutStream CreateDirectPCMStream(AudioApplication application, int? bitrate = null, int packetLoss = 30); | |||
/// <summary>Recycles an RTPFrame's payload buffer. Do not call more than once for a given frame.</summary> | |||
void RecycleFrame(RTPFrame frame); | |||
} | |||
} |
@@ -5,7 +5,7 @@ namespace Discord.Audio | |||
public readonly ushort Sequence; | |||
public readonly uint Timestamp; | |||
public readonly byte[] Payload; | |||
public readonly bool Missed; | |||
public readonly bool Missed; | |||
public RTPFrame(ushort sequence, uint timestamp, byte[] payload, bool missed) | |||
{ | |||
@@ -38,6 +38,7 @@ namespace Discord.Audio | |||
private readonly ConcurrentQueue<KeyValuePair<ulong, int>> _keepaliveTimes; | |||
private readonly ConcurrentDictionary<uint, ulong> _ssrcMap; | |||
private readonly ConcurrentDictionary<ulong, StreamPair> _streams; | |||
private readonly ConcurrentQueue<byte[]> _frameBuffers; | |||
private Task _heartbeatTask, _keepaliveTask; | |||
private long _lastMessageTime; | |||
@@ -79,6 +80,7 @@ namespace Discord.Audio | |||
_keepaliveTimes = new ConcurrentQueue<KeyValuePair<ulong, int>>(); | |||
_ssrcMap = new ConcurrentDictionary<uint, ulong>(); | |||
_streams = new ConcurrentDictionary<ulong, StreamPair>(); | |||
_frameBuffers = new ConcurrentQueue<byte[]>(); | |||
_serializer = new JsonSerializer { ContractResolver = new DiscordContractResolver() }; | |||
_serializer.Error += (s, e) => | |||
@@ -174,7 +176,7 @@ namespace Discord.Audio | |||
//Assume Thread-safe | |||
if (!_streams.ContainsKey(userId)) | |||
{ | |||
var readerStream = new InputStream(); //Consumes header | |||
var readerStream = new InputStream(this); //Consumes header | |||
var opusDecoder = new OpusDecodeStream(readerStream); //Passes header | |||
//var jitterBuffer = new JitterBuffer(opusDecoder, _audioLogger); | |||
var rtpReader = new RTPReadStream(opusDecoder); //Generates header | |||
@@ -283,13 +285,13 @@ namespace Discord.Audio | |||
return; | |||
} | |||
} | |||
private async Task ProcessPacketAsync(byte[] packet) | |||
private async Task ProcessPacketAsync(byte[] packet, int offset, int count) | |||
{ | |||
try | |||
{ | |||
if (_connection.State == ConnectionState.Connecting) | |||
{ | |||
if (packet.Length != 70) | |||
if (count != 70) | |||
{ | |||
await _audioLogger.DebugAsync($"Malformed Packet").ConfigureAwait(false); | |||
return; | |||
@@ -298,8 +300,8 @@ namespace Discord.Audio | |||
int port; | |||
try | |||
{ | |||
ip = Encoding.UTF8.GetString(packet, 4, 70 - 6).TrimEnd('\0'); | |||
port = (packet[69] << 8) | packet[68]; | |||
ip = Encoding.UTF8.GetString(packet, offset + 4, 70 - 6).TrimEnd('\0'); | |||
port = (packet[offset + 69] << 8) | packet[offset + 68]; | |||
} | |||
catch (Exception ex) | |||
{ | |||
@@ -312,19 +314,19 @@ namespace Discord.Audio | |||
} | |||
else if (_connection.State == ConnectionState.Connected) | |||
{ | |||
if (packet.Length == 8) | |||
if (count == 8) | |||
{ | |||
await _audioLogger.DebugAsync("Received Keepalive").ConfigureAwait(false); | |||
ulong value = | |||
((ulong)packet[0] >> 0) | | |||
((ulong)packet[1] >> 8) | | |||
((ulong)packet[2] >> 16) | | |||
((ulong)packet[3] >> 24) | | |||
((ulong)packet[4] >> 32) | | |||
((ulong)packet[5] >> 40) | | |||
((ulong)packet[6] >> 48) | | |||
((ulong)packet[7] >> 56); | |||
((ulong)packet[offset + 0] >> 0) | | |||
((ulong)packet[offset + 1] >> 8) | | |||
((ulong)packet[offset + 2] >> 16) | | |||
((ulong)packet[offset + 3] >> 24) | | |||
((ulong)packet[offset + 4] >> 32) | | |||
((ulong)packet[offset + 5] >> 40) | | |||
((ulong)packet[offset + 6] >> 48) | | |||
((ulong)packet[offset + 7] >> 56); | |||
while (_keepaliveTimes.TryDequeue(out var pair)) | |||
{ | |||
@@ -341,7 +343,7 @@ namespace Discord.Audio | |||
} | |||
else | |||
{ | |||
if (!RTPReadStream.TryReadSsrc(packet, 0, out var ssrc)) | |||
if (!RTPReadStream.TryReadSsrc(packet, offset, out var ssrc)) | |||
{ | |||
await _audioLogger.DebugAsync($"Malformed Frame").ConfigureAwait(false); | |||
return; | |||
@@ -358,14 +360,14 @@ namespace Discord.Audio | |||
} | |||
try | |||
{ | |||
await pair.Writer.WriteAsync(packet, 0, packet.Length).ConfigureAwait(false); | |||
await pair.Writer.WriteAsync(packet, offset, count).ConfigureAwait(false); | |||
} | |||
catch (Exception ex) | |||
{ | |||
await _audioLogger.DebugAsync($"Malformed Frame", ex).ConfigureAwait(false); | |||
return; | |||
} | |||
//await _audioLogger.DebugAsync($"Received {packet.Length} bytes from user {userId}").ConfigureAwait(false); | |||
//await _audioLogger.DebugAsync($"Received {count} bytes from user {userId}").ConfigureAwait(false); | |||
} | |||
} | |||
} | |||
@@ -461,6 +463,18 @@ namespace Discord.Audio | |||
} | |||
} | |||
public void RecycleFrame(RTPFrame frame) | |||
{ | |||
if (_frameBuffers.Count < 100) //2s of audio | |||
_frameBuffers.Enqueue(frame.Payload); | |||
} | |||
internal byte[] GetFrameBuffer() | |||
{ | |||
if (_frameBuffers.TryDequeue(out var buffer)) | |||
return buffer; | |||
return new byte[OpusConverter.FrameBytes]; | |||
} | |||
internal void Dispose(bool disposing) | |||
{ | |||
if (disposing) | |||
@@ -10,6 +10,7 @@ namespace Discord.Audio.Streams | |||
{ | |||
private const int MaxFrames = 100; //1-2 Seconds | |||
private readonly AudioClient _client; | |||
private ConcurrentQueue<RTPFrame> _frames; | |||
private SemaphoreSlim _signal; | |||
private ushort _nextSeq; | |||
@@ -23,8 +24,9 @@ namespace Discord.Audio.Streams | |||
public override bool CanWrite => false; | |||
public override int AvailableFrames => _signal.CurrentCount; | |||
public InputStream() | |||
public InputStream(IAudioClient client) | |||
{ | |||
_client = (AudioClient)client; | |||
_frames = new ConcurrentQueue<RTPFrame>(); | |||
_signal = new SemaphoreSlim(0, MaxFrames); | |||
} | |||
@@ -70,30 +72,35 @@ namespace Discord.Audio.Streams | |||
_nextMissed = missed; | |||
} | |||
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken) | |||
=> WriteAsync(buffer, offset, count, cancelToken, false); | |||
internal Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken, bool claimBuffer) | |||
{ | |||
cancelToken.ThrowIfCancellationRequested(); | |||
if (_signal.CurrentCount >= MaxFrames) //1-2 seconds | |||
{ | |||
_hasHeader = false; | |||
return Task.Delay(0); //Buffer overloaded | |||
} | |||
if (!_hasHeader) | |||
throw new InvalidOperationException("Received payload without an RTP header"); | |||
_hasHeader = false; | |||
byte[] payload = new byte[count]; | |||
Buffer.BlockCopy(buffer, offset, payload, 0, count); | |||
if (_signal.CurrentCount >= MaxFrames) //1-2 seconds | |||
return Task.Delay(0); //Buffer overloaded | |||
_frames.Enqueue(new RTPFrame( | |||
sequence: _nextSeq, | |||
timestamp: _nextTimestamp, | |||
missed: _nextMissed, | |||
payload: payload | |||
payload: CopyBuffer(buffer, offset, count) | |||
)); | |||
_signal.Release(); | |||
return Task.Delay(0); | |||
} | |||
protected byte[] CopyBuffer(byte[] buffer, int offset, int count) | |||
{ | |||
byte[] payload = _client.GetFrameBuffer(); | |||
Buffer.BlockCopy(buffer, offset, payload, 0, count); | |||
return payload; | |||
} | |||
protected override void Dispose(bool isDisposing) | |||
{ | |||
_isDisposed = true; | |||
@@ -7,7 +7,7 @@ namespace Discord.Audio.Streams | |||
public class RTPReadStream : AudioOutStream | |||
{ | |||
private readonly AudioStream _next; | |||
private readonly byte[] _buffer, _nonce; | |||
private readonly byte[] _nonce; | |||
public override bool CanRead => true; | |||
public override bool CanSeek => false; | |||
@@ -16,7 +16,6 @@ namespace Discord.Audio.Streams | |||
public RTPReadStream(AudioStream next, int bufferSize = 4000) | |||
{ | |||
_next = next; | |||
_buffer = new byte[bufferSize]; | |||
_nonce = new byte[24]; | |||
} | |||
@@ -26,6 +26,8 @@ namespace Discord.API | |||
public event Func<Exception, Task> Disconnected { add { _disconnectedEvent.Add(value); } remove { _disconnectedEvent.Remove(value); } } | |||
private readonly AsyncEvent<Func<Exception, Task>> _disconnectedEvent = new AsyncEvent<Func<Exception, Task>>(); | |||
private readonly MemoryStream _decompressionStream; | |||
private readonly JsonTextReader _decompressionJsonReader; | |||
private CancellationTokenSource _connectCancelToken; | |||
private string _gatewayUrl; | |||
private bool _isExplicitUrl; | |||
@@ -41,23 +43,23 @@ namespace Discord.API | |||
_gatewayUrl = url; | |||
if (url != null) | |||
_isExplicitUrl = true; | |||
_decompressionStream = new MemoryStream(10 * 1024); //10 KB | |||
_decompressionJsonReader = new JsonTextReader(new StreamReader(_decompressionStream)); | |||
WebSocketClient = webSocketProvider(); | |||
//WebSocketClient.SetHeader("user-agent", DiscordConfig.UserAgent); (Causes issues in .NET Framework 4.6+) | |||
WebSocketClient.BinaryMessage += async (data, index, count) => | |||
{ | |||
using (var compressed = new MemoryStream(data, index + 2, count - 2)) | |||
using (var decompressed = new MemoryStream()) | |||
{ | |||
_decompressionStream.Position = 0; | |||
using (var zlib = new DeflateStream(compressed, CompressionMode.Decompress)) | |||
zlib.CopyTo(decompressed); | |||
decompressed.Position = 0; | |||
using (var reader = new StreamReader(decompressed)) | |||
using (var jsonReader = new JsonTextReader(reader)) | |||
{ | |||
var msg = _serializer.Deserialize<SocketFrame>(jsonReader); | |||
if (msg != null) | |||
await _receivedGatewayEvent.InvokeAsync((GatewayOpCode)msg.Operation, msg.Sequence, msg.Type, msg.Payload).ConfigureAwait(false); | |||
} | |||
zlib.CopyTo(_decompressionStream); | |||
_decompressionStream.Position = 0; | |||
var msg = _serializer.Deserialize<SocketFrame>(_decompressionJsonReader); | |||
if (msg != null) | |||
await _receivedGatewayEvent.InvokeAsync((GatewayOpCode)msg.Operation, msg.Sequence, msg.Type, msg.Payload).ConfigureAwait(false); | |||
} | |||
}; | |||
WebSocketClient.TextMessage += async text => | |||
@@ -32,13 +32,15 @@ namespace Discord.Audio | |||
public event Func<VoiceOpCode, object, Task> ReceivedEvent { add { _receivedEvent.Add(value); } remove { _receivedEvent.Remove(value); } } | |||
private readonly AsyncEvent<Func<VoiceOpCode, object, Task>> _receivedEvent = new AsyncEvent<Func<VoiceOpCode, object, Task>>(); | |||
public event Func<byte[], Task> ReceivedPacket { add { _receivedPacketEvent.Add(value); } remove { _receivedPacketEvent.Remove(value); } } | |||
private readonly AsyncEvent<Func<byte[], Task>> _receivedPacketEvent = new AsyncEvent<Func<byte[], Task>>(); | |||
public event Func<byte[], int, int, Task> ReceivedPacket { add { _receivedPacketEvent.Add(value); } remove { _receivedPacketEvent.Remove(value); } } | |||
private readonly AsyncEvent<Func<byte[], int, int, Task>> _receivedPacketEvent = new AsyncEvent<Func<byte[], int, int, Task>>(); | |||
public event Func<Exception, Task> Disconnected { add { _disconnectedEvent.Add(value); } remove { _disconnectedEvent.Remove(value); } } | |||
private readonly AsyncEvent<Func<Exception, Task>> _disconnectedEvent = new AsyncEvent<Func<Exception, Task>>(); | |||
private readonly JsonSerializer _serializer; | |||
private readonly SemaphoreSlim _connectionLock; | |||
private readonly MemoryStream _decompressionStream; | |||
private readonly JsonTextReader _decompressionJsonReader; | |||
private CancellationTokenSource _connectCancelToken; | |||
private IUdpSocket _udp; | |||
private bool _isDisposed; | |||
@@ -55,38 +57,35 @@ namespace Discord.Audio | |||
GuildId = guildId; | |||
_connectionLock = new SemaphoreSlim(1, 1); | |||
_udp = udpSocketProvider(); | |||
_udp.ReceivedDatagram += async (data, index, count) => | |||
{ | |||
if (index != 0 || count != data.Length) | |||
{ | |||
var newData = new byte[count]; | |||
Buffer.BlockCopy(data, index, newData, 0, count); | |||
data = newData; | |||
} | |||
await _receivedPacketEvent.InvokeAsync(data).ConfigureAwait(false); | |||
}; | |||
_udp.ReceivedDatagram += (data, index, count) => _receivedPacketEvent.InvokeAsync(data, index, count); | |||
_decompressionStream = new MemoryStream(10 * 1024); //10 KB | |||
_decompressionJsonReader = new JsonTextReader(new StreamReader(_decompressionStream)); | |||
WebSocketClient = webSocketProvider(); | |||
//_gatewayClient.SetHeader("user-agent", DiscordConfig.UserAgent); //(Causes issues in .Net 4.6+) | |||
WebSocketClient.BinaryMessage += async (data, index, count) => | |||
{ | |||
using (var compressed = new MemoryStream(data, index + 2, count - 2)) | |||
using (var decompressed = new MemoryStream()) | |||
{ | |||
_decompressionStream.Position = 0; | |||
using (var zlib = new DeflateStream(compressed, CompressionMode.Decompress)) | |||
zlib.CopyTo(decompressed); | |||
decompressed.Position = 0; | |||
using (var reader = new StreamReader(decompressed)) | |||
{ | |||
var msg = JsonConvert.DeserializeObject<SocketFrame>(reader.ReadToEnd()); | |||
zlib.CopyTo(_decompressionStream); | |||
_decompressionStream.Position = 0; | |||
var msg = _serializer.Deserialize<SocketFrame>(_decompressionJsonReader); | |||
if (msg != null) | |||
await _receivedEvent.InvokeAsync((VoiceOpCode)msg.Operation, msg.Payload).ConfigureAwait(false); | |||
} | |||
} | |||
}; | |||
WebSocketClient.TextMessage += async text => | |||
{ | |||
var msg = JsonConvert.DeserializeObject<SocketFrame>(text); | |||
await _receivedEvent.InvokeAsync((VoiceOpCode)msg.Operation, msg.Payload).ConfigureAwait(false); | |||
using (var reader = new StringReader(text)) | |||
using (var jsonReader = new JsonTextReader(reader)) | |||
{ | |||
var msg = _serializer.Deserialize<SocketFrame>(jsonReader); | |||
if (msg != null) | |||
await _receivedEvent.InvokeAsync((VoiceOpCode)msg.Operation, msg.Payload).ConfigureAwait(false); | |||
} | |||
}; | |||
WebSocketClient.Closed += async ex => | |||
{ | |||
@@ -212,7 +212,7 @@ namespace Discord.Net.WebSockets | |||
else | |||
result = stream.ToArray(); | |||
#else | |||
result = stream.GetBuffer(); | |||
result = stream.GetBuffer(); | |||
#endif | |||
} | |||
} | |||