You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

VoiceSocket.cs 22 kB

9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516
  1. using Discord.API.Client;
  2. using Discord.API.Client.VoiceSocket;
  3. using Discord.Audio;
  4. using Discord.Audio.Opus;
  5. using Discord.Audio.Sodium;
  6. using Discord.Logging;
  7. using Newtonsoft.Json;
  8. using Newtonsoft.Json.Linq;
  9. using System;
  10. using System.Collections.Concurrent;
  11. using System.Collections.Generic;
  12. using System.Diagnostics;
  13. using System.IO;
  14. using System.Linq;
  15. using System.Net;
  16. using System.Net.Sockets;
  17. using System.Text;
  18. using System.Threading;
  19. using System.Threading.Tasks;
  20. namespace Discord.Net.WebSockets
  21. {
  22. public partial class VoiceSocket : WebSocket
  23. {
  24. private const int MaxOpusSize = 4000;
  25. private const string EncryptedMode = "xsalsa20_poly1305";
  26. private const string UnencryptedMode = "plain";
  27. private readonly int _targetAudioBufferLength;
  28. private readonly ConcurrentDictionary<uint, OpusDecoder> _decoders;
  29. private readonly AudioServiceConfig _audioConfig;
  30. private Task _sendTask, _receiveTask;
  31. private VoiceBuffer _sendBuffer;
  32. private OpusEncoder _encoder;
  33. private uint _ssrc;
  34. private ConcurrentDictionary<uint, ulong> _ssrcMapping;
  35. private UdpClient _udp;
  36. private IPEndPoint _endpoint;
  37. private bool _isEncrypted;
  38. private byte[] _secretKey, _encodingBuffer;
  39. private ushort _sequence;
  40. private string _encryptionMode;
  41. private int _ping;
  42. private ulong? _userId;
  43. private string _sessionId;
  44. public string Token { get; internal set; }
  45. public Server Server { get; internal set; }
  46. public Channel Channel { get; internal set; }
  47. public int Ping => _ping;
  48. internal VoiceBuffer OutputBuffer => _sendBuffer;
  49. internal event EventHandler<InternalIsSpeakingEventArgs> UserIsSpeaking = delegate { };
  50. internal event EventHandler<InternalFrameEventArgs> FrameReceived = delegate { };
  51. private void OnUserIsSpeaking(ulong userId, bool isSpeaking)
  52. => UserIsSpeaking(this, new InternalIsSpeakingEventArgs(userId, isSpeaking));
  53. internal void OnFrameReceived(ulong userId, ulong channelId, byte[] buffer, int offset, int count)
  54. => FrameReceived(this, new InternalFrameEventArgs(userId, channelId, buffer, offset, count));
  55. internal VoiceSocket(DiscordConfig config, AudioServiceConfig audioConfig, JsonSerializer serializer, Logger logger)
  56. : base(config, serializer, logger)
  57. {
  58. _audioConfig = audioConfig;
  59. _decoders = new ConcurrentDictionary<uint, OpusDecoder>();
  60. _targetAudioBufferLength = _audioConfig.BufferLength / 20; //20 ms frames
  61. _encodingBuffer = new byte[MaxOpusSize];
  62. _ssrcMapping = new ConcurrentDictionary<uint, ulong>();
  63. _encoder = new OpusEncoder(48000, _audioConfig.Channels, 20, _audioConfig.Bitrate, OpusApplication.MusicOrMixed);
  64. _sendBuffer = new VoiceBuffer((int)Math.Ceiling(_audioConfig.BufferLength / (double)_encoder.FrameLength), _encoder.FrameSize);
  65. }
  66. public Task Connect(string host, string token, ulong userId, string sessionId, CancellationToken parentCancelToken)
  67. {
  68. Host = host;
  69. Token = token;
  70. _userId = userId;
  71. _sessionId = sessionId;
  72. return BeginConnect(parentCancelToken);
  73. }
  74. private async Task Reconnect()
  75. {
  76. try
  77. {
  78. var cancelToken = _parentCancelToken;
  79. await Task.Delay(_config.ReconnectDelay, cancelToken).ConfigureAwait(false);
  80. while (!cancelToken.IsCancellationRequested)
  81. {
  82. try
  83. {
  84. await BeginConnect(_parentCancelToken).ConfigureAwait(false);
  85. break;
  86. }
  87. catch (OperationCanceledException) { throw; }
  88. catch (Exception ex)
  89. {
  90. Logger.Error("Reconnect failed", ex);
  91. //Net is down? We can keep trying to reconnect until the user runs Disconnect()
  92. await Task.Delay(_config.FailedReconnectDelay, cancelToken).ConfigureAwait(false);
  93. }
  94. }
  95. }
  96. catch (OperationCanceledException) { }
  97. }
  98. public async Task Disconnect()
  99. {
  100. await _taskManager.Stop(true).ConfigureAwait(false);
  101. _userId = null;
  102. }
  103. protected override async Task Run()
  104. {
  105. _udp = new UdpClient(new IPEndPoint(IPAddress.Any, 0));
  106. List<Task> tasks = new List<Task>();
  107. if (_audioConfig.Mode.HasFlag(AudioMode.Outgoing))
  108. _sendTask = Task.Run(() => SendVoiceAsync(CancelToken));
  109. _receiveTask = Task.Run(() => ReceiveVoiceAsync(CancelToken));
  110. SendIdentify(_userId.Value, _sessionId);
  111. #if !NETSTANDARD1_3
  112. tasks.Add(WatcherAsync());
  113. #endif
  114. tasks.AddRange(_engine.GetTasks(CancelToken));
  115. tasks.Add(HeartbeatAsync(CancelToken));
  116. await _taskManager.Start(tasks, _cancelSource).ConfigureAwait(false);
  117. }
  118. protected override async Task Cleanup()
  119. {
  120. var sendThread = _sendTask;
  121. if (sendThread != null)
  122. {
  123. try { await sendThread.ConfigureAwait(false); }
  124. catch (Exception) { } //Ignore any errors during cleanup
  125. }
  126. _sendTask = null;
  127. var receiveThread = _receiveTask;
  128. if (receiveThread != null)
  129. {
  130. try { await receiveThread.ConfigureAwait(false); }
  131. catch (Exception) { } //Ignore any errors during cleanup
  132. }
  133. _receiveTask = null;
  134. OpusDecoder decoder;
  135. foreach (var pair in _decoders)
  136. {
  137. if (_decoders.TryRemove(pair.Key, out decoder))
  138. decoder.Dispose();
  139. }
  140. ClearPCMFrames();
  141. _udp = null;
  142. await base.Cleanup().ConfigureAwait(false);
  143. }
  144. private async Task ReceiveVoiceAsync(CancellationToken cancelToken)
  145. {
  146. var closeTask = cancelToken.Wait();
  147. try
  148. {
  149. byte[] packet, decodingBuffer = null, nonce = null, result;
  150. int packetLength, resultOffset, resultLength;
  151. IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
  152. if ((_audioConfig.Mode & AudioMode.Incoming) != 0)
  153. {
  154. decodingBuffer = new byte[MaxOpusSize];
  155. nonce = new byte[24];
  156. }
  157. while (!cancelToken.IsCancellationRequested)
  158. {
  159. await Task.Delay(1).ConfigureAwait(false);
  160. if (_udp.Available > 0)
  161. {
  162. #if !NETSTANDARD1_3
  163. packet = _udp.Receive(ref endpoint);
  164. #else
  165. //TODO: Is this really the only way to end a Receive call in DOTNET5_4?
  166. var receiveTask = _udp.ReceiveAsync();
  167. var task = Task.WhenAny(closeTask, receiveTask).Result;
  168. if (task == closeTask)
  169. break;
  170. var udpPacket = receiveTask.Result;
  171. packet = udpPacket.Buffer;
  172. endpoint = udpPacket.RemoteEndPoint;
  173. #endif
  174. packetLength = packet.Length;
  175. if (packetLength > 0 && endpoint.Equals(_endpoint))
  176. {
  177. if (State != ConnectionState.Connected)
  178. {
  179. if (packetLength != 70)
  180. return;
  181. string ip = Encoding.UTF8.GetString(packet, 4, 70 - 6).TrimEnd('\0');
  182. int port = packet[68] | packet[69] << 8;
  183. SendSelectProtocol(ip, port);
  184. if ((_audioConfig.Mode & AudioMode.Incoming) == 0)
  185. return; //We dont need this thread anymore
  186. }
  187. else
  188. {
  189. //Parse RTP Data
  190. if (packetLength < 12) return;
  191. if (packet[0] != 0x80) return; //Flags
  192. if (packet[1] != 0x78) return; //Payload Type
  193. ushort sequenceNumber = (ushort)((packet[2] << 8) |
  194. packet[3] << 0);
  195. uint timestamp = (uint)((packet[4] << 24) |
  196. (packet[5] << 16) |
  197. (packet[6] << 8) |
  198. (packet[7] << 0));
  199. uint ssrc = (uint)((packet[8] << 24) |
  200. (packet[9] << 16) |
  201. (packet[10] << 8) |
  202. (packet[11] << 0));
  203. //Decrypt
  204. if (_isEncrypted)
  205. {
  206. if (packetLength < 28) //12 + 16 (RTP + Poly1305 MAC)
  207. return;
  208. Buffer.BlockCopy(packet, 0, nonce, 0, 12);
  209. int ret = SecretBox.Decrypt(packet, 12, packetLength - 12, decodingBuffer, nonce, _secretKey);
  210. if (ret != 0)
  211. continue;
  212. result = decodingBuffer;
  213. resultOffset = 0;
  214. resultLength = packetLength - 28;
  215. }
  216. else //Plain
  217. {
  218. result = packet;
  219. resultOffset = 12;
  220. resultLength = packetLength - 12;
  221. }
  222. /*if (_logLevel >= LogMessageSeverity.Debug)
  223. RaiseOnLog(LogMessageSeverity.Debug, $"Received {buffer.Length - 12} bytes.");*/
  224. ulong userId;
  225. if (_ssrcMapping.TryGetValue(ssrc, out userId))
  226. OnFrameReceived(userId, Channel.Id, result, resultOffset, resultLength);
  227. }
  228. }
  229. }
  230. }
  231. }
  232. catch (OperationCanceledException) { }
  233. catch (InvalidOperationException) { } //Includes ObjectDisposedException
  234. }
  235. private async Task SendVoiceAsync(CancellationToken cancelToken)
  236. {
  237. try
  238. {
  239. while (!cancelToken.IsCancellationRequested && State != ConnectionState.Connected)
  240. await Task.Delay(1).ConfigureAwait(false);
  241. if (cancelToken.IsCancellationRequested)
  242. return;
  243. byte[] frame = new byte[_encoder.FrameSize];
  244. byte[] encodedFrame = new byte[MaxOpusSize];
  245. byte[] voicePacket, pingPacket, nonce = null;
  246. uint timestamp = 0;
  247. double nextTicks = 0.0, nextPingTicks = 0.0;
  248. long ticksPerSeconds = Stopwatch.Frequency;
  249. double ticksPerMillisecond = Stopwatch.Frequency / 1000.0;
  250. double ticksPerFrame = ticksPerMillisecond * _encoder.FrameLength;
  251. double spinLockThreshold = 3 * ticksPerMillisecond;
  252. uint samplesPerFrame = (uint)_encoder.SamplesPerFrame;
  253. Stopwatch sw = Stopwatch.StartNew();
  254. if (_isEncrypted)
  255. {
  256. nonce = new byte[24];
  257. voicePacket = new byte[MaxOpusSize + 12 + 16];
  258. }
  259. else
  260. voicePacket = new byte[MaxOpusSize + 12];
  261. pingPacket = new byte[8];
  262. int rtpPacketLength = 0;
  263. voicePacket[0] = 0x80; //Flags;
  264. voicePacket[1] = 0x78; //Payload Type
  265. voicePacket[8] = (byte)(_ssrc >> 24);
  266. voicePacket[9] = (byte)(_ssrc >> 16);
  267. voicePacket[10] = (byte)(_ssrc >> 8);
  268. voicePacket[11] = (byte)(_ssrc >> 0);
  269. if (_isEncrypted)
  270. Buffer.BlockCopy(voicePacket, 0, nonce, 0, 12);
  271. bool hasFrame = false;
  272. while (!cancelToken.IsCancellationRequested)
  273. {
  274. if (!hasFrame && _sendBuffer.Pop(frame))
  275. {
  276. ushort sequence = unchecked(_sequence++);
  277. voicePacket[2] = (byte)(sequence >> 8);
  278. voicePacket[3] = (byte)(sequence >> 0);
  279. voicePacket[4] = (byte)(timestamp >> 24);
  280. voicePacket[5] = (byte)(timestamp >> 16);
  281. voicePacket[6] = (byte)(timestamp >> 8);
  282. voicePacket[7] = (byte)(timestamp >> 0);
  283. //Encode
  284. int encodedLength = _encoder.EncodeFrame(frame, 0, encodedFrame);
  285. //Encrypt
  286. if (_isEncrypted)
  287. {
  288. Buffer.BlockCopy(voicePacket, 2, nonce, 2, 6); //Update nonce
  289. int ret = SecretBox.Encrypt(encodedFrame, encodedLength, voicePacket, 12, nonce, _secretKey);
  290. if (ret != 0)
  291. continue;
  292. rtpPacketLength = encodedLength + 12 + 16;
  293. }
  294. else
  295. {
  296. Buffer.BlockCopy(encodedFrame, 0, voicePacket, 12, encodedLength);
  297. rtpPacketLength = encodedLength + 12;
  298. }
  299. timestamp = unchecked(timestamp + samplesPerFrame);
  300. hasFrame = true;
  301. }
  302. long currentTicks = sw.ElapsedTicks;
  303. double ticksToNextFrame = nextTicks - currentTicks;
  304. if (ticksToNextFrame <= 0.0)
  305. {
  306. if (hasFrame)
  307. {
  308. try
  309. {
  310. await _udp.SendAsync(voicePacket, rtpPacketLength, _endpoint).ConfigureAwait(false);
  311. }
  312. catch (SocketException ex)
  313. {
  314. Logger.Error("Failed to send UDP packet.", ex);
  315. }
  316. hasFrame = false;
  317. }
  318. nextTicks += ticksPerFrame;
  319. //Is it time to send out another ping?
  320. if (currentTicks > nextPingTicks)
  321. {
  322. //Increment in LE
  323. for (int i = 0; i < 8; i++)
  324. {
  325. var b = pingPacket[i];
  326. if (b == byte.MaxValue)
  327. pingPacket[i] = 0;
  328. else
  329. {
  330. pingPacket[i] = (byte)(b + 1);
  331. break;
  332. }
  333. }
  334. await _udp.SendAsync(pingPacket, pingPacket.Length, _endpoint).ConfigureAwait(false);
  335. nextPingTicks = currentTicks + 5 * ticksPerSeconds;
  336. }
  337. }
  338. else
  339. {
  340. if (hasFrame)
  341. {
  342. int time = (int)Math.Floor(ticksToNextFrame / ticksPerMillisecond);
  343. if (time > 0)
  344. await Task.Delay(time).ConfigureAwait(false);
  345. }
  346. else
  347. await Task.Delay(1).ConfigureAwait(false); //Give as much time to the encrypter as possible
  348. }
  349. }
  350. }
  351. catch (OperationCanceledException) { }
  352. catch (InvalidOperationException) { } //Includes ObjectDisposedException
  353. }
  354. #if !NETSTANDARD1_3
  355. //Closes the UDP socket when _disconnectToken is triggered, since UDPClient doesn't allow passing a canceltoken
  356. private async Task WatcherAsync()
  357. {
  358. await CancelToken.Wait().ConfigureAwait(false);
  359. _udp.Close();
  360. }
  361. #endif
  362. protected override async Task ProcessMessage(string json)
  363. {
  364. await base.ProcessMessage(json).ConfigureAwait(false);
  365. WebSocketMessage msg;
  366. using (var reader = new JsonTextReader(new StringReader(json)))
  367. msg = _serializer.Deserialize(reader, typeof(WebSocketMessage)) as WebSocketMessage;
  368. var opCode = (OpCodes)msg.Operation;
  369. switch (opCode)
  370. {
  371. case OpCodes.Ready:
  372. {
  373. if (State != ConnectionState.Connected)
  374. {
  375. var payload = (msg.Payload as JToken).ToObject<ReadyEvent>(_serializer);
  376. _heartbeatInterval = payload.HeartbeatInterval;
  377. _ssrc = payload.SSRC;
  378. string hostname = Host.Replace("wss://", "");
  379. var address = (await Dns.GetHostAddressesAsync(hostname).ConfigureAwait(false)).FirstOrDefault();
  380. _endpoint = new IPEndPoint(address, payload.Port);
  381. if (_audioConfig.EnableEncryption)
  382. {
  383. if (payload.Modes.Contains(EncryptedMode))
  384. {
  385. _encryptionMode = EncryptedMode;
  386. _isEncrypted = true;
  387. }
  388. else
  389. throw new InvalidOperationException("Unexpected encryption format.");
  390. }
  391. else
  392. {
  393. _encryptionMode = UnencryptedMode;
  394. _isEncrypted = false;
  395. }
  396. _sequence = 0;// (ushort)_rand.Next(0, ushort.MaxValue);
  397. //No thread issue here because SendAsync doesn't start until _isReady is true
  398. byte[] packet = new byte[70];
  399. packet[0] = (byte)(_ssrc >> 24);
  400. packet[1] = (byte)(_ssrc >> 16);
  401. packet[2] = (byte)(_ssrc >> 8);
  402. packet[3] = (byte)(_ssrc >> 0);
  403. await _udp.SendAsync(packet, 70, _endpoint).ConfigureAwait(false);
  404. }
  405. }
  406. break;
  407. case OpCodes.Heartbeat:
  408. {
  409. long time = EpochTime.GetMilliseconds();
  410. var payload = (long)msg.Payload;
  411. _ping = (int)(payload - time);
  412. //TODO: Use this to estimate latency
  413. }
  414. break;
  415. case OpCodes.SessionDescription:
  416. {
  417. var payload = (msg.Payload as JToken).ToObject<SessionDescriptionEvent>(_serializer);
  418. _secretKey = payload.SecretKey;
  419. SendSetSpeaking(true);
  420. await EndConnect().ConfigureAwait(false);
  421. }
  422. break;
  423. case OpCodes.Speaking:
  424. {
  425. var payload = (msg.Payload as JToken).ToObject<SpeakingEvent>(_serializer);
  426. OnUserIsSpeaking(payload.UserId, payload.IsSpeaking);
  427. }
  428. break;
  429. default:
  430. Logger.Warning($"Unknown Opcode: {opCode}");
  431. break;
  432. }
  433. }
  434. public void SendPCMFrames(byte[] data, int offset, int count)
  435. {
  436. _sendBuffer.Push(data, offset, count, CancelToken);
  437. }
  438. public void ClearPCMFrames()
  439. {
  440. _sendBuffer.Clear(CancelToken);
  441. }
  442. public void WaitForQueue()
  443. {
  444. _sendBuffer.Wait(CancelToken);
  445. }
  446. public override void SendHeartbeat()
  447. => QueueMessage(new HeartbeatCommand());
  448. public void SendIdentify(ulong id, string sessionId)
  449. => QueueMessage(new IdentifyCommand
  450. {
  451. GuildId = Server.Id,
  452. UserId = id,
  453. SessionId = sessionId,
  454. Token = Token
  455. });
  456. public void SendSelectProtocol(string externalAddress, int externalPort)
  457. => QueueMessage(new SelectProtocolCommand
  458. {
  459. Protocol = "udp",
  460. ExternalAddress = externalAddress,
  461. ExternalPort = externalPort,
  462. EncryptionMode = _encryptionMode
  463. });
  464. public void SendSetSpeaking(bool value)
  465. => QueueMessage(new SetSpeakingCommand { IsSpeaking = value, Delay = 0 });
  466. }
  467. }