From a7b0ce5bcb89e19d2f6365e86227cb85e97ea418 Mon Sep 17 00:00:00 2001 From: noisyfox Date: Fri, 26 Aug 2016 12:22:11 +1000 Subject: [PATCH 1/4] Use atomic long read. Signed-off-by: noisyfox --- .../Controller/ShadowsocksController.cs | 24 +++++++++++----------- shadowsocks-csharp/View/LogForm.cs | 2 +- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/shadowsocks-csharp/Controller/ShadowsocksController.cs b/shadowsocks-csharp/Controller/ShadowsocksController.cs index 5b6cd588..a2a34eee 100644 --- a/shadowsocks-csharp/Controller/ShadowsocksController.cs +++ b/shadowsocks-csharp/Controller/ShadowsocksController.cs @@ -33,8 +33,10 @@ namespace Shadowsocks.Controller public AvailabilityStatistics availabilityStatistics = AvailabilityStatistics.Instance; public StatisticsStrategyConfiguration StatisticsConfiguration { get; private set; } - public long inboundCounter = 0; - public long outboundCounter = 0; + private long _inboundCounter = 0; + private long _outboundCounter = 0; + public long InboundCounter => Interlocked.Read(ref _inboundCounter); + public long OutboundCounter => Interlocked.Read(ref _outboundCounter); public QueueLast traffic; private bool stopped = false; @@ -364,7 +366,7 @@ namespace Shadowsocks.Controller public void UpdateInboundCounter(Server server, long n) { - Interlocked.Add(ref inboundCounter, n); + Interlocked.Add(ref _inboundCounter, n); if (_config.availabilityStatistics) { availabilityStatistics.UpdateInboundCounter(server, n); @@ -373,7 +375,7 @@ namespace Shadowsocks.Controller public void UpdateOutboundCounter(Server server, long n) { - Interlocked.Add(ref outboundCounter, n); + Interlocked.Add(ref _outboundCounter, n); if (_config.availabilityStatistics) { availabilityStatistics.UpdateOutboundCounter(server, n); @@ -579,19 +581,17 @@ namespace Shadowsocks.Controller { TrafficPerSecond previous = traffic.Last; TrafficPerSecond current = new TrafficPerSecond(); - current.inboundCounter = inboundCounter; - current.outboundCounter = outboundCounter; - current.inboundIncreasement = inboundCounter - previous.inboundCounter; - current.outboundIncreasement = outboundCounter - previous.outboundCounter; + + var inbound = current.inboundCounter = InboundCounter; + var outbound = current.outboundCounter = OutboundCounter; + current.inboundIncreasement = inbound - previous.inboundCounter; + current.outboundIncreasement = outbound - previous.outboundCounter; traffic.Enqueue(current); if (traffic.Count > queueMaxSize) traffic.Dequeue(); - if (TrafficChanged != null) - { - TrafficChanged(this, new EventArgs()); - } + TrafficChanged?.Invoke(this, new EventArgs()); Thread.Sleep(1000); } diff --git a/shadowsocks-csharp/View/LogForm.cs b/shadowsocks-csharp/View/LogForm.cs index 481c341a..7aab4e8e 100644 --- a/shadowsocks-csharp/View/LogForm.cs +++ b/shadowsocks-csharp/View/LogForm.cs @@ -171,7 +171,7 @@ namespace Shadowsocks.View } this.Text = I18N.GetString("Log Viewer") + - $" [in: {Utils.FormatBandwidth(controller.inboundCounter)}, out: {Utils.FormatBandwidth(controller.outboundCounter)}]"; + $" [in: {Utils.FormatBandwidth(controller.InboundCounter)}, out: {Utils.FormatBandwidth(controller.OutboundCounter)}]"; } private void LogForm_Load(object sender, EventArgs e) From 65d72fdfed146afba2b27508307e51d96e2293a2 Mon Sep 17 00:00:00 2001 From: noisyfox Date: Fri, 26 Aug 2016 12:22:11 +1000 Subject: [PATCH 2/4] New approach to record and calculate inbound/outbound speed. Use lock to sync between threads.This should fix issue #679 and other same issue. Fix a bug in UpdateSpeed() that only save the first xxxSpeedRecord. --- .../Controller/Service/AvailabilityStatistics.cs | 139 +++++++++++++++------ 1 file changed, 103 insertions(+), 36 deletions(-) diff --git a/shadowsocks-csharp/Controller/Service/AvailabilityStatistics.cs b/shadowsocks-csharp/Controller/Service/AvailabilityStatistics.cs index a43c4780..a691d0e8 100644 --- a/shadowsocks-csharp/Controller/Service/AvailabilityStatistics.cs +++ b/shadowsocks-csharp/Controller/Service/AvailabilityStatistics.cs @@ -34,12 +34,77 @@ namespace Shadowsocks.Controller //records cache for current server in {_monitorInterval} minutes private readonly ConcurrentDictionary> _latencyRecords = new ConcurrentDictionary>(); //speed in KiB/s - private readonly ConcurrentDictionary _inboundCounter = new ConcurrentDictionary(); - private readonly ConcurrentDictionary _lastInboundCounter = new ConcurrentDictionary(); private readonly ConcurrentDictionary> _inboundSpeedRecords = new ConcurrentDictionary>(); - private readonly ConcurrentDictionary _outboundCounter = new ConcurrentDictionary(); - private readonly ConcurrentDictionary _lastOutboundCounter = new ConcurrentDictionary(); private readonly ConcurrentDictionary> _outboundSpeedRecords = new ConcurrentDictionary>(); + private readonly ConcurrentDictionary _inOutBoundRecords = new ConcurrentDictionary(); + private class InOutBoundRecord + { + private long _inbound; + private long _lastInbound; + private long _outbound; + private long _lastOutbound; + + private SpinLock _lock = new SpinLock(); + + public void UpdateInbound(long delta) + { + bool lockTaken = false; + try + { + _lock.Enter(ref lockTaken); + Interlocked.Add(ref _inbound, delta); + } + finally + { + if (lockTaken) + { + _lock.Exit(false); + } + } + } + + public void UpdateOutbound(long delta) + { + bool lockTaken = false; + try + { + _lock.Enter(ref lockTaken); + Interlocked.Add(ref _outbound, delta); + } + finally + { + if (lockTaken) + { + _lock.Exit(false); + } + } + } + + public void GetDelta(out long inboundDelta, out long outboundDelta) + { + bool lockTaken = false; + try + { + _lock.Enter(ref lockTaken); + + var i = Interlocked.Read(ref _inbound); + var il = Interlocked.Exchange(ref _lastInbound, i); + inboundDelta = i - il; + + + var o = Interlocked.Read(ref _outbound); + var ol = Interlocked.Exchange(ref _lastOutbound, o); + outboundDelta = o - ol; + } + finally + { + if (lockTaken) + { + _lock.Exit(false); + } + } + } + } //tasks private readonly TimeSpan _delayBeforeStart = TimeSpan.FromSeconds(1); @@ -98,36 +163,26 @@ namespace Shadowsocks.Controller private void UpdateSpeed(object _) { - foreach (var kv in _lastInboundCounter) + foreach (var kv in _inOutBoundRecords) { var id = kv.Key; + var record = kv.Value; - var lastInbound = kv.Value; - var inbound = _inboundCounter[id]; - var bytes = inbound - lastInbound; - _lastInboundCounter[id] = inbound; - var inboundSpeed = GetSpeedInKiBPerSecond(bytes, _monitorInterval.TotalSeconds); - _inboundSpeedRecords.GetOrAdd(id, (k) => - { - List records = new List(); - records.Add(inboundSpeed); - return records; - }); + long inboundDelta, outboundDelta; - var lastOutbound = _lastOutboundCounter[id]; - var outbound = _outboundCounter[id]; - bytes = outbound - lastOutbound; - _lastOutboundCounter[id] = outbound; - var outboundSpeed = GetSpeedInKiBPerSecond(bytes, _monitorInterval.TotalSeconds); - _outboundSpeedRecords.GetOrAdd(id, (k) => - { - List records = new List(); - records.Add(outboundSpeed); - return records; - }); + record.GetDelta(out inboundDelta, out outboundDelta); + + var inboundSpeed = GetSpeedInKiBPerSecond(inboundDelta, _monitorInterval.TotalSeconds); + var outboundSpeed = GetSpeedInKiBPerSecond(outboundDelta, _monitorInterval.TotalSeconds); + + var inR = _inboundSpeedRecords.GetOrAdd(id, (k) => new List()); + var outR = _outboundSpeedRecords.GetOrAdd(id, (k) => new List()); + + inR.Add(inboundSpeed); + outR.Add(outboundSpeed); Logging.Debug( - $"{id}: current/max inbound {inboundSpeed}/{_inboundSpeedRecords[id].Max()} KiB/s, current/max outbound {outboundSpeed}/{_outboundSpeedRecords[id].Max()} KiB/s"); + $"{id}: current/max inbound {inboundSpeed}/{inR.Max()} KiB/s, current/max outbound {outboundSpeed}/{outR.Max()} KiB/s"); } } @@ -327,20 +382,32 @@ namespace Shadowsocks.Controller public void UpdateInboundCounter(Server server, long n) { - _inboundCounter.AddOrUpdate(server.Identifier(), (k) => + _inOutBoundRecords.AddOrUpdate(server.Identifier(), (k) => { - _lastInboundCounter.GetOrAdd(server.Identifier(), 0); - return n; - }, (k, v) => (v + n)); + var r = new InOutBoundRecord(); + r.UpdateInbound(n); + + return r; + }, (k, v) => + { + v.UpdateInbound(n); + return v; + }); } public void UpdateOutboundCounter(Server server, long n) { - _outboundCounter.AddOrUpdate(server.Identifier(), (k) => + _inOutBoundRecords.AddOrUpdate(server.Identifier(), (k) => { - _lastOutboundCounter.GetOrAdd(server.Identifier(), 0); - return n; - }, (k, v) => (v + n)); + var r = new InOutBoundRecord(); + r.UpdateOutbound(n); + + return r; + }, (k, v) => + { + v.UpdateOutbound(n); + return v; + }); } class UpdateRecordsState From ed54b249db0e3710c265436bfffa815b5e00abce Mon Sep 17 00:00:00 2001 From: noisyfox Date: Fri, 26 Aug 2016 21:08:04 +1000 Subject: [PATCH 3/4] Close services while closing listener. Now it will close all existing TCP relay connections while reloading the listener. Signed-off-by: noisyfox --- shadowsocks-csharp/Controller/Service/Listener.cs | 22 +++++++++++++++++----- shadowsocks-csharp/Controller/Service/PACServer.cs | 2 +- .../Controller/Service/PortForwarder.cs | 2 +- shadowsocks-csharp/Controller/Service/TCPRelay.cs | 12 +++++++++++- shadowsocks-csharp/Controller/Service/UDPRelay.cs | 2 +- .../Controller/ShadowsocksController.cs | 2 +- 6 files changed, 32 insertions(+), 10 deletions(-) diff --git a/shadowsocks-csharp/Controller/Service/Listener.cs b/shadowsocks-csharp/Controller/Service/Listener.cs index 9fed2cb7..b2c33262 100644 --- a/shadowsocks-csharp/Controller/Service/Listener.cs +++ b/shadowsocks-csharp/Controller/Service/Listener.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Net; using System.Net.NetworkInformation; using System.Net.Sockets; @@ -10,9 +11,18 @@ namespace Shadowsocks.Controller { public class Listener { - public interface Service + public interface IService { bool Handle(byte[] firstPacket, int length, Socket socket, object state); + + void Stop(); + } + + public abstract class Service : IService + { + public abstract bool Handle(byte[] firstPacket, int length, Socket socket, object state); + + public virtual void Stop() { } } public class UDPState @@ -25,9 +35,9 @@ namespace Shadowsocks.Controller bool _shareOverLAN; Socket _tcpSocket; Socket _udpSocket; - IList _services; + List _services; - public Listener(IList services) + public Listener(List services) { this._services = services; } @@ -97,6 +107,8 @@ namespace Shadowsocks.Controller _udpSocket.Close(); _udpSocket = null; } + + _services.ForEach(s=>s.Stop()); } public void RecvFromCallback(IAsyncResult ar) @@ -105,7 +117,7 @@ namespace Shadowsocks.Controller try { int bytesRead = _udpSocket.EndReceiveFrom(ar, ref state.remoteEndPoint); - foreach (Service service in _services) + foreach (IService service in _services) { if (service.Handle(state.buffer, bytesRead, _udpSocket, state)) { @@ -187,7 +199,7 @@ namespace Shadowsocks.Controller try { int bytesRead = conn.EndReceive(ar); - foreach (Service service in _services) + foreach (IService service in _services) { if (service.Handle(buf, bytesRead, conn, null)) { diff --git a/shadowsocks-csharp/Controller/Service/PACServer.cs b/shadowsocks-csharp/Controller/Service/PACServer.cs index c6853331..3fb0461a 100644 --- a/shadowsocks-csharp/Controller/Service/PACServer.cs +++ b/shadowsocks-csharp/Controller/Service/PACServer.cs @@ -35,7 +35,7 @@ namespace Shadowsocks.Controller this._config = config; } - public bool Handle(byte[] firstPacket, int length, Socket socket, object state) + public override bool Handle(byte[] firstPacket, int length, Socket socket, object state) { if (socket.ProtocolType != ProtocolType.Tcp) { diff --git a/shadowsocks-csharp/Controller/Service/PortForwarder.cs b/shadowsocks-csharp/Controller/Service/PortForwarder.cs index dcac75bf..6be4dbef 100644 --- a/shadowsocks-csharp/Controller/Service/PortForwarder.cs +++ b/shadowsocks-csharp/Controller/Service/PortForwarder.cs @@ -14,7 +14,7 @@ namespace Shadowsocks.Controller this._targetPort = targetPort; } - public bool Handle(byte[] firstPacket, int length, Socket socket, object state) + public override bool Handle(byte[] firstPacket, int length, Socket socket, object state) { if (socket.ProtocolType != ProtocolType.Tcp) { diff --git a/shadowsocks-csharp/Controller/Service/TCPRelay.cs b/shadowsocks-csharp/Controller/Service/TCPRelay.cs index 2d86de25..7a80fefb 100644 --- a/shadowsocks-csharp/Controller/Service/TCPRelay.cs +++ b/shadowsocks-csharp/Controller/Service/TCPRelay.cs @@ -29,7 +29,7 @@ namespace Shadowsocks.Controller _lastSweepTime = DateTime.Now; } - public bool Handle(byte[] firstPacket, int length, Socket socket, object state) + public override bool Handle(byte[] firstPacket, int length, Socket socket, object state) { if (socket.ProtocolType != ProtocolType.Tcp || (length < 2 || firstPacket[0] != 5)) @@ -62,6 +62,16 @@ namespace Shadowsocks.Controller return true; } + public override void Stop() + { + List handlersToClose = new List(); + lock (Handlers) + { + handlersToClose.AddRange(Handlers); + } + handlersToClose.ForEach(h=>h.Close()); + } + public void UpdateInboundCounter(Server server, long n) { _controller.UpdateInboundCounter(server, n); diff --git a/shadowsocks-csharp/Controller/Service/UDPRelay.cs b/shadowsocks-csharp/Controller/Service/UDPRelay.cs index 5f0d2363..e7beb46c 100644 --- a/shadowsocks-csharp/Controller/Service/UDPRelay.cs +++ b/shadowsocks-csharp/Controller/Service/UDPRelay.cs @@ -24,7 +24,7 @@ namespace Shadowsocks.Controller this._cache = new LRUCache(512); // todo: choose a smart number } - public bool Handle(byte[] firstPacket, int length, Socket socket, object state) + public override bool Handle(byte[] firstPacket, int length, Socket socket, object state) { if (socket.ProtocolType != ProtocolType.Udp) { diff --git a/shadowsocks-csharp/Controller/ShadowsocksController.cs b/shadowsocks-csharp/Controller/ShadowsocksController.cs index a2a34eee..089fbaee 100644 --- a/shadowsocks-csharp/Controller/ShadowsocksController.cs +++ b/shadowsocks-csharp/Controller/ShadowsocksController.cs @@ -429,7 +429,7 @@ namespace Shadowsocks.Controller TCPRelay tcpRelay = new TCPRelay(this, _config); UDPRelay udpRelay = new UDPRelay(this); - List services = new List(); + List services = new List(); services.Add(tcpRelay); services.Add(udpRelay); services.Add(_pacServer); From 5b2327776ac56a7d842b1fe623c0f2a6a203e019 Mon Sep 17 00:00:00 2001 From: noisyfox Date: Sat, 27 Aug 2016 03:35:33 +1000 Subject: [PATCH 4/4] Fix a null ref exception. --- shadowsocks-csharp/Controller/Service/Listener.cs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/shadowsocks-csharp/Controller/Service/Listener.cs b/shadowsocks-csharp/Controller/Service/Listener.cs index b2c33262..56222d7f 100644 --- a/shadowsocks-csharp/Controller/Service/Listener.cs +++ b/shadowsocks-csharp/Controller/Service/Listener.cs @@ -27,6 +27,7 @@ namespace Shadowsocks.Controller public class UDPState { + public Socket socket; public byte[] buffer = new byte[4096]; public EndPoint remoteEndPoint = new IPEndPoint(IPAddress.Any, 0); } @@ -86,6 +87,7 @@ namespace Shadowsocks.Controller Logging.Info("Shadowsocks started"); _tcpSocket.BeginAccept(new AsyncCallback(AcceptCallback), _tcpSocket); UDPState udpState = new UDPState(); + udpState.socket = _udpSocket; _udpSocket.BeginReceiveFrom(udpState.buffer, 0, udpState.buffer.Length, 0, ref udpState.remoteEndPoint, new AsyncCallback(RecvFromCallback), udpState); } catch (SocketException) @@ -114,12 +116,13 @@ namespace Shadowsocks.Controller public void RecvFromCallback(IAsyncResult ar) { UDPState state = (UDPState)ar.AsyncState; + var socket = state.socket; try { - int bytesRead = _udpSocket.EndReceiveFrom(ar, ref state.remoteEndPoint); + int bytesRead = socket.EndReceiveFrom(ar, ref state.remoteEndPoint); foreach (IService service in _services) { - if (service.Handle(state.buffer, bytesRead, _udpSocket, state)) + if (service.Handle(state.buffer, bytesRead, socket, state)) { break; } @@ -136,7 +139,7 @@ namespace Shadowsocks.Controller { try { - _udpSocket.BeginReceiveFrom(state.buffer, 0, state.buffer.Length, 0, ref state.remoteEndPoint, new AsyncCallback(RecvFromCallback), state); + socket.BeginReceiveFrom(state.buffer, 0, state.buffer.Length, 0, ref state.remoteEndPoint, new AsyncCallback(RecvFromCallback), state); } catch (ObjectDisposedException) {