TcpSocketSession.cs 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630
  1. using System;
  2. using System.IO;
  3. using System.Net;
  4. using System.Net.Security;
  5. using System.Net.Sockets;
  6. using System.Security.Cryptography.X509Certificates;
  7. using System.Threading;
  8. using Aitex.Core.RT.Log;
  9. using MECF.Framework.Common.Communications.Tcp.Buffer;
  10. namespace MECF.Framework.Common.Communications.Tcp.Socket.Server.APM
  11. {
  12. public sealed class TcpSocketSession
  13. {
  14. #region Fields
  15. //private static readonly ILogger _log = LoggerManager.GetLogger(Assembly.GetExecutingAssembly(), "Tcp");//. .Get<TcpSocketClient>();
  16. private TcpClient _tcpClient;
  17. private readonly TcpSocketServerConfiguration _configuration;
  18. private readonly ISegmentBufferManager _bufferManager;
  19. private readonly TcpSocketServer _server;
  20. private readonly string _sessionKey;
  21. private Stream _stream;
  22. private ArraySegment<byte> _receiveBuffer = default(ArraySegment<byte>);
  23. private int _receiveBufferOffset = 0;
  24. private IPEndPoint _remoteEndPoint;
  25. private IPEndPoint _localEndPoint;
  26. private int _state;
  27. private const int _none = 0;
  28. private const int _connecting = 1;
  29. private const int _connected = 2;
  30. private const int _disposed = 5;
  31. #endregion
  32. #region Constructors
  33. public TcpSocketSession(
  34. TcpClient tcpClient,
  35. TcpSocketServerConfiguration configuration,
  36. ISegmentBufferManager bufferManager,
  37. TcpSocketServer server)
  38. {
  39. if (tcpClient == null)
  40. throw new ArgumentNullException("tcpClient");
  41. if (configuration == null)
  42. throw new ArgumentNullException("configuration");
  43. if (bufferManager == null)
  44. throw new ArgumentNullException("bufferManager");
  45. if (server == null)
  46. throw new ArgumentNullException("server");
  47. _tcpClient = tcpClient;
  48. _configuration = configuration;
  49. _bufferManager = bufferManager;
  50. _server = server;
  51. _sessionKey = Guid.NewGuid().ToString();
  52. this.StartTime = DateTime.UtcNow;
  53. SetSocketOptions();
  54. _remoteEndPoint = this.RemoteEndPoint;
  55. _localEndPoint = this.LocalEndPoint;
  56. }
  57. #endregion
  58. #region Properties
  59. public string SessionKey { get { return _sessionKey; } }
  60. public DateTime StartTime { get; private set; }
  61. public TimeSpan ConnectTimeout { get { return _configuration.ConnectTimeout; } }
  62. private bool Connected { get { return _tcpClient != null && _tcpClient.Connected; } }
  63. public IPEndPoint RemoteEndPoint { get { return Connected ? (IPEndPoint)_tcpClient.Client.RemoteEndPoint : _remoteEndPoint; } }
  64. public IPEndPoint LocalEndPoint { get { return Connected ? (IPEndPoint)_tcpClient.Client.LocalEndPoint : _localEndPoint; } }
  65. public System.Net.Sockets.Socket Socket { get { return Connected ? _tcpClient.Client : null; } }
  66. public Stream Stream { get { return _stream; } }
  67. public TcpSocketServer Server { get { return _server; } }
  68. public TcpSocketConnectionState State
  69. {
  70. get
  71. {
  72. switch (_state)
  73. {
  74. case _none:
  75. return TcpSocketConnectionState.None;
  76. case _connecting:
  77. return TcpSocketConnectionState.Connecting;
  78. case _connected:
  79. return TcpSocketConnectionState.Connected;
  80. case _disposed:
  81. return TcpSocketConnectionState.Closed;
  82. default:
  83. return TcpSocketConnectionState.Closed;
  84. }
  85. }
  86. }
  87. public override string ToString()
  88. {
  89. return string.Format("SessionKey[{0}], RemoteEndPoint[{1}], LocalEndPoint[{2}]",
  90. this.SessionKey, this.RemoteEndPoint, this.LocalEndPoint);
  91. }
  92. #endregion
  93. #region Process
  94. internal void Start()
  95. {
  96. int origin = Interlocked.CompareExchange(ref _state, _connecting, _none);
  97. if (origin == _disposed)
  98. {
  99. throw new ObjectDisposedException("This tcp socket session has been disposed when connecting.");
  100. }
  101. else if (origin != _none)
  102. {
  103. throw new InvalidOperationException("This tcp socket session is in invalid state when connecting.");
  104. }
  105. try
  106. {
  107. _stream = NegotiateStream(_tcpClient.GetStream());
  108. if (_receiveBuffer == default(ArraySegment<byte>))
  109. _receiveBuffer = _bufferManager.BorrowBuffer();
  110. _receiveBufferOffset = 0;
  111. if (Interlocked.CompareExchange(ref _state, _connected, _connecting) != _connecting)
  112. {
  113. Close();
  114. throw new ObjectDisposedException("This tcp socket session has been disposed after connected.");
  115. }
  116. bool isErrorOccurredInUserSide = false;
  117. try
  118. {
  119. _server.RaiseClientConnected(this);
  120. }
  121. catch (Exception ex)
  122. {
  123. isErrorOccurredInUserSide = true;
  124. HandleUserSideError(ex);
  125. }
  126. if (!isErrorOccurredInUserSide)
  127. {
  128. ContinueReadBuffer();
  129. }
  130. else
  131. {
  132. Close();
  133. }
  134. }
  135. catch (Exception ex)
  136. {
  137. LOG.Error(ex.Message, ex);
  138. Close();
  139. }
  140. }
  141. public void Close()
  142. {
  143. if (Interlocked.Exchange(ref _state, _disposed) == _disposed)
  144. {
  145. return;
  146. }
  147. Clean();
  148. try
  149. {
  150. _server.RaiseClientDisconnected(this);
  151. }
  152. catch (Exception ex)
  153. {
  154. HandleUserSideError(ex);
  155. }
  156. }
  157. private void Clean()
  158. {
  159. try
  160. {
  161. try
  162. {
  163. if (_stream != null)
  164. {
  165. _stream.Dispose();
  166. }
  167. }
  168. catch { }
  169. try
  170. {
  171. if (_tcpClient != null)
  172. {
  173. _tcpClient.Close();
  174. }
  175. }
  176. catch { }
  177. }
  178. catch { }
  179. finally
  180. {
  181. _stream = null;
  182. _tcpClient = null;
  183. }
  184. if (_receiveBuffer != default(ArraySegment<byte>))
  185. _configuration.BufferManager.ReturnBuffer(_receiveBuffer);
  186. _receiveBuffer = default(ArraySegment<byte>);
  187. _receiveBufferOffset = 0;
  188. }
  189. private void SetSocketOptions()
  190. {
  191. _tcpClient.ReceiveBufferSize = _configuration.ReceiveBufferSize;
  192. _tcpClient.SendBufferSize = _configuration.SendBufferSize;
  193. _tcpClient.ReceiveTimeout = (int)_configuration.ReceiveTimeout.TotalMilliseconds;
  194. _tcpClient.SendTimeout = (int)_configuration.SendTimeout.TotalMilliseconds;
  195. _tcpClient.NoDelay = _configuration.NoDelay;
  196. _tcpClient.LingerState = _configuration.LingerState;
  197. if (_configuration.KeepAlive)
  198. {
  199. _tcpClient.Client.SetSocketOption(
  200. SocketOptionLevel.Socket,
  201. SocketOptionName.KeepAlive,
  202. (int)_configuration.KeepAliveInterval.TotalMilliseconds);
  203. }
  204. _tcpClient.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, _configuration.ReuseAddress);
  205. }
  206. private Stream NegotiateStream(Stream stream)
  207. {
  208. if (!_configuration.SslEnabled)
  209. return stream;
  210. var validateRemoteCertificate = new RemoteCertificateValidationCallback(
  211. (object sender,
  212. X509Certificate certificate,
  213. X509Chain chain,
  214. SslPolicyErrors sslPolicyErrors)
  215. =>
  216. {
  217. if (sslPolicyErrors == SslPolicyErrors.None)
  218. return true;
  219. if (_configuration.SslPolicyErrorsBypassed)
  220. return true;
  221. else
  222. LOG.Error(String.Format("Session [{0}] error occurred when validating remote certificate: [{1}], [{2}].",
  223. this, this.RemoteEndPoint, sslPolicyErrors));
  224. return false;
  225. });
  226. var sslStream = new SslStream(
  227. stream,
  228. false,
  229. validateRemoteCertificate,
  230. null,
  231. _configuration.SslEncryptionPolicy);
  232. IAsyncResult ar = null;
  233. if (!_configuration.SslClientCertificateRequired)
  234. {
  235. ar = sslStream.BeginAuthenticateAsServer(
  236. _configuration.SslServerCertificate, // The X509Certificate used to authenticate the server.
  237. null, _tcpClient);
  238. }
  239. else
  240. {
  241. ar = sslStream.BeginAuthenticateAsServer(
  242. _configuration.SslServerCertificate, // The X509Certificate used to authenticate the server.
  243. _configuration.SslClientCertificateRequired, // A Boolean value that specifies whether the client must supply a certificate for authentication.
  244. _configuration.SslEnabledProtocols, // The SslProtocols value that represents the protocol used for authentication.
  245. _configuration.SslCheckCertificateRevocation, // A Boolean value that specifies whether the certificate revocation list is checked during authentication.
  246. null, _tcpClient);
  247. }
  248. if (!ar.AsyncWaitHandle.WaitOne(ConnectTimeout))
  249. {
  250. Close();
  251. throw new TimeoutException(string.Format(
  252. "Negotiate SSL/TSL with remote [{0}] timeout [{1}].", this.RemoteEndPoint, ConnectTimeout));
  253. }
  254. // When authentication succeeds, you must check the IsEncrypted and IsSigned properties
  255. // to determine what security services are used by the SslStream.
  256. // Check the IsMutuallyAuthenticated property to determine whether mutual authentication occurred.
  257. LOG.Info(string.Format(
  258. "Ssl Stream: SslProtocol[{0}], IsServer[{1}], IsAuthenticated[{2}], IsEncrypted[{3}], IsSigned[{4}], IsMutuallyAuthenticated[{5}], "
  259. + "HashAlgorithm[{6}], HashStrength[{7}], KeyExchangeAlgorithm[{8}], KeyExchangeStrength[{9}], CipherAlgorithm[{10}], CipherStrength[{11}].",
  260. sslStream.SslProtocol,
  261. sslStream.IsServer,
  262. sslStream.IsAuthenticated,
  263. sslStream.IsEncrypted,
  264. sslStream.IsSigned,
  265. sslStream.IsMutuallyAuthenticated,
  266. sslStream.HashAlgorithm,
  267. sslStream.HashStrength,
  268. sslStream.KeyExchangeAlgorithm,
  269. sslStream.KeyExchangeStrength,
  270. sslStream.CipherAlgorithm,
  271. sslStream.CipherStrength));
  272. return sslStream;
  273. }
  274. private void ContinueReadBuffer()
  275. {
  276. try
  277. {
  278. _stream.BeginRead(
  279. _receiveBuffer.Array,
  280. _receiveBuffer.Offset + _receiveBufferOffset,
  281. _receiveBuffer.Count - _receiveBufferOffset,
  282. HandleDataReceived,
  283. _stream);
  284. }
  285. catch (Exception ex)
  286. {
  287. if (!CloseIfShould(ex))
  288. throw;
  289. }
  290. }
  291. private void HandleDataReceived(IAsyncResult ar)
  292. {
  293. if (State != TcpSocketConnectionState.Connected)
  294. return;
  295. try
  296. {
  297. // when callback to here the stream may have been closed
  298. if (_stream == null)
  299. return;
  300. int numberOfReadBytes = 0;
  301. try
  302. {
  303. // The EndRead method blocks until data is available. The EndRead method reads
  304. // as much data as is available up to the number of bytes specified in the size
  305. // parameter of the BeginRead method. If the remote host shuts down the Socket
  306. // connection and all available data has been received, the EndRead method
  307. // completes immediately and returns zero bytes.
  308. numberOfReadBytes = _stream.EndRead(ar);
  309. }
  310. catch (Exception)
  311. {
  312. // unable to read data from transport connection,
  313. // the existing connection was forcibly closed by remote host
  314. numberOfReadBytes = 0;
  315. }
  316. if (numberOfReadBytes == 0)
  317. {
  318. // connection has been closed
  319. Close();
  320. return;
  321. }
  322. ReceiveBuffer(numberOfReadBytes);
  323. ContinueReadBuffer();
  324. }
  325. catch (Exception ex)
  326. {
  327. if (!CloseIfShould(ex))
  328. throw;
  329. }
  330. }
  331. private void ReceiveBuffer(int receiveCount)
  332. {
  333. // TCP guarantees delivery of all packets in the correct order.
  334. // But there is no guarantee that one write operation on the sender-side will result in
  335. // one read event on the receiving side. One call of write(message) by the sender
  336. // can result in multiple messageReceived(session, message) events on the receiver;
  337. // and multiple calls of write(message) can lead to a single messageReceived event.
  338. // In a stream-based transport such as TCP/IP, received data is stored into a socket receive buffer.
  339. // Unfortunately, the buffer of a stream-based transport is not a queue of packets but a queue of bytes.
  340. // It means, even if you sent two messages as two independent packets,
  341. // an operating system will not treat them as two messages but as just a bunch of bytes.
  342. // Therefore, there is no guarantee that what you read is exactly what your remote peer wrote.
  343. // There are three common techniques for splitting the stream of bytes into messages:
  344. // 1. use fixed length messages
  345. // 2. use a fixed length header that indicates the length of the body
  346. // 3. using a delimiter; for example many text-based protocols append
  347. // a newline (or CR LF pair) after every message.
  348. int frameLength;
  349. byte[] payload;
  350. int payloadOffset;
  351. int payloadCount;
  352. int consumedLength = 0;
  353. SegmentBufferDeflector.ReplaceBuffer(_bufferManager, ref _receiveBuffer, ref _receiveBufferOffset, receiveCount);
  354. while (true)
  355. {
  356. frameLength = 0;
  357. payload = null;
  358. payloadOffset = 0;
  359. payloadCount = 0;
  360. if (_configuration.FrameBuilder.Decoder.TryDecodeFrame(
  361. _receiveBuffer.Array,
  362. _receiveBuffer.Offset + consumedLength,
  363. _receiveBufferOffset - consumedLength,
  364. out frameLength, out payload, out payloadOffset, out payloadCount))
  365. {
  366. try
  367. {
  368. _server.RaiseClientDataReceived(this, payload, payloadOffset, payloadCount);
  369. }
  370. catch (Exception ex)
  371. {
  372. HandleUserSideError(ex);
  373. }
  374. finally
  375. {
  376. consumedLength += frameLength;
  377. }
  378. }
  379. else
  380. {
  381. break;
  382. }
  383. }
  384. try
  385. {
  386. SegmentBufferDeflector.ShiftBuffer(_bufferManager, consumedLength, ref _receiveBuffer, ref _receiveBufferOffset);
  387. }
  388. catch (ArgumentOutOfRangeException) { }
  389. }
  390. #endregion
  391. #region Exception Handler
  392. private bool IsSocketTimeOut(Exception ex)
  393. {
  394. return ex is IOException
  395. && ex.InnerException != null
  396. && ex.InnerException is SocketException
  397. && (ex.InnerException as SocketException).SocketErrorCode == SocketError.TimedOut;
  398. }
  399. private bool CloseIfShould(Exception ex)
  400. {
  401. if (ex is ObjectDisposedException
  402. || ex is InvalidOperationException
  403. || ex is SocketException
  404. || ex is IOException
  405. || ex is NullReferenceException
  406. )
  407. {
  408. if (ex is SocketException)
  409. LOG.Error(string.Format("Session [{0}] exception occurred, [{1}].", this, ex.Message), ex);
  410. // connection has been closed
  411. Close();
  412. return true;
  413. }
  414. return false;
  415. }
  416. private void HandleUserSideError(Exception ex)
  417. {
  418. LOG.Error(string.Format("Session [{0}] error occurred in user side [{1}].", this, ex.Message), ex);
  419. }
  420. #endregion
  421. #region Send
  422. public void Send(byte[] data)
  423. {
  424. if (data == null)
  425. throw new ArgumentNullException("data");
  426. Send(data, 0, data.Length);
  427. }
  428. public void Send(byte[] data, int offset, int count)
  429. {
  430. BufferValidator.ValidateBuffer(data, offset, count, "data");
  431. if (State != TcpSocketConnectionState.Connected)
  432. {
  433. throw new InvalidProgramException("This session has been closed.");
  434. }
  435. try
  436. {
  437. byte[] frameBuffer;
  438. int frameBufferOffset;
  439. int frameBufferLength;
  440. _configuration.FrameBuilder.Encoder.EncodeFrame(data, offset, count, out frameBuffer, out frameBufferOffset, out frameBufferLength);
  441. _stream.Write(frameBuffer, frameBufferOffset, frameBufferLength);
  442. }
  443. catch (Exception ex)
  444. {
  445. if (IsSocketTimeOut(ex))
  446. {
  447. LOG.Error(ex.Message, ex);
  448. }
  449. else
  450. {
  451. if (!CloseIfShould(ex))
  452. throw;
  453. }
  454. }
  455. }
  456. public void BeginSend(byte[] data)
  457. {
  458. if (data == null)
  459. throw new ArgumentNullException("data");
  460. BeginSend(data, 0, data.Length);
  461. }
  462. public void BeginSend(byte[] data, int offset, int count)
  463. {
  464. BufferValidator.ValidateBuffer(data, offset, count, "data");
  465. if (State != TcpSocketConnectionState.Connected)
  466. {
  467. throw new InvalidProgramException("This session has been closed.");
  468. }
  469. try
  470. {
  471. byte[] frameBuffer;
  472. int frameBufferOffset;
  473. int frameBufferLength;
  474. _configuration.FrameBuilder.Encoder.EncodeFrame(data, offset, count, out frameBuffer, out frameBufferOffset, out frameBufferLength);
  475. _stream.BeginWrite(frameBuffer, frameBufferOffset, frameBufferLength, HandleDataWritten, _stream);
  476. }
  477. catch (Exception ex)
  478. {
  479. if (IsSocketTimeOut(ex))
  480. {
  481. LOG.Error(ex.Message, ex);
  482. }
  483. else
  484. {
  485. if (!CloseIfShould(ex))
  486. throw;
  487. }
  488. }
  489. }
  490. private void HandleDataWritten(IAsyncResult ar)
  491. {
  492. try
  493. {
  494. _stream.EndWrite(ar);
  495. }
  496. catch (Exception ex)
  497. {
  498. if (!CloseIfShould(ex))
  499. throw;
  500. }
  501. }
  502. public IAsyncResult BeginSend(byte[] data, AsyncCallback callback, object state)
  503. {
  504. if (data == null)
  505. throw new ArgumentNullException("data");
  506. return BeginSend(data, 0, data.Length, callback, state);
  507. }
  508. public IAsyncResult BeginSend(byte[] data, int offset, int count, AsyncCallback callback, object state)
  509. {
  510. BufferValidator.ValidateBuffer(data, offset, count, "data");
  511. if (State != TcpSocketConnectionState.Connected)
  512. {
  513. throw new InvalidProgramException("This session has been closed.");
  514. }
  515. try
  516. {
  517. byte[] frameBuffer;
  518. int frameBufferOffset;
  519. int frameBufferLength;
  520. _configuration.FrameBuilder.Encoder.EncodeFrame(data, offset, count, out frameBuffer, out frameBufferOffset, out frameBufferLength);
  521. return _stream.BeginWrite(frameBuffer, frameBufferOffset, frameBufferLength, callback, state);
  522. }
  523. catch (Exception ex)
  524. {
  525. if (IsSocketTimeOut(ex))
  526. {
  527. LOG.Error(ex.Message, ex);
  528. }
  529. else
  530. {
  531. if (!CloseIfShould(ex))
  532. throw;
  533. }
  534. throw;
  535. }
  536. }
  537. public void EndSend(IAsyncResult asyncResult)
  538. {
  539. HandleDataWritten(asyncResult);
  540. }
  541. #endregion
  542. }
  543. }