|
@@ -1,317 +1,333 @@
|
|
|
-using Aitex.Core.Util;
|
|
|
-using SecsGem.Core;
|
|
|
-using SecsGem.Core.EnumData;
|
|
|
-using SecsGem.Core.ItemModel;
|
|
|
-using SecsGem.Core.Utility;
|
|
|
-using SecsGem.Core.Validator;
|
|
|
-using System;
|
|
|
-using System.Buffers;
|
|
|
-using System.Buffers.Binary;
|
|
|
-using System.Collections.Generic;
|
|
|
-using System.Diagnostics;
|
|
|
-using System.IO.Pipelines;
|
|
|
-using System.Linq;
|
|
|
-using System.Runtime.CompilerServices;
|
|
|
-using System.Runtime.Remoting.Channels;
|
|
|
-using System.Runtime.Remoting.Messaging;
|
|
|
-using System.Text;
|
|
|
-using System.Threading;
|
|
|
-using System.Threading.Channels;
|
|
|
-using System.Threading.Tasks;
|
|
|
-
|
|
|
-namespace SecsGem.Hsms.Core
|
|
|
-{
|
|
|
- public class PipeDecoder
|
|
|
- {
|
|
|
- private readonly PipeReader _reader;
|
|
|
- public PipeWriter Input { get; }
|
|
|
-
|
|
|
- public event EventHandler<SecsMessage> OnSendResponse;
|
|
|
-
|
|
|
- public event DelegateEvent.MessageEventHander OnAnalyseMessageEventHandler;
|
|
|
-
|
|
|
- public event DelegateEvent.MessageEventHander OnSendMessageEventHandler;
|
|
|
-
|
|
|
- public event DelegateEvent.MessageEventHander OnReplyMessageEventHandler;
|
|
|
-
|
|
|
- public event EventHandler<string> OnDisConnection;
|
|
|
-
|
|
|
- public PipeDecoder(PipeReader reader, PipeWriter input,CancellationToken cancellationToken)
|
|
|
- {
|
|
|
- _reader = reader;
|
|
|
- Input = input;
|
|
|
- }
|
|
|
-
|
|
|
- public Task StartAsync(CancellationToken cancellation)
|
|
|
- => DecodeLoopAsync(cancellation);
|
|
|
-
|
|
|
-
|
|
|
- private async Task DecodeLoopAsync(
|
|
|
- CancellationToken cancellation)
|
|
|
- {
|
|
|
- var totalLengthBytes = new byte[4];
|
|
|
- var messageHeaderBytes = new byte[10];
|
|
|
- // PipeReader peek first
|
|
|
- ReadOnlySequence<byte> buffer = new ReadOnlySequence<byte>() ;
|
|
|
- try
|
|
|
- {
|
|
|
- buffer = await PipeReadAsync(_reader, required: 4, cancellation).ConfigureAwait(false);
|
|
|
- }
|
|
|
- catch(Exception ex)
|
|
|
- {
|
|
|
- HsmsLogMamager.Instance.Error("PipeReadAsync error", ex);
|
|
|
- Dispose();
|
|
|
- return;
|
|
|
- }
|
|
|
- while (true)
|
|
|
- {
|
|
|
- try
|
|
|
- {
|
|
|
- // 0: get total message length 4 bytes
|
|
|
- if (IsBufferInsufficient(_reader, ref buffer, required: 4))
|
|
|
- {
|
|
|
- buffer = await PipeReadAsync(_reader, required: 4, cancellation).ConfigureAwait(false);
|
|
|
- }
|
|
|
- var totalLengthSeq = buffer.Slice(buffer.Start, 4);
|
|
|
- totalLengthSeq.CopyTo(totalLengthBytes);
|
|
|
- uint messageLength = BinaryPrimitives.ReadUInt32BigEndian(totalLengthBytes);
|
|
|
- buffer = buffer.Slice(totalLengthSeq.End);
|
|
|
- HsmsLogMamager.Instance.WriteReceivedBuffer("length buffer",totalLengthBytes);
|
|
|
-
|
|
|
- // 1: get message header 10 bytes
|
|
|
- if (IsBufferInsufficient(_reader, ref buffer, required: 10))
|
|
|
- {
|
|
|
- buffer = await PipeReadAsync(_reader, required: 10, cancellation).ConfigureAwait(false);
|
|
|
- }
|
|
|
- var messageHaderSeq = buffer.Slice(buffer.Start, 10);
|
|
|
- messageHaderSeq.CopyTo(messageHeaderBytes);
|
|
|
- Singleton<SecsMessageHeaderDecoder>.Instance.DecodeHsmsHeader(messageHeaderBytes, out var header);
|
|
|
- buffer = buffer.Slice(messageHaderSeq.End);
|
|
|
- HsmsLogMamager.Instance.WriteReceivedBuffer("header buffer",messageHeaderBytes);
|
|
|
-
|
|
|
- if (messageLength > 10 && buffer.Length < messageLength - 10)
|
|
|
- {
|
|
|
- if (IsBufferInsufficient(_reader, ref buffer, (int)(messageLength - 10)))
|
|
|
- {
|
|
|
- buffer = await PipeReadAsync(_reader, (int)(messageLength - 10), cancellation).ConfigureAwait(false);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (messageLength == 10) // only message header
|
|
|
- {
|
|
|
- if (header.SType == SType.DataMessage)
|
|
|
- {
|
|
|
- SecsMessage secsMessage = new SecsMessage(header, null);
|
|
|
- AnalyseMessageEvent(secsMessage);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- if ((byte)header.SType % 2 == 0)
|
|
|
- {
|
|
|
- SecsMessage replyMessage = new SecsMessage(header, null);
|
|
|
- ReplyMessageEvent(replyMessage);
|
|
|
- }
|
|
|
- else if (header.SType == SType.SelectRequest)
|
|
|
- {
|
|
|
- HsmsLogMamager.Instance.Info("received host selected request");
|
|
|
- SendSelectResponse(SType.SelectResponse, header.TransactionId);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- //存在Item数据
|
|
|
- else if (buffer.Length >= messageLength - 10)
|
|
|
- {
|
|
|
- var dataBuffer= new byte[messageLength-10];
|
|
|
- var dataSeq = buffer.Slice(buffer.Start, messageLength-10);
|
|
|
- dataSeq.CopyTo(dataBuffer);
|
|
|
-
|
|
|
- HsmsLogMamager.Instance.WriteReceivedBuffer("data item buffer", messageHeaderBytes);
|
|
|
- int startIndex = 0;
|
|
|
- Item item = null;
|
|
|
- try
|
|
|
- {
|
|
|
- item = ItemDecoder.Decode(dataBuffer, ref startIndex);
|
|
|
- if (item == null)
|
|
|
- {
|
|
|
- Dispose();
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- catch (Exception ex)
|
|
|
- {
|
|
|
- HsmsLogMamager.Instance.Error("DecodeLoopAsync decode item error", ex);
|
|
|
- Dispose();
|
|
|
- break;
|
|
|
- }
|
|
|
- SecsMessage secsMessage = new SecsMessage(header, item);
|
|
|
-
|
|
|
- buffer = buffer.Slice(dataSeq.End);
|
|
|
- if (header.SType == SType.DataMessage)
|
|
|
- {
|
|
|
- if(header.Function % 2 == 0)//回复指令
|
|
|
- {
|
|
|
- ReplyMessageEvent(secsMessage);
|
|
|
- }
|
|
|
- else//Host主动指令
|
|
|
- {
|
|
|
- AnalyseMessageEvent(secsMessage);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- catch (Exception ex)
|
|
|
- {
|
|
|
- HsmsLogMamager.Instance.Error("DecodeLoopAsync error", ex);
|
|
|
- Dispose();
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /// <summary>
|
|
|
- /// 数组不足
|
|
|
- /// </summary>
|
|
|
- /// <param name="reader"></param>
|
|
|
- /// <param name="remainedBuffer"></param>
|
|
|
- /// <param name="required"></param>
|
|
|
- /// <returns></returns>
|
|
|
- private bool IsBufferInsufficient(PipeReader reader, ref ReadOnlySequence<byte> remainedBuffer, int required)
|
|
|
- {
|
|
|
- if (remainedBuffer.Length >= required)
|
|
|
- {
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- reader.AdvanceTo(remainedBuffer.Start);
|
|
|
- return !PipeTryRead(reader, required, ref remainedBuffer);
|
|
|
- }
|
|
|
-
|
|
|
- //管道尝试读取
|
|
|
- private bool PipeTryRead(PipeReader reader, int required, ref ReadOnlySequence<byte> buffer)
|
|
|
- {
|
|
|
- if (reader.TryRead(out var result))
|
|
|
- {
|
|
|
- buffer = result.Buffer;
|
|
|
- if (buffer.Length >= required)
|
|
|
- {
|
|
|
- return true;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- reader.AdvanceTo(consumed: buffer.Start, examined: buffer.End);
|
|
|
- }
|
|
|
- }
|
|
|
- return false;
|
|
|
- }
|
|
|
- //管道读取缓存
|
|
|
- private ValueTask<ReadOnlySequence<byte>> PipeReadAsync(PipeReader reader, int required, CancellationToken cancellation)
|
|
|
- {
|
|
|
- ReadOnlySequence<byte> buffer = ReadOnlySequence<byte>.Empty;
|
|
|
- if (PipeTryRead(reader, required, ref buffer))
|
|
|
- {
|
|
|
- return new ValueTask<ReadOnlySequence<byte>>(buffer);
|
|
|
- }
|
|
|
-
|
|
|
- return SlowPipeReadAsync(reader, required, cancellation);
|
|
|
- }
|
|
|
- /// <summary>
|
|
|
- /// 阻塞读取超出长度
|
|
|
- /// </summary>
|
|
|
- /// <param name="reader"></param>
|
|
|
- /// <param name="required"></param>
|
|
|
- /// <param name="cancellation"></param>
|
|
|
- /// <returns></returns>
|
|
|
- async ValueTask<ReadOnlySequence<byte>> SlowPipeReadAsync(PipeReader reader, int required, CancellationToken cancellation)
|
|
|
- {
|
|
|
- while (true)
|
|
|
- {
|
|
|
- var result = await reader.ReadAsync(cancellation).ConfigureAwait(false);
|
|
|
- var buffer = result.Buffer;
|
|
|
-
|
|
|
- if (buffer.Length >= required)
|
|
|
- {
|
|
|
- return buffer;
|
|
|
- }
|
|
|
- reader.AdvanceTo(consumed: buffer.Start, examined: buffer.End);
|
|
|
- }
|
|
|
- }
|
|
|
- /// <summary>
|
|
|
- /// 发送状态select回复
|
|
|
- /// </summary>
|
|
|
- /// <param name="msgType"></param>
|
|
|
- /// <param name="id"></param>
|
|
|
- private void SendSelectResponse(SType msgType,int id)
|
|
|
- {
|
|
|
- SecsMessage secsMessage = SecsReplyMessageFactory.CreateSelectedReplyMessage(id);
|
|
|
- if(OnSendResponse!=null)
|
|
|
- {
|
|
|
- OnSendResponse(this,secsMessage);
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
- /// <summary>
|
|
|
- /// 接收到回复消息
|
|
|
- /// </summary>
|
|
|
- /// <param name="secsMessage"></param>
|
|
|
- private void ReplyMessageEvent(SecsMessage secsMessage)
|
|
|
- {
|
|
|
- if (OnReplyMessageEventHandler != null)
|
|
|
- {
|
|
|
- OnReplyMessageEventHandler(secsMessage.Header.TransactionId, secsMessage);
|
|
|
- }
|
|
|
- }
|
|
|
- /// <summary>
|
|
|
- /// 发送消息
|
|
|
- /// </summary>
|
|
|
- /// <param name="secsMessage"></param>
|
|
|
- private void SendMessageEvent(SecsMessage secsMessage)
|
|
|
- {
|
|
|
- if (OnSendMessageEventHandler != null)
|
|
|
- {
|
|
|
- OnSendMessageEventHandler(secsMessage.Header.TransactionId, secsMessage);
|
|
|
- }
|
|
|
- }
|
|
|
- /// <summary>
|
|
|
- /// 解析消息
|
|
|
- /// </summary>
|
|
|
- /// <param name="secsMessage"></param>
|
|
|
- private void AnalyseMessageEvent(SecsMessage secsMessage)
|
|
|
- {
|
|
|
- HsmsLogMamager.Instance.WriteHostInfo(SmlSerializationUtil.SerializeItemToString(secsMessage));
|
|
|
- try
|
|
|
- {
|
|
|
- SecsMessageValidator secsMessageValidator = ValidatorFactory.CreateValidator(secsMessage);
|
|
|
- secsMessageValidator.DeviceId = GlobalData.DeviceId;
|
|
|
- //判定消息格式
|
|
|
- ValidateType validateType = secsMessageValidator.Validate(secsMessage);
|
|
|
- if (validateType == ValidateType.OK)
|
|
|
- {
|
|
|
- if (OnAnalyseMessageEventHandler != null)//回调分析事件
|
|
|
- {
|
|
|
- OnAnalyseMessageEventHandler(secsMessage.Header.TransactionId, secsMessage);
|
|
|
- }
|
|
|
- }
|
|
|
- else//错误格式
|
|
|
- {
|
|
|
- SecsMessage errorMessage = SecsReplyMessageFactory.CreateErrorMessageReply(validateType, secsMessage.Header);
|
|
|
- SendMessageEvent(errorMessage);
|
|
|
- }
|
|
|
- }
|
|
|
- catch(Exception ex)
|
|
|
- {
|
|
|
- HsmsLogMamager.Instance.Error("AnalyseMessage error", ex);
|
|
|
- Dispose();
|
|
|
- }
|
|
|
- }
|
|
|
- /// <summary>
|
|
|
- /// 释放资源
|
|
|
- /// </summary>
|
|
|
- private void Dispose()
|
|
|
- {
|
|
|
- if(OnDisConnection!=null)
|
|
|
- {
|
|
|
- OnDisConnection(this,"");
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
+using Aitex.Core.Util;
|
|
|
+using SecsGem.Core;
|
|
|
+using SecsGem.Core.EnumData;
|
|
|
+using SecsGem.Core.ItemModel;
|
|
|
+using SecsGem.Core.Utility;
|
|
|
+using SecsGem.Core.Validator;
|
|
|
+using System;
|
|
|
+using System.Buffers;
|
|
|
+using System.Buffers.Binary;
|
|
|
+using System.Collections.Generic;
|
|
|
+using System.Diagnostics;
|
|
|
+using System.IO.Pipelines;
|
|
|
+using System.Linq;
|
|
|
+using System.Runtime.CompilerServices;
|
|
|
+using System.Runtime.Remoting.Channels;
|
|
|
+using System.Runtime.Remoting.Messaging;
|
|
|
+using System.Text;
|
|
|
+using System.Threading;
|
|
|
+using System.Threading.Channels;
|
|
|
+using System.Threading.Tasks;
|
|
|
+
|
|
|
+namespace SecsGem.Hsms.Core
|
|
|
+{
|
|
|
+ public class PipeDecoder
|
|
|
+ {
|
|
|
+ private readonly PipeReader _reader;
|
|
|
+ public PipeWriter Input { get; }
|
|
|
+
|
|
|
+ public event EventHandler<SecsMessage> OnSendResponse;
|
|
|
+
|
|
|
+ public event DelegateEvent.MessageEventHander OnAnalyseMessageEventHandler;
|
|
|
+
|
|
|
+ public event DelegateEvent.MessageEventHander OnSendMessageEventHandler;
|
|
|
+
|
|
|
+ public event DelegateEvent.MessageEventHander OnReplyMessageEventHandler;
|
|
|
+
|
|
|
+ public event EventHandler<string> OnDisConnection;
|
|
|
+
|
|
|
+ public PipeDecoder(PipeReader reader, PipeWriter input,CancellationToken cancellationToken)
|
|
|
+ {
|
|
|
+ _reader = reader;
|
|
|
+ Input = input;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Task StartAsync(CancellationToken cancellation)
|
|
|
+ => DecodeLoopAsync(cancellation);
|
|
|
+
|
|
|
+
|
|
|
+ private async Task DecodeLoopAsync(
|
|
|
+ CancellationToken cancellation)
|
|
|
+ {
|
|
|
+ var totalLengthBytes = new byte[4];
|
|
|
+ var messageHeaderBytes = new byte[10];
|
|
|
+ // PipeReader peek first
|
|
|
+ ReadOnlySequence<byte> buffer = new ReadOnlySequence<byte>() ;
|
|
|
+ try
|
|
|
+ {
|
|
|
+ buffer = await PipeReadAsync(_reader, required: 4, cancellation).ConfigureAwait(false);
|
|
|
+ }
|
|
|
+ catch(Exception ex)
|
|
|
+ {
|
|
|
+ HsmsLogMamager.Instance.Error("PipeReadAsync error", ex);
|
|
|
+ Dispose();
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ while (true)
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ // 0: get total message length 4 bytes
|
|
|
+ if (IsBufferInsufficient(_reader, ref buffer, required: 4))
|
|
|
+ {
|
|
|
+ buffer = await PipeReadAsync(_reader, required: 4, cancellation).ConfigureAwait(false);
|
|
|
+ }
|
|
|
+ var totalLengthSeq = buffer.Slice(buffer.Start, 4);
|
|
|
+ totalLengthSeq.CopyTo(totalLengthBytes);
|
|
|
+ uint messageLength = BinaryPrimitives.ReadUInt32BigEndian(totalLengthBytes);
|
|
|
+ buffer = buffer.Slice(totalLengthSeq.End);
|
|
|
+ HsmsLogMamager.Instance.WriteReceivedBuffer("length buffer",totalLengthBytes);
|
|
|
+
|
|
|
+ // 1: get message header 10 bytes
|
|
|
+ if (IsBufferInsufficient(_reader, ref buffer, required: 10))
|
|
|
+ {
|
|
|
+ buffer = await PipeReadAsync(_reader, required: 10, cancellation).ConfigureAwait(false);
|
|
|
+ }
|
|
|
+ var messageHaderSeq = buffer.Slice(buffer.Start, 10);
|
|
|
+ messageHaderSeq.CopyTo(messageHeaderBytes);
|
|
|
+ Singleton<SecsMessageHeaderDecoder>.Instance.DecodeHsmsHeader(messageHeaderBytes, out var header);
|
|
|
+ buffer = buffer.Slice(messageHaderSeq.End);
|
|
|
+ HsmsLogMamager.Instance.WriteReceivedBuffer("header buffer",messageHeaderBytes);
|
|
|
+
|
|
|
+ if (messageLength > 10 && buffer.Length < messageLength - 10)
|
|
|
+ {
|
|
|
+ if (IsBufferInsufficient(_reader, ref buffer, (int)(messageLength - 10)))
|
|
|
+ {
|
|
|
+ buffer = await PipeReadAsync(_reader, (int)(messageLength - 10), cancellation).ConfigureAwait(false);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (messageLength == 10) // only message header
|
|
|
+ {
|
|
|
+ if (header.SType == SType.DataMessage)
|
|
|
+ {
|
|
|
+ SecsMessage secsMessage = new SecsMessage(header, null);
|
|
|
+ AnalyseMessageEvent(secsMessage);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ if ((byte)header.SType % 2 == 0)
|
|
|
+ {
|
|
|
+ SecsMessage replyMessage = new SecsMessage(header, null);
|
|
|
+ ReplyMessageEvent(replyMessage);
|
|
|
+ }
|
|
|
+ else if (header.SType == SType.SelectRequest)
|
|
|
+ {
|
|
|
+ HsmsLogMamager.Instance.Info("received host selected request");
|
|
|
+ SendSelectResponse(header.TransactionId);
|
|
|
+ }
|
|
|
+ else if (header.SType == SType.LinkTestRequest)
|
|
|
+ {
|
|
|
+ HsmsLogMamager.Instance.Info("received host linktest request");
|
|
|
+ SendLinkTestResponse(header.TransactionId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //存在Item数据
|
|
|
+ else if (buffer.Length >= messageLength - 10)
|
|
|
+ {
|
|
|
+ var dataBuffer= new byte[messageLength-10];
|
|
|
+ var dataSeq = buffer.Slice(buffer.Start, messageLength-10);
|
|
|
+ dataSeq.CopyTo(dataBuffer);
|
|
|
+
|
|
|
+ HsmsLogMamager.Instance.WriteReceivedBuffer("data item buffer", messageHeaderBytes);
|
|
|
+ int startIndex = 0;
|
|
|
+ Item item = null;
|
|
|
+ try
|
|
|
+ {
|
|
|
+ item = ItemDecoder.Decode(dataBuffer, ref startIndex);
|
|
|
+ if (item == null)
|
|
|
+ {
|
|
|
+ Dispose();
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ HsmsLogMamager.Instance.Error("DecodeLoopAsync decode item error", ex);
|
|
|
+ Dispose();
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ SecsMessage secsMessage = new SecsMessage(header, item);
|
|
|
+
|
|
|
+ buffer = buffer.Slice(dataSeq.End);
|
|
|
+ if (header.SType == SType.DataMessage)
|
|
|
+ {
|
|
|
+ if(header.Function % 2 == 0)//回复指令
|
|
|
+ {
|
|
|
+ ReplyMessageEvent(secsMessage);
|
|
|
+ }
|
|
|
+ else//Host主动指令
|
|
|
+ {
|
|
|
+ AnalyseMessageEvent(secsMessage);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ HsmsLogMamager.Instance.Error("DecodeLoopAsync error", ex);
|
|
|
+ Dispose();
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 数组不足
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="reader"></param>
|
|
|
+ /// <param name="remainedBuffer"></param>
|
|
|
+ /// <param name="required"></param>
|
|
|
+ /// <returns></returns>
|
|
|
+ private bool IsBufferInsufficient(PipeReader reader, ref ReadOnlySequence<byte> remainedBuffer, int required)
|
|
|
+ {
|
|
|
+ if (remainedBuffer.Length >= required)
|
|
|
+ {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ reader.AdvanceTo(remainedBuffer.Start);
|
|
|
+ return !PipeTryRead(reader, required, ref remainedBuffer);
|
|
|
+ }
|
|
|
+
|
|
|
+ //管道尝试读取
|
|
|
+ private bool PipeTryRead(PipeReader reader, int required, ref ReadOnlySequence<byte> buffer)
|
|
|
+ {
|
|
|
+ if (reader.TryRead(out var result))
|
|
|
+ {
|
|
|
+ buffer = result.Buffer;
|
|
|
+ if (buffer.Length >= required)
|
|
|
+ {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ reader.AdvanceTo(consumed: buffer.Start, examined: buffer.End);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ //管道读取缓存
|
|
|
+ private ValueTask<ReadOnlySequence<byte>> PipeReadAsync(PipeReader reader, int required, CancellationToken cancellation)
|
|
|
+ {
|
|
|
+ ReadOnlySequence<byte> buffer = ReadOnlySequence<byte>.Empty;
|
|
|
+ if (PipeTryRead(reader, required, ref buffer))
|
|
|
+ {
|
|
|
+ return new ValueTask<ReadOnlySequence<byte>>(buffer);
|
|
|
+ }
|
|
|
+
|
|
|
+ return SlowPipeReadAsync(reader, required, cancellation);
|
|
|
+ }
|
|
|
+ /// <summary>
|
|
|
+ /// 阻塞读取超出长度
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="reader"></param>
|
|
|
+ /// <param name="required"></param>
|
|
|
+ /// <param name="cancellation"></param>
|
|
|
+ /// <returns></returns>
|
|
|
+ async ValueTask<ReadOnlySequence<byte>> SlowPipeReadAsync(PipeReader reader, int required, CancellationToken cancellation)
|
|
|
+ {
|
|
|
+ while (true)
|
|
|
+ {
|
|
|
+ var result = await reader.ReadAsync(cancellation).ConfigureAwait(false);
|
|
|
+ var buffer = result.Buffer;
|
|
|
+
|
|
|
+ if (buffer.Length >= required)
|
|
|
+ {
|
|
|
+ return buffer;
|
|
|
+ }
|
|
|
+ reader.AdvanceTo(consumed: buffer.Start, examined: buffer.End);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ /// <summary>
|
|
|
+ /// 发送状态select回复
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="msgType"></param>
|
|
|
+ /// <param name="id"></param>
|
|
|
+ private void SendSelectResponse(int id)
|
|
|
+ {
|
|
|
+ SecsMessage secsMessage = SecsReplyMessageFactory.CreateSelectedReplyMessage(id);
|
|
|
+ if(OnSendResponse!=null)
|
|
|
+ {
|
|
|
+ OnSendResponse(this,secsMessage);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ /// <summary>
|
|
|
+ /// 发送LinkTest回复
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="id"></param>
|
|
|
+ private void SendLinkTestResponse(int id)
|
|
|
+ {
|
|
|
+ SecsMessage secsMessage = SecsReplyMessageFactory.CreateLinkTestReplyMessage(id);
|
|
|
+ if (OnSendResponse != null)
|
|
|
+ {
|
|
|
+ OnSendResponse(this, secsMessage);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ /// <summary>
|
|
|
+ /// 接收到回复消息
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="secsMessage"></param>
|
|
|
+ private void ReplyMessageEvent(SecsMessage secsMessage)
|
|
|
+ {
|
|
|
+ if (OnReplyMessageEventHandler != null)
|
|
|
+ {
|
|
|
+ OnReplyMessageEventHandler(secsMessage.Header.TransactionId, secsMessage);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ /// <summary>
|
|
|
+ /// 发送消息
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="secsMessage"></param>
|
|
|
+ private void SendMessageEvent(SecsMessage secsMessage)
|
|
|
+ {
|
|
|
+ if (OnSendMessageEventHandler != null)
|
|
|
+ {
|
|
|
+ OnSendMessageEventHandler(secsMessage.Header.TransactionId, secsMessage);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ /// <summary>
|
|
|
+ /// 解析消息
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="secsMessage"></param>
|
|
|
+ private void AnalyseMessageEvent(SecsMessage secsMessage)
|
|
|
+ {
|
|
|
+ HsmsLogMamager.Instance.WriteHostInfo(SmlSerializationUtil.SerializeItemToString(secsMessage));
|
|
|
+ try
|
|
|
+ {
|
|
|
+ SecsMessageValidator secsMessageValidator = ValidatorFactory.CreateValidator(secsMessage);
|
|
|
+ secsMessageValidator.DeviceId = GlobalData.DeviceId;
|
|
|
+ //判定消息格式
|
|
|
+ ValidateType validateType = secsMessageValidator.Validate(secsMessage);
|
|
|
+ if (validateType == ValidateType.OK)
|
|
|
+ {
|
|
|
+ if (OnAnalyseMessageEventHandler != null)//回调分析事件
|
|
|
+ {
|
|
|
+ OnAnalyseMessageEventHandler(secsMessage.Header.TransactionId, secsMessage);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else//错误格式
|
|
|
+ {
|
|
|
+ SecsMessage errorMessage = SecsReplyMessageFactory.CreateErrorMessageReply(validateType, secsMessage.Header);
|
|
|
+ SendMessageEvent(errorMessage);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch(Exception ex)
|
|
|
+ {
|
|
|
+ HsmsLogMamager.Instance.Error("AnalyseMessage error", ex);
|
|
|
+ Dispose();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ /// <summary>
|
|
|
+ /// 释放资源
|
|
|
+ /// </summary>
|
|
|
+ private void Dispose()
|
|
|
+ {
|
|
|
+ if(OnDisConnection!=null)
|
|
|
+ {
|
|
|
+ OnDisConnection(this,"");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|