@@ -38,7 +38,7 @@ | |||
<AllowUnsafeBlocks>true</AllowUnsafeBlocks> | |||
<LangVersion>6</LangVersion> | |||
</PropertyGroup> | |||
<PropertyGroup Condition="'$(Configuration)|$(Platform)' == 'FullDebug|AnyCPU'"> | |||
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'TestResponses|AnyCPU' "> | |||
<DebugSymbols>true</DebugSymbols> | |||
<OutputPath>bin\FullDebug\</OutputPath> | |||
<DefineConstants>TRACE;DEBUG;NET45,TEST_RESPONSES</DefineConstants> | |||
@@ -515,6 +515,9 @@ | |||
<Compile Include="..\Discord.Net\Net\WebSocketException.cs"> | |||
<Link>Net\WebSockets\WebSocketException.cs</Link> | |||
</Compile> | |||
<Compile Include="..\Discord.Net\Net\WebSockets\BuiltInEngine.cs"> | |||
<Link>Net\WebSockets\BuiltInEngine.cs</Link> | |||
</Compile> | |||
<Compile Include="..\Discord.Net\Net\WebSockets\GatewaySocket.cs"> | |||
<Link>Net\WebSockets\GatewaySocket.cs</Link> | |||
</Compile> | |||
@@ -0,0 +1,137 @@ | |||
#if DOTNET5_4 | |||
using Discord.Logging; | |||
using System; | |||
using System.IO; | |||
using System.Linq; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
using System.Net.Http; | |||
using System.Net; | |||
using System.Text; | |||
using System.Globalization; | |||
namespace Discord.Net.Rest | |||
{ | |||
internal sealed class BuiltInEngine : IRestEngine | |||
{ | |||
private readonly DiscordConfig _config; | |||
private readonly HttpClient _client; | |||
private readonly string _baseUrl; | |||
private readonly object _rateLimitLock; | |||
private DateTime _rateLimitTime; | |||
internal Logger Logger { get; } | |||
public BuiltInEngine(DiscordConfig config, string baseUrl, Logger logger) | |||
{ | |||
_config = config; | |||
_baseUrl = baseUrl; | |||
_rateLimitLock = new object(); | |||
_client = new HttpClient(new HttpClientHandler | |||
{ | |||
AutomaticDecompression = DecompressionMethods.Deflate | DecompressionMethods.GZip, | |||
UseCookies = false, | |||
UseProxy = false, | |||
PreAuthenticate = false //We do auth ourselves | |||
}); | |||
_client.DefaultRequestHeaders.Add("accept", "*/*"); | |||
_client.DefaultRequestHeaders.Add("accept-encoding", "gzip,deflate"); | |||
_client.DefaultRequestHeaders.Add("user-agent", config.UserAgent); | |||
} | |||
public void SetToken(string token) | |||
{ | |||
_client.DefaultRequestHeaders.Remove("authorization"); | |||
if (token != null) | |||
_client.DefaultRequestHeaders.Add("authorization", token); | |||
} | |||
public async Task<string> Send(string method, string path, string json, CancellationToken cancelToken) | |||
{ | |||
using (var request = new HttpRequestMessage(GetMethod(method), _baseUrl + path)) | |||
{ | |||
if (json != null) | |||
request.Content = new StringContent(json, Encoding.UTF8, "application/json"); | |||
return await Send(request, cancelToken); | |||
} | |||
} | |||
public async Task<string> SendFile(string method, string path, string filename, Stream stream, CancellationToken cancelToken) | |||
{ | |||
using (var request = new HttpRequestMessage(GetMethod(method), _baseUrl + path)) | |||
{ | |||
var content = new MultipartFormDataContent("Upload----" + DateTime.Now.ToString(CultureInfo.InvariantCulture)); | |||
content.Add(new StreamContent(File.OpenRead(path)), "file", filename); | |||
request.Content = content; | |||
return await Send(request, cancelToken); | |||
} | |||
} | |||
private async Task<string> Send(HttpRequestMessage request, CancellationToken cancelToken) | |||
{ | |||
int retryCount = 0; | |||
while (true) | |||
{ | |||
HttpResponseMessage response; | |||
try | |||
{ | |||
response = await _client.SendAsync(request, cancelToken).ConfigureAwait(false); | |||
} | |||
catch (WebException ex) | |||
{ | |||
//The request was aborted: Could not create SSL/TLS secure channel. | |||
if (ex.HResult == -2146233079 && retryCount++ < 5) | |||
continue; //Retrying seems to fix this somehow? | |||
throw; | |||
} | |||
int statusCode = (int)response.StatusCode; | |||
if (statusCode == 429) //Rate limit | |||
{ | |||
var retryAfter = response.Headers | |||
.Where(x => x.Key.Equals("Retry-After", StringComparison.OrdinalIgnoreCase)) | |||
.Select(x => x.Value.FirstOrDefault()) | |||
.FirstOrDefault(); | |||
int milliseconds; | |||
if (retryAfter != null && int.TryParse(retryAfter, out milliseconds)) | |||
{ | |||
var now = DateTime.UtcNow; | |||
if (now >= _rateLimitTime) | |||
{ | |||
lock (_rateLimitLock) | |||
{ | |||
if (now >= _rateLimitTime) | |||
{ | |||
_rateLimitTime = now.AddMilliseconds(milliseconds); | |||
Logger.Warning($"Rate limit hit, waiting {Math.Round(milliseconds / 1000.0f, 2)} seconds"); | |||
} | |||
} | |||
} | |||
await Task.Delay(milliseconds, cancelToken).ConfigureAwait(false); | |||
continue; | |||
} | |||
throw new HttpException(response.StatusCode); | |||
} | |||
else if (statusCode < 200 || statusCode >= 300) //2xx = Success | |||
throw new HttpException(response.StatusCode); | |||
else | |||
return await response.Content.ReadAsStringAsync(); | |||
} | |||
} | |||
private static readonly HttpMethod _patch = new HttpMethod("PATCH"); | |||
private HttpMethod GetMethod(string method) | |||
{ | |||
switch (method) | |||
{ | |||
case "DELETE": return HttpMethod.Delete; | |||
case "GET": return HttpMethod.Get; | |||
case "PATCH": return _patch; | |||
case "POST": return HttpMethod.Post; | |||
case "PUT": return HttpMethod.Put; | |||
default: throw new InvalidOperationException($"Unknown HttpMethod: {method}"); | |||
} | |||
} | |||
} | |||
} | |||
#endif |
@@ -51,7 +51,7 @@ namespace Discord.Net.Rest | |||
#if !DOTNET5_4 | |||
_engine = new RestSharpEngine(config, baseUrl, logger); | |||
#else | |||
//_engine = new BuiltInRestEngine(config, baseUrl, logger); | |||
_engine = new BuiltInEngine(config, baseUrl, logger); | |||
#endif | |||
} | |||
@@ -51,7 +51,7 @@ namespace Discord.Net.Rest | |||
} | |||
public Task<string> SendFile(string method, string path, string filename, Stream stream, CancellationToken cancelToken) | |||
{ | |||
var request = new RestRequest(path, Method.POST); | |||
var request = new RestRequest(path, GetMethod(method)); | |||
request.AddHeader("content-length", (stream.Length - stream.Position).ToString()); | |||
byte[] bytes = new byte[stream.Length - stream.Position]; | |||
@@ -79,6 +79,7 @@ namespace Discord.Net.Rest | |||
{ | |||
var retryAfter = response.Headers | |||
.FirstOrDefault(x => x.Name.Equals("Retry-After", StringComparison.OrdinalIgnoreCase)); | |||
int milliseconds; | |||
if (retryAfter != null && int.TryParse((string)retryAfter.Value, out milliseconds)) | |||
{ | |||
@@ -0,0 +1,163 @@ | |||
#if DOTNET5_4 | |||
using System; | |||
using System.Collections.Concurrent; | |||
using System.Collections.Generic; | |||
using System.ComponentModel; | |||
using System.IO; | |||
using System.Net.WebSockets; | |||
using System.Text; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
using WebSocketClient = System.Net.WebSockets.ClientWebSocket; | |||
namespace Discord.Net.WebSockets | |||
{ | |||
internal class BuiltInEngine : IWebSocketEngine | |||
{ | |||
private const int ReceiveChunkSize = 4096; | |||
private const int SendChunkSize = 4096; | |||
private const int HR_TIMEOUT = -2147012894; | |||
private readonly DiscordConfig _config; | |||
private readonly ConcurrentQueue<string> _sendQueue; | |||
private WebSocketClient _webSocket; | |||
public event EventHandler<WebSocketBinaryMessageEventArgs> BinaryMessage = delegate { }; | |||
public event EventHandler<WebSocketTextMessageEventArgs> TextMessage = delegate { }; | |||
private void OnBinaryMessage(byte[] data) | |||
=> BinaryMessage(this, new WebSocketBinaryMessageEventArgs(data)); | |||
private void OnTextMessage(string msg) | |||
=> TextMessage(this, new WebSocketTextMessageEventArgs(msg)); | |||
internal BuiltInEngine(DiscordConfig config) | |||
{ | |||
_config = config; | |||
_sendQueue = new ConcurrentQueue<string>(); | |||
} | |||
public Task Connect(string host, CancellationToken cancelToken) | |||
{ | |||
return Task.Run(async () => | |||
{ | |||
_webSocket = new WebSocketClient(); | |||
_webSocket.Options.Proxy = null; | |||
_webSocket.Options.SetRequestHeader("User-Agent", _config.UserAgent); | |||
_webSocket.Options.KeepAliveInterval = TimeSpan.Zero; | |||
await _webSocket.ConnectAsync(new Uri(host), cancelToken)//.ConfigureAwait(false); | |||
.ContinueWith(t => ReceiveAsync(cancelToken)).ConfigureAwait(false); | |||
//TODO: ContinueWith is a temporary hack, may be a bug related to https://github.com/dotnet/corefx/issues/4429 | |||
}); | |||
} | |||
public Task Disconnect() | |||
{ | |||
string ignored; | |||
while (_sendQueue.TryDequeue(out ignored)) { } | |||
var socket = _webSocket; | |||
_webSocket = null; | |||
return TaskHelper.CompletedTask; | |||
} | |||
public IEnumerable<Task> GetTasks(CancellationToken cancelToken) | |||
=> new Task[] { /*ReceiveAsync(cancelToken),*/ SendAsync(cancelToken) }; | |||
private Task ReceiveAsync(CancellationToken cancelToken) | |||
{ | |||
return Task.Run(async () => | |||
{ | |||
var sendInterval = _config.WebSocketInterval; | |||
//var buffer = new ArraySegment<byte>(new byte[ReceiveChunkSize]); | |||
var buffer = new byte[ReceiveChunkSize]; | |||
var stream = new MemoryStream(); | |||
try | |||
{ | |||
while (!cancelToken.IsCancellationRequested) | |||
{ | |||
WebSocketReceiveResult result = null; | |||
do | |||
{ | |||
if (cancelToken.IsCancellationRequested) return; | |||
try | |||
{ | |||
result = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), cancelToken);//.ConfigureAwait(false); | |||
} | |||
catch (Win32Exception ex) when (ex.HResult == HR_TIMEOUT) | |||
{ | |||
throw new Exception($"Connection timed out."); | |||
} | |||
if (result.MessageType == WebSocketMessageType.Close) | |||
throw new WebSocketException((int)result.CloseStatus.Value, result.CloseStatusDescription); | |||
else | |||
stream.Write(buffer, 0, result.Count); | |||
} | |||
while (result == null || !result.EndOfMessage); | |||
var array = stream.ToArray(); | |||
if (result.MessageType == WebSocketMessageType.Binary) | |||
OnBinaryMessage(array); | |||
else if (result.MessageType == WebSocketMessageType.Text) | |||
OnTextMessage(Encoding.UTF8.GetString(array, 0, array.Length)); | |||
stream.Position = 0; | |||
stream.SetLength(0); | |||
} | |||
} | |||
catch (OperationCanceledException) { } | |||
}); | |||
} | |||
private Task SendAsync(CancellationToken cancelToken) | |||
{ | |||
return Task.Run(async () => | |||
{ | |||
byte[] bytes = new byte[SendChunkSize]; | |||
var sendInterval = _config.WebSocketInterval; | |||
try | |||
{ | |||
while (!cancelToken.IsCancellationRequested) | |||
{ | |||
string json; | |||
while (_sendQueue.TryDequeue(out json)) | |||
{ | |||
int byteCount = Encoding.UTF8.GetBytes(json, 0, json.Length, bytes, 0); | |||
int frameCount = (int)Math.Ceiling((double)byteCount / SendChunkSize); | |||
int offset = 0; | |||
for (var i = 0; i < frameCount; i++, offset += SendChunkSize) | |||
{ | |||
bool isLast = i == (frameCount - 1); | |||
int count; | |||
if (isLast) | |||
count = byteCount - (i * SendChunkSize); | |||
else | |||
count = SendChunkSize; | |||
try | |||
{ | |||
await _webSocket.SendAsync(new ArraySegment<byte>(bytes, offset, count), WebSocketMessageType.Text, isLast, cancelToken).ConfigureAwait(false); | |||
} | |||
catch (Win32Exception ex) when (ex.HResult == HR_TIMEOUT) | |||
{ | |||
return; | |||
} | |||
} | |||
} | |||
await Task.Delay(sendInterval, cancelToken).ConfigureAwait(false); | |||
} | |||
} | |||
catch (OperationCanceledException) { } | |||
}); | |||
} | |||
public void QueueMessage(string message) | |||
=> _sendQueue.Enqueue(message); | |||
} | |||
} | |||
#endif |
@@ -6,7 +6,7 @@ using System.Collections.Generic; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
using WebSocket4Net; | |||
using WS4NetWebSocket = WebSocket4Net.WebSocket; | |||
using WebSocketClient = WebSocket4Net.WebSocket; | |||
namespace Discord.Net.WebSockets | |||
{ | |||
@@ -15,7 +15,7 @@ namespace Discord.Net.WebSockets | |||
private readonly DiscordConfig _config; | |||
private readonly ConcurrentQueue<string> _sendQueue; | |||
private readonly TaskManager _taskManager; | |||
private WS4NetWebSocket _webSocket; | |||
private WebSocketClient _webSocket; | |||
private ManualResetEventSlim _waitUntilConnect; | |||
public event EventHandler<WebSocketBinaryMessageEventArgs> BinaryMessage = delegate { }; | |||
@@ -35,7 +35,7 @@ namespace Discord.Net.WebSockets | |||
public Task Connect(string host, CancellationToken cancelToken) | |||
{ | |||
_webSocket = new WS4NetWebSocket(host); | |||
_webSocket = new WebSocketClient(host); | |||
_webSocket.EnableAutoSendPing = false; | |||
_webSocket.NoDelay = true; | |||
_webSocket.Proxy = null; | |||
@@ -96,7 +96,8 @@ namespace Discord.Net.WebSockets | |||
private void OnWebSocketBinary(object sender, DataReceivedEventArgs e) | |||
=> OnBinaryMessage(e.Data); | |||
public IEnumerable<Task> GetTasks(CancellationToken cancelToken) => new Task[] { SendAsync(cancelToken) }; | |||
public IEnumerable<Task> GetTasks(CancellationToken cancelToken) | |||
=> new Task[] { SendAsync(cancelToken) }; | |||
private Task SendAsync(CancellationToken cancelToken) | |||
{ | |||
@@ -53,7 +53,7 @@ namespace Discord.Net.WebSockets | |||
#if !DOTNET5_4 | |||
_engine = new WS4NetEngine(client.Config, _taskManager); | |||
#else | |||
//_engine = new BuiltInWebSocketEngine(this, client.Config); | |||
_engine = new BuiltInEngine(client.Config); | |||
#endif | |||
_engine.BinaryMessage += (s, e) => | |||
{ | |||
@@ -179,7 +179,8 @@ namespace Discord.Net.WebSockets | |||
{ | |||
//Cancel if either DiscordClient.Disconnect is called, data socket errors or timeout is reached | |||
cancelToken = CancellationTokenSource.CreateLinkedTokenSource(cancelToken, CancelToken).Token; | |||
_connectedEvent.Wait(cancelToken); | |||
if (!_connectedEvent.Wait(_client.Config.ConnectionTimeout, cancelToken)) | |||
throw new TimeoutException(); | |||
} | |||
catch (OperationCanceledException) | |||
{ | |||
@@ -32,14 +32,8 @@ | |||
"Newtonsoft.Json": "7.0.1" | |||
}, | |||
"frameworks": { | |||
"net45": { | |||
"dependencies": { | |||
"WebSocket4Net": "0.14.1", | |||
"RestSharp": "105.2.3" | |||
} | |||
}, | |||
"dotnet5.4": { | |||
"frameworks": { | |||
"dotnet5.4": { | |||
"dependencies": { | |||
"System.Collections": "4.0.11-beta-23516", | |||
"System.Collections.Concurrent": "4.0.11-beta-23516", | |||
@@ -47,6 +41,7 @@ | |||
"System.IO.FileSystem": "4.0.1-beta-23516", | |||
"System.IO.Compression": "4.1.0-beta-23516", | |||
"System.Linq": "4.0.1-beta-23516", | |||
"System.Net.Http": "4.0.1-beta-23516", | |||
"System.Net.NameResolution": "4.0.0-beta-23516", | |||
"System.Net.Sockets": "4.1.0-beta-23409", | |||
"System.Net.Requests": "4.0.11-beta-23516", | |||
@@ -57,6 +52,12 @@ | |||
"System.Threading": "4.0.11-beta-23516", | |||
"System.Threading.Thread": "4.0.0-beta-23516" | |||
} | |||
} | |||
} | |||
}, | |||
"net45": { | |||
"dependencies": { | |||
"WebSocket4Net": "0.14.1", | |||
"RestSharp": "105.2.3" | |||
} | |||
} | |||
} | |||
} |