You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

AvailabilityStatistics.cs 19 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.IO;
  5. using System.Linq;
  6. using System.Net;
  7. using System.Net.NetworkInformation;
  8. using System.Net.Sockets;
  9. using System.Threading;
  10. using System.Threading.Tasks;
  11. using Newtonsoft.Json;
  12. using NLog;
  13. using Shadowsocks.Model;
  14. using Shadowsocks.Util;
  15. namespace Shadowsocks.Controller
  16. {
  17. using Statistics = Dictionary<string, List<StatisticsRecord>>;
  18. public sealed class AvailabilityStatistics : IDisposable
  19. {
  20. private static Logger logger = LogManager.GetCurrentClassLogger();
  21. public const string DateTimePattern = "yyyy-MM-dd HH:mm:ss";
  22. private const string StatisticsFilesName = "shadowsocks.availability.json";
  23. public static string AvailabilityStatisticsFile;
  24. //static constructor to initialize every public static fields before refereced
  25. static AvailabilityStatistics()
  26. {
  27. AvailabilityStatisticsFile = Utils.GetTempPath(StatisticsFilesName);
  28. }
  29. //arguments for ICMP tests
  30. private int Repeat => Config.RepeatTimesNum;
  31. public const int TimeoutMilliseconds = 500;
  32. //records cache for current server in {_monitorInterval} minutes
  33. private readonly ConcurrentDictionary<string, List<int>> _latencyRecords = new ConcurrentDictionary<string, List<int>>();
  34. //speed in KiB/s
  35. private readonly ConcurrentDictionary<string, List<int>> _inboundSpeedRecords = new ConcurrentDictionary<string, List<int>>();
  36. private readonly ConcurrentDictionary<string, List<int>> _outboundSpeedRecords = new ConcurrentDictionary<string, List<int>>();
  37. private readonly ConcurrentDictionary<string, InOutBoundRecord> _inOutBoundRecords = new ConcurrentDictionary<string, InOutBoundRecord>();
  38. private class InOutBoundRecord
  39. {
  40. private long _inbound;
  41. private long _lastInbound;
  42. private long _outbound;
  43. private long _lastOutbound;
  44. public void UpdateInbound(long delta)
  45. {
  46. Interlocked.Add(ref _inbound, delta);
  47. }
  48. public void UpdateOutbound(long delta)
  49. {
  50. Interlocked.Add(ref _outbound, delta);
  51. }
  52. public void GetDelta(out long inboundDelta, out long outboundDelta)
  53. {
  54. var i = Interlocked.Read(ref _inbound);
  55. var il = Interlocked.Exchange(ref _lastInbound, i);
  56. inboundDelta = i - il;
  57. var o = Interlocked.Read(ref _outbound);
  58. var ol = Interlocked.Exchange(ref _lastOutbound, o);
  59. outboundDelta = o - ol;
  60. }
  61. }
  62. //tasks
  63. private readonly TimeSpan _delayBeforeStart = TimeSpan.FromSeconds(1);
  64. private readonly TimeSpan _retryInterval = TimeSpan.FromMinutes(2);
  65. private TimeSpan RecordingInterval => TimeSpan.FromMinutes(Config.DataCollectionMinutes);
  66. private Timer _perSecondTimer; //analyze and save cached records to RawStatistics and filter records
  67. private readonly TimeSpan _monitorInterval = TimeSpan.FromSeconds(1);
  68. //private Timer _writer; //write RawStatistics to file
  69. //private readonly TimeSpan _writingInterval = TimeSpan.FromMinutes(1);
  70. private ShadowsocksController _controller;
  71. private StatisticsStrategyConfiguration Config => _controller.StatisticsConfiguration;
  72. // Static Singleton Initialization
  73. public static AvailabilityStatistics Instance { get; } = new AvailabilityStatistics();
  74. public Statistics RawStatistics { get; private set; }
  75. public Statistics FilteredStatistics { get; private set; }
  76. private AvailabilityStatistics()
  77. {
  78. RawStatistics = new Statistics();
  79. }
  80. internal void UpdateConfiguration(ShadowsocksController controller)
  81. {
  82. _controller = controller;
  83. Reset();
  84. try
  85. {
  86. if (Config.StatisticsEnabled)
  87. {
  88. LoadRawStatistics();
  89. if (_perSecondTimer == null)
  90. {
  91. _perSecondTimer = new Timer(OperationsPerSecond, new Counter(), _delayBeforeStart, TimeSpan.FromSeconds(1));
  92. }
  93. }
  94. else
  95. {
  96. _perSecondTimer?.Dispose();
  97. }
  98. }
  99. catch (Exception e)
  100. {
  101. logger.LogUsefulException(e);
  102. }
  103. }
  104. private void OperationsPerSecond(object state)
  105. {
  106. lock(state)
  107. {
  108. var counter = state as Counter;
  109. if (counter.count % _monitorInterval.TotalSeconds == 0)
  110. {
  111. UpdateSpeed();
  112. }
  113. if (counter.count % RecordingInterval.TotalSeconds == 0)
  114. {
  115. Run();
  116. }
  117. counter.count++;
  118. }
  119. }
  120. private void UpdateSpeed()
  121. {
  122. foreach (var kv in _inOutBoundRecords)
  123. {
  124. var id = kv.Key;
  125. var record = kv.Value;
  126. long inboundDelta, outboundDelta;
  127. record.GetDelta(out inboundDelta, out outboundDelta);
  128. var inboundSpeed = GetSpeedInKiBPerSecond(inboundDelta, _monitorInterval.TotalSeconds);
  129. var outboundSpeed = GetSpeedInKiBPerSecond(outboundDelta, _monitorInterval.TotalSeconds);
  130. // not thread safe
  131. var inR = _inboundSpeedRecords.GetOrAdd(id, (k) => new List<int>());
  132. var outR = _outboundSpeedRecords.GetOrAdd(id, (k) => new List<int>());
  133. inR.Add(inboundSpeed);
  134. outR.Add(outboundSpeed);
  135. logger.Debug(
  136. $"{id}: current/max inbound {inboundSpeed}/{inR.Max()} KiB/s, current/max outbound {outboundSpeed}/{outR.Max()} KiB/s");
  137. }
  138. }
  139. private void Reset()
  140. {
  141. _inboundSpeedRecords.Clear();
  142. _outboundSpeedRecords.Clear();
  143. _latencyRecords.Clear();
  144. }
  145. private void Run()
  146. {
  147. UpdateRecords();
  148. Reset();
  149. }
  150. private void UpdateRecords()
  151. {
  152. var records = new Dictionary<string, StatisticsRecord>();
  153. UpdateRecordsState state = new UpdateRecordsState();
  154. int serverCount = _controller.GetCurrentConfiguration().configs.Count;
  155. state.counter = serverCount;
  156. bool isPing = Config.Ping;
  157. for (int i = 0; i < serverCount; i++)
  158. {
  159. try
  160. {
  161. var server = _controller.GetCurrentConfiguration().configs[i];
  162. var id = server.Identifier();
  163. List<int> inboundSpeedRecords = null;
  164. List<int> outboundSpeedRecords = null;
  165. List<int> latencyRecords = null;
  166. _inboundSpeedRecords.TryGetValue(id, out inboundSpeedRecords);
  167. _outboundSpeedRecords.TryGetValue(id, out outboundSpeedRecords);
  168. _latencyRecords.TryGetValue(id, out latencyRecords);
  169. StatisticsRecord record = new StatisticsRecord(id, inboundSpeedRecords, outboundSpeedRecords, latencyRecords);
  170. /* duplicate server identifier */
  171. if (records.ContainsKey(id))
  172. records[id] = record;
  173. else
  174. records.Add(id, record);
  175. if (isPing)
  176. {
  177. // FIXME: on ping completed, every thing could be asynchrously changed.
  178. // focus on: Config/ RawStatistics
  179. MyPing ping = new MyPing(server, Repeat);
  180. ping.Completed += ping_Completed;
  181. ping.Start(new PingState { state = state, record = record });
  182. }
  183. else if (!record.IsEmptyData())
  184. {
  185. AppendRecord(id, record);
  186. }
  187. }
  188. catch (Exception e)
  189. {
  190. logger.Debug("config changed asynchrously, just ignore this server");
  191. }
  192. }
  193. if (!isPing)
  194. {
  195. Save();
  196. FilterRawStatistics();
  197. }
  198. }
  199. private void ping_Completed(object sender, MyPing.CompletedEventArgs e)
  200. {
  201. PingState pingState = (PingState)e.UserState;
  202. UpdateRecordsState state = pingState.state;
  203. Server server = e.Server;
  204. StatisticsRecord record = pingState.record;
  205. record.SetResponse(e.RoundtripTime);
  206. if (!record.IsEmptyData())
  207. {
  208. AppendRecord(server.Identifier(), record);
  209. }
  210. logger.Debug($"Ping {server.FriendlyName()} {e.RoundtripTime.Count} times, {(100 - record.PackageLoss * 100)}% packages loss, min {record.MinResponse} ms, max {record.MaxResponse} ms, avg {record.AverageResponse} ms");
  211. if (Interlocked.Decrement(ref state.counter) == 0)
  212. {
  213. Save();
  214. FilterRawStatistics();
  215. }
  216. }
  217. private void AppendRecord(string serverIdentifier, StatisticsRecord record)
  218. {
  219. try
  220. {
  221. List<StatisticsRecord> records;
  222. lock (RawStatistics)
  223. {
  224. if (!RawStatistics.TryGetValue(serverIdentifier, out records))
  225. {
  226. records = new List<StatisticsRecord>();
  227. RawStatistics[serverIdentifier] = records;
  228. }
  229. }
  230. records.Add(record);
  231. }
  232. catch (Exception e)
  233. {
  234. logger.LogUsefulException(e);
  235. }
  236. }
  237. private void Save()
  238. {
  239. logger.Debug($"save statistics to {AvailabilityStatisticsFile}");
  240. if (RawStatistics.Count == 0)
  241. {
  242. return;
  243. }
  244. try
  245. {
  246. string content;
  247. #if DEBUG
  248. content = JsonConvert.SerializeObject(RawStatistics, Formatting.Indented);
  249. #else
  250. content = JsonConvert.SerializeObject(RawStatistics, Formatting.None);
  251. #endif
  252. File.WriteAllText(AvailabilityStatisticsFile, content);
  253. }
  254. catch (IOException e)
  255. {
  256. logger.LogUsefulException(e);
  257. }
  258. }
  259. private bool IsValidRecord(StatisticsRecord record)
  260. {
  261. if (Config.ByHourOfDay)
  262. {
  263. if (!record.Timestamp.Hour.Equals(DateTime.Now.Hour)) return false;
  264. }
  265. return true;
  266. }
  267. private void FilterRawStatistics()
  268. {
  269. try
  270. {
  271. logger.Debug("filter raw statistics");
  272. if (RawStatistics == null) return;
  273. var filteredStatistics = new Statistics();
  274. foreach (var serverAndRecords in RawStatistics)
  275. {
  276. var server = serverAndRecords.Key;
  277. var filteredRecords = serverAndRecords.Value.FindAll(IsValidRecord);
  278. filteredStatistics[server] = filteredRecords;
  279. }
  280. FilteredStatistics = filteredStatistics;
  281. }
  282. catch (Exception e)
  283. {
  284. logger.LogUsefulException(e);
  285. }
  286. }
  287. private void LoadRawStatistics()
  288. {
  289. try
  290. {
  291. var path = AvailabilityStatisticsFile;
  292. logger.Debug($"loading statistics from {path}");
  293. if (!File.Exists(path))
  294. {
  295. using (File.Create(path))
  296. {
  297. //do nothing
  298. }
  299. }
  300. var content = File.ReadAllText(path);
  301. RawStatistics = JsonConvert.DeserializeObject<Statistics>(content) ?? RawStatistics;
  302. }
  303. catch (Exception e)
  304. {
  305. logger.LogUsefulException(e);
  306. Console.WriteLine($"failed to load statistics; use runtime statistics, some data may be lost");
  307. }
  308. }
  309. private static int GetSpeedInKiBPerSecond(long bytes, double seconds)
  310. {
  311. var result = (int)(bytes / seconds) / 1024;
  312. return result;
  313. }
  314. public void Dispose()
  315. {
  316. _perSecondTimer.Dispose();
  317. }
  318. public void UpdateLatency(Server server, int latency)
  319. {
  320. _latencyRecords.GetOrAdd(server.Identifier(), (k) =>
  321. {
  322. List<int> records = new List<int>();
  323. records.Add(latency);
  324. return records;
  325. });
  326. }
  327. public void UpdateInboundCounter(Server server, long n)
  328. {
  329. _inOutBoundRecords.AddOrUpdate(server.Identifier(), (k) =>
  330. {
  331. var r = new InOutBoundRecord();
  332. r.UpdateInbound(n);
  333. return r;
  334. }, (k, v) =>
  335. {
  336. v.UpdateInbound(n);
  337. return v;
  338. });
  339. }
  340. public void UpdateOutboundCounter(Server server, long n)
  341. {
  342. _inOutBoundRecords.AddOrUpdate(server.Identifier(), (k) =>
  343. {
  344. var r = new InOutBoundRecord();
  345. r.UpdateOutbound(n);
  346. return r;
  347. }, (k, v) =>
  348. {
  349. v.UpdateOutbound(n);
  350. return v;
  351. });
  352. }
  353. private class Counter
  354. {
  355. public int count = 0;
  356. }
  357. class UpdateRecordsState
  358. {
  359. public int counter;
  360. }
  361. class PingState
  362. {
  363. public UpdateRecordsState state;
  364. public StatisticsRecord record;
  365. }
  366. class MyPing
  367. {
  368. private static Logger logger = LogManager.GetCurrentClassLogger();
  369. //arguments for ICMP tests
  370. public const int TimeoutMilliseconds = 500;
  371. public EventHandler<CompletedEventArgs> Completed;
  372. private Server server;
  373. private int repeat;
  374. private IPAddress ip;
  375. private Ping ping;
  376. private List<int?> RoundtripTime;
  377. public MyPing(Server server, int repeat)
  378. {
  379. this.server = server;
  380. this.repeat = repeat;
  381. RoundtripTime = new List<int?>(repeat);
  382. ping = new Ping();
  383. ping.PingCompleted += Ping_PingCompleted;
  384. }
  385. public void Start(object userstate)
  386. {
  387. if (server.server == "")
  388. {
  389. FireCompleted(new Exception("Invalid Server"), userstate);
  390. return;
  391. }
  392. new Task(() => ICMPTest(0, userstate)).Start();
  393. }
  394. private void ICMPTest(int delay, object userstate)
  395. {
  396. try
  397. {
  398. logger.Debug($"Ping {server.FriendlyName()}");
  399. if (ip == null)
  400. {
  401. ip = Dns.GetHostAddresses(server.server)
  402. .First(
  403. ip =>
  404. ip.AddressFamily == AddressFamily.InterNetwork ||
  405. ip.AddressFamily == AddressFamily.InterNetworkV6);
  406. }
  407. repeat--;
  408. if (delay > 0)
  409. Thread.Sleep(delay);
  410. ping.SendAsync(ip, TimeoutMilliseconds, userstate);
  411. }
  412. catch (Exception e)
  413. {
  414. logger.Error($"An exception occured while eveluating {server.FriendlyName()}");
  415. logger.LogUsefulException(e);
  416. FireCompleted(e, userstate);
  417. }
  418. }
  419. private void Ping_PingCompleted(object sender, PingCompletedEventArgs e)
  420. {
  421. try
  422. {
  423. if (e.Reply.Status == IPStatus.Success)
  424. {
  425. logger.Debug($"Ping {server.FriendlyName()} {e.Reply.RoundtripTime} ms");
  426. RoundtripTime.Add((int?)e.Reply.RoundtripTime);
  427. }
  428. else
  429. {
  430. logger.Debug($"Ping {server.FriendlyName()} timeout");
  431. RoundtripTime.Add(null);
  432. }
  433. TestNext(e.UserState);
  434. }
  435. catch (Exception ex)
  436. {
  437. logger.Error($"An exception occured while eveluating {server.FriendlyName()}");
  438. logger.LogUsefulException(ex);
  439. FireCompleted(ex, e.UserState);
  440. }
  441. }
  442. private void TestNext(object userstate)
  443. {
  444. if (repeat > 0)
  445. {
  446. //Do ICMPTest in a random frequency
  447. int delay = TimeoutMilliseconds + new Random().Next() % TimeoutMilliseconds;
  448. new Task(() => ICMPTest(delay, userstate)).Start();
  449. }
  450. else
  451. {
  452. FireCompleted(null, userstate);
  453. }
  454. }
  455. private void FireCompleted(Exception error, object userstate)
  456. {
  457. Completed?.Invoke(this, new CompletedEventArgs
  458. {
  459. Error = error,
  460. Server = server,
  461. RoundtripTime = RoundtripTime,
  462. UserState = userstate
  463. });
  464. }
  465. public class CompletedEventArgs : EventArgs
  466. {
  467. public Exception Error;
  468. public Server Server;
  469. public List<int?> RoundtripTime;
  470. public object UserState;
  471. }
  472. }
  473. }
  474. }