Browse Source

Improved WebSocket receive performance and reduced allocations

pull/97/head
RogueException 9 years ago
parent
commit
ab9a702218
1 changed files with 45 additions and 32 deletions
  1. +45
    -32
      src/Discord.Net/Net/WebSockets/DefaultWebsocketClient.cs

+ 45
- 32
src/Discord.Net/Net/WebSockets/DefaultWebsocketClient.cs View File

@@ -1,5 +1,4 @@
using Discord.Extensions;
using System;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.IO;
@@ -12,7 +11,7 @@ namespace Discord.Net.WebSockets
{
public class DefaultWebSocketClient : IWebSocketClient
{
public const int ReceiveChunkSize = 12 * 1024; //12KB
public const int ReceiveChunkSize = 16 * 1024; //16KB
public const int SendChunkSize = 4 * 1024; //4KB
private const int HR_TIMEOUT = -2147012894;

@@ -137,50 +136,64 @@ namespace Discord.Net.WebSockets
private async Task RunAsync(CancellationToken cancelToken)
{
var buffer = new ArraySegment<byte>(new byte[ReceiveChunkSize]);
var stream = new MemoryStream();

try
{
while (!cancelToken.IsCancellationRequested)
{
WebSocketReceiveResult result = null;
do
WebSocketReceiveResult socketResult = await _client.ReceiveAsync(buffer, cancelToken).ConfigureAwait(false);
byte[] result;
int resultCount;
if (socketResult.MessageType == WebSocketMessageType.Close)
{
if (cancelToken.IsCancellationRequested) return;
var _ = Closed(new WebSocketClosedException((int)socketResult.CloseStatus, socketResult.CloseStatusDescription));
return;
}

try
{
result = await _client.ReceiveAsync(buffer, cancelToken).ConfigureAwait(false);
}
catch (Win32Exception ex) when (ex.HResult == HR_TIMEOUT)
if (!socketResult.EndOfMessage)
{
//This is a large message (likely just READY), lets create a temporary expandable stream
using (var stream = new MemoryStream())
{
throw new Exception("Connection timed out.");
stream.Write(buffer.Array, 0, socketResult.Count);
do
{
if (cancelToken.IsCancellationRequested) return;
socketResult = await _client.ReceiveAsync(buffer, cancelToken).ConfigureAwait(false);
stream.Write(buffer.Array, 0, socketResult.Count);
}
while (socketResult == null || !socketResult.EndOfMessage);

//Use the internal buffer if we can get it
resultCount = (int)stream.Length;
ArraySegment<byte> streamBuffer;
if (stream.TryGetBuffer(out streamBuffer))
result = streamBuffer.Array;
else
result = stream.ToArray();
}

if (result.Count > 0)
stream.Write(buffer.Array, 0, result.Count);
}
while (result == null || !result.EndOfMessage);

var array = stream.ToArray();
stream.Position = 0;
stream.SetLength(0);
else
{
//Small message
resultCount = socketResult.Count;
result = buffer.Array;
}

switch (result.MessageType)
if (socketResult.MessageType == WebSocketMessageType.Text)
{
case WebSocketMessageType.Binary:
await BinaryMessage(array, 0, array.Length).ConfigureAwait(false);
break;
case WebSocketMessageType.Text:
string text = Encoding.UTF8.GetString(array, 0, array.Length);
await TextMessage(text).ConfigureAwait(false);
break;
case WebSocketMessageType.Close:
var _ = Closed(new WebSocketClosedException((int)result.CloseStatus, result.CloseStatusDescription));
return;
string text = Encoding.UTF8.GetString(result, 0, resultCount);
await TextMessage(text).ConfigureAwait(false);
}
else
await BinaryMessage(result, 0, resultCount).ConfigureAwait(false);
}
}
catch (Win32Exception ex) when (ex.HResult == HR_TIMEOUT)
{
var _ = Closed(new Exception("Connection timed out.", ex));
}
catch (OperationCanceledException) { }
catch (Exception ex)
{


Loading…
Cancel
Save