You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

DiscordSocketApiClient.cs 12 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. #pragma warning disable CS1591
  2. using Discord.API.Gateway;
  3. using Discord.API.Rest;
  4. using Discord.Net.Queue;
  5. using Discord.Net.Rest;
  6. using Discord.Net.WebSockets;
  7. using Discord.WebSocket;
  8. using Newtonsoft.Json;
  9. using System;
  10. using System.Collections.Generic;
  11. using System.IO;
  12. using System.IO.Compression;
  13. using System.Text;
  14. using System.Threading;
  15. using System.Threading.Tasks;
  16. namespace Discord.API
  17. {
  18. internal class DiscordSocketApiClient : DiscordRestApiClient
  19. {
  20. public event Func<GatewayOpCode, Task> SentGatewayMessage { add { _sentGatewayMessageEvent.Add(value); } remove { _sentGatewayMessageEvent.Remove(value); } }
  21. private readonly AsyncEvent<Func<GatewayOpCode, Task>> _sentGatewayMessageEvent = new AsyncEvent<Func<GatewayOpCode, Task>>();
  22. public event Func<GatewayOpCode, int?, string, object, Task> ReceivedGatewayEvent { add { _receivedGatewayEvent.Add(value); } remove { _receivedGatewayEvent.Remove(value); } }
  23. private readonly AsyncEvent<Func<GatewayOpCode, int?, string, object, Task>> _receivedGatewayEvent = new AsyncEvent<Func<GatewayOpCode, int?, string, object, Task>>();
  24. public event Func<Exception, Task> Disconnected { add { _disconnectedEvent.Add(value); } remove { _disconnectedEvent.Remove(value); } }
  25. private readonly AsyncEvent<Func<Exception, Task>> _disconnectedEvent = new AsyncEvent<Func<Exception, Task>>();
  26. private readonly bool _isExplicitUrl;
  27. private CancellationTokenSource _connectCancelToken;
  28. private string _gatewayUrl;
  29. //Store our decompression streams for zlib shared state
  30. private MemoryStream _compressed;
  31. private DeflateStream _decompressor;
  32. internal IWebSocketClient WebSocketClient { get; }
  33. public ConnectionState ConnectionState { get; private set; }
  34. public DiscordSocketApiClient(RestClientProvider restClientProvider, WebSocketProvider webSocketProvider, string userAgent,
  35. string url = null, RetryMode defaultRetryMode = RetryMode.AlwaysRetry, JsonSerializer serializer = null)
  36. : base(restClientProvider, userAgent, defaultRetryMode, serializer)
  37. {
  38. _gatewayUrl = url;
  39. if (url != null)
  40. _isExplicitUrl = true;
  41. WebSocketClient = webSocketProvider();
  42. //WebSocketClient.SetHeader("user-agent", DiscordConfig.UserAgent); (Causes issues in .NET Framework 4.6+)
  43. WebSocketClient.BinaryMessage += async (data, index, count) =>
  44. {
  45. using (var decompressed = new MemoryStream())
  46. {
  47. if (data[0] == 0x78)
  48. {
  49. //Strip the zlib header
  50. _compressed.Write(data, index + 2, count - 2);
  51. _compressed.SetLength(count - 2);
  52. }
  53. else
  54. {
  55. _compressed.Write(data, index, count);
  56. _compressed.SetLength(count);
  57. }
  58. //Reset positions so we don't run out of memory
  59. _compressed.Position = 0;
  60. _decompressor.CopyTo(decompressed);
  61. _compressed.Position = 0;
  62. decompressed.Position = 0;
  63. using (var reader = new StreamReader(decompressed))
  64. using (var jsonReader = new JsonTextReader(reader))
  65. {
  66. var msg = _serializer.Deserialize<SocketFrame>(jsonReader);
  67. if (msg != null)
  68. await _receivedGatewayEvent.InvokeAsync((GatewayOpCode)msg.Operation, msg.Sequence, msg.Type, msg.Payload).ConfigureAwait(false);
  69. }
  70. }
  71. };
  72. WebSocketClient.TextMessage += async text =>
  73. {
  74. using (var reader = new StringReader(text))
  75. using (var jsonReader = new JsonTextReader(reader))
  76. {
  77. var msg = _serializer.Deserialize<SocketFrame>(jsonReader);
  78. if (msg != null)
  79. await _receivedGatewayEvent.InvokeAsync((GatewayOpCode)msg.Operation, msg.Sequence, msg.Type, msg.Payload).ConfigureAwait(false);
  80. }
  81. };
  82. WebSocketClient.Closed += async ex =>
  83. {
  84. await DisconnectAsync().ConfigureAwait(false);
  85. await _disconnectedEvent.InvokeAsync(ex).ConfigureAwait(false);
  86. };
  87. }
  88. internal override void Dispose(bool disposing)
  89. {
  90. if (!_isDisposed)
  91. {
  92. if (disposing)
  93. {
  94. _connectCancelToken?.Dispose();
  95. (WebSocketClient as IDisposable)?.Dispose();
  96. _decompressor?.Dispose();
  97. _compressed?.Dispose();
  98. }
  99. _isDisposed = true;
  100. }
  101. }
  102. public async Task ConnectAsync()
  103. {
  104. await _stateLock.WaitAsync().ConfigureAwait(false);
  105. try
  106. {
  107. await ConnectInternalAsync().ConfigureAwait(false);
  108. }
  109. finally { _stateLock.Release(); }
  110. }
  111. internal override async Task ConnectInternalAsync()
  112. {
  113. if (LoginState != LoginState.LoggedIn)
  114. throw new InvalidOperationException("You must log in before connecting.");
  115. if (WebSocketClient == null)
  116. throw new NotSupportedException("This client is not configured with websocket support.");
  117. //Re-create streams to reset the zlib state
  118. _compressed?.Dispose();
  119. _decompressor?.Dispose();
  120. _compressed = new MemoryStream();
  121. _decompressor = new DeflateStream(_compressed, CompressionMode.Decompress);
  122. ConnectionState = ConnectionState.Connecting;
  123. try
  124. {
  125. _connectCancelToken = new CancellationTokenSource();
  126. if (WebSocketClient != null)
  127. WebSocketClient.SetCancelToken(_connectCancelToken.Token);
  128. if (!_isExplicitUrl)
  129. {
  130. var gatewayResponse = await GetGatewayAsync().ConfigureAwait(false);
  131. _gatewayUrl = $"{gatewayResponse.Url}?v={DiscordConfig.APIVersion}&encoding={DiscordSocketConfig.GatewayEncoding}&compress=zlib-stream";
  132. }
  133. await WebSocketClient.ConnectAsync(_gatewayUrl).ConfigureAwait(false);
  134. ConnectionState = ConnectionState.Connected;
  135. }
  136. catch
  137. {
  138. if (!_isExplicitUrl)
  139. _gatewayUrl = null; //Uncache in case the gateway url changed
  140. await DisconnectInternalAsync().ConfigureAwait(false);
  141. throw;
  142. }
  143. }
  144. public async Task DisconnectAsync()
  145. {
  146. await _stateLock.WaitAsync().ConfigureAwait(false);
  147. try
  148. {
  149. await DisconnectInternalAsync().ConfigureAwait(false);
  150. }
  151. finally { _stateLock.Release(); }
  152. }
  153. public async Task DisconnectAsync(Exception ex)
  154. {
  155. await _stateLock.WaitAsync().ConfigureAwait(false);
  156. try
  157. {
  158. await DisconnectInternalAsync().ConfigureAwait(false);
  159. }
  160. finally { _stateLock.Release(); }
  161. }
  162. internal override async Task DisconnectInternalAsync()
  163. {
  164. if (WebSocketClient == null)
  165. throw new NotSupportedException("This client is not configured with websocket support.");
  166. if (ConnectionState == ConnectionState.Disconnected) return;
  167. ConnectionState = ConnectionState.Disconnecting;
  168. try { _connectCancelToken?.Cancel(false); }
  169. catch { }
  170. await WebSocketClient.DisconnectAsync().ConfigureAwait(false);
  171. ConnectionState = ConnectionState.Disconnected;
  172. }
  173. //Core
  174. public Task SendGatewayAsync(GatewayOpCode opCode, object payload, RequestOptions options = null)
  175. => SendGatewayInternalAsync(opCode, payload, options);
  176. private async Task SendGatewayInternalAsync(GatewayOpCode opCode, object payload, RequestOptions options)
  177. {
  178. CheckState();
  179. //TODO: Add ETF
  180. byte[] bytes = null;
  181. payload = new SocketFrame { Operation = (int)opCode, Payload = payload };
  182. if (payload != null)
  183. bytes = Encoding.UTF8.GetBytes(SerializeJson(payload));
  184. await RequestQueue.SendAsync(new WebSocketRequest(WebSocketClient, null, bytes, true, options)).ConfigureAwait(false);
  185. await _sentGatewayMessageEvent.InvokeAsync(opCode).ConfigureAwait(false);
  186. }
  187. public async Task SendIdentifyAsync(int largeThreshold = 100, int shardID = 0, int totalShards = 1, RequestOptions options = null)
  188. {
  189. options = RequestOptions.CreateOrClone(options);
  190. var props = new Dictionary<string, string>
  191. {
  192. ["$device"] = "Discord.Net"
  193. };
  194. var msg = new IdentifyParams()
  195. {
  196. Token = AuthToken,
  197. Properties = props,
  198. LargeThreshold = largeThreshold
  199. };
  200. if (totalShards > 1)
  201. msg.ShardingParams = new int[] { shardID, totalShards };
  202. await SendGatewayAsync(GatewayOpCode.Identify, msg, options: options).ConfigureAwait(false);
  203. }
  204. public async Task SendResumeAsync(string sessionId, int lastSeq, RequestOptions options = null)
  205. {
  206. options = RequestOptions.CreateOrClone(options);
  207. var msg = new ResumeParams()
  208. {
  209. Token = AuthToken,
  210. SessionId = sessionId,
  211. Sequence = lastSeq
  212. };
  213. await SendGatewayAsync(GatewayOpCode.Resume, msg, options: options).ConfigureAwait(false);
  214. }
  215. public async Task SendHeartbeatAsync(int lastSeq, RequestOptions options = null)
  216. {
  217. options = RequestOptions.CreateOrClone(options);
  218. await SendGatewayAsync(GatewayOpCode.Heartbeat, lastSeq, options: options).ConfigureAwait(false);
  219. }
  220. public async Task SendStatusUpdateAsync(UserStatus status, bool isAFK, long? since, Game game, RequestOptions options = null)
  221. {
  222. options = RequestOptions.CreateOrClone(options);
  223. var args = new StatusUpdateParams
  224. {
  225. Status = status,
  226. IdleSince = since,
  227. IsAFK = isAFK,
  228. Game = game
  229. };
  230. await SendGatewayAsync(GatewayOpCode.StatusUpdate, args, options: options).ConfigureAwait(false);
  231. }
  232. public async Task SendRequestMembersAsync(IEnumerable<ulong> guildIds, RequestOptions options = null)
  233. {
  234. options = RequestOptions.CreateOrClone(options);
  235. await SendGatewayAsync(GatewayOpCode.RequestGuildMembers, new RequestMembersParams { GuildIds = guildIds, Query = "", Limit = 0 }, options: options).ConfigureAwait(false);
  236. }
  237. public async Task SendVoiceStateUpdateAsync(ulong guildId, ulong? channelId, bool selfDeaf, bool selfMute, RequestOptions options = null)
  238. {
  239. options = RequestOptions.CreateOrClone(options);
  240. var payload = new VoiceStateUpdateParams
  241. {
  242. GuildId = guildId,
  243. ChannelId = channelId,
  244. SelfDeaf = selfDeaf,
  245. SelfMute = selfMute
  246. };
  247. await SendGatewayAsync(GatewayOpCode.VoiceStateUpdate, payload, options: options).ConfigureAwait(false);
  248. }
  249. public async Task SendGuildSyncAsync(IEnumerable<ulong> guildIds, RequestOptions options = null)
  250. {
  251. options = RequestOptions.CreateOrClone(options);
  252. await SendGatewayAsync(GatewayOpCode.GuildSync, guildIds, options: options).ConfigureAwait(false);
  253. }
  254. }
  255. }