@@ -49,7 +49,7 @@ namespace Discord.Audio | |||||
private DiscordAudioClient _defaultClient; | private DiscordAudioClient _defaultClient; | ||||
private ConcurrentDictionary<ulong, DiscordAudioClient> _voiceClients; | private ConcurrentDictionary<ulong, DiscordAudioClient> _voiceClients; | ||||
private ConcurrentDictionary<User, bool> _talkingUsers; | private ConcurrentDictionary<User, bool> _talkingUsers; | ||||
private int _nextClientId; | |||||
//private int _nextClientId; | |||||
internal DiscordClient Client => _client; | internal DiscordClient Client => _client; | ||||
private DiscordClient _client; | private DiscordClient _client; | ||||
@@ -143,13 +143,14 @@ namespace Discord.Audio | |||||
_defaultClient.SetServerId(server.Id); | _defaultClient.SetServerId(server.Id); | ||||
return Task.FromResult(_defaultClient); | return Task.FromResult(_defaultClient); | ||||
} | } | ||||
else | |||||
throw new InvalidOperationException("Multiserver voice is not currently supported"); | |||||
var client = _voiceClients.GetOrAdd(server.Id, _ => | |||||
/*var client = _voiceClients.GetOrAdd(server.Id, _ => | |||||
{ | { | ||||
int id = unchecked(++_nextClientId); | int id = unchecked(++_nextClientId); | ||||
var logger = Client.Log.CreateLogger($"Voice #{id}"); | var logger = Client.Log.CreateLogger($"Voice #{id}"); | ||||
GatewaySocket gatewaySocket = null; | |||||
var voiceClient = new DiscordAudioClient(this, id, logger, gatewaySocket); | |||||
var voiceClient = new DiscordAudioClient(this, id, logger, Client.GatewaySocket); | |||||
voiceClient.SetServerId(server.Id); | voiceClient.SetServerId(server.Id); | ||||
voiceClient.VoiceSocket.OnPacket += (s, e) => | voiceClient.VoiceSocket.OnPacket += (s, e) => | ||||
@@ -165,7 +166,7 @@ namespace Discord.Audio | |||||
return voiceClient; | return voiceClient; | ||||
}); | }); | ||||
//await client.Connect(gatewaySocket.Host, _client.Token).ConfigureAwait(false); | //await client.Connect(gatewaySocket.Host, _client.Token).ConfigureAwait(false); | ||||
return Task.FromResult(client); | |||||
return Task.FromResult(client);*/ | |||||
} | } | ||||
public async Task<DiscordAudioClient> Join(Channel channel) | public async Task<DiscordAudioClient> Join(Channel channel) | ||||
@@ -1,18 +1,20 @@ | |||||
using Discord.API; | |||||
using Discord.API.Client.GatewaySocket; | |||||
using Discord.API.Client.GatewaySocket; | |||||
using Discord.Logging; | using Discord.Logging; | ||||
using Discord.Net.WebSockets; | using Discord.Net.WebSockets; | ||||
using Newtonsoft.Json; | using Newtonsoft.Json; | ||||
using System; | using System; | ||||
using System.Threading; | |||||
using System.Threading.Tasks; | using System.Threading.Tasks; | ||||
namespace Discord.Audio | namespace Discord.Audio | ||||
{ | { | ||||
public partial class DiscordAudioClient | public partial class DiscordAudioClient | ||||
{ | |||||
private JsonSerializer _serializer; | |||||
{ | |||||
private readonly Semaphore _connectionLock; | |||||
private readonly JsonSerializer _serializer; | |||||
private CancellationTokenSource _cancelTokenSource; | |||||
internal AudioService Service { get; } | |||||
internal AudioService Service { get; } | |||||
internal Logger Logger { get; } | internal Logger Logger { get; } | ||||
public int Id { get; } | public int Id { get; } | ||||
public GatewaySocket GatewaySocket { get; } | public GatewaySocket GatewaySocket { get; } | ||||
@@ -26,6 +28,9 @@ namespace Discord.Audio | |||||
Service = service; | Service = service; | ||||
Id = id; | Id = id; | ||||
Logger = logger; | Logger = logger; | ||||
GatewaySocket = gatewaySocket; | |||||
_connectionLock = new Semaphore(1, 1); | |||||
_serializer = new JsonSerializer(); | _serializer = new JsonSerializer(); | ||||
_serializer.DateTimeZoneHandling = DateTimeZoneHandling.Utc; | _serializer.DateTimeZoneHandling = DateTimeZoneHandling.Utc; | ||||
@@ -35,7 +40,6 @@ namespace Discord.Audio | |||||
Logger.Error("Serialization Failed", e.ErrorContext.Error); | Logger.Error("Serialization Failed", e.ErrorContext.Error); | ||||
}; | }; | ||||
GatewaySocket = gatewaySocket; | |||||
VoiceSocket = new VoiceWebSocket(service.Client, this, _serializer, logger); | VoiceSocket = new VoiceWebSocket(service.Client, this, _serializer, logger); | ||||
/*_voiceSocket.Connected += (s, e) => RaiseVoiceConnected(); | /*_voiceSocket.Connected += (s, e) => RaiseVoiceConnected(); | ||||
@@ -72,33 +76,7 @@ namespace Discord.Audio | |||||
_voiceSocket.ParentCancelToken = _cancelToken; | _voiceSocket.ParentCancelToken = _cancelToken; | ||||
};*/ | };*/ | ||||
GatewaySocket.ReceivedDispatch += async (s, e) => | |||||
{ | |||||
try | |||||
{ | |||||
switch (e.Type) | |||||
{ | |||||
case "VOICE_SERVER_UPDATE": | |||||
{ | |||||
var data = e.Payload.ToObject<VoiceServerUpdateEvent>(_serializer); | |||||
var serverId = data.GuildId; | |||||
if (serverId == ServerId) | |||||
{ | |||||
var client = Service.Client; | |||||
VoiceSocket.Token = data.Token; | |||||
VoiceSocket.Host = "wss://" + e.Payload.Value<string>("endpoint").Split(':')[0]; | |||||
await VoiceSocket.Connect().ConfigureAwait(false); | |||||
} | |||||
} | |||||
break; | |||||
} | |||||
} | |||||
catch (Exception ex) | |||||
{ | |||||
Logger.Error($"Error handling {e.Type} event", ex); | |||||
} | |||||
}; | |||||
GatewaySocket.ReceivedDispatch += OnReceivedDispatch; | |||||
} | } | ||||
@@ -106,22 +84,71 @@ namespace Discord.Audio | |||||
{ | { | ||||
VoiceSocket.ServerId = serverId; | VoiceSocket.ServerId = serverId; | ||||
} | } | ||||
public async Task Join(Channel channel) | |||||
public Task Join(Channel channel) | |||||
{ | { | ||||
if (channel == null) throw new ArgumentNullException(nameof(channel)); | if (channel == null) throw new ArgumentNullException(nameof(channel)); | ||||
ulong? serverId = channel.Server?.Id; | ulong? serverId = channel.Server?.Id; | ||||
if (serverId != ServerId) | if (serverId != ServerId) | ||||
throw new InvalidOperationException("Cannot join a channel on a different server than this voice client."); | throw new InvalidOperationException("Cannot join a channel on a different server than this voice client."); | ||||
//CheckReady(checkVoice: true); | |||||
//CheckReady(checkVoice: true); | |||||
await VoiceSocket.Disconnect().ConfigureAwait(false); | |||||
VoiceSocket.ChannelId = channel.Id; | |||||
GatewaySocket.SendUpdateVoice(channel.Server.Id, channel.Id, | |||||
(Service.Config.Mode | AudioMode.Outgoing) == 0, | |||||
(Service.Config.Mode | AudioMode.Incoming) == 0); | |||||
await VoiceSocket.WaitForConnection(Service.Config.ConnectionTimeout).ConfigureAwait(false); | |||||
return Task.Run(async () => | |||||
{ | |||||
_connectionLock.WaitOne(); | |||||
try | |||||
{ | |||||
await VoiceSocket.Disconnect().ConfigureAwait(false); | |||||
_cancelTokenSource = new CancellationTokenSource(); | |||||
var cancelToken = _cancelTokenSource.Token; | |||||
VoiceSocket.ParentCancelToken = cancelToken; | |||||
VoiceSocket.ChannelId = channel.Id; | |||||
GatewaySocket.SendUpdateVoice(channel.Server.Id, channel.Id, | |||||
(Service.Config.Mode | AudioMode.Outgoing) == 0, | |||||
(Service.Config.Mode | AudioMode.Incoming) == 0); | |||||
VoiceSocket.WaitForConnection(cancelToken); | |||||
} | |||||
finally | |||||
{ | |||||
_connectionLock.Release(); | |||||
} | |||||
}); | |||||
} | |||||
public Task Disconnect() | |||||
{ | |||||
GatewaySocket.ReceivedDispatch -= OnReceivedDispatch; | |||||
return VoiceSocket.Disconnect(); | |||||
} | |||||
private async void OnReceivedDispatch(object sender, WebSocketEventEventArgs e) | |||||
{ | |||||
try | |||||
{ | |||||
switch (e.Type) | |||||
{ | |||||
case "VOICE_SERVER_UPDATE": | |||||
{ | |||||
var data = e.Payload.ToObject<VoiceServerUpdateEvent>(_serializer); | |||||
var serverId = data.GuildId; | |||||
if (serverId == ServerId) | |||||
{ | |||||
var client = Service.Client; | |||||
VoiceSocket.Token = data.Token; | |||||
VoiceSocket.Host = "wss://" + e.Payload.Value<string>("endpoint").Split(':')[0]; | |||||
await VoiceSocket.Connect().ConfigureAwait(false); | |||||
} | |||||
} | |||||
break; | |||||
} | |||||
} | |||||
catch (Exception ex) | |||||
{ | |||||
Logger.Error($"Error handling {e.Type} event", ex); | |||||
} | |||||
} | } | ||||
public Task Disconnect() => VoiceSocket.Disconnect(); | |||||
/// <summary> Sends a PCM frame to the voice server. Will block until space frees up in the outgoing buffer. </summary> | /// <summary> Sends a PCM frame to the voice server. Will block until space frees up in the outgoing buffer. </summary> | ||||
/// <param name="data">PCM frame to send. This must be a single or collection of uncompressed 48Kz monochannel 20ms PCM frames. </param> | /// <param name="data">PCM frame to send. This must be a single or collection of uncompressed 48Kz monochannel 20ms PCM frames. </param> | ||||
@@ -63,10 +63,8 @@ namespace Discord.Net.WebSockets | |||||
_sendBuffer = new VoiceBuffer((int)Math.Ceiling(_config.BufferLength / (double)_encoder.FrameLength), _encoder.FrameSize); | _sendBuffer = new VoiceBuffer((int)Math.Ceiling(_config.BufferLength / (double)_encoder.FrameLength), _encoder.FrameSize); | ||||
} | } | ||||
public async Task Connect() | |||||
{ | |||||
await BeginConnect().ConfigureAwait(false); | |||||
} | |||||
public Task Connect() | |||||
=> BeginConnect(); | |||||
public async Task Reconnect() | public async Task Reconnect() | ||||
{ | { | ||||
try | try | ||||
@@ -473,21 +471,6 @@ namespace Discord.Net.WebSockets | |||||
{ | { | ||||
_sendBuffer.Wait(CancelToken); | _sendBuffer.Wait(CancelToken); | ||||
} | } | ||||
public Task WaitForConnection(int timeout) | |||||
{ | |||||
return Task.Run(() => | |||||
{ | |||||
try | |||||
{ | |||||
if (!_connectedEvent.Wait(timeout, CancelToken)) | |||||
throw new TimeoutException(); | |||||
} | |||||
catch (OperationCanceledException) | |||||
{ | |||||
_taskManager.ThrowException(); | |||||
} | |||||
}); | |||||
} | |||||
public override void SendHeartbeat() | public override void SendHeartbeat() | ||||
=> QueueMessage(new HeartbeatCommand()); | => QueueMessage(new HeartbeatCommand()); | ||||
@@ -6,7 +6,7 @@ namespace Discord.API.Client.GatewaySocket | |||||
[JsonObject(MemberSerialization.OptIn)] | [JsonObject(MemberSerialization.OptIn)] | ||||
public sealed class UpdateVoiceCommand : IWebSocketMessage | public sealed class UpdateVoiceCommand : IWebSocketMessage | ||||
{ | { | ||||
int IWebSocketMessage.OpCode => (int)OpCodes.StatusUpdate; | |||||
int IWebSocketMessage.OpCode => (int)OpCodes.VoiceStateUpdate; | |||||
object IWebSocketMessage.Payload => this; | object IWebSocketMessage.Payload => this; | ||||
bool IWebSocketMessage.IsPrivate => false; | bool IWebSocketMessage.IsPrivate => false; | ||||
@@ -126,11 +126,6 @@ namespace Discord | |||||
if (Config.UseMessageQueue) | if (Config.UseMessageQueue) | ||||
MessageQueue = new MessageQueue(this, Log.CreateLogger("MessageQueue")); | MessageQueue = new MessageQueue(this, Log.CreateLogger("MessageQueue")); | ||||
Connected += async (s, e) => | |||||
{ | |||||
ClientAPI.CancelToken = CancelToken; | |||||
await SendStatus().ConfigureAwait(false); | |||||
}; | |||||
//Extensibility | //Extensibility | ||||
Services = new ServiceManager(this); | Services = new ServiceManager(this); | ||||
@@ -172,6 +167,10 @@ namespace Discord | |||||
State = ConnectionState.Connecting; | State = ConnectionState.Connecting; | ||||
_disconnectedEvent.Reset(); | _disconnectedEvent.Reset(); | ||||
_cancelTokenSource = new CancellationTokenSource(); | |||||
CancelToken = _cancelTokenSource.Token; | |||||
GatewaySocket.ParentCancelToken = CancelToken; | |||||
await Login(email, password, token).ConfigureAwait(false); | await Login(email, password, token).ConfigureAwait(false); | ||||
await GatewaySocket.Connect().ConfigureAwait(false); | await GatewaySocket.Connect().ConfigureAwait(false); | ||||
@@ -196,10 +195,6 @@ namespace Discord | |||||
} | } | ||||
private async Task Login(string email, string password, string token) | private async Task Login(string email, string password, string token) | ||||
{ | { | ||||
_cancelTokenSource = new CancellationTokenSource(); | |||||
CancelToken = _cancelTokenSource.Token; | |||||
GatewaySocket.ParentCancelToken = CancelToken; | |||||
bool useCache = Config.CacheToken; | bool useCache = Config.CacheToken; | ||||
while (true) | while (true) | ||||
{ | { | ||||
@@ -261,6 +256,9 @@ namespace Discord | |||||
{ | { | ||||
State = ConnectionState.Connected; | State = ConnectionState.Connected; | ||||
_connectedEvent.Set(); | _connectedEvent.Set(); | ||||
ClientAPI.CancelToken = CancelToken; | |||||
SendStatus(); | |||||
OnConnected(); | OnConnected(); | ||||
} | } | ||||
@@ -293,21 +291,19 @@ namespace Discord | |||||
_disconnectedEvent.Set(); | _disconnectedEvent.Set(); | ||||
} | } | ||||
public Task SetStatus(UserStatus status) | |||||
public void SetStatus(UserStatus status) | |||||
{ | { | ||||
if (status == null) throw new ArgumentNullException(nameof(status)); | if (status == null) throw new ArgumentNullException(nameof(status)); | ||||
if (status != UserStatus.Online && status != UserStatus.Idle) | if (status != UserStatus.Online && status != UserStatus.Idle) | ||||
throw new ArgumentException($"Invalid status, must be {UserStatus.Online} or {UserStatus.Idle}", nameof(status)); | throw new ArgumentException($"Invalid status, must be {UserStatus.Online} or {UserStatus.Idle}", nameof(status)); | ||||
Status = status; | Status = status; | ||||
return SendStatus(); | |||||
} | } | ||||
public Task SetGame(string game) | |||||
public void SetGame(string game) | |||||
{ | { | ||||
CurrentGame = game; | CurrentGame = game; | ||||
return SendStatus(); | |||||
} | } | ||||
private Task SendStatus() | |||||
private void SendStatus() | |||||
{ | { | ||||
PrivateUser.Status = Status; | PrivateUser.Status = Status; | ||||
PrivateUser.CurrentGame = CurrentGame; | PrivateUser.CurrentGame = CurrentGame; | ||||
@@ -321,7 +317,6 @@ namespace Discord | |||||
} | } | ||||
} | } | ||||
GatewaySocket.SendUpdateStatus(Status == UserStatus.Idle ? EpochTime.GetMilliseconds() - (10 * 60 * 1000) : (long?)null, CurrentGame); | GatewaySocket.SendUpdateStatus(Status == UserStatus.Idle ? EpochTime.GetMilliseconds() - (10 * 60 * 1000) : (long?)null, CurrentGame); | ||||
return TaskHelper.CompletedTask; | |||||
} | } | ||||
#region Channels | #region Channels | ||||
@@ -5,6 +5,7 @@ using Newtonsoft.Json; | |||||
using Newtonsoft.Json.Linq; | using Newtonsoft.Json.Linq; | ||||
using System; | using System; | ||||
using System.Collections.Generic; | using System.Collections.Generic; | ||||
using System.Threading; | |||||
using System.Threading.Tasks; | using System.Threading.Tasks; | ||||
namespace Discord.Net.WebSockets | namespace Discord.Net.WebSockets | ||||
@@ -176,5 +177,9 @@ namespace Discord.Net.WebSockets | |||||
=> QueueMessage(new UpdateVoiceCommand { GuildId = serverId, ChannelId = channelId, IsSelfMuted = isSelfMuted, IsSelfDeafened = isSelfDeafened }); | => QueueMessage(new UpdateVoiceCommand { GuildId = serverId, ChannelId = channelId, IsSelfMuted = isSelfMuted, IsSelfDeafened = isSelfDeafened }); | ||||
public void SendRequestMembers(ulong serverId, string query, int limit) | public void SendRequestMembers(ulong serverId, string query, int limit) | ||||
=> QueueMessage(new RequestMembersCommand { GuildId = serverId, Query = query, Limit = limit }); | => QueueMessage(new RequestMembersCommand { GuildId = serverId, Query = query, Limit = limit }); | ||||
} | |||||
//Cancel if either DiscordClient.Disconnect is called, data socket errors or timeout is reached | |||||
public override void WaitForConnection(CancellationToken cancelToken) | |||||
=> base.WaitForConnection(CancellationTokenSource.CreateLinkedTokenSource(cancelToken, CancelToken).Token); | |||||
} | |||||
} | } |
@@ -173,12 +173,10 @@ namespace Discord.Net.WebSockets | |||||
} | } | ||||
public abstract void SendHeartbeat(); | public abstract void SendHeartbeat(); | ||||
public void WaitForConnection(CancellationToken cancelToken) | |||||
public virtual void WaitForConnection(CancellationToken cancelToken) | |||||
{ | { | ||||
try | try | ||||
{ | { | ||||
//Cancel if either DiscordClient.Disconnect is called, data socket errors or timeout is reached | |||||
cancelToken = CancellationTokenSource.CreateLinkedTokenSource(cancelToken, CancelToken).Token; | |||||
if (!_connectedEvent.Wait(_client.Config.ConnectionTimeout, cancelToken)) | if (!_connectedEvent.Wait(_client.Config.ConnectionTimeout, cancelToken)) | ||||
throw new TimeoutException(); | throw new TimeoutException(); | ||||
} | } | ||||