TcppClient.cs 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. using Azylee.Core.DataUtils.CollectionUtils;
  2. using Azylee.Core.ThreadUtils.SleepUtils;
  3. using Azylee.Core.WindowsUtils.ConsoleUtils;
  4. using Azylee.Jsons;
  5. using System;
  6. using System.Collections.Concurrent;
  7. using System.Collections.Generic;
  8. using System.Linq;
  9. using System.Net.Sockets;
  10. using System.Text;
  11. using System.Threading.Tasks;
  12. namespace Azylee.YeahWeb.SocketUtils.TcpUtils
  13. {
  14. public class TcppClient
  15. {
  16. const int ReceiveBufferSize = 1024;
  17. private string _IP = "";
  18. private int _Port = 52801;
  19. private TcpClient Client = null;
  20. private NetworkStream networkStream = null;
  21. Action OnConnectAction = null;
  22. Action OnDisconnectAction = null;
  23. Action<TcpDataModel> OnReceiveAction = null;
  24. ConcurrentQueue<Tuple<int, Action<TcpDataModel>>> SyncFunction = new ConcurrentQueue<Tuple<int, Action<TcpDataModel>>>();
  25. /// <summary>
  26. /// 构造函数
  27. /// </summary>
  28. /// <param name="ip"></param>
  29. /// <param name="port"></param>
  30. /// <param name="onConnect">连接动作</param>
  31. /// <param name="onDisconnect">断开动作</param>
  32. /// <param name="onReceive">接收消息</param>
  33. public TcppClient(string ip, int port, Action onConnect, Action onDisconnect, Action<TcpDataModel> onReceive)
  34. {
  35. this._IP = ip;
  36. this._Port = port;
  37. OnConnectAction = onConnect;
  38. OnDisconnectAction = onDisconnect;
  39. OnReceiveAction = onReceive;
  40. }
  41. #region 连接和关闭连接
  42. /// <summary>
  43. /// 连接到TcpServer(重连)
  44. /// </summary>
  45. public bool Connect()
  46. {
  47. try
  48. {
  49. if (this.Client == null || !this.Client.Connected)
  50. {
  51. this.Client = new TcpClient();
  52. //this.Read();
  53. IAsyncResult ar = this.Client.BeginConnect(this._IP, this._Port, acceptCallback, this.Client);
  54. bool isConnect = ar.AsyncWaitHandle.WaitOne(1000);
  55. if (isConnect) return true;
  56. }
  57. }
  58. catch { }
  59. return false;
  60. }
  61. /// <summary>
  62. /// 关闭连接
  63. /// </summary>
  64. public void Disconnect()
  65. {
  66. networkStream?.Close();
  67. Client?.Close();
  68. }
  69. #endregion
  70. #region 连接后的读写操作
  71. /// <summary>
  72. /// 发送数据(action禁止使用阻塞操作,必须新建task线程操作)
  73. /// </summary>
  74. /// <param name="model">数据模型</param>
  75. /// <param name="actionType">事件驱动处理类型</param>
  76. /// <param name="action">事件驱动处理方法</param>
  77. public bool Write(TcpDataModel model, int? actionType = 0, Action<TcpDataModel> action = null)
  78. {
  79. bool flag = false;
  80. if (Client != null && Client.Connected)
  81. {
  82. flag = TcpStreamHelper.Write(Client, model);
  83. }
  84. if (flag)
  85. {
  86. if (actionType != null && action != null)
  87. {
  88. int type = actionType.GetValueOrDefault();
  89. SyncFunction.Enqueue(new Tuple<int, Action<TcpDataModel>>(type, action));
  90. }
  91. }
  92. return flag;
  93. }
  94. /// <summary>
  95. /// 接受数据
  96. /// </summary>
  97. private void acceptCallback(IAsyncResult state)
  98. {
  99. try
  100. {
  101. this.Client = (TcpClient)state.AsyncState;
  102. this.Client.EndConnect(state);
  103. // c# 系统检测到在一个调用中尝试使用指针参数时的无效指针地址 怎么解决
  104. // 用管理身份运行cmd,执行 netsh winsock reset 重启问题解决
  105. //string host = this.Client.Client.RemoteEndPoint.ToString();
  106. ConnectTask(Client);
  107. }
  108. catch (Exception ex) { }
  109. }
  110. private void ConnectTask(TcpClient client)
  111. {
  112. Task.Factory.StartNew(() =>
  113. {
  114. OnConnectAction?.Invoke();//委托:已连接
  115. while (client.Connected)
  116. {
  117. try
  118. {
  119. TcpDataModel model = TcpStreamHelper.Read(client);
  120. if (model != null)
  121. {
  122. if (model.Type == int.MaxValue)
  123. {
  124. //返回心跳
  125. Write(new TcpDataModel(int.MaxValue));
  126. }
  127. else
  128. {
  129. //优先调用默认接收消息方法Action
  130. OnReceiveAction?.Invoke(model);
  131. //调用同步处理委托方法
  132. if (Ls.Ok(SyncFunction))
  133. {
  134. for (var i = 0; i < SyncFunction.Count; i++)
  135. {
  136. bool flag = SyncFunction.TryDequeue(out Tuple<int, Action<TcpDataModel>> fun);
  137. if (flag)
  138. {
  139. Task.Factory.StartNew(() => { fun.Item2?.Invoke(model); });
  140. }
  141. }
  142. }
  143. }
  144. }
  145. }
  146. catch { }
  147. //Sleep.S(1);
  148. }
  149. client.Close();
  150. OnDisconnectAction?.Invoke();//委托:断开连接
  151. });
  152. //lstn.BeginAcceptTcpClient(new AsyncCallback(acceptCallback), lstn);
  153. }
  154. #endregion
  155. }
  156. }