You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

ProtocolMessagePipe.cs 4.9 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. using System;
  2. using System.Buffers;
  3. using System.Diagnostics;
  4. using System.IO.Pipelines;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace Shadowsocks.Protocol
  8. {
  9. public class ProtocolMessagePipe
  10. {
  11. private readonly PipeReader _reader;
  12. private readonly PipeWriter _writer;
  13. public ProtocolMessagePipe(IDuplexPipe pipe)
  14. {
  15. _reader = pipe.Input;
  16. _writer = pipe.Output;
  17. }
  18. public async Task<T> ReadAsync<T>(int millisecond) where T : IProtocolMessage, new()
  19. {
  20. var delay = new CancellationTokenSource();
  21. delay.CancelAfter(millisecond);
  22. return await ReadAsync<T>(delay.Token);
  23. }
  24. public async Task<T> ReadAsync<T>(T ret, int millisecond) where T : IProtocolMessage
  25. {
  26. var delay = new CancellationTokenSource();
  27. delay.CancelAfter(millisecond);
  28. return await ReadAsync(ret, delay.Token);
  29. }
  30. public async Task<T> ReadAsync<T>(CancellationToken token = default) where T : IProtocolMessage, new() => await ReadAsync(new T(), token);
  31. public async Task<T> ReadAsync<T>(T ret, CancellationToken token = default) where T : IProtocolMessage
  32. {
  33. Debug.WriteLine($"Reading protocol message {typeof(T).Name}");
  34. //var ret = new T();
  35. var required = 0;
  36. do
  37. {
  38. var seq = ReadOnlySequence<byte>.Empty;
  39. var eof = false;
  40. var ctr = 0;
  41. do
  42. {
  43. if (eof)
  44. throw new FormatException(
  45. $"Message {typeof(T)} parse error, required {required} byte, {seq.Length} byte remain");
  46. var result = await _reader.ReadAsync(token);
  47. seq = result.Buffer;
  48. eof = result.IsCompleted;
  49. if (seq.Length == 0)
  50. {
  51. if (++ctr > 1000)
  52. throw new FormatException($"Message {typeof(T)} parse error, maybe EOF");
  53. }
  54. } while (seq.Length < required);
  55. var frame = MakeFrame(seq);
  56. (var ok, var len) = ret.TryLoad(frame);
  57. if (ok)
  58. {
  59. var ptr = seq.GetPosition(len, seq.Start);
  60. _reader.AdvanceTo(ptr);
  61. break;
  62. }
  63. if (len == 0)
  64. {
  65. var arr = Util.GetArray(frame).Array;
  66. if (arr == null) throw new FormatException($"Message {typeof(T)} parse error");
  67. throw new FormatException(
  68. $"Message {typeof(T)} parse error, {Environment.NewLine}{BitConverter.ToString(arr)}");
  69. }
  70. required = len;
  71. } while (true);
  72. return ret;
  73. }
  74. public async Task WriteAsync(IProtocolMessage msg, CancellationToken token = default)
  75. {
  76. Debug.WriteLine($"Writing protocol message {msg}");
  77. Memory<byte> buf;
  78. var estSize = 4096;
  79. int size;
  80. do
  81. {
  82. buf = _writer.GetMemory(estSize);
  83. try
  84. {
  85. size = msg.Serialize(buf);
  86. }
  87. catch (ArgumentException)
  88. {
  89. estSize *= 2;
  90. continue;
  91. }
  92. if (estSize > 65536) throw new ArgumentException("Protocol message is too large");
  93. _writer.Advance(size);
  94. await _writer.FlushAsync(token);
  95. return;
  96. } while (true);
  97. }
  98. private SequencePosition _lastFrameStart;
  99. private SequencePosition _lastFrameEnd;
  100. private ReadOnlyMemory<byte> _lastFrame;
  101. public ReadOnlyMemory<byte> MakeFrame(ReadOnlySequence<byte> seq)
  102. {
  103. // cached frame
  104. if (_lastFrameStart.Equals(seq.Start) && _lastFrameEnd.Equals(seq.End))
  105. {
  106. Debug.WriteLine("Hit cached frame");
  107. return _lastFrame;
  108. }
  109. _lastFrameStart = seq.Start;
  110. _lastFrameEnd = seq.End;
  111. if (seq.IsSingleSegment)
  112. {
  113. Debug.WriteLine("Frame is single segement");
  114. _lastFrame = seq.First;
  115. return seq.First;
  116. }
  117. Debug.WriteLine("Copy frame data into single Memory");
  118. Memory<byte> ret = new byte[seq.Length];
  119. var ptr = 0;
  120. foreach (var mem in seq)
  121. {
  122. mem.CopyTo(ret.Slice(ptr));
  123. ptr += mem.Length;
  124. }
  125. _lastFrame = ret;
  126. return ret;
  127. }
  128. }
  129. }