New approach to record and calculate inbound/outbound speed.tags/3.3
@@ -34,12 +34,77 @@ namespace Shadowsocks.Controller | |||
//records cache for current server in {_monitorInterval} minutes | |||
private readonly ConcurrentDictionary<string, List<int>> _latencyRecords = new ConcurrentDictionary<string, List<int>>(); | |||
//speed in KiB/s | |||
private readonly ConcurrentDictionary<string, long> _inboundCounter = new ConcurrentDictionary<string, long>(); | |||
private readonly ConcurrentDictionary<string, long> _lastInboundCounter = new ConcurrentDictionary<string, long>(); | |||
private readonly ConcurrentDictionary<string, List<int>> _inboundSpeedRecords = new ConcurrentDictionary<string, List<int>>(); | |||
private readonly ConcurrentDictionary<string, long> _outboundCounter = new ConcurrentDictionary<string, long>(); | |||
private readonly ConcurrentDictionary<string, long> _lastOutboundCounter = new ConcurrentDictionary<string, long>(); | |||
private readonly ConcurrentDictionary<string, List<int>> _outboundSpeedRecords = new ConcurrentDictionary<string, List<int>>(); | |||
private readonly ConcurrentDictionary<string, InOutBoundRecord> _inOutBoundRecords = new ConcurrentDictionary<string, InOutBoundRecord>(); | |||
private class InOutBoundRecord | |||
{ | |||
private long _inbound; | |||
private long _lastInbound; | |||
private long _outbound; | |||
private long _lastOutbound; | |||
private SpinLock _lock = new SpinLock(); | |||
public void UpdateInbound(long delta) | |||
{ | |||
bool lockTaken = false; | |||
try | |||
{ | |||
_lock.Enter(ref lockTaken); | |||
Interlocked.Add(ref _inbound, delta); | |||
} | |||
finally | |||
{ | |||
if (lockTaken) | |||
{ | |||
_lock.Exit(false); | |||
} | |||
} | |||
} | |||
public void UpdateOutbound(long delta) | |||
{ | |||
bool lockTaken = false; | |||
try | |||
{ | |||
_lock.Enter(ref lockTaken); | |||
Interlocked.Add(ref _outbound, delta); | |||
} | |||
finally | |||
{ | |||
if (lockTaken) | |||
{ | |||
_lock.Exit(false); | |||
} | |||
} | |||
} | |||
public void GetDelta(out long inboundDelta, out long outboundDelta) | |||
{ | |||
bool lockTaken = false; | |||
try | |||
{ | |||
_lock.Enter(ref lockTaken); | |||
var i = Interlocked.Read(ref _inbound); | |||
var il = Interlocked.Exchange(ref _lastInbound, i); | |||
inboundDelta = i - il; | |||
var o = Interlocked.Read(ref _outbound); | |||
var ol = Interlocked.Exchange(ref _lastOutbound, o); | |||
outboundDelta = o - ol; | |||
} | |||
finally | |||
{ | |||
if (lockTaken) | |||
{ | |||
_lock.Exit(false); | |||
} | |||
} | |||
} | |||
} | |||
//tasks | |||
private readonly TimeSpan _delayBeforeStart = TimeSpan.FromSeconds(1); | |||
@@ -98,36 +163,26 @@ namespace Shadowsocks.Controller | |||
private void UpdateSpeed(object _) | |||
{ | |||
foreach (var kv in _lastInboundCounter) | |||
foreach (var kv in _inOutBoundRecords) | |||
{ | |||
var id = kv.Key; | |||
var record = kv.Value; | |||
var lastInbound = kv.Value; | |||
var inbound = _inboundCounter[id]; | |||
var bytes = inbound - lastInbound; | |||
_lastInboundCounter[id] = inbound; | |||
var inboundSpeed = GetSpeedInKiBPerSecond(bytes, _monitorInterval.TotalSeconds); | |||
_inboundSpeedRecords.GetOrAdd(id, (k) => | |||
{ | |||
List<int> records = new List<int>(); | |||
records.Add(inboundSpeed); | |||
return records; | |||
}); | |||
long inboundDelta, outboundDelta; | |||
var lastOutbound = _lastOutboundCounter[id]; | |||
var outbound = _outboundCounter[id]; | |||
bytes = outbound - lastOutbound; | |||
_lastOutboundCounter[id] = outbound; | |||
var outboundSpeed = GetSpeedInKiBPerSecond(bytes, _monitorInterval.TotalSeconds); | |||
_outboundSpeedRecords.GetOrAdd(id, (k) => | |||
{ | |||
List<int> records = new List<int>(); | |||
records.Add(outboundSpeed); | |||
return records; | |||
}); | |||
record.GetDelta(out inboundDelta, out outboundDelta); | |||
var inboundSpeed = GetSpeedInKiBPerSecond(inboundDelta, _monitorInterval.TotalSeconds); | |||
var outboundSpeed = GetSpeedInKiBPerSecond(outboundDelta, _monitorInterval.TotalSeconds); | |||
var inR = _inboundSpeedRecords.GetOrAdd(id, (k) => new List<int>()); | |||
var outR = _outboundSpeedRecords.GetOrAdd(id, (k) => new List<int>()); | |||
inR.Add(inboundSpeed); | |||
outR.Add(outboundSpeed); | |||
Logging.Debug( | |||
$"{id}: current/max inbound {inboundSpeed}/{_inboundSpeedRecords[id].Max()} KiB/s, current/max outbound {outboundSpeed}/{_outboundSpeedRecords[id].Max()} KiB/s"); | |||
$"{id}: current/max inbound {inboundSpeed}/{inR.Max()} KiB/s, current/max outbound {outboundSpeed}/{outR.Max()} KiB/s"); | |||
} | |||
} | |||
@@ -327,20 +382,32 @@ namespace Shadowsocks.Controller | |||
public void UpdateInboundCounter(Server server, long n) | |||
{ | |||
_inboundCounter.AddOrUpdate(server.Identifier(), (k) => | |||
_inOutBoundRecords.AddOrUpdate(server.Identifier(), (k) => | |||
{ | |||
_lastInboundCounter.GetOrAdd(server.Identifier(), 0); | |||
return n; | |||
}, (k, v) => (v + n)); | |||
var r = new InOutBoundRecord(); | |||
r.UpdateInbound(n); | |||
return r; | |||
}, (k, v) => | |||
{ | |||
v.UpdateInbound(n); | |||
return v; | |||
}); | |||
} | |||
public void UpdateOutboundCounter(Server server, long n) | |||
{ | |||
_outboundCounter.AddOrUpdate(server.Identifier(), (k) => | |||
_inOutBoundRecords.AddOrUpdate(server.Identifier(), (k) => | |||
{ | |||
_lastOutboundCounter.GetOrAdd(server.Identifier(), 0); | |||
return n; | |||
}, (k, v) => (v + n)); | |||
var r = new InOutBoundRecord(); | |||
r.UpdateOutbound(n); | |||
return r; | |||
}, (k, v) => | |||
{ | |||
v.UpdateOutbound(n); | |||
return v; | |||
}); | |||
} | |||
class UpdateRecordsState | |||
@@ -1,5 +1,6 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Linq; | |||
using System.Net; | |||
using System.Net.NetworkInformation; | |||
using System.Net.Sockets; | |||
@@ -10,13 +11,23 @@ namespace Shadowsocks.Controller | |||
{ | |||
public class Listener | |||
{ | |||
public interface Service | |||
public interface IService | |||
{ | |||
bool Handle(byte[] firstPacket, int length, Socket socket, object state); | |||
void Stop(); | |||
} | |||
public abstract class Service : IService | |||
{ | |||
public abstract bool Handle(byte[] firstPacket, int length, Socket socket, object state); | |||
public virtual void Stop() { } | |||
} | |||
public class UDPState | |||
{ | |||
public Socket socket; | |||
public byte[] buffer = new byte[4096]; | |||
public EndPoint remoteEndPoint = new IPEndPoint(IPAddress.Any, 0); | |||
} | |||
@@ -25,9 +36,9 @@ namespace Shadowsocks.Controller | |||
bool _shareOverLAN; | |||
Socket _tcpSocket; | |||
Socket _udpSocket; | |||
IList<Service> _services; | |||
List<IService> _services; | |||
public Listener(IList<Service> services) | |||
public Listener(List<IService> services) | |||
{ | |||
this._services = services; | |||
} | |||
@@ -76,6 +87,7 @@ namespace Shadowsocks.Controller | |||
Logging.Info("Shadowsocks started"); | |||
_tcpSocket.BeginAccept(new AsyncCallback(AcceptCallback), _tcpSocket); | |||
UDPState udpState = new UDPState(); | |||
udpState.socket = _udpSocket; | |||
_udpSocket.BeginReceiveFrom(udpState.buffer, 0, udpState.buffer.Length, 0, ref udpState.remoteEndPoint, new AsyncCallback(RecvFromCallback), udpState); | |||
} | |||
catch (SocketException) | |||
@@ -97,17 +109,20 @@ namespace Shadowsocks.Controller | |||
_udpSocket.Close(); | |||
_udpSocket = null; | |||
} | |||
_services.ForEach(s=>s.Stop()); | |||
} | |||
public void RecvFromCallback(IAsyncResult ar) | |||
{ | |||
UDPState state = (UDPState)ar.AsyncState; | |||
var socket = state.socket; | |||
try | |||
{ | |||
int bytesRead = _udpSocket.EndReceiveFrom(ar, ref state.remoteEndPoint); | |||
foreach (Service service in _services) | |||
int bytesRead = socket.EndReceiveFrom(ar, ref state.remoteEndPoint); | |||
foreach (IService service in _services) | |||
{ | |||
if (service.Handle(state.buffer, bytesRead, _udpSocket, state)) | |||
if (service.Handle(state.buffer, bytesRead, socket, state)) | |||
{ | |||
break; | |||
} | |||
@@ -124,7 +139,7 @@ namespace Shadowsocks.Controller | |||
{ | |||
try | |||
{ | |||
_udpSocket.BeginReceiveFrom(state.buffer, 0, state.buffer.Length, 0, ref state.remoteEndPoint, new AsyncCallback(RecvFromCallback), state); | |||
socket.BeginReceiveFrom(state.buffer, 0, state.buffer.Length, 0, ref state.remoteEndPoint, new AsyncCallback(RecvFromCallback), state); | |||
} | |||
catch (ObjectDisposedException) | |||
{ | |||
@@ -187,7 +202,7 @@ namespace Shadowsocks.Controller | |||
try | |||
{ | |||
int bytesRead = conn.EndReceive(ar); | |||
foreach (Service service in _services) | |||
foreach (IService service in _services) | |||
{ | |||
if (service.Handle(buf, bytesRead, conn, null)) | |||
{ | |||
@@ -35,7 +35,7 @@ namespace Shadowsocks.Controller | |||
this._config = config; | |||
} | |||
public bool Handle(byte[] firstPacket, int length, Socket socket, object state) | |||
public override bool Handle(byte[] firstPacket, int length, Socket socket, object state) | |||
{ | |||
if (socket.ProtocolType != ProtocolType.Tcp) | |||
{ | |||
@@ -14,7 +14,7 @@ namespace Shadowsocks.Controller | |||
this._targetPort = targetPort; | |||
} | |||
public bool Handle(byte[] firstPacket, int length, Socket socket, object state) | |||
public override bool Handle(byte[] firstPacket, int length, Socket socket, object state) | |||
{ | |||
if (socket.ProtocolType != ProtocolType.Tcp) | |||
{ | |||
@@ -29,7 +29,7 @@ namespace Shadowsocks.Controller | |||
_lastSweepTime = DateTime.Now; | |||
} | |||
public bool Handle(byte[] firstPacket, int length, Socket socket, object state) | |||
public override bool Handle(byte[] firstPacket, int length, Socket socket, object state) | |||
{ | |||
if (socket.ProtocolType != ProtocolType.Tcp | |||
|| (length < 2 || firstPacket[0] != 5)) | |||
@@ -62,6 +62,16 @@ namespace Shadowsocks.Controller | |||
return true; | |||
} | |||
public override void Stop() | |||
{ | |||
List<TCPHandler> handlersToClose = new List<TCPHandler>(); | |||
lock (Handlers) | |||
{ | |||
handlersToClose.AddRange(Handlers); | |||
} | |||
handlersToClose.ForEach(h=>h.Close()); | |||
} | |||
public void UpdateInboundCounter(Server server, long n) | |||
{ | |||
_controller.UpdateInboundCounter(server, n); | |||
@@ -24,7 +24,7 @@ namespace Shadowsocks.Controller | |||
this._cache = new LRUCache<IPEndPoint, UDPHandler>(512); // todo: choose a smart number | |||
} | |||
public bool Handle(byte[] firstPacket, int length, Socket socket, object state) | |||
public override bool Handle(byte[] firstPacket, int length, Socket socket, object state) | |||
{ | |||
if (socket.ProtocolType != ProtocolType.Udp) | |||
{ | |||
@@ -33,8 +33,10 @@ namespace Shadowsocks.Controller | |||
public AvailabilityStatistics availabilityStatistics = AvailabilityStatistics.Instance; | |||
public StatisticsStrategyConfiguration StatisticsConfiguration { get; private set; } | |||
public long inboundCounter = 0; | |||
public long outboundCounter = 0; | |||
private long _inboundCounter = 0; | |||
private long _outboundCounter = 0; | |||
public long InboundCounter => Interlocked.Read(ref _inboundCounter); | |||
public long OutboundCounter => Interlocked.Read(ref _outboundCounter); | |||
public QueueLast<TrafficPerSecond> traffic; | |||
private bool stopped = false; | |||
@@ -364,7 +366,7 @@ namespace Shadowsocks.Controller | |||
public void UpdateInboundCounter(Server server, long n) | |||
{ | |||
Interlocked.Add(ref inboundCounter, n); | |||
Interlocked.Add(ref _inboundCounter, n); | |||
if (_config.availabilityStatistics) | |||
{ | |||
availabilityStatistics.UpdateInboundCounter(server, n); | |||
@@ -373,7 +375,7 @@ namespace Shadowsocks.Controller | |||
public void UpdateOutboundCounter(Server server, long n) | |||
{ | |||
Interlocked.Add(ref outboundCounter, n); | |||
Interlocked.Add(ref _outboundCounter, n); | |||
if (_config.availabilityStatistics) | |||
{ | |||
availabilityStatistics.UpdateOutboundCounter(server, n); | |||
@@ -427,7 +429,7 @@ namespace Shadowsocks.Controller | |||
TCPRelay tcpRelay = new TCPRelay(this, _config); | |||
UDPRelay udpRelay = new UDPRelay(this); | |||
List<Listener.Service> services = new List<Listener.Service>(); | |||
List<Listener.IService> services = new List<Listener.IService>(); | |||
services.Add(tcpRelay); | |||
services.Add(udpRelay); | |||
services.Add(_pacServer); | |||
@@ -579,19 +581,17 @@ namespace Shadowsocks.Controller | |||
{ | |||
TrafficPerSecond previous = traffic.Last; | |||
TrafficPerSecond current = new TrafficPerSecond(); | |||
current.inboundCounter = inboundCounter; | |||
current.outboundCounter = outboundCounter; | |||
current.inboundIncreasement = inboundCounter - previous.inboundCounter; | |||
current.outboundIncreasement = outboundCounter - previous.outboundCounter; | |||
var inbound = current.inboundCounter = InboundCounter; | |||
var outbound = current.outboundCounter = OutboundCounter; | |||
current.inboundIncreasement = inbound - previous.inboundCounter; | |||
current.outboundIncreasement = outbound - previous.outboundCounter; | |||
traffic.Enqueue(current); | |||
if (traffic.Count > queueMaxSize) | |||
traffic.Dequeue(); | |||
if (TrafficChanged != null) | |||
{ | |||
TrafficChanged(this, new EventArgs()); | |||
} | |||
TrafficChanged?.Invoke(this, new EventArgs()); | |||
Thread.Sleep(1000); | |||
} | |||
@@ -171,7 +171,7 @@ namespace Shadowsocks.View | |||
} | |||
this.Text = I18N.GetString("Log Viewer") + | |||
$" [in: {Utils.FormatBandwidth(controller.inboundCounter)}, out: {Utils.FormatBandwidth(controller.outboundCounter)}]"; | |||
$" [in: {Utils.FormatBandwidth(controller.InboundCounter)}, out: {Utils.FormatBandwidth(controller.OutboundCounter)}]"; | |||
} | |||
private void LogForm_Load(object sender, EventArgs e) | |||