using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Collections.Concurrent; using System.Threading; using System.Runtime.InteropServices; namespace Paho.MqttDotnet { /// /// 断开丢失委托 /// /// 触发者 public delegate void ConnectionLostdHandler(object sender); /// /// 收到消息委托 /// /// 触发者 /// 主题 /// 消息 public delegate void MessageArrivedHandler(object sender, string topic, MqttMessage message); /// /// 表示mqtt客户端 /// unsafe public class MqttClient : SafeHandle { /// /// 用于保存最近的连接配置项 /// private ConnectOption connectOpt; /// /// 用于生成任务的id /// private int taskIdValue = 0; /// /// 用于保存对象的引用 /// private readonly HashSet hashSet = new HashSet(); /// /// 用于保存当前正在执行的任务 /// private readonly TaskSetterTable taskSetterTable = new TaskSetterTable(); /// /// 连接断开事件 /// public event ConnectionLostdHandler OnConnectionLost; /// /// 收到消息事件 /// public event MessageArrivedHandler OnMessageArrived; /// /// 获取是否已连接 /// public bool IsConnected { get { return MQTTAsync.MQTTAsync_isConnected(this.handle) > 0; } } /// /// 获取句柄是否无效 /// public sealed override bool IsInvalid { get { return this.handle == IntPtr.Zero; } } /// /// mqtt客户端 /// /// mqtt://mymqtt.com /// 客户端id /// /// /// public MqttClient(string serverUri, string clientId) : this(serverUri, clientId, MqttPersistence.None) { } /// /// mqtt客户端 /// /// mqtt://mymqtt.com /// 客户端id /// 持久化方式 /// /// /// /// public MqttClient(string serverUri, string clientId, MqttPersistence persistence) : base(IntPtr.Zero, true) { if (string.IsNullOrEmpty(serverUri)) { throw new ArgumentNullException("serverUri"); } if (string.IsNullOrEmpty(clientId)) { throw new ArgumentNullException("clientId"); } if (persistence == MqttPersistence.User) { var message = string.Format("不支持的持久化方式:{0}.{1}", typeof(MqttPersistence).Name, persistence); throw new NotSupportedException(message); } var uri = new Uri(serverUri); const int mqttDefaultPort = 1883; var tcpUri = string.Format("tcp://{0}:{1}", uri.Host, uri.Port > 0 ? uri.Port : mqttDefaultPort); var er = MQTTAsync.MQTTAsync_create(ref this.handle, tcpUri, clientId, persistence, IntPtr.Zero); this.EnsureSuccessCode(er); this.InitClientCallbacks(); } /// /// 设置客户端相关回调 /// private void InitClientCallbacks() { var lost = new MQTTAsync_connectionLost((context, cause) => { var e = this.OnConnectionLost; if (e != null) { e.Invoke(this); } }); var arrvied = new MQTTAsync_messageArrived((context, topicName, topicLen, msg) => { var e = this.OnMessageArrived; if (e != null) { var topic = Marshal.PtrToStringAnsi(topicName, topicLen); var message = MqttMessage.From(msg); e.Invoke(this, topic, message); } MQTTAsync.MQTTAsync_free(topicName); MQTTAsync.MQTTAsync_freeMessage(ref msg); return 1; }); var er = MQTTAsync.MQTTAsync_setCallbacks(this.handle, IntPtr.Zero, lost, arrvied, null); this.EnsureSuccessCode(er); this.AutoRef(lost, arrvied); } /// /// 确保mqtt正确 /// /// 客户端错误码 /// private void EnsureSuccessCode(MqttError er) { if (er != MqttError.Success) { throw new MqttException(er); } } /// /// 添加委托引用 /// /// /// private void AutoRef(params Delegate[] d) { foreach (var item in d) { this.hashSet.Add(item); } } /// /// 创建并维护IMqttOptions引用 /// /// /// private TMqttOptions AutoRef() where TMqttOptions : IMqttOptions, new() { var opt = new TMqttOptions(); opt.OnCompleted((taskId, value) => { this.hashSet.Remove(opt); this.taskSetterTable.Remove(taskId).SetResult(value); }); opt.OnException((taskId, ex) => { this.hashSet.Remove(opt); this.taskSetterTable.Remove(taskId).SetException(ex); }); this.hashSet.Add(opt); return opt; } /// /// 连接到服务器 /// /// 选项 /// /// /// public ConnectError Connect(ConnectOption option) { return this.ConnectInternal(option).GetResult(); } /// /// 连接到服务器 /// /// 选项 /// /// /// public Task ConnectAsync(ConnectOption option) { return this.ConnectInternal(option).GetTask(); } /// /// 连接到服务器 /// /// 选项 /// /// /// private ITaskSetter ConnectInternal(ConnectOption option) { if (option == null) { throw new ArgumentNullException(); } this.connectOpt = option; var taskId = this.GenerateTaskId(); var setter = this.taskSetterTable.Create(taskId); var mqttOptions = this.AutoRef(); var opt = mqttOptions.ToStruct(taskId, option.ToStruct()); var er = MQTTAsync.MQTTAsync_connect(this.handle, ref opt); this.EnsureSuccessCode(er); return setter; } /// /// 生成任务id /// /// private int GenerateTaskId() { return Interlocked.Increment(ref this.taskIdValue); } /// /// 重连到服务端 /// /// /// /// public ConnectError ReConnect() { if (this.connectOpt == null) { throw new NotSupportedException(); } return this.Connect(this.connectOpt); } /// /// 重连到服务端 /// /// /// /// public Task ReConnectAsync() { if (this.connectOpt == null) { throw new NotSupportedException(); } return this.ConnectAsync(this.connectOpt); } /// /// 断开连接 /// /// /// public bool Disconnect() { return this.DisconnectInternal().GetResult(); } /// /// 断开连接 /// /// /// public Task DisconnectAsync() { return this.DisconnectInternal().GetTask(); } /// /// 断开连接 /// /// /// private ITaskSetter DisconnectInternal() { var taskId = this.GenerateTaskId(); var setter = this.taskSetterTable.Create(taskId); var mqttOptions = this.AutoRef(); var opt = mqttOptions.ToStruct(taskId); var er = MQTTAsync.MQTTAsync_disconnect(this.handle, ref opt); this.EnsureSuccessCode(er); return setter; } /// /// 订阅 /// /// 主题 /// 质量 /// /// /// public bool Subscribe(string topic, MqttQoS qos) { return this.SubscribeInternal(topic, qos).GetResult(); } /// /// 订阅 /// /// 主题 /// 质量 /// /// /// public Task SubscribeAsync(string topic, MqttQoS qos) { return this.SubscribeInternal(topic, qos).GetTask(); } /// /// 订阅 /// /// 主题 /// 质量 /// /// /// private ITaskSetter SubscribeInternal(string topic, MqttQoS qos) { if (string.IsNullOrEmpty(topic)) { throw new ArgumentNullException("topic"); } var taskId = this.GenerateTaskId(); var setter = this.taskSetterTable.Create(taskId); var opt = this.InitResponseOptions(taskId); var er = MQTTAsync.MQTTAsync_subscribe(this.handle, topic, qos, ref opt); this.EnsureSuccessCode(er); return setter; } /// /// 订阅多个主题 /// /// 主题与消息 /// /// /// /// public bool SubscribeMany(params MqttTopicQoS[] topicQos) { return this.SubscribeManyInternal(topicQos).GetResult(); } /// /// 订阅多个主题 /// /// 主题与消息 /// /// /// /// public Task SubscribeManyAsync(params MqttTopicQoS[] topicQos) { return this.SubscribeManyInternal(topicQos).GetTask(); } /// /// 订阅多个主题 /// /// 主题与消息 /// 任务id /// /// /// /// private ITaskSetter SubscribeManyInternal(MqttTopicQoS[] topicQos) { if (topicQos == null) { throw new ArgumentNullException(); } if (topicQos.Length == 0) { throw new ArgumentOutOfRangeException(); } var taskId = this.GenerateTaskId(); var setter = this.taskSetterTable.Create(taskId); var opt = this.InitResponseOptions(taskId); var qoss = topicQos.Select(item => (int)item.QoS).ToArray(); var topics = topicQos.Select(item => item.Topic.ToUnmanagedPointer()).ToArray(); fixed (IntPtr* ptrtopic = &topics[0]) { fixed (int* ptrqos = &qoss[0]) { var er = MQTTAsync.MQTTAsync_subscribeMany(this.handle, qoss.Length, ptrtopic, ptrqos, ref opt); this.EnsureSuccessCode(er); } } return setter; } /// /// 取消订阅 /// /// 主题 /// /// /// public bool Unsubscribe(string topic) { return this.UnsubscribeInternal(topic).GetResult(); } /// /// 取消订阅 /// /// 主题 /// /// /// public Task UnsubscribeAsync(string topic) { return this.UnsubscribeInternal(topic).GetTask(); } /// /// 取消订阅 /// /// 主题 /// /// /// private ITaskSetter UnsubscribeInternal(string topic) { if (topic == null) { throw new ArgumentNullException(); } var taskId = this.GenerateTaskId(); var setter = this.taskSetterTable.Create(taskId); var opt = this.InitResponseOptions(taskId); var er = MQTTAsync.MQTTAsync_unsubscribe(this.handle, topic, ref opt); this.EnsureSuccessCode(er); return setter; } /// /// 取消订阅多个主题 /// /// 主题 /// /// /// /// public bool UnsubscribeMany(params string[] topic) { return this.UnsubscribeManyInternal(topic).GetResult(); } /// /// 取消订阅多个主题 /// /// 主题 /// /// /// /// public Task UnsubscribeManyAsync(params string[] topic) { return this.UnsubscribeManyInternal(topic).GetTask(); } /// /// 取消订阅多个主题 /// /// 主题 /// /// /// /// private ITaskSetter UnsubscribeManyInternal(string[] topic) { if (topic == null) { throw new ArgumentNullException(); } if (topic.Length == 0) { throw new ArgumentOutOfRangeException(); } var taskId = this.GenerateTaskId(); var setter = this.taskSetterTable.Create(taskId); var topicArray = topic.Select(item => item.ToUnmanagedPointer()).ToArray(); fixed (IntPtr* ptrtopic = &topicArray[0]) { var opt = this.InitResponseOptions(taskId); var er = MQTTAsync.MQTTAsync_unsubscribeMany(this.handle, topic.Length, ptrtopic, ref opt); this.EnsureSuccessCode(er); } return setter; } /// /// 发送消息 /// 不跟踪服务端的消息处理结果 /// /// 主题 /// 消息 /// /// public MqttError SendMessageAsync(string topic, MqttMessage message) { if (string.IsNullOrEmpty(topic)) { throw new ArgumentNullException("topic"); } if (message == null) { throw new ArgumentNullException("message"); } var msg = message.ToStruct(); var opt = new MQTTAsync_responseOptions(); opt.Init(); var er = MQTTAsync.MQTTAsync_sendMessage(this.handle, topic, ref msg, ref opt); msg.Dispose(); return er; } /// /// 发送消息 /// 并跟踪服务端对消息回复结果 /// /// 主题 /// 消息 /// /// /// public Task SendMessageTaskAsync(string topic, MqttMessage message) { if (string.IsNullOrEmpty(topic)) { throw new ArgumentNullException("topic"); } if (message == null) { throw new ArgumentNullException("message"); } var taskId = this.GenerateTaskId(); var setter = this.taskSetterTable.Create(taskId); var msg = message.ToStruct(); var opt = this.InitResponseOptions(taskId); var er = MQTTAsync.MQTTAsync_sendMessage(this.handle, topic, ref msg, ref opt); msg.Dispose(); this.EnsureSuccessCode(er); return setter.GetTask(); } /// /// 初始化回复选项 /// /// 任务id /// private MQTTAsync_responseOptions InitResponseOptions(int taskId) { var mqttOptions = this.AutoRef(); var opt = mqttOptions.ToStruct(taskId); return opt; } /// /// 释放句柄 /// protected sealed override bool ReleaseHandle() { if (this.IsInvalid == true) { return false; } MQTTAsync.MQTTAsync_destroy(ref this.handle); return this.IsInvalid; } /// /// 追踪回调 /// private static MQTTAsync_traceCallback trackCallback; /// /// 设置追踪级别 /// /// 级别 public static void SetTraceLevel(MqttTraceLevels level) { MQTTAsync.MQTTAsync_setTraceLevel(level); } /// /// 设置追踪回调 /// /// 追踪回调,null则清除追踪 public static void SetTraceCallback(Action traceCallback) { if (traceCallback == null) { MQTTAsync.MQTTAsync_setTraceCallback(null); } else { MqttClient.trackCallback = new MQTTAsync_traceCallback((level, msg) => traceCallback(level, msg)); MQTTAsync.MQTTAsync_setTraceCallback(MqttClient.trackCallback); } } } }