diff --git a/src/Discord.Net.WebSocket/Audio/AudioClient.Events.cs b/src/Discord.Net.WebSocket/Audio/AudioClient.Events.cs index 20ebe7e6d..b3e438a01 100644 --- a/src/Discord.Net.WebSocket/Audio/AudioClient.Events.cs +++ b/src/Discord.Net.WebSocket/Audio/AudioClient.Events.cs @@ -23,6 +23,12 @@ namespace Discord.Audio remove { _latencyUpdatedEvent.Remove(value); } } private readonly AsyncEvent> _latencyUpdatedEvent = new AsyncEvent>(); + public event Func UdpLatencyUpdated + { + add { _udpLatencyUpdatedEvent.Add(value); } + remove { _udpLatencyUpdatedEvent.Remove(value); } + } + private readonly AsyncEvent> _udpLatencyUpdatedEvent = new AsyncEvent>(); public event Func StreamCreated { add { _streamCreatedEvent.Add(value); } diff --git a/src/Discord.Net.WebSocket/Audio/AudioClient.cs b/src/Discord.Net.WebSocket/Audio/AudioClient.cs index 717393951..39814f9bf 100644 --- a/src/Discord.Net.WebSocket/Audio/AudioClient.cs +++ b/src/Discord.Net.WebSocket/Audio/AudioClient.cs @@ -11,6 +11,7 @@ using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; +using System.Collections.Generic; namespace Discord.Audio { @@ -34,10 +35,11 @@ namespace Discord.Audio private readonly ConnectionManager _connection; private readonly SemaphoreSlim _stateLock; private readonly ConcurrentQueue _heartbeatTimes; + private readonly ConcurrentQueue> _keepaliveTimes; private readonly ConcurrentDictionary _ssrcMap; private readonly ConcurrentDictionary _streams; - private Task _heartbeatTask; + private Task _heartbeatTask, _keepaliveTask; private long _lastMessageTime; private string _url, _sessionId, _token; private ulong _userId; @@ -46,6 +48,7 @@ namespace Discord.Audio public SocketGuild Guild { get; } public DiscordVoiceAPIClient ApiClient { get; private set; } public int Latency { get; private set; } + public int UdpLatency { get; private set; } public ulong ChannelId { get; internal set; } internal byte[] SecretKey { get; private set; } @@ -72,6 +75,7 @@ namespace Discord.Audio _connection.Connected += () => _connectedEvent.InvokeAsync(); _connection.Disconnected += (ex, recon) => _disconnectedEvent.InvokeAsync(ex); _heartbeatTimes = new ConcurrentQueue(); + _keepaliveTimes = new ConcurrentQueue>(); _ssrcMap = new ConcurrentDictionary(); _streams = new ConcurrentDictionary(); @@ -83,6 +87,7 @@ namespace Discord.Audio }; LatencyUpdated += async (old, val) => await _audioLogger.DebugAsync($"Latency = {val} ms").ConfigureAwait(false); + UdpLatencyUpdated += async (old, val) => await _audioLogger.DebugAsync($"UDP Latency = {val} ms").ConfigureAwait(false); } internal async Task StartAsync(string url, ulong userId, string sessionId, string token) @@ -119,6 +124,10 @@ namespace Discord.Audio if (heartbeatTask != null) await heartbeatTask.ConfigureAwait(false); _heartbeatTask = null; + var keepaliveTask = _keepaliveTask; + if (keepaliveTask != null) + await keepaliveTask.ConfigureAwait(false); + _keepaliveTask = null; long time; while (_heartbeatTimes.TryDequeue(out time)) { } @@ -242,6 +251,7 @@ namespace Discord.Audio SecretKey = data.SecretKey; await ApiClient.SendSetSpeaking(false).ConfigureAwait(false); + _keepaliveTask = RunKeepaliveAsync(5000, _connection.CancelToken); var _ = _connection.CompleteAsync(); } @@ -284,60 +294,94 @@ namespace Discord.Audio } private async Task ProcessPacketAsync(byte[] packet) { - if (_connection.State == ConnectionState.Connecting) + try { - if (packet.Length != 70) - { - await _audioLogger.DebugAsync($"Malformed Packet").ConfigureAwait(false); - return; - } - string ip; - int port; - try + if (_connection.State == ConnectionState.Connecting) { - ip = Encoding.UTF8.GetString(packet, 4, 70 - 6).TrimEnd('\0'); - port = (packet[69] << 8) | packet[68]; + if (packet.Length != 70) + { + await _audioLogger.DebugAsync($"Malformed Packet").ConfigureAwait(false); + return; + } + string ip; + int port; + try + { + ip = Encoding.UTF8.GetString(packet, 4, 70 - 6).TrimEnd('\0'); + port = (packet[69] << 8) | packet[68]; + } + catch (Exception ex) + { + await _audioLogger.DebugAsync($"Malformed Packet", ex).ConfigureAwait(false); + return; + } + + await _audioLogger.DebugAsync("Received Discovery").ConfigureAwait(false); + await ApiClient.SendSelectProtocol(ip, port).ConfigureAwait(false); } - catch (Exception ex) + else if (_connection.State == ConnectionState.Connected) { - await _audioLogger.DebugAsync($"Malformed Packet", ex).ConfigureAwait(false); - return; + if (packet.Length == 8) + { + await _audioLogger.DebugAsync("Received Keepalive").ConfigureAwait(false); + + ulong value = + ((ulong)packet[0] >> 0) | + ((ulong)packet[1] >> 8) | + ((ulong)packet[2] >> 16) | + ((ulong)packet[3] >> 24) | + ((ulong)packet[4] >> 32) | + ((ulong)packet[5] >> 40) | + ((ulong)packet[6] >> 48) | + ((ulong)packet[7] >> 56); + + while (_keepaliveTimes.TryDequeue(out var pair)) + { + if (pair.Key == value) + { + int latency = (int)(Environment.TickCount - pair.Value); + int before = UdpLatency; + UdpLatency = latency; + + await _udpLatencyUpdatedEvent.InvokeAsync(before, latency).ConfigureAwait(false); + break; + } + } + } + else + { + if (!RTPReadStream.TryReadSsrc(packet, 0, out var ssrc)) + { + await _audioLogger.DebugAsync($"Malformed Frame").ConfigureAwait(false); + return; + } + if (!_ssrcMap.TryGetValue(ssrc, out var userId)) + { + await _audioLogger.DebugAsync($"Unknown SSRC {ssrc}").ConfigureAwait(false); + return; + } + if (!_streams.TryGetValue(userId, out var pair)) + { + await _audioLogger.DebugAsync($"Unknown User {userId}").ConfigureAwait(false); + return; + } + try + { + await pair.Writer.WriteAsync(packet, 0, packet.Length).ConfigureAwait(false); + } + catch (Exception ex) + { + await _audioLogger.DebugAsync($"Malformed Frame", ex).ConfigureAwait(false); + return; + } + //await _audioLogger.DebugAsync($"Received {packet.Length} bytes from user {userId}").ConfigureAwait(false); + } } - - await _audioLogger.DebugAsync("Received Discovery").ConfigureAwait(false); - await ApiClient.SendSelectProtocol(ip, port).ConfigureAwait(false); } - else if (_connection.State == ConnectionState.Connected) + catch (Exception ex) { - uint ssrc; - ulong userId; - StreamPair pair; - - if (!RTPReadStream.TryReadSsrc(packet, 0, out ssrc)) - { - await _audioLogger.DebugAsync($"Malformed Frame").ConfigureAwait(false); - return; - } - if (!_ssrcMap.TryGetValue(ssrc, out userId)) - { - await _audioLogger.DebugAsync($"Unknown SSRC {ssrc}").ConfigureAwait(false); - return; - } - if (!_streams.TryGetValue(userId, out pair)) - { - await _audioLogger.DebugAsync($"Unknown User {userId}").ConfigureAwait(false); - return; - } - try - { - await pair.Writer.WriteAsync(packet, 0, packet.Length).ConfigureAwait(false); - } - catch (Exception ex) - { - await _audioLogger.DebugAsync($"Malformed Frame", ex).ConfigureAwait(false); - return; - } - //await _audioLogger.DebugAsync($"Received {packet.Length} bytes from user {userId}").ConfigureAwait(false); + await _audioLogger.WarningAsync($"Failed to process UDP packet", ex).ConfigureAwait(false); + return; } } @@ -366,7 +410,7 @@ namespace Discord.Audio } catch (Exception ex) { - await _audioLogger.WarningAsync("Heartbeat Errored", ex).ConfigureAwait(false); + await _audioLogger.WarningAsync("Failed to send heartbeat", ex).ConfigureAwait(false); } await Task.Delay(intervalMillis, cancelToken).ConfigureAwait(false); @@ -382,6 +426,40 @@ namespace Discord.Audio await _audioLogger.ErrorAsync("Heartbeat Errored", ex).ConfigureAwait(false); } } + private async Task RunKeepaliveAsync(int intervalMillis, CancellationToken cancelToken) + { + var packet = new byte[8]; + try + { + await _audioLogger.DebugAsync("Keepalive Started").ConfigureAwait(false); + while (!cancelToken.IsCancellationRequested) + { + var now = Environment.TickCount; + + try + { + ulong value = await ApiClient.SendKeepaliveAsync().ConfigureAwait(false); + if (_keepaliveTimes.Count < 12) //No reply for 60 Seconds + _keepaliveTimes.Enqueue(new KeyValuePair(value, now)); + } + catch (Exception ex) + { + await _audioLogger.WarningAsync("Failed to send keepalive", ex).ConfigureAwait(false); + } + + await Task.Delay(intervalMillis, cancelToken).ConfigureAwait(false); + } + await _audioLogger.DebugAsync("Keepalive Stopped").ConfigureAwait(false); + } + catch (OperationCanceledException) + { + await _audioLogger.DebugAsync("Keepalive Stopped").ConfigureAwait(false); + } + catch (Exception ex) + { + await _audioLogger.ErrorAsync("Keepalive Errored", ex).ConfigureAwait(false); + } + } internal void Dispose(bool disposing) { diff --git a/src/Discord.Net.WebSocket/DiscordVoiceApiClient.cs b/src/Discord.Net.WebSocket/DiscordVoiceApiClient.cs index f66050bc1..05671d71d 100644 --- a/src/Discord.Net.WebSocket/DiscordVoiceApiClient.cs +++ b/src/Discord.Net.WebSocket/DiscordVoiceApiClient.cs @@ -42,6 +42,7 @@ namespace Discord.Audio private CancellationTokenSource _connectCancelToken; private IUdpSocket _udp; private bool _isDisposed; + private ulong _nextKeepalive; public ulong GuildId { get; } internal IWebSocketClient WebSocketClient { get; } @@ -227,6 +228,21 @@ namespace Discord.Audio await SendAsync(packet, 0, 70).ConfigureAwait(false); await _sentDiscoveryEvent.InvokeAsync().ConfigureAwait(false); } + public async Task SendKeepaliveAsync() + { + var value = _nextKeepalive++; + var packet = new byte[8]; + packet[0] = (byte)(value >> 0); + packet[1] = (byte)(value >> 8); + packet[2] = (byte)(value >> 16); + packet[3] = (byte)(value >> 24); + packet[4] = (byte)(value >> 32); + packet[5] = (byte)(value >> 40); + packet[6] = (byte)(value >> 48); + packet[7] = (byte)(value >> 56); + await SendAsync(packet, 0, 8).ConfigureAwait(false); + return value; + } public void SetUdpEndpoint(string ip, int port) {