Browse Source

Add a timeout to guild member chunking, fix voice connections, delay READY.

pull/46/merge
RogueException 9 years ago
parent
commit
8f0bc1933a
4 changed files with 119 additions and 76 deletions
  1. +2
    -1
      src/Discord.Net.Audio/Net/VoiceSocket.cs
  2. +46
    -5
      src/Discord.Net/DiscordClient.cs
  3. +70
    -70
      src/Discord.Net/Net/WebSockets/WebSocket.cs
  4. +1
    -0
      src/Discord.Net/TaskManager.cs

+ 2
- 1
src/Discord.Net.Audio/Net/VoiceSocket.cs View File

@@ -418,7 +418,8 @@ namespace Discord.Net.WebSockets
var payload = (msg.Payload as JToken).ToObject<ReadyEvent>(_serializer);
_heartbeatInterval = payload.HeartbeatInterval;
_ssrc = payload.SSRC;
var address = (await Dns.GetHostAddressesAsync(Host.Replace("wss://", "")).ConfigureAwait(false)).FirstOrDefault();
string hostname = Host.Substring(0, Host.IndexOf('?')).Replace("wss://", "");
var address = (await Dns.GetHostAddressesAsync(hostname).ConfigureAwait(false)).FirstOrDefault();
_endpoint = new IPEndPoint(address, payload.Port);

if (_audioConfig.EnableEncryption)


+ 46
- 5
src/Discord.Net/DiscordClient.cs View File

@@ -33,6 +33,7 @@ namespace Discord
private ConcurrentDictionary<ulong, Channel> _privateChannels; //Key = RecipientId
private Dictionary<string, Region> _regions;
private Stopwatch _connectionStopwatch;
private ConcurrentQueue<ulong> _largeServers;

internal Logger Logger { get; }

@@ -125,6 +126,7 @@ namespace Discord
_servers = new ConcurrentDictionary<ulong, Server>(2, 0);
_channels = new ConcurrentDictionary<ulong, Channel>(2, 0);
_privateChannels = new ConcurrentDictionary<ulong, Channel>(2, 0);
_largeServers = new ConcurrentQueue<ulong>();

//Serialization
Serializer = new JsonSerializer();
@@ -285,6 +287,9 @@ namespace Discord
catch (OperationCanceledException) { }
}

ulong serverId;
while (_largeServers.TryDequeue(out serverId)) { }

MessageQueue.Clear();

await GatewaySocket.Disconnect().ConfigureAwait(false);
@@ -507,6 +512,8 @@ namespace Discord
var server = AddServer(model.Id);
server.Update(model);
}
if (model.IsLarge)
_largeServers.Enqueue(model.Id);
}
for (int i = 0; i < data.PrivateChannels.Length; i++)
{
@@ -514,7 +521,36 @@ namespace Discord
var channel = AddPrivateChannel(model.Id, model.Recipient.Id);
channel.Update(model);
}
EndConnect();

//Temporary hotfix to download all large guilds before raising READY
CancellationToken cancelToken = _taskManager.CancelToken;
Task.Run(async () =>
{
try
{
ulong serverId;
ulong[] serverIds = new ulong[50];
int i = 0;

await Task.Delay(2500, cancelToken);
while (true)
{
while (_largeServers.TryDequeue(out serverId) && i < 50)
serverIds[i++] = serverId;
if (i > 0)
{
cancelToken.ThrowIfCancellationRequested();
GatewaySocket.SendRequestMembers(serverIds, "", 0);
await Task.Delay(1500, cancelToken);
}
if (i < 50)
break;
}
await Task.Delay(2500, cancelToken);
EndConnect();
}
catch (OperationCanceledException) { }
});
}
break;

@@ -528,16 +564,21 @@ namespace Discord
server.Update(data);

if (data.Unavailable != false)
{
Logger.Info($"GUILD_CREATE: {server.Path}");
OnJoinedServer(server);
}
else
Logger.Info($"GUILD_AVAILABLE: {server.Path}");

if (data.IsLarge)
GatewaySocket.SendRequestMembers(new ulong[] { data.Id }, "", 0);
if (!data.IsLarge)
OnServerAvailable(server);
else
{
OnJoinedServer(server);
OnServerAvailable(server);
if (State == ConnectionState.Connected)
GatewaySocket.SendRequestMembers(new ulong[] { data.Id }, "", 0);
else
_largeServers.Enqueue(data.Id);
}
}
}


+ 70
- 70
src/Discord.Net/Net/WebSockets/WebSocket.cs View File

@@ -10,36 +10,36 @@ using System.Threading.Tasks;

namespace Discord.Net.WebSockets
{
public abstract partial class WebSocket
public abstract partial class WebSocket
{
private readonly AsyncLock _lock;
protected readonly IWebSocketEngine _engine;
protected readonly DiscordConfig _config;
protected readonly ManualResetEventSlim _connectedEvent;
protected readonly DiscordConfig _config;
protected readonly ManualResetEventSlim _connectedEvent;
protected readonly TaskManager _taskManager;
protected readonly JsonSerializer _serializer;
protected CancellationTokenSource _cancelSource;
protected CancellationToken _parentCancelToken;
protected int _heartbeatInterval;
private DateTime _lastHeartbeat;
private DateTime _lastHeartbeat;

/// <summary> Gets the logger used for this client. </summary>
protected internal Logger Logger { get; }
public CancellationToken CancelToken { get; private set; }

public string Host { get; set; }
public string Host { get; set; }
/// <summary> Gets the current connection state of this client. </summary>
public ConnectionState State { get; private set; }

public event EventHandler Connected = delegate { };
private void OnConnected()
=> Connected(this, EventArgs.Empty);
private void OnConnected()
=> Connected(this, EventArgs.Empty);
public event EventHandler<DisconnectedEventArgs> Disconnected = delegate { };
private void OnDisconnected(bool wasUnexpected, Exception error)
private void OnDisconnected(bool wasUnexpected, Exception error)
=> Disconnected(this, new DisconnectedEventArgs(wasUnexpected, error));

public WebSocket(DiscordConfig config, JsonSerializer serializer, Logger logger)
{
public WebSocket(DiscordConfig config, JsonSerializer serializer, Logger logger)
{
_config = config;
_serializer = serializer;
Logger = logger;
@@ -47,30 +47,30 @@ namespace Discord.Net.WebSockets
_lock = new AsyncLock();
_taskManager = new TaskManager(Cleanup);
CancelToken = new CancellationToken(true);
_connectedEvent = new ManualResetEventSlim(false);
_connectedEvent = new ManualResetEventSlim(false);

#if !DOTNET5_4
_engine = new WS4NetEngine(config, _taskManager);
_engine = new WS4NetEngine(config, _taskManager);
#else
_engine = new BuiltInEngine(config);
#endif
_engine.BinaryMessage += (s, e) =>
{
using (var compressed = new MemoryStream(e.Data, 2, e.Data.Length - 2))
using (var decompressed = new MemoryStream())
{
using (var zlib = new DeflateStream(compressed, CompressionMode.Decompress))
zlib.CopyTo(decompressed);
decompressed.Position = 0;
using (var compressed = new MemoryStream(e.Data, 2, e.Data.Length - 2))
using (var decompressed = new MemoryStream())
{
using (var zlib = new DeflateStream(compressed, CompressionMode.Decompress))
zlib.CopyTo(decompressed);
decompressed.Position = 0;
using (var reader = new StreamReader(decompressed))
ProcessMessage(reader.ReadToEnd()).GetAwaiter().GetResult();
}
ProcessMessage(reader.ReadToEnd()).GetAwaiter().GetResult();
}
};
_engine.TextMessage += (s, e) => ProcessMessage(e.Message).Wait();
}
_engine.TextMessage += (s, e) => ProcessMessage(e.Message).Wait();
}

protected async Task BeginConnect(CancellationToken parentCancelToken)
{
protected async Task BeginConnect(CancellationToken parentCancelToken)
{
try
{
using (await _lock.LockAsync().ConfigureAwait(false))
@@ -80,7 +80,7 @@ namespace Discord.Net.WebSockets
await _taskManager.Stop().ConfigureAwait(false);
_taskManager.ClearException();
State = ConnectionState.Connecting;
_cancelSource = new CancellationTokenSource();
CancelToken = CancellationTokenSource.CreateLinkedTokenSource(_cancelSource.Token, parentCancelToken).Token;
_lastHeartbeat = DateTime.UtcNow;
@@ -88,39 +88,39 @@ namespace Discord.Net.WebSockets
await _engine.Connect(Host, CancelToken).ConfigureAwait(false);
await Run().ConfigureAwait(false);
}
}
catch (Exception ex)
{
}
catch (Exception ex)
{
//TODO: Should this be inside the lock?
await _taskManager.SignalError(ex).ConfigureAwait(false);
throw;
}
}
protected async Task EndConnect()
{
try
}
}
protected async Task EndConnect()
{
try
{
State = ConnectionState.Connected;
Logger.Info($"Connected");

OnConnected();
_connectedEvent.Set();
}
catch (Exception ex)
}
catch (Exception ex)
{
await _taskManager.SignalError(ex).ConfigureAwait(false);
}
}
}

protected abstract Task Run();
protected virtual async Task Cleanup()
{
protected abstract Task Run();
protected virtual async Task Cleanup()
{
var oldState = State;
State = ConnectionState.Disconnecting;

await _engine.Disconnect().ConfigureAwait(false);
_cancelSource = null;
_connectedEvent.Reset();
_cancelSource = null;
_connectedEvent.Reset();

if (oldState == ConnectionState.Connecting || oldState == ConnectionState.Connected)
{
@@ -136,35 +136,35 @@ namespace Discord.Net.WebSockets
State = ConnectionState.Disconnected;
}

protected virtual Task ProcessMessage(string json)
{
return TaskHelper.CompletedTask;
}
protected void QueueMessage(IWebSocketMessage message)
{
string json = JsonConvert.SerializeObject(new WebSocketMessage(message));
_engine.QueueMessage(json);
}
protected Task HeartbeatAsync(CancellationToken cancelToken)
{
return Task.Run(async () =>
{
try
{
while (!cancelToken.IsCancellationRequested)
{
if (this.State == ConnectionState.Connected && _heartbeatInterval > 0)
{
protected virtual Task ProcessMessage(string json)
{
return TaskHelper.CompletedTask;
}
protected void QueueMessage(IWebSocketMessage message)
{
string json = JsonConvert.SerializeObject(new WebSocketMessage(message));
_engine.QueueMessage(json);
}
protected Task HeartbeatAsync(CancellationToken cancelToken)
{
return Task.Run(async () =>
{
try
{
while (!cancelToken.IsCancellationRequested)
{
if (this.State == ConnectionState.Connected && _heartbeatInterval > 0)
{
SendHeartbeat();
await Task.Delay(_heartbeatInterval, cancelToken).ConfigureAwait(false);
}
else
await Task.Delay(1000, cancelToken).ConfigureAwait(false);
}
}
catch (OperationCanceledException) { }
});
await Task.Delay(_heartbeatInterval, cancelToken).ConfigureAwait(false);
}
else
await Task.Delay(1000, cancelToken).ConfigureAwait(false);
}
}
catch (OperationCanceledException) { }
});
}
public abstract void SendHeartbeat();

@@ -184,5 +184,5 @@ namespace Discord.Net.WebSockets
throw;
}
}
}
}
}

+ 1
- 0
src/Discord.Net/TaskManager.cs View File

@@ -20,6 +20,7 @@ namespace Discord

public bool StopOnCompletion { get; }
public bool WasStopExpected { get; private set; }
public CancellationToken CancelToken => _cancelSource.Token;

public Exception Exception => _stopReason?.SourceException;



Loading…
Cancel
Save