| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630 | using System;using System.IO;using System.Net;using System.Net.Security;using System.Net.Sockets;using System.Security.Cryptography.X509Certificates;using System.Threading;using Aitex.Core.RT.Log;using MECF.Framework.Common.Communications.Tcp.Buffer;namespace MECF.Framework.Common.Communications.Tcp.Socket.Server.APM{    public sealed class TcpSocketSession    {        #region Fields        //private static readonly ILogger _log = LoggerManager.GetLogger(Assembly.GetExecutingAssembly(), "Tcp");//. .Get<TcpSocketClient>();        private TcpClient _tcpClient;        private readonly TcpSocketServerConfiguration _configuration;        private readonly ISegmentBufferManager _bufferManager;        private readonly TcpSocketServer _server;        private readonly string _sessionKey;        private Stream _stream;        private ArraySegment<byte> _receiveBuffer = default(ArraySegment<byte>);        private int _receiveBufferOffset = 0;        private IPEndPoint _remoteEndPoint;        private IPEndPoint _localEndPoint;        private int _state;        private const int _none = 0;        private const int _connecting = 1;        private const int _connected = 2;        private const int _disposed = 5;        #endregion        #region Constructors        public TcpSocketSession(            TcpClient tcpClient,            TcpSocketServerConfiguration configuration,            ISegmentBufferManager bufferManager,            TcpSocketServer server)        {            if (tcpClient == null)                throw new ArgumentNullException("tcpClient");            if (configuration == null)                throw new ArgumentNullException("configuration");            if (bufferManager == null)                throw new ArgumentNullException("bufferManager");            if (server == null)                throw new ArgumentNullException("server");            _tcpClient = tcpClient;            _configuration = configuration;            _bufferManager = bufferManager;            _server = server;            _sessionKey = Guid.NewGuid().ToString();            this.StartTime = DateTime.UtcNow;            SetSocketOptions();            _remoteEndPoint = this.RemoteEndPoint;            _localEndPoint = this.LocalEndPoint;        }        #endregion        #region Properties        public string SessionKey { get { return _sessionKey; } }        public DateTime StartTime { get; private set; }        public TimeSpan ConnectTimeout { get { return _configuration.ConnectTimeout; } }        private bool Connected { get { return _tcpClient != null && _tcpClient.Connected; } }        public IPEndPoint RemoteEndPoint { get { return Connected ? (IPEndPoint)_tcpClient.Client.RemoteEndPoint : _remoteEndPoint; } }        public IPEndPoint LocalEndPoint { get { return Connected ? (IPEndPoint)_tcpClient.Client.LocalEndPoint : _localEndPoint; } }        public System.Net.Sockets.Socket Socket { get { return Connected ? _tcpClient.Client : null; } }        public Stream Stream { get { return _stream; } }        public TcpSocketServer Server { get { return _server; } }        public TcpSocketConnectionState State        {            get            {                switch (_state)                {                    case _none:                        return TcpSocketConnectionState.None;                    case _connecting:                        return TcpSocketConnectionState.Connecting;                    case _connected:                        return TcpSocketConnectionState.Connected;                    case _disposed:                        return TcpSocketConnectionState.Closed;                    default:                        return TcpSocketConnectionState.Closed;                }            }        }        public override string ToString()        {            return string.Format("SessionKey[{0}], RemoteEndPoint[{1}], LocalEndPoint[{2}]",                this.SessionKey, this.RemoteEndPoint, this.LocalEndPoint);        }        #endregion        #region Process        internal void Start()        {            int origin = Interlocked.CompareExchange(ref _state, _connecting, _none);            if (origin == _disposed)            {                throw new ObjectDisposedException("This tcp socket session has been disposed when connecting.");            }            else if (origin != _none)            {                throw new InvalidOperationException("This tcp socket session is in invalid state when connecting.");            }            try            {                _stream = NegotiateStream(_tcpClient.GetStream());                if (_receiveBuffer == default(ArraySegment<byte>))                    _receiveBuffer = _bufferManager.BorrowBuffer();                _receiveBufferOffset = 0;                if (Interlocked.CompareExchange(ref _state, _connected, _connecting) != _connecting)                {                    Close();                    throw new ObjectDisposedException("This tcp socket session has been disposed after connected.");                }                bool isErrorOccurredInUserSide = false;                try                {                    _server.RaiseClientConnected(this);                }                catch (Exception ex)                {                    isErrorOccurredInUserSide = true;                    HandleUserSideError(ex);                }                if (!isErrorOccurredInUserSide)                {                    ContinueReadBuffer();                }                else                {                    Close();                }            }            catch (Exception ex)            {                LOG.WriteExeption(ex);                Close();            }        }        public void Close()        {            if (Interlocked.Exchange(ref _state, _disposed) == _disposed)            {                return;            }            Clean();            try            {                _server.RaiseClientDisconnected(this);            }            catch (Exception ex)            {                HandleUserSideError(ex);            }        }        private void Clean()        {            try            {                try                {                    if (_stream != null)                    {                        _stream.Dispose();                    }                }                catch { }                try                {                    if (_tcpClient != null)                    {                        _tcpClient.Close();                    }                }                catch { }            }            catch { }            finally            {                _stream = null;                _tcpClient = null;            }            if (_receiveBuffer != default(ArraySegment<byte>))                _configuration.BufferManager.ReturnBuffer(_receiveBuffer);            _receiveBuffer = default(ArraySegment<byte>);            _receiveBufferOffset = 0;        }        private void SetSocketOptions()        {            _tcpClient.ReceiveBufferSize = _configuration.ReceiveBufferSize;            _tcpClient.SendBufferSize = _configuration.SendBufferSize;            _tcpClient.ReceiveTimeout = (int)_configuration.ReceiveTimeout.TotalMilliseconds;            _tcpClient.SendTimeout = (int)_configuration.SendTimeout.TotalMilliseconds;            _tcpClient.NoDelay = _configuration.NoDelay;            _tcpClient.LingerState = _configuration.LingerState;            if (_configuration.KeepAlive)            {                _tcpClient.Client.SetSocketOption(                    SocketOptionLevel.Socket,                    SocketOptionName.KeepAlive,                    (int)_configuration.KeepAliveInterval.TotalMilliseconds);            }            _tcpClient.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, _configuration.ReuseAddress);        }        private Stream NegotiateStream(Stream stream)        {            if (!_configuration.SslEnabled)                return stream;            var validateRemoteCertificate = new RemoteCertificateValidationCallback(                (object sender,                X509Certificate certificate,                X509Chain chain,                SslPolicyErrors sslPolicyErrors)                =>                {                    if (sslPolicyErrors == SslPolicyErrors.None)                        return true;                    if (_configuration.SslPolicyErrorsBypassed)                        return true;                    else                        //LOG.Error(String.Format("Session [{0}] error occurred when validating remote certificate: [{1}], [{2}].",                        //    this, this.RemoteEndPoint, sslPolicyErrors));                    return false;                });            var sslStream = new SslStream(                stream,                false,                validateRemoteCertificate,                null,                _configuration.SslEncryptionPolicy);            IAsyncResult ar = null;            if (!_configuration.SslClientCertificateRequired)            {                ar = sslStream.BeginAuthenticateAsServer(                    _configuration.SslServerCertificate, // The X509Certificate used to authenticate the server.                    null, _tcpClient);            }            else            {                ar = sslStream.BeginAuthenticateAsServer(                    _configuration.SslServerCertificate, // The X509Certificate used to authenticate the server.                    _configuration.SslClientCertificateRequired, // A Boolean value that specifies whether the client must supply a certificate for authentication.                    _configuration.SslEnabledProtocols, // The SslProtocols value that represents the protocol used for authentication.                    _configuration.SslCheckCertificateRevocation, // A Boolean value that specifies whether the certificate revocation list is checked during authentication.                    null, _tcpClient);            }            if (!ar.AsyncWaitHandle.WaitOne(ConnectTimeout))            {                Close();                throw new TimeoutException(string.Format(                    "Negotiate SSL/TSL with remote [{0}] timeout [{1}].", this.RemoteEndPoint, ConnectTimeout));            }            // When authentication succeeds, you must check the IsEncrypted and IsSigned properties             // to determine what security services are used by the SslStream.             // Check the IsMutuallyAuthenticated property to determine whether mutual authentication occurred.            //LOG.Info(string.Format(            //    "Ssl Stream: SslProtocol[{0}], IsServer[{1}], IsAuthenticated[{2}], IsEncrypted[{3}], IsSigned[{4}], IsMutuallyAuthenticated[{5}], "            //    + "HashAlgorithm[{6}], HashStrength[{7}], KeyExchangeAlgorithm[{8}], KeyExchangeStrength[{9}], CipherAlgorithm[{10}], CipherStrength[{11}].",            //    sslStream.SslProtocol,            //    sslStream.IsServer,            //    sslStream.IsAuthenticated,            //    sslStream.IsEncrypted,            //    sslStream.IsSigned,            //    sslStream.IsMutuallyAuthenticated,            //    sslStream.HashAlgorithm,            //    sslStream.HashStrength,            //    sslStream.KeyExchangeAlgorithm,            //    sslStream.KeyExchangeStrength,            //    sslStream.CipherAlgorithm,            //    sslStream.CipherStrength));            return sslStream;        }        private void ContinueReadBuffer()        {            try            {                _stream.BeginRead(                    _receiveBuffer.Array,                    _receiveBuffer.Offset + _receiveBufferOffset,                    _receiveBuffer.Count - _receiveBufferOffset,                    HandleDataReceived,                    _stream);            }            catch (Exception ex)            {                if (!CloseIfShould(ex))                    throw;            }        }        private void HandleDataReceived(IAsyncResult ar)        {            if (State != TcpSocketConnectionState.Connected)                return;            try            {                // when callback to here the stream may have been closed                if (_stream == null)                    return;                int numberOfReadBytes = 0;                try                {                    // The EndRead method blocks until data is available. The EndRead method reads                     // as much data as is available up to the number of bytes specified in the size                     // parameter of the BeginRead method. If the remote host shuts down the Socket                     // connection and all available data has been received, the EndRead method                     // completes immediately and returns zero bytes.                    numberOfReadBytes = _stream.EndRead(ar);                }                catch (Exception)                {                    // unable to read data from transport connection,                     // the existing connection was forcibly closed by remote host                    numberOfReadBytes = 0;                }                if (numberOfReadBytes == 0)                {                    // connection has been closed                    Close();                    return;                }                ReceiveBuffer(numberOfReadBytes);                ContinueReadBuffer();            }            catch (Exception ex)            {                if (!CloseIfShould(ex))                    throw;            }        }        private void ReceiveBuffer(int receiveCount)        {            // TCP guarantees delivery of all packets in the correct order.             // But there is no guarantee that one write operation on the sender-side will result in             // one read event on the receiving side. One call of write(message) by the sender             // can result in multiple messageReceived(session, message) events on the receiver;             // and multiple calls of write(message) can lead to a single messageReceived event.            // In a stream-based transport such as TCP/IP, received data is stored into a socket receive buffer.             // Unfortunately, the buffer of a stream-based transport is not a queue of packets but a queue of bytes.             // It means, even if you sent two messages as two independent packets,             // an operating system will not treat them as two messages but as just a bunch of bytes.             // Therefore, there is no guarantee that what you read is exactly what your remote peer wrote.            // There are three common techniques for splitting the stream of bytes into messages:            //   1. use fixed length messages            //   2. use a fixed length header that indicates the length of the body            //   3. using a delimiter; for example many text-based protocols append            //      a newline (or CR LF pair) after every message.            int frameLength;            byte[] payload;            int payloadOffset;            int payloadCount;            int consumedLength = 0;            SegmentBufferDeflector.ReplaceBuffer(_bufferManager, ref _receiveBuffer, ref _receiveBufferOffset, receiveCount);            while (true)            {                frameLength = 0;                payload = null;                payloadOffset = 0;                payloadCount = 0;                if (_configuration.FrameBuilder.Decoder.TryDecodeFrame(                    _receiveBuffer.Array,                    _receiveBuffer.Offset + consumedLength,                    _receiveBufferOffset - consumedLength,                    out frameLength, out payload, out payloadOffset, out payloadCount))                {                    try                    {                        _server.RaiseClientDataReceived(this, payload, payloadOffset, payloadCount);                    }                    catch (Exception ex)                    {                        HandleUserSideError(ex);                    }                    finally                    {                        consumedLength += frameLength;                    }                }                else                {                    break;                }            }            try            {                SegmentBufferDeflector.ShiftBuffer(_bufferManager, consumedLength, ref _receiveBuffer, ref _receiveBufferOffset);            }            catch (ArgumentOutOfRangeException) { }        }        #endregion        #region Exception Handler        private bool IsSocketTimeOut(Exception ex)        {            return ex is IOException                && ex.InnerException != null                && ex.InnerException is SocketException                && (ex.InnerException as SocketException).SocketErrorCode == SocketError.TimedOut;        }        private bool CloseIfShould(Exception ex)        {            if (ex is ObjectDisposedException                || ex is InvalidOperationException                || ex is SocketException                || ex is IOException                || ex is NullReferenceException                )            {                if (ex is SocketException)                    //LOG.Error(string.Format("Session [{0}] exception occurred, [{1}].", this, ex.Message), ex);                // connection has been closed                Close();                return true;            }            return false;        }        private void HandleUserSideError(Exception ex)        {            //LOG.Error(string.Format("Session [{0}] error occurred in user side [{1}].", this, ex.Message), ex);        }        #endregion        #region Send        public void Send(byte[] data)        {            if (data == null)                throw new ArgumentNullException("data");            Send(data, 0, data.Length);        }        public void Send(byte[] data, int offset, int count)        {            BufferValidator.ValidateBuffer(data, offset, count, "data");            if (State != TcpSocketConnectionState.Connected)            {                throw new InvalidProgramException("This session has been closed.");            }            try            {                byte[] frameBuffer;                int frameBufferOffset;                int frameBufferLength;                _configuration.FrameBuilder.Encoder.EncodeFrame(data, offset, count, out frameBuffer, out frameBufferOffset, out frameBufferLength);                _stream.Write(frameBuffer, frameBufferOffset, frameBufferLength);            }            catch (Exception ex)            {                if (IsSocketTimeOut(ex))                {                    //LOG.Error(ex.Message, ex);                }                else                {                    if (!CloseIfShould(ex))                        throw;                }            }        }        public void BeginSend(byte[] data)        {            if (data == null)                throw new ArgumentNullException("data");            BeginSend(data, 0, data.Length);        }        public void BeginSend(byte[] data, int offset, int count)        {            BufferValidator.ValidateBuffer(data, offset, count, "data");            if (State != TcpSocketConnectionState.Connected)            {                throw new InvalidProgramException("This session has been closed.");            }            try            {                byte[] frameBuffer;                int frameBufferOffset;                int frameBufferLength;                _configuration.FrameBuilder.Encoder.EncodeFrame(data, offset, count, out frameBuffer, out frameBufferOffset, out frameBufferLength);                _stream.BeginWrite(frameBuffer, frameBufferOffset, frameBufferLength, HandleDataWritten, _stream);            }            catch (Exception ex)            {                if (IsSocketTimeOut(ex))                {                    //LOG.Error(ex.Message, ex);                }                else                {                    if (!CloseIfShould(ex))                        throw;                }            }        }        private void HandleDataWritten(IAsyncResult ar)        {            try            {                _stream.EndWrite(ar);            }            catch (Exception ex)            {                if (!CloseIfShould(ex))                    throw;            }        }        public IAsyncResult BeginSend(byte[] data, AsyncCallback callback, object state)        {            if (data == null)                throw new ArgumentNullException("data");            return BeginSend(data, 0, data.Length, callback, state);        }        public IAsyncResult BeginSend(byte[] data, int offset, int count, AsyncCallback callback, object state)        {            BufferValidator.ValidateBuffer(data, offset, count, "data");            if (State != TcpSocketConnectionState.Connected)            {                throw new InvalidProgramException("This session has been closed.");            }            try            {                byte[] frameBuffer;                int frameBufferOffset;                int frameBufferLength;                _configuration.FrameBuilder.Encoder.EncodeFrame(data, offset, count, out frameBuffer, out frameBufferOffset, out frameBufferLength);                return _stream.BeginWrite(frameBuffer, frameBufferOffset, frameBufferLength, callback, state);            }            catch (Exception ex)            {                if (IsSocketTimeOut(ex))                {                    //LOG.Error(ex.Message, ex);                }                else                {                    if (!CloseIfShould(ex))                        throw;                }                throw;            }        }        public void EndSend(IAsyncResult asyncResult)        {            HandleDataWritten(asyncResult);        }        #endregion    }}
 |