diff --git a/src/Discord.Net.Audio/Net/VoiceSocket.cs b/src/Discord.Net.Audio/Net/VoiceSocket.cs index 544a7d710..f6b15055b 100644 --- a/src/Discord.Net.Audio/Net/VoiceSocket.cs +++ b/src/Discord.Net.Audio/Net/VoiceSocket.cs @@ -418,7 +418,8 @@ namespace Discord.Net.WebSockets var payload = (msg.Payload as JToken).ToObject(_serializer); _heartbeatInterval = payload.HeartbeatInterval; _ssrc = payload.SSRC; - var address = (await Dns.GetHostAddressesAsync(Host.Replace("wss://", "")).ConfigureAwait(false)).FirstOrDefault(); + string hostname = Host.Substring(0, Host.IndexOf('?')).Replace("wss://", ""); + var address = (await Dns.GetHostAddressesAsync(hostname).ConfigureAwait(false)).FirstOrDefault(); _endpoint = new IPEndPoint(address, payload.Port); if (_audioConfig.EnableEncryption) diff --git a/src/Discord.Net/DiscordClient.cs b/src/Discord.Net/DiscordClient.cs index bc9c2f537..aa3d402c8 100644 --- a/src/Discord.Net/DiscordClient.cs +++ b/src/Discord.Net/DiscordClient.cs @@ -33,6 +33,7 @@ namespace Discord private ConcurrentDictionary _privateChannels; //Key = RecipientId private Dictionary _regions; private Stopwatch _connectionStopwatch; + private ConcurrentQueue _largeServers; internal Logger Logger { get; } @@ -125,6 +126,7 @@ namespace Discord _servers = new ConcurrentDictionary(2, 0); _channels = new ConcurrentDictionary(2, 0); _privateChannels = new ConcurrentDictionary(2, 0); + _largeServers = new ConcurrentQueue(); //Serialization Serializer = new JsonSerializer(); @@ -285,6 +287,9 @@ namespace Discord catch (OperationCanceledException) { } } + ulong serverId; + while (_largeServers.TryDequeue(out serverId)) { } + MessageQueue.Clear(); await GatewaySocket.Disconnect().ConfigureAwait(false); @@ -507,6 +512,8 @@ namespace Discord var server = AddServer(model.Id); server.Update(model); } + if (model.IsLarge) + _largeServers.Enqueue(model.Id); } for (int i = 0; i < data.PrivateChannels.Length; i++) { @@ -514,7 +521,36 @@ namespace Discord var channel = AddPrivateChannel(model.Id, model.Recipient.Id); channel.Update(model); } - EndConnect(); + + //Temporary hotfix to download all large guilds before raising READY + CancellationToken cancelToken = _taskManager.CancelToken; + Task.Run(async () => + { + try + { + ulong serverId; + ulong[] serverIds = new ulong[50]; + int i = 0; + + await Task.Delay(2500, cancelToken); + while (true) + { + while (_largeServers.TryDequeue(out serverId) && i < 50) + serverIds[i++] = serverId; + if (i > 0) + { + cancelToken.ThrowIfCancellationRequested(); + GatewaySocket.SendRequestMembers(serverIds, "", 0); + await Task.Delay(1500, cancelToken); + } + if (i < 50) + break; + } + await Task.Delay(2500, cancelToken); + EndConnect(); + } + catch (OperationCanceledException) { } + }); } break; @@ -528,16 +564,21 @@ namespace Discord server.Update(data); if (data.Unavailable != false) + { Logger.Info($"GUILD_CREATE: {server.Path}"); + OnJoinedServer(server); + } else Logger.Info($"GUILD_AVAILABLE: {server.Path}"); - if (data.IsLarge) - GatewaySocket.SendRequestMembers(new ulong[] { data.Id }, "", 0); + if (!data.IsLarge) + OnServerAvailable(server); else { - OnJoinedServer(server); - OnServerAvailable(server); + if (State == ConnectionState.Connected) + GatewaySocket.SendRequestMembers(new ulong[] { data.Id }, "", 0); + else + _largeServers.Enqueue(data.Id); } } } diff --git a/src/Discord.Net/Net/WebSockets/WebSocket.cs b/src/Discord.Net/Net/WebSockets/WebSocket.cs index aa2c9a98b..d3f70f14c 100644 --- a/src/Discord.Net/Net/WebSockets/WebSocket.cs +++ b/src/Discord.Net/Net/WebSockets/WebSocket.cs @@ -10,36 +10,36 @@ using System.Threading.Tasks; namespace Discord.Net.WebSockets { - public abstract partial class WebSocket + public abstract partial class WebSocket { private readonly AsyncLock _lock; protected readonly IWebSocketEngine _engine; - protected readonly DiscordConfig _config; - protected readonly ManualResetEventSlim _connectedEvent; + protected readonly DiscordConfig _config; + protected readonly ManualResetEventSlim _connectedEvent; protected readonly TaskManager _taskManager; protected readonly JsonSerializer _serializer; protected CancellationTokenSource _cancelSource; protected CancellationToken _parentCancelToken; protected int _heartbeatInterval; - private DateTime _lastHeartbeat; + private DateTime _lastHeartbeat; /// Gets the logger used for this client. protected internal Logger Logger { get; } public CancellationToken CancelToken { get; private set; } - public string Host { get; set; } + public string Host { get; set; } /// Gets the current connection state of this client. public ConnectionState State { get; private set; } public event EventHandler Connected = delegate { }; - private void OnConnected() - => Connected(this, EventArgs.Empty); + private void OnConnected() + => Connected(this, EventArgs.Empty); public event EventHandler Disconnected = delegate { }; - private void OnDisconnected(bool wasUnexpected, Exception error) + private void OnDisconnected(bool wasUnexpected, Exception error) => Disconnected(this, new DisconnectedEventArgs(wasUnexpected, error)); - public WebSocket(DiscordConfig config, JsonSerializer serializer, Logger logger) - { + public WebSocket(DiscordConfig config, JsonSerializer serializer, Logger logger) + { _config = config; _serializer = serializer; Logger = logger; @@ -47,30 +47,30 @@ namespace Discord.Net.WebSockets _lock = new AsyncLock(); _taskManager = new TaskManager(Cleanup); CancelToken = new CancellationToken(true); - _connectedEvent = new ManualResetEventSlim(false); + _connectedEvent = new ManualResetEventSlim(false); #if !DOTNET5_4 - _engine = new WS4NetEngine(config, _taskManager); + _engine = new WS4NetEngine(config, _taskManager); #else _engine = new BuiltInEngine(config); #endif _engine.BinaryMessage += (s, e) => { - using (var compressed = new MemoryStream(e.Data, 2, e.Data.Length - 2)) - using (var decompressed = new MemoryStream()) - { - using (var zlib = new DeflateStream(compressed, CompressionMode.Decompress)) - zlib.CopyTo(decompressed); - decompressed.Position = 0; + using (var compressed = new MemoryStream(e.Data, 2, e.Data.Length - 2)) + using (var decompressed = new MemoryStream()) + { + using (var zlib = new DeflateStream(compressed, CompressionMode.Decompress)) + zlib.CopyTo(decompressed); + decompressed.Position = 0; using (var reader = new StreamReader(decompressed)) - ProcessMessage(reader.ReadToEnd()).GetAwaiter().GetResult(); - } + ProcessMessage(reader.ReadToEnd()).GetAwaiter().GetResult(); + } }; - _engine.TextMessage += (s, e) => ProcessMessage(e.Message).Wait(); - } + _engine.TextMessage += (s, e) => ProcessMessage(e.Message).Wait(); + } - protected async Task BeginConnect(CancellationToken parentCancelToken) - { + protected async Task BeginConnect(CancellationToken parentCancelToken) + { try { using (await _lock.LockAsync().ConfigureAwait(false)) @@ -80,7 +80,7 @@ namespace Discord.Net.WebSockets await _taskManager.Stop().ConfigureAwait(false); _taskManager.ClearException(); State = ConnectionState.Connecting; - + _cancelSource = new CancellationTokenSource(); CancelToken = CancellationTokenSource.CreateLinkedTokenSource(_cancelSource.Token, parentCancelToken).Token; _lastHeartbeat = DateTime.UtcNow; @@ -88,39 +88,39 @@ namespace Discord.Net.WebSockets await _engine.Connect(Host, CancelToken).ConfigureAwait(false); await Run().ConfigureAwait(false); } - } - catch (Exception ex) - { + } + catch (Exception ex) + { //TODO: Should this be inside the lock? await _taskManager.SignalError(ex).ConfigureAwait(false); throw; - } - } - protected async Task EndConnect() - { - try + } + } + protected async Task EndConnect() + { + try { State = ConnectionState.Connected; Logger.Info($"Connected"); OnConnected(); _connectedEvent.Set(); - } - catch (Exception ex) + } + catch (Exception ex) { await _taskManager.SignalError(ex).ConfigureAwait(false); } - } + } - protected abstract Task Run(); - protected virtual async Task Cleanup() - { + protected abstract Task Run(); + protected virtual async Task Cleanup() + { var oldState = State; State = ConnectionState.Disconnecting; await _engine.Disconnect().ConfigureAwait(false); - _cancelSource = null; - _connectedEvent.Reset(); + _cancelSource = null; + _connectedEvent.Reset(); if (oldState == ConnectionState.Connecting || oldState == ConnectionState.Connected) { @@ -136,35 +136,35 @@ namespace Discord.Net.WebSockets State = ConnectionState.Disconnected; } - protected virtual Task ProcessMessage(string json) - { - return TaskHelper.CompletedTask; - } - protected void QueueMessage(IWebSocketMessage message) - { - string json = JsonConvert.SerializeObject(new WebSocketMessage(message)); - _engine.QueueMessage(json); - } - - protected Task HeartbeatAsync(CancellationToken cancelToken) - { - return Task.Run(async () => - { - try - { - while (!cancelToken.IsCancellationRequested) - { - if (this.State == ConnectionState.Connected && _heartbeatInterval > 0) - { + protected virtual Task ProcessMessage(string json) + { + return TaskHelper.CompletedTask; + } + protected void QueueMessage(IWebSocketMessage message) + { + string json = JsonConvert.SerializeObject(new WebSocketMessage(message)); + _engine.QueueMessage(json); + } + + protected Task HeartbeatAsync(CancellationToken cancelToken) + { + return Task.Run(async () => + { + try + { + while (!cancelToken.IsCancellationRequested) + { + if (this.State == ConnectionState.Connected && _heartbeatInterval > 0) + { SendHeartbeat(); - await Task.Delay(_heartbeatInterval, cancelToken).ConfigureAwait(false); - } - else - await Task.Delay(1000, cancelToken).ConfigureAwait(false); - } - } - catch (OperationCanceledException) { } - }); + await Task.Delay(_heartbeatInterval, cancelToken).ConfigureAwait(false); + } + else + await Task.Delay(1000, cancelToken).ConfigureAwait(false); + } + } + catch (OperationCanceledException) { } + }); } public abstract void SendHeartbeat(); @@ -184,5 +184,5 @@ namespace Discord.Net.WebSockets throw; } } - } + } } diff --git a/src/Discord.Net/TaskManager.cs b/src/Discord.Net/TaskManager.cs index d43372396..fa4d0a52f 100644 --- a/src/Discord.Net/TaskManager.cs +++ b/src/Discord.Net/TaskManager.cs @@ -20,6 +20,7 @@ namespace Discord public bool StopOnCompletion { get; } public bool WasStopExpected { get; private set; } + public CancellationToken CancelToken => _cancelSource.Token; public Exception Exception => _stopReason?.SourceException;