using System; using System.Collections; using System.IO; using System.Linq; using System.Net; using System.Net.Security; using System.Net.Sockets; using System.Security.Cryptography.X509Certificates; using System.Threading; using NLog; using uPLibrary.Networking.M2Mqtt.Exceptions; using uPLibrary.Networking.M2Mqtt.Internal; using uPLibrary.Networking.M2Mqtt.Messages; using uPLibrary.Networking.M2Mqtt.Session; using uPLibrary.Networking.M2Mqtt.Utility; namespace uPLibrary.Networking.M2Mqtt { /// /// MQTT Client /// public class MqttClient { private static Logger logger = NLog.LogManager.GetCurrentClassLogger(); #region Constants ... // thread names private const string RECEIVE_THREAD_NAME = "ReceiveThread"; private const string RECEIVE_EVENT_THREAD_NAME = "DispatchEventThread"; private const string PROCESS_INFLIGHT_THREAD_NAME = "ProcessInflightThread"; private const string KEEP_ALIVE_THREAD = "KeepAliveThread"; #endregion /// /// Delagate that defines event handler for PUBLISH message received /// public delegate void MqttMsgPublishEventHandler(object sender, MqttMsgPublishEventArgs e); /// /// Delegate that defines event handler for published message /// public delegate void MqttMsgPublishedEventHandler(object sender, MqttMsgPublishedEventArgs e); /// /// Delagate that defines event handler for subscribed topic /// public delegate void MqttMsgSubscribedEventHandler(object sender, MqttMsgSubscribedEventArgs e); /// /// Delagate that defines event handler for unsubscribed topic /// public delegate void MqttMsgUnsubscribedEventHandler(object sender, MqttMsgUnsubscribedEventArgs e); /// /// Delagate that defines event handler for SUBSCRIBE message received /// public delegate void MqttMsgSubscribeEventHandler(object sender, MqttMsgSubscribeEventArgs e); /// /// Delagate that defines event handler for UNSUBSCRIBE message received /// public delegate void MqttMsgUnsubscribeEventHandler(object sender, MqttMsgUnsubscribeEventArgs e); /// /// Delagate that defines event handler for CONNECT message received /// public delegate void MqttMsgConnectEventHandler(object sender, MqttMsgConnectEventArgs e); /// /// Delegate that defines event handler for client disconnection (DISCONNECT message or not) /// public delegate void MqttMsgDisconnectEventHandler(object sender, EventArgs e); /// /// Delegate that defines event handler for cliet/peer disconnection /// public delegate void ConnectionClosedEventHandler(object sender, EventArgs e); // broker hostname (or ip address) and port private string brokerHostName; private int brokerPort; // running status of threads private bool isRunning; // event for raising received message event private AutoResetEvent receiveEventWaitHandle; private AutoResetEvent closeEventWaitHandle; // event for starting process inflight queue asynchronously private AutoResetEvent inflightWaitHandle; // event for signaling synchronous receive AutoResetEvent syncEndReceiving; // message received MqttMsgBase msgReceived; // exeption thrown during receiving Exception exReceiving; // Connection timeout for ssl authentication private int connectTimeout; // keep alive period (in ms) private int keepAlivePeriod; // events for signaling on keep alive thread private AutoResetEvent keepAliveEvent; private AutoResetEvent keepAliveEventEnd; // last communication time in ticks private int lastCommTime; // event for PUBLISH message received public event MqttMsgPublishEventHandler MqttMsgPublishReceived; // event for published message public event MqttMsgPublishedEventHandler MqttMsgPublished; // event for subscribed topic public event MqttMsgSubscribedEventHandler MqttMsgSubscribed; // event for unsubscribed topic public event MqttMsgUnsubscribedEventHandler MqttMsgUnsubscribed; // event for SUBSCRIBE message received public event MqttMsgSubscribeEventHandler MqttMsgSubscribeReceived; // event for USUBSCRIBE message received public event MqttMsgUnsubscribeEventHandler MqttMsgUnsubscribeReceived; // event for CONNECT message received public event MqttMsgConnectEventHandler MqttMsgConnected; // event for DISCONNECT message received public event MqttMsgDisconnectEventHandler MqttMsgDisconnected; // event for peer/client disconnection public event ConnectionClosedEventHandler ConnectionClosed; // channel to communicate over the network private IMqttNetworkChannel channel; // inflight messages queue private Queue inflightQueue; // internal queue for received messages about inflight messages private Queue internalQueue; // internal queue for dispatching events private Queue eventQueue; // session private MqttClientSession session; // reference to avoid access to singleton via property private MqttSettings settings; // current message identifier generated private ushort messageIdCounter = 0; // connection is closing due to peer private bool isConnectionClosing; /// /// Connection status between client and broker /// public bool IsConnected { get; private set; } /// /// Client identifier /// public string ClientId { get; private set; } /// /// Clean session flag /// public bool CleanSession { get; private set; } /// /// Will flag /// public bool WillFlag { get; private set; } /// /// Will QOS level /// public byte WillQosLevel { get; private set; } /// /// Will topic /// public string WillTopic { get; private set; } /// /// Will message /// public string WillMessage { get; private set; } /// /// MQTT protocol version /// public MqttProtocolVersion ProtocolVersion { get; set; } /// /// MQTT Client Session /// public MqttClientSession Session { get { return this.session; } set { this.session = value; } } /// /// MQTT client settings /// public MqttSettings Settings { get { return this.settings; } } /// /// Constructor /// /// Broker IP address [Obsolete("Use this ctor MqttClient(string brokerHostName) insted")] public MqttClient(IPAddress brokerIpAddress) : this(brokerIpAddress, MqttSettings.MQTT_BROKER_DEFAULT_PORT, false, null, null, MqttSslProtocols.None) { } /// /// Constructor /// /// Broker IP address /// Broker port /// Using secure connection /// CA certificate for secure connection /// Client certificate /// SSL/TLS protocol version [Obsolete("Use this ctor MqttClient(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert) insted")] public MqttClient(IPAddress brokerIpAddress, int brokerPort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol) { this.Init(brokerIpAddress.ToString(), brokerPort, secure, caCert, clientCert, sslProtocol, 0, null, null); } /// /// Constructor /// /// Broker Host Name or IP Address public MqttClient(string brokerHostName) : this(brokerHostName, MqttSettings.MQTT_BROKER_DEFAULT_PORT, false, null, null, MqttSslProtocols.None, 0) { } /// /// Constructor /// /// Broker Host Name or IP Address /// Broker port /// Using secure connection /// SSL/TLS protocol version /// CA certificate for secure connection /// Client certificate public MqttClient(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol, int connectTimeout) { this.Init(brokerHostName, brokerPort, secure, caCert, clientCert, sslProtocol, connectTimeout, null, null); } /// /// Constructor /// /// Broker Host Name or IP Address /// Broker port /// Using secure connection /// CA certificate for secure connection /// Client certificate /// SSL/TLS protocol version /// A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party public MqttClient(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol, int connectTimeout, RemoteCertificateValidationCallback userCertificateValidationCallback) : this(brokerHostName, brokerPort, secure, caCert, clientCert, sslProtocol, connectTimeout, userCertificateValidationCallback, null) { } /// /// Constructor /// /// Broker Host Name or IP Address /// Broker port /// Using secure connection /// SSL/TLS protocol version /// A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party /// A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication public MqttClient(string brokerHostName, int brokerPort, bool secure, MqttSslProtocols sslProtocol, int connectTimeout, RemoteCertificateValidationCallback userCertificateValidationCallback, LocalCertificateSelectionCallback userCertificateSelectionCallback) : this(brokerHostName, brokerPort, secure, null, null, sslProtocol, connectTimeout, userCertificateValidationCallback, userCertificateSelectionCallback) { } /// /// Constructor /// /// Broker Host Name or IP Address /// Broker port /// Using secure connection /// CA certificate for secure connection /// Client certificate /// SSL/TLS protocol version /// A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party /// A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication public MqttClient(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol, int connectTimeout, RemoteCertificateValidationCallback userCertificateValidationCallback, LocalCertificateSelectionCallback userCertificateSelectionCallback) { this.Init(brokerHostName, brokerPort, secure, caCert, clientCert, sslProtocol, connectTimeout, userCertificateValidationCallback, userCertificateSelectionCallback); } /// /// Constructor /// /// Network channel for communication public MqttClient(IMqttNetworkChannel channel) { // set default MQTT protocol version (default is 3.1.1) this.ProtocolVersion = MqttProtocolVersion.Version_3_1_1; this.channel = channel; // reference to MQTT settings this.settings = MqttSettings.Instance; // client not connected yet (CONNACK not send from client), some default values this.IsConnected = false; this.ClientId = null; this.CleanSession = true; this.keepAliveEvent = new AutoResetEvent(false); // queue for handling inflight messages (publishing and acknowledge) this.inflightWaitHandle = new AutoResetEvent(false); this.inflightQueue = new Queue(); // queue for received message this.receiveEventWaitHandle = new AutoResetEvent(false); this.closeEventWaitHandle = new AutoResetEvent(false); this.eventQueue = new Queue(); this.internalQueue = new Queue(); // session this.session = null; } /// /// MqttClient initialization /// /// Broker Host Name or IP Address /// Broker port /// >Using secure connection /// CA certificate for secure connection /// Client certificate /// SSL/TLS protocol version /// A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party /// A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication private void Init(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol, int connectTimeout, RemoteCertificateValidationCallback userCertificateValidationCallback, LocalCertificateSelectionCallback userCertificateSelectionCallback) { // set default MQTT protocol version (default is 3.1.1) this.ProtocolVersion = MqttProtocolVersion.Version_3_1_1; this.brokerHostName = brokerHostName; this.brokerPort = brokerPort; // reference to MQTT settings this.settings = MqttSettings.Instance; // set settings port based on secure connection or not if (!secure) this.settings.Port = this.brokerPort; else this.settings.SslPort = this.brokerPort; this.connectTimeout = connectTimeout; this.syncEndReceiving = new AutoResetEvent(false); this.keepAliveEvent = new AutoResetEvent(false); // queue for handling inflight messages (publishing and acknowledge) this.inflightWaitHandle = new AutoResetEvent(false); this.inflightQueue = new Queue(); // queue for received message this.receiveEventWaitHandle = new AutoResetEvent(false); this.closeEventWaitHandle = new AutoResetEvent(false); this.eventQueue = new Queue(); this.internalQueue = new Queue(); // session this.session = null; // create network channel this.channel = new MqttNetworkChannel(this.brokerHostName, this.brokerPort, secure, caCert, clientCert, sslProtocol, connectTimeout, userCertificateValidationCallback, userCertificateSelectionCallback); } /// /// Connect to broker /// /// Client identifier /// Return code of CONNACK message from broker public byte Connect(string clientId) { return this.Connect(clientId, null, null, false, MqttMsgConnect.QOS_LEVEL_AT_MOST_ONCE, false, null, null, true, MqttMsgConnect.KEEP_ALIVE_PERIOD_DEFAULT); } /// /// Connect to broker /// /// Client identifier /// Username /// Password /// Return code of CONNACK message from broker public byte Connect(string clientId, string username, string password) { return this.Connect(clientId, username, password, false, MqttMsgConnect.QOS_LEVEL_AT_MOST_ONCE, false, null, null, true, MqttMsgConnect.KEEP_ALIVE_PERIOD_DEFAULT); } /// /// Connect to broker /// /// Client identifier /// Username /// Password /// Clean sessione flag /// Keep alive period /// Return code of CONNACK message from broker public byte Connect(string clientId, string username, string password, bool cleanSession, ushort keepAlivePeriod) { return this.Connect(clientId, username, password, false, MqttMsgConnect.QOS_LEVEL_AT_MOST_ONCE, false, null, null, cleanSession, keepAlivePeriod); } /// /// Connect to broker /// /// Client identifier /// Username /// Password /// Will retain flag /// Will QOS level /// Will flag /// Will topic /// Will message /// Clean sessione flag /// Keep alive period /// Return code of CONNACK message from broker public byte Connect(string clientId, string username, string password, bool willRetain, byte willQosLevel, bool willFlag, string willTopic, string willMessage, bool cleanSession, ushort keepAlivePeriod) { // create CONNECT message MqttMsgConnect connect = new MqttMsgConnect(clientId, username, password, willRetain, willQosLevel, willFlag, willTopic, willMessage, cleanSession, keepAlivePeriod, (byte)this.ProtocolVersion); try { // connect to the broker this.channel.Connect(); } catch (Exception ex) { throw new MqttConnectionException("Exception connecting to the broker", ex); } this.lastCommTime = 0; this.isRunning = true; this.isConnectionClosing = false; // start thread for receiving messages from broker Fx.StartThread(this.ReceiveThread); MqttMsgConnack connack = null; try { connack = (MqttMsgConnack)this.SendReceive(connect); } catch (MqttCommunicationException) { this.isRunning = false; throw; } // if connection accepted, start keep alive timer and if (connack.ReturnCode == MqttMsgConnack.CONN_ACCEPTED) { // set all client properties this.ClientId = clientId; this.CleanSession = cleanSession; this.WillFlag = willFlag; this.WillTopic = willTopic; this.WillMessage = willMessage; this.WillQosLevel = willQosLevel; this.keepAlivePeriod = keepAlivePeriod * 1000; // convert in ms // restore previous session this.RestoreSession(); // keep alive period equals zero means turning off keep alive mechanism if (this.keepAlivePeriod != 0) { // start thread for sending keep alive message to the broker Fx.StartThread(this.KeepAliveThread); } // start thread for raising received message event from broker Fx.StartThread(this.DispatchEventThread); // start thread for handling inflight messages queue to broker asynchronously (publish and acknowledge) Fx.StartThread(this.ProcessInflightThread); this.IsConnected = true; } return connack.ReturnCode; } /// /// Disconnect from broker /// public void Disconnect() { MqttMsgDisconnect disconnect = new MqttMsgDisconnect(); this.Send(disconnect); // close client this.OnConnectionClosing(); } /// /// Open client communication /// public void Open() { this.isRunning = true; // start thread for receiving messages from client Fx.StartThread(this.ReceiveThread); // start thread for raising received message event from client Fx.StartThread(this.DispatchEventThread); // start thread for handling inflight messages queue to client asynchronously (publish and acknowledge) Fx.StartThread(this.ProcessInflightThread); } /// /// Close client /// public void Close() { // stop receiving thread this.isRunning = false; // wait end receive event thread if (this.receiveEventWaitHandle != null) this.receiveEventWaitHandle.Set(); if (this.closeEventWaitHandle != null) this.closeEventWaitHandle.Set(); // wait end process inflight thread if (this.inflightWaitHandle != null) this.inflightWaitHandle.Set(); // unlock keep alive thread this.keepAliveEvent.Set(); // clear all queues this.inflightQueue.Clear(); this.internalQueue.Clear(); this.eventQueue.Clear(); // close network channel this.channel.Close(); this.IsConnected = false; } /// /// Execute ping to broker for keep alive /// /// PINGRESP message from broker private MqttMsgPingResp Ping() { MqttMsgPingReq pingreq = new MqttMsgPingReq(); try { // broker must send PINGRESP within timeout equal to keep alive period return (MqttMsgPingResp)this.SendReceive(pingreq, this.keepAlivePeriod); } catch (Exception e) { logger.Debug("Exception occurred: {0}" , e.ToString()); // client must close connection this.OnConnectionClosing(); this.closeEventWaitHandle.WaitOne(); return null; } } /// /// Send CONNACK message to the client (connection accepted or not) /// /// CONNECT message with all client information /// Return code for CONNACK message /// If not null, client id assigned by broker /// Session present on the broker public void Connack(MqttMsgConnect connect, byte returnCode, string clientId, bool sessionPresent) { this.lastCommTime = 0; // create CONNACK message and ... MqttMsgConnack connack = new MqttMsgConnack(); connack.ReturnCode = returnCode; // [v3.1.1] session present flag if (this.ProtocolVersion == MqttProtocolVersion.Version_3_1_1) connack.SessionPresent = sessionPresent; // ... send it to the client this.Send(connack); // connection accepted, start keep alive thread checking if (connack.ReturnCode == MqttMsgConnack.CONN_ACCEPTED) { // [v3.1.1] if client id isn't null, the CONNECT message has a cliend id with zero bytes length // and broker assigned a unique identifier to the client this.ClientId = (clientId == null) ? connect.ClientId : clientId; this.CleanSession = connect.CleanSession; this.WillFlag = connect.WillFlag; this.WillTopic = connect.WillTopic; this.WillMessage = connect.WillMessage; this.WillQosLevel = connect.WillQosLevel; this.keepAlivePeriod = connect.KeepAlivePeriod * 1000; // convert in ms // broker has a tolerance of 1.5 specified keep alive period this.keepAlivePeriod += (this.keepAlivePeriod / 2); // start thread for checking keep alive period timeout Fx.StartThread(this.KeepAliveThread); this.isConnectionClosing = false; this.IsConnected = true; } // connection refused, close TCP/IP channel else { this.Close(); } } /// /// Send SUBACK message to the client /// /// Message Id for the SUBSCRIBE message that is being acknowledged /// Granted QoS Levels public void Suback(ushort messageId, byte[] grantedQosLevels) { MqttMsgSuback suback = new MqttMsgSuback(); suback.MessageId = messageId; suback.GrantedQoSLevels = grantedQosLevels; this.Send(suback); } /// /// Send UNSUBACK message to the client /// /// Message Id for the UNSUBSCRIBE message that is being acknowledged public void Unsuback(ushort messageId) { MqttMsgUnsuback unsuback = new MqttMsgUnsuback(); unsuback.MessageId = messageId; this.Send(unsuback); } /// /// Subscribe for message topics /// /// List of topics to subscribe /// QOS levels related to topics /// Message Id related to SUBSCRIBE message public ushort Subscribe(string[] topics, byte[] qosLevels) { MqttMsgSubscribe subscribe = new MqttMsgSubscribe(topics, qosLevels); subscribe.MessageId = this.GetMessageId(); // enqueue subscribe request into the inflight queue this.EnqueueInflight(subscribe, MqttMsgFlow.ToPublish); return subscribe.MessageId; } /// /// Unsubscribe for message topics /// /// List of topics to unsubscribe /// Message Id in UNSUBACK message from broker public ushort Unsubscribe(string[] topics) { MqttMsgUnsubscribe unsubscribe = new MqttMsgUnsubscribe(topics); unsubscribe.MessageId = this.GetMessageId(); // enqueue unsubscribe request into the inflight queue this.EnqueueInflight(unsubscribe, MqttMsgFlow.ToPublish); return unsubscribe.MessageId; } /// /// Publish a message asynchronously (QoS Level 0 and not retained) /// /// Message topic /// Message data (payload) /// Message Id related to PUBLISH message public ushort Publish(string topic, byte[] message) { return this.Publish(topic, message, MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE, false); } /// /// Publish a message asynchronously /// /// Message topic /// Message data (payload) /// QoS Level /// Retain flag /// Message Id related to PUBLISH message public ushort Publish(string topic, byte[] message, byte qosLevel, bool retain) { MqttMsgPublish publish = new MqttMsgPublish(topic, message, false, qosLevel, retain); publish.MessageId = this.GetMessageId(); // enqueue message to publish into the inflight queue bool enqueue = this.EnqueueInflight(publish, MqttMsgFlow.ToPublish); // message enqueued if (enqueue) return publish.MessageId; // infligh queue full, message not enqueued else throw new MqttClientException(MqttClientErrorCode.InflightQueueFull); } /// /// Wrapper method for raising events /// /// Internal event private void OnInternalEvent(InternalEvent internalEvent) { lock (this.eventQueue) { this.eventQueue.Enqueue(internalEvent); } this.receiveEventWaitHandle.Set(); } /// /// Wrapper method for raising closing connection event /// private void OnConnectionClosing() { if (!this.isConnectionClosing) { this.isConnectionClosing = true; this.receiveEventWaitHandle.Set(); } } /// /// Wrapper method for raising PUBLISH message received event /// /// PUBLISH message received private void OnMqttMsgPublishReceived(MqttMsgPublish publish) { if (this.MqttMsgPublishReceived != null) { this.MqttMsgPublishReceived(this, new MqttMsgPublishEventArgs(publish.Topic, publish.Message, publish.DupFlag, publish.QosLevel, publish.Retain)); } } /// /// Wrapper method for raising published message event /// /// Message identifier for published message /// Publish flag private void OnMqttMsgPublished(ushort messageId, bool isPublished) { if (this.MqttMsgPublished != null) { this.MqttMsgPublished(this, new MqttMsgPublishedEventArgs(messageId, isPublished)); } } /// /// Wrapper method for raising subscribed topic event /// /// SUBACK message received private void OnMqttMsgSubscribed(MqttMsgSuback suback) { if (this.MqttMsgSubscribed != null) { this.MqttMsgSubscribed(this, new MqttMsgSubscribedEventArgs(suback.MessageId, suback.GrantedQoSLevels)); } } /// /// Wrapper method for raising unsubscribed topic event /// /// Message identifier for unsubscribed topic private void OnMqttMsgUnsubscribed(ushort messageId) { if (this.MqttMsgUnsubscribed != null) { this.MqttMsgUnsubscribed(this, new MqttMsgUnsubscribedEventArgs(messageId)); } } /// /// Wrapper method for raising SUBSCRIBE message event /// /// Message identifier for subscribe topics request /// Topics requested to subscribe /// List of QOS Levels requested private void OnMqttMsgSubscribeReceived(ushort messageId, string[] topics, byte[] qosLevels) { if (this.MqttMsgSubscribeReceived != null) { this.MqttMsgSubscribeReceived(this, new MqttMsgSubscribeEventArgs(messageId, topics, qosLevels)); } } /// /// Wrapper method for raising UNSUBSCRIBE message event /// /// Message identifier for unsubscribe topics request /// Topics requested to unsubscribe private void OnMqttMsgUnsubscribeReceived(ushort messageId, string[] topics) { if (this.MqttMsgUnsubscribeReceived != null) { this.MqttMsgUnsubscribeReceived(this, new MqttMsgUnsubscribeEventArgs(messageId, topics)); } } /// /// Wrapper method for raising CONNECT message event /// private void OnMqttMsgConnected(MqttMsgConnect connect) { if (this.MqttMsgConnected != null) { this.ProtocolVersion = (MqttProtocolVersion)connect.ProtocolVersion; this.MqttMsgConnected(this, new MqttMsgConnectEventArgs(connect)); } } /// /// Wrapper method for raising DISCONNECT message event /// private void OnMqttMsgDisconnected() { if (this.MqttMsgDisconnected != null) { this.MqttMsgDisconnected(this, EventArgs.Empty); } } /// /// Wrapper method for peer/client disconnection /// private void OnConnectionClosed() { if (this.ConnectionClosed != null) { this.ConnectionClosed(this, EventArgs.Empty); } } /// /// Send a message /// /// Message bytes private void Send(byte[] msgBytes) { try { // send message this.channel.Send(msgBytes); } catch (Exception e) { logger.Error("Exception occurred: {0}" , e.ToString()); throw new MqttCommunicationException(e); } } /// /// Send a message /// /// Message private void Send(MqttMsgBase msg) { logger.Debug("SEND {0}" , msg); this.Send(msg.GetBytes((byte)this.ProtocolVersion)); } /// /// Send a message to the broker and wait answer /// /// Message bytes /// MQTT message response private MqttMsgBase SendReceive(byte[] msgBytes) { return this.SendReceive(msgBytes, MqttSettings.MQTT_DEFAULT_TIMEOUT); } /// /// Send a message to the broker and wait answer /// /// Message bytes /// Timeout for receiving answer /// MQTT message response private MqttMsgBase SendReceive(byte[] msgBytes, int timeout) { // reset handle before sending this.syncEndReceiving.Reset(); try { // send message this.channel.Send(msgBytes); // update last message sent ticks this.lastCommTime = Environment.TickCount; } catch (Exception e) { if (typeof(SocketException) == e.GetType()) { // connection reset by broker if (((SocketException)e).SocketErrorCode == SocketError.ConnectionReset) this.IsConnected = false; } logger.Error("Exception occurred: {0}" , e.ToString()); throw new MqttCommunicationException(e); } // wait for answer from broker if (this.syncEndReceiving.WaitOne(timeout)) { // message received without exception if (this.exReceiving == null) return this.msgReceived; // receiving thread catched exception else throw this.exReceiving; } else { // throw timeout exception throw new MqttCommunicationException(); } } /// /// Send a message to the broker and wait answer /// /// Message /// MQTT message response private MqttMsgBase SendReceive(MqttMsgBase msg) { return this.SendReceive(msg, MqttSettings.MQTT_DEFAULT_TIMEOUT); } /// /// Send a message to the broker and wait answer /// /// Message /// Timeout for receiving answer /// MQTT message response private MqttMsgBase SendReceive(MqttMsgBase msg, int timeout) { logger.Debug("SEND {0}" , msg); return this.SendReceive(msg.GetBytes((byte)this.ProtocolVersion), timeout); } /// /// Enqueue a message into the inflight queue /// /// Message to enqueue /// Message flow (publish, acknowledge) /// Message enqueued or not private bool EnqueueInflight(MqttMsgBase msg, MqttMsgFlow flow) { // enqueue is needed (or not) bool enqueue = true; // if it is a PUBLISH message with QoS Level 2 if ((msg.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) && (msg.QosLevel == MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE)) { lock (this.inflightQueue) { // if it is a PUBLISH message already received (it is in the inflight queue), the publisher // re-sent it because it didn't received the PUBREC. In this case, we have to re-send PUBREC // NOTE : I need to find on message id and flow because the broker could be publish/received // to/from client and message id could be the same (one tracked by broker and the other by client) MqttMsgContextFinder msgCtxFinder = new MqttMsgContextFinder(msg.MessageId, MqttMsgFlow.ToAcknowledge); MqttMsgContext msgCtx = (MqttMsgContext)this.inflightQueue.Get(msgCtxFinder.Find); // the PUBLISH message is alredy in the inflight queue, we don't need to re-enqueue but we need // to change state to re-send PUBREC if (msgCtx != null) { msgCtx.State = MqttMsgState.QueuedQos2; msgCtx.Flow = MqttMsgFlow.ToAcknowledge; enqueue = false; } } } if (enqueue) { // set a default state MqttMsgState state = MqttMsgState.QueuedQos0; // based on QoS level, the messages flow between broker and client changes switch (msg.QosLevel) { // QoS Level 0 case MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE: state = MqttMsgState.QueuedQos0; break; // QoS Level 1 case MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE: state = MqttMsgState.QueuedQos1; break; // QoS Level 2 case MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE: state = MqttMsgState.QueuedQos2; break; } // [v3.1.1] SUBSCRIBE and UNSUBSCRIBE aren't "officially" QOS = 1 // so QueuedQos1 state isn't valid for them if (msg.Type == MqttMsgBase.MQTT_MSG_SUBSCRIBE_TYPE) state = MqttMsgState.SendSubscribe; else if (msg.Type == MqttMsgBase.MQTT_MSG_UNSUBSCRIBE_TYPE) state = MqttMsgState.SendUnsubscribe; // queue message context MqttMsgContext msgContext = new MqttMsgContext() { Message = msg, State = state, Flow = flow, Attempt = 0 }; lock (this.inflightQueue) { // check number of messages inside inflight queue enqueue = (this.inflightQueue.Count < this.settings.InflightQueueSize); if (enqueue) { // enqueue message and unlock send thread this.inflightQueue.Enqueue(msgContext); logger.Debug("enqueued {0}" , msg); // PUBLISH message if (msg.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) { // to publish and QoS level 1 or 2 if ((msgContext.Flow == MqttMsgFlow.ToPublish) && ((msg.QosLevel == MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE) || (msg.QosLevel == MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE))) { if (this.session != null) this.session.InflightMessages.Add(msgContext.Key, msgContext); } // to acknowledge and QoS level 2 else if ((msgContext.Flow == MqttMsgFlow.ToAcknowledge) && (msg.QosLevel == MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE)) { if (this.session != null) this.session.InflightMessages.Add(msgContext.Key, msgContext); } } } } } this.inflightWaitHandle.Set(); return enqueue; } /// /// Enqueue a message into the internal queue /// /// Message to enqueue private void EnqueueInternal(MqttMsgBase msg) { // enqueue is needed (or not) bool enqueue = true; // if it is a PUBREL message (for QoS Level 2) if (msg.Type == MqttMsgBase.MQTT_MSG_PUBREL_TYPE) { lock (this.inflightQueue) { // if it is a PUBREL but the corresponding PUBLISH isn't in the inflight queue, // it means that we processed PUBLISH message and received PUBREL and we sent PUBCOMP // but publisher didn't receive PUBCOMP so it re-sent PUBREL. We need only to re-send PUBCOMP. // NOTE : I need to find on message id and flow because the broker could be publish/received // to/from client and message id could be the same (one tracked by broker and the other by client) MqttMsgContextFinder msgCtxFinder = new MqttMsgContextFinder(msg.MessageId, MqttMsgFlow.ToAcknowledge); MqttMsgContext msgCtx = (MqttMsgContext)this.inflightQueue.Get(msgCtxFinder.Find); // the PUBLISH message isn't in the inflight queue, it was already processed so // we need to re-send PUBCOMP only if (msgCtx == null) { MqttMsgPubcomp pubcomp = new MqttMsgPubcomp(); pubcomp.MessageId = msg.MessageId; this.Send(pubcomp); enqueue = false; } } } // if it is a PUBCOMP message (for QoS Level 2) else if (msg.Type == MqttMsgBase.MQTT_MSG_PUBCOMP_TYPE) { lock (this.inflightQueue) { // if it is a PUBCOMP but the corresponding PUBLISH isn't in the inflight queue, // it means that we sent PUBLISH message, sent PUBREL (after receiving PUBREC) and already received PUBCOMP // but publisher didn't receive PUBREL so it re-sent PUBCOMP. We need only to ignore this PUBCOMP. // NOTE : I need to find on message id and flow because the broker could be publish/received // to/from client and message id could be the same (one tracked by broker and the other by client) MqttMsgContextFinder msgCtxFinder = new MqttMsgContextFinder(msg.MessageId, MqttMsgFlow.ToPublish); MqttMsgContext msgCtx = (MqttMsgContext)this.inflightQueue.Get(msgCtxFinder.Find); // the PUBLISH message isn't in the inflight queue, it was already sent so we need to ignore this PUBCOMP if (msgCtx == null) { enqueue = false; } } } // if it is a PUBREC message (for QoS Level 2) else if (msg.Type == MqttMsgBase.MQTT_MSG_PUBREC_TYPE) { lock (this.inflightQueue) { // if it is a PUBREC but the corresponding PUBLISH isn't in the inflight queue, // it means that we sent PUBLISH message more times (retries) but broker didn't send PUBREC in time // the publish is failed and we need only to ignore this PUBREC. // NOTE : I need to find on message id and flow because the broker could be publish/received // to/from client and message id could be the same (one tracked by broker and the other by client) MqttMsgContextFinder msgCtxFinder = new MqttMsgContextFinder(msg.MessageId, MqttMsgFlow.ToPublish); MqttMsgContext msgCtx = (MqttMsgContext)this.inflightQueue.Get(msgCtxFinder.Find); // the PUBLISH message isn't in the inflight queue, it was already sent so we need to ignore this PUBREC if (msgCtx == null) { enqueue = false; } } } if (enqueue) { lock (this.internalQueue) { this.internalQueue.Enqueue(msg); logger.Debug("enqueued {0}" , msg); this.inflightWaitHandle.Set(); } } } /// /// Thread for receiving messages /// private void ReceiveThread() { int readBytes = 0; byte[] fixedHeaderFirstByte = new byte[1]; byte msgType; while (this.isRunning) { try { // read first byte (fixed header) readBytes = this.channel.Receive(fixedHeaderFirstByte); if (readBytes > 0) { // update last message received ticks this.lastCommTime = Environment.TickCount; // extract message type from received byte msgType = (byte)((fixedHeaderFirstByte[0] & MqttMsgBase.MSG_TYPE_MASK) >> MqttMsgBase.MSG_TYPE_OFFSET); switch (msgType) { // CONNECT message received case MqttMsgBase.MQTT_MSG_CONNECT_TYPE: MqttMsgConnect connect = MqttMsgConnect.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel); logger.Debug("RECV {0}" , connect); // raise message received event this.OnInternalEvent(new MsgInternalEvent(connect)); break; // CONNACK message received case MqttMsgBase.MQTT_MSG_CONNACK_TYPE: throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage); // PINGREQ message received case MqttMsgBase.MQTT_MSG_PINGREQ_TYPE: this.msgReceived = MqttMsgPingReq.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel); logger.Debug("RECV {0}" , this.msgReceived); MqttMsgPingResp pingresp = new MqttMsgPingResp(); this.Send(pingresp); break; // PINGRESP message received case MqttMsgBase.MQTT_MSG_PINGRESP_TYPE: throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage); // SUBSCRIBE message received case MqttMsgBase.MQTT_MSG_SUBSCRIBE_TYPE: MqttMsgSubscribe subscribe = MqttMsgSubscribe.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel); logger.Debug("RECV {0}" , subscribe); // raise message received event this.OnInternalEvent(new MsgInternalEvent(subscribe)); break; // SUBACK message received case MqttMsgBase.MQTT_MSG_SUBACK_TYPE: throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage); // PUBLISH message received case MqttMsgBase.MQTT_MSG_PUBLISH_TYPE: MqttMsgPublish publish = MqttMsgPublish.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel); logger.Debug("RECV {0}" , publish); // enqueue PUBLISH message to acknowledge into the inflight queue this.EnqueueInflight(publish, MqttMsgFlow.ToAcknowledge); break; // PUBACK message received case MqttMsgBase.MQTT_MSG_PUBACK_TYPE: // enqueue PUBACK message received (for QoS Level 1) into the internal queue MqttMsgPuback puback = MqttMsgPuback.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel); logger.Debug("RECV {0}" , puback); // enqueue PUBACK message into the internal queue this.EnqueueInternal(puback); break; // PUBREC message received case MqttMsgBase.MQTT_MSG_PUBREC_TYPE: // enqueue PUBREC message received (for QoS Level 2) into the internal queue MqttMsgPubrec pubrec = MqttMsgPubrec.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel); logger.Debug("RECV {0}" , pubrec); // enqueue PUBREC message into the internal queue this.EnqueueInternal(pubrec); break; // PUBREL message received case MqttMsgBase.MQTT_MSG_PUBREL_TYPE: // enqueue PUBREL message received (for QoS Level 2) into the internal queue MqttMsgPubrel pubrel = MqttMsgPubrel.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel); logger.Debug("RECV {0}" , pubrel); // enqueue PUBREL message into the internal queue this.EnqueueInternal(pubrel); break; // PUBCOMP message received case MqttMsgBase.MQTT_MSG_PUBCOMP_TYPE: // enqueue PUBCOMP message received (for QoS Level 2) into the internal queue MqttMsgPubcomp pubcomp = MqttMsgPubcomp.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel); logger.Debug("RECV {0}" , pubcomp); // enqueue PUBCOMP message into the internal queue this.EnqueueInternal(pubcomp); break; // UNSUBSCRIBE message received case MqttMsgBase.MQTT_MSG_UNSUBSCRIBE_TYPE: MqttMsgUnsubscribe unsubscribe = MqttMsgUnsubscribe.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel); logger.Debug("RECV {0}" , unsubscribe); // raise message received event this.OnInternalEvent(new MsgInternalEvent(unsubscribe)); break; // UNSUBACK message received case MqttMsgBase.MQTT_MSG_UNSUBACK_TYPE: throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage); // DISCONNECT message received case MqttMsgDisconnect.MQTT_MSG_DISCONNECT_TYPE: MqttMsgDisconnect disconnect = MqttMsgDisconnect.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel); logger.Debug("RECV {0}" , disconnect); // raise message received event this.OnInternalEvent(new MsgInternalEvent(disconnect)); break; default: throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage); } this.exReceiving = null; } // zero bytes read, peer gracefully closed socket else { // wake up thread that will notify connection is closing this.OnConnectionClosing(); } } catch (Exception e) { #if TRACE //MqttUtility.Trace.WriteLine(TraceLevel.Error, "Exception occurred: {0}", e.ToString()); #endif this.exReceiving = new MqttCommunicationException(e); bool close = false; if (e.GetType() == typeof(MqttClientException)) { // [v3.1.1] scenarios the receiver MUST close the network connection MqttClientException ex = e as MqttClientException; close = ((ex.ErrorCode == MqttClientErrorCode.InvalidFlagBits) || (ex.ErrorCode == MqttClientErrorCode.InvalidProtocolName) || (ex.ErrorCode == MqttClientErrorCode.InvalidConnectFlags)); } else if ((e.GetType() == typeof(IOException)) || (e.GetType() == typeof(SocketException)) || ((e.InnerException != null) && (e.InnerException.GetType() == typeof(SocketException)))) // added for SSL/TLS incoming connection that use SslStream that wraps SocketException { close = true; } if (close) { // wake up thread that will notify connection is closing this.OnConnectionClosing(); } } } } /// /// Thread for handling keep alive message /// private void KeepAliveThread() { int delta = 0; int wait = this.keepAlivePeriod; // create event to signal that current thread is end this.keepAliveEventEnd = new AutoResetEvent(false); while (this.isRunning) { // waiting... this.keepAliveEvent.WaitOne(wait); if (this.isRunning) { delta = Environment.TickCount - this.lastCommTime; // if timeout exceeded ... if (delta >= this.keepAlivePeriod) { // client must close connection this.OnConnectionClosing(); } else { // update waiting time wait = this.keepAlivePeriod - delta; } } } // signal thread end this.keepAliveEventEnd.Set(); } /// /// Thread for raising event /// private void DispatchEventThread() { while (this.isRunning) { if ((this.eventQueue.Count == 0) && !this.isConnectionClosing) { // broker need to receive the first message (CONNECT) // within a reasonable amount of time after TCP/IP connection if (!this.IsConnected) { // wait on receiving message from client with a connection timeout if (!this.receiveEventWaitHandle.WaitOne(this.settings.TimeoutOnConnection)) { // client must close connection this.Close(); // client raw disconnection this.OnConnectionClosed(); } } else { // wait on receiving message from client this.receiveEventWaitHandle.WaitOne(); } } // check if it is running or we are closing client if (this.isRunning) { // get event from queue InternalEvent internalEvent = null; lock (this.eventQueue) { if (this.eventQueue.Count > 0) internalEvent = (InternalEvent)this.eventQueue.Dequeue(); } // it's an event with a message inside if (internalEvent != null) { MqttMsgBase msg = ((MsgInternalEvent)internalEvent).Message; if (msg != null) { switch (msg.Type) { // CONNECT message received case MqttMsgBase.MQTT_MSG_CONNECT_TYPE: // raise connected client event (CONNECT message received) this.OnMqttMsgConnected((MqttMsgConnect)msg); break; // SUBSCRIBE message received case MqttMsgBase.MQTT_MSG_SUBSCRIBE_TYPE: MqttMsgSubscribe subscribe = (MqttMsgSubscribe)msg; // raise subscribe topic event (SUBSCRIBE message received) this.OnMqttMsgSubscribeReceived(subscribe.MessageId, subscribe.Topics, subscribe.QoSLevels); break; // SUBACK message received case MqttMsgBase.MQTT_MSG_SUBACK_TYPE: // raise subscribed topic event (SUBACK message received) this.OnMqttMsgSubscribed((MqttMsgSuback)msg); break; // PUBLISH message received case MqttMsgBase.MQTT_MSG_PUBLISH_TYPE: // PUBLISH message received in a published internal event, no publish succeeded if (internalEvent.GetType() == typeof(MsgPublishedInternalEvent)) this.OnMqttMsgPublished(msg.MessageId, false); else // raise PUBLISH message received event this.OnMqttMsgPublishReceived((MqttMsgPublish)msg); break; // PUBACK message received case MqttMsgBase.MQTT_MSG_PUBACK_TYPE: // raise published message event // (PUBACK received for QoS Level 1) this.OnMqttMsgPublished(msg.MessageId, true); break; // PUBREL message received case MqttMsgBase.MQTT_MSG_PUBREL_TYPE: // raise message received event // (PUBREL received for QoS Level 2) this.OnMqttMsgPublishReceived((MqttMsgPublish)msg); break; // PUBCOMP message received case MqttMsgBase.MQTT_MSG_PUBCOMP_TYPE: // raise published message event // (PUBCOMP received for QoS Level 2) this.OnMqttMsgPublished(msg.MessageId, true); break; // UNSUBSCRIBE message received from client case MqttMsgBase.MQTT_MSG_UNSUBSCRIBE_TYPE: MqttMsgUnsubscribe unsubscribe = (MqttMsgUnsubscribe)msg; // raise unsubscribe topic event (UNSUBSCRIBE message received) this.OnMqttMsgUnsubscribeReceived(unsubscribe.MessageId, unsubscribe.Topics); break; // UNSUBACK message received case MqttMsgBase.MQTT_MSG_UNSUBACK_TYPE: // raise unsubscribed topic event this.OnMqttMsgUnsubscribed(msg.MessageId); break; // DISCONNECT message received from client case MqttMsgDisconnect.MQTT_MSG_DISCONNECT_TYPE: // raise disconnected client event (DISCONNECT message received) this.OnMqttMsgDisconnected(); break; } } } // all events for received messages dispatched, check if there is closing connection if ((this.eventQueue.Count == 0) && this.isConnectionClosing) { // client must close connection this.Close(); // client raw disconnection this.OnConnectionClosed(); } } } } /// /// Process inflight messages queue /// private void ProcessInflightThread() { MqttMsgContext msgContext = null; MqttMsgBase msgInflight = null; MqttMsgBase msgReceived = null; InternalEvent internalEvent = null; bool acknowledge = false; int timeout = Timeout.Infinite; int delta; bool msgReceivedProcessed = false; try { while (this.isRunning) { // wait on message queueud to inflight this.inflightWaitHandle.WaitOne(timeout); // it could be unblocked because Close() method is joining if (this.isRunning) { lock (this.inflightQueue) { // message received and peeked from internal queue is processed // NOTE : it has the corresponding message in inflight queue based on messageId // (ex. a PUBREC for a PUBLISH, a SUBACK for a SUBSCRIBE, ...) // if it's orphan we need to remove from internal queue msgReceivedProcessed = false; acknowledge = false; msgReceived = null; // set timeout tu MaxValue instead of Infinte (-1) to perform // compare with calcultad current msgTimeout timeout = Int32.MaxValue; // a message inflight could be re-enqueued but we have to // analyze it only just one time for cycle int count = this.inflightQueue.Count; // process all inflight queued messages while (count > 0) { count--; acknowledge = false; msgReceived = null; // check to be sure that client isn't closing and all queues are now empty ! if (!this.isRunning) break; // dequeue message context from queue msgContext = (MqttMsgContext)this.inflightQueue.Dequeue(); // get inflight message msgInflight = (MqttMsgBase)msgContext.Message; switch (msgContext.State) { case MqttMsgState.QueuedQos0: // QoS 0, PUBLISH message to send to broker, no state change, no acknowledge if (msgContext.Flow == MqttMsgFlow.ToPublish) { this.Send(msgInflight); } // QoS 0, no need acknowledge else if (msgContext.Flow == MqttMsgFlow.ToAcknowledge) { internalEvent = new MsgInternalEvent(msgInflight); // notify published message from broker (no need acknowledged) this.OnInternalEvent(internalEvent); } logger.Debug("processed {0}" , msgInflight); break; case MqttMsgState.QueuedQos1: // [v3.1.1] SUBSCRIBE and UNSIBSCRIBE aren't "officially" QOS = 1 case MqttMsgState.SendSubscribe: case MqttMsgState.SendUnsubscribe: // QoS 1, PUBLISH or SUBSCRIBE/UNSUBSCRIBE message to send to broker, state change to wait PUBACK or SUBACK/UNSUBACK if (msgContext.Flow == MqttMsgFlow.ToPublish) { msgContext.Timestamp = Environment.TickCount; msgContext.Attempt++; if (msgInflight.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) { // PUBLISH message to send, wait for PUBACK msgContext.State = MqttMsgState.WaitForPuback; // retry ? set dup flag [v3.1.1] only for PUBLISH message if (msgContext.Attempt > 1) msgInflight.DupFlag = true; } else if (msgInflight.Type == MqttMsgBase.MQTT_MSG_SUBSCRIBE_TYPE) // SUBSCRIBE message to send, wait for SUBACK msgContext.State = MqttMsgState.WaitForSuback; else if (msgInflight.Type == MqttMsgBase.MQTT_MSG_UNSUBSCRIBE_TYPE) // UNSUBSCRIBE message to send, wait for UNSUBACK msgContext.State = MqttMsgState.WaitForUnsuback; this.Send(msgInflight); // update timeout : minimum between delay (based on current message sent) or current timeout timeout = (this.settings.DelayOnRetry < timeout) ? this.settings.DelayOnRetry : timeout; // re-enqueue message (I have to re-analyze for receiving PUBACK, SUBACK or UNSUBACK) this.inflightQueue.Enqueue(msgContext); } // QoS 1, PUBLISH message received from broker to acknowledge, send PUBACK else if (msgContext.Flow == MqttMsgFlow.ToAcknowledge) { MqttMsgPuback puback = new MqttMsgPuback(); puback.MessageId = msgInflight.MessageId; this.Send(puback); internalEvent = new MsgInternalEvent(msgInflight); // notify published message from broker and acknowledged this.OnInternalEvent(internalEvent); logger.Debug("processed {0}" , internalEvent); } break; case MqttMsgState.QueuedQos2: // QoS 2, PUBLISH message to send to broker, state change to wait PUBREC if (msgContext.Flow == MqttMsgFlow.ToPublish) { msgContext.Timestamp = Environment.TickCount; msgContext.Attempt++; msgContext.State = MqttMsgState.WaitForPubrec; // retry ? set dup flag if (msgContext.Attempt > 1) msgInflight.DupFlag = true; this.Send(msgInflight); // update timeout : minimum between delay (based on current message sent) or current timeout timeout = (this.settings.DelayOnRetry < timeout) ? this.settings.DelayOnRetry : timeout; // re-enqueue message (I have to re-analyze for receiving PUBREC) this.inflightQueue.Enqueue(msgContext); } // QoS 2, PUBLISH message received from broker to acknowledge, send PUBREC, state change to wait PUBREL else if (msgContext.Flow == MqttMsgFlow.ToAcknowledge) { MqttMsgPubrec pubrec = new MqttMsgPubrec(); pubrec.MessageId = msgInflight.MessageId; msgContext.State = MqttMsgState.WaitForPubrel; this.Send(pubrec); // re-enqueue message (I have to re-analyze for receiving PUBREL) this.inflightQueue.Enqueue(msgContext); } break; case MqttMsgState.WaitForPuback: case MqttMsgState.WaitForSuback: case MqttMsgState.WaitForUnsuback: // QoS 1, waiting for PUBACK of a PUBLISH message sent or // waiting for SUBACK of a SUBSCRIBE message sent or // waiting for UNSUBACK of a UNSUBSCRIBE message sent or if (msgContext.Flow == MqttMsgFlow.ToPublish) { acknowledge = false; lock (this.internalQueue) { if (this.internalQueue.Count > 0) msgReceived = (MqttMsgBase)this.internalQueue.Peek(); } // it is a PUBACK message or a SUBACK/UNSUBACK message if (msgReceived != null) { // PUBACK message or SUBACK/UNSUBACK message for the current message if (((msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBACK_TYPE) && (msgInflight.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) && (msgReceived.MessageId == msgInflight.MessageId)) || ((msgReceived.Type == MqttMsgBase.MQTT_MSG_SUBACK_TYPE) && (msgInflight.Type == MqttMsgBase.MQTT_MSG_SUBSCRIBE_TYPE) && (msgReceived.MessageId == msgInflight.MessageId)) || ((msgReceived.Type == MqttMsgBase.MQTT_MSG_UNSUBACK_TYPE) && (msgInflight.Type == MqttMsgBase.MQTT_MSG_UNSUBSCRIBE_TYPE) && (msgReceived.MessageId == msgInflight.MessageId))) { lock (this.internalQueue) { // received message processed this.internalQueue.Dequeue(); acknowledge = true; msgReceivedProcessed = true; logger.Debug("dequeued {0}" , msgReceived); } // if PUBACK received, confirm published with flag if (msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBACK_TYPE) internalEvent = new MsgPublishedInternalEvent(msgReceived, true); else internalEvent = new MsgInternalEvent(msgReceived); // notify received acknowledge from broker of a published message or subscribe/unsubscribe message this.OnInternalEvent(internalEvent); // PUBACK received for PUBLISH message with QoS Level 1, remove from session state if ((msgInflight.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) && (this.session != null) && (this.session.InflightMessages.ContainsKey(msgContext.Key))) { this.session.InflightMessages.Remove(msgContext.Key); } logger.Debug("processed {0}" , msgInflight); } } // current message not acknowledged, no PUBACK or SUBACK/UNSUBACK or not equal messageid if (!acknowledge) { delta = Environment.TickCount - msgContext.Timestamp; // check timeout for receiving PUBACK since PUBLISH was sent or // for receiving SUBACK since SUBSCRIBE was sent or // for receiving UNSUBACK since UNSUBSCRIBE was sent if (delta >= this.settings.DelayOnRetry) { // max retry not reached, resend if (msgContext.Attempt < this.settings.AttemptsOnRetry) { msgContext.State = MqttMsgState.QueuedQos1; // re-enqueue message this.inflightQueue.Enqueue(msgContext); // update timeout (0 -> reanalyze queue immediately) timeout = 0; } else { // if PUBACK for a PUBLISH message not received after retries, raise event for not published if (msgInflight.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) { // PUBACK not received in time, PUBLISH retries failed, need to remove from session inflight messages too if ((this.session != null) && (this.session.InflightMessages.ContainsKey(msgContext.Key))) { this.session.InflightMessages.Remove(msgContext.Key); } internalEvent = new MsgPublishedInternalEvent(msgInflight, false); // notify not received acknowledge from broker and message not published this.OnInternalEvent(internalEvent); } // NOTE : not raise events for SUBACK or UNSUBACK not received // for the user no event raised means subscribe/unsubscribe failed } } else { // re-enqueue message (I have to re-analyze for receiving PUBACK, SUBACK or UNSUBACK) this.inflightQueue.Enqueue(msgContext); // update timeout int msgTimeout = (this.settings.DelayOnRetry - delta); timeout = (msgTimeout < timeout) ? msgTimeout : timeout; } } } break; case MqttMsgState.WaitForPubrec: // QoS 2, waiting for PUBREC of a PUBLISH message sent if (msgContext.Flow == MqttMsgFlow.ToPublish) { acknowledge = false; lock (this.internalQueue) { if (this.internalQueue.Count > 0) msgReceived = (MqttMsgBase)this.internalQueue.Peek(); } // it is a PUBREC message if ((msgReceived != null) && (msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBREC_TYPE)) { // PUBREC message for the current PUBLISH message, send PUBREL, wait for PUBCOMP if (msgReceived.MessageId == msgInflight.MessageId) { lock (this.internalQueue) { // received message processed this.internalQueue.Dequeue(); acknowledge = true; msgReceivedProcessed = true; logger.Debug("dequeued {0}" , msgReceived); } MqttMsgPubrel pubrel = new MqttMsgPubrel(); pubrel.MessageId = msgInflight.MessageId; msgContext.State = MqttMsgState.WaitForPubcomp; msgContext.Timestamp = Environment.TickCount; msgContext.Attempt = 1; this.Send(pubrel); // update timeout : minimum between delay (based on current message sent) or current timeout timeout = (this.settings.DelayOnRetry < timeout) ? this.settings.DelayOnRetry : timeout; // re-enqueue message this.inflightQueue.Enqueue(msgContext); } } // current message not acknowledged if (!acknowledge) { delta = Environment.TickCount - msgContext.Timestamp; // check timeout for receiving PUBREC since PUBLISH was sent if (delta >= this.settings.DelayOnRetry) { // max retry not reached, resend if (msgContext.Attempt < this.settings.AttemptsOnRetry) { msgContext.State = MqttMsgState.QueuedQos2; // re-enqueue message this.inflightQueue.Enqueue(msgContext); // update timeout (0 -> reanalyze queue immediately) timeout = 0; } else { // PUBREC not received in time, PUBLISH retries failed, need to remove from session inflight messages too if ((this.session != null) && (this.session.InflightMessages.ContainsKey(msgContext.Key))) { this.session.InflightMessages.Remove(msgContext.Key); } // if PUBREC for a PUBLISH message not received after retries, raise event for not published internalEvent = new MsgPublishedInternalEvent(msgInflight, false); // notify not received acknowledge from broker and message not published this.OnInternalEvent(internalEvent); } } else { // re-enqueue message this.inflightQueue.Enqueue(msgContext); // update timeout int msgTimeout = (this.settings.DelayOnRetry - delta); timeout = (msgTimeout < timeout) ? msgTimeout : timeout; } } } break; case MqttMsgState.WaitForPubrel: // QoS 2, waiting for PUBREL of a PUBREC message sent if (msgContext.Flow == MqttMsgFlow.ToAcknowledge) { lock (this.internalQueue) { if (this.internalQueue.Count > 0) msgReceived = (MqttMsgBase)this.internalQueue.Peek(); } // it is a PUBREL message if ((msgReceived != null) && (msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBREL_TYPE)) { // PUBREL message for the current message, send PUBCOMP if (msgReceived.MessageId == msgInflight.MessageId) { lock (this.internalQueue) { // received message processed this.internalQueue.Dequeue(); msgReceivedProcessed = true; logger.Debug("dequeued {0}" , msgReceived); } MqttMsgPubcomp pubcomp = new MqttMsgPubcomp(); pubcomp.MessageId = msgInflight.MessageId; this.Send(pubcomp); internalEvent = new MsgInternalEvent(msgInflight); // notify published message from broker and acknowledged this.OnInternalEvent(internalEvent); // PUBREL received (and PUBCOMP sent) for PUBLISH message with QoS Level 2, remove from session state if ((msgInflight.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) && (this.session != null) && (this.session.InflightMessages.ContainsKey(msgContext.Key))) { this.session.InflightMessages.Remove(msgContext.Key); } logger.Debug("processed {0}" , msgInflight); } else { // re-enqueue message this.inflightQueue.Enqueue(msgContext); } } else { // re-enqueue message this.inflightQueue.Enqueue(msgContext); } } break; case MqttMsgState.WaitForPubcomp: // QoS 2, waiting for PUBCOMP of a PUBREL message sent if (msgContext.Flow == MqttMsgFlow.ToPublish) { acknowledge = false; lock (this.internalQueue) { if (this.internalQueue.Count > 0) msgReceived = (MqttMsgBase)this.internalQueue.Peek(); } // it is a PUBCOMP message if ((msgReceived != null) && (msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBCOMP_TYPE)) { // PUBCOMP message for the current message if (msgReceived.MessageId == msgInflight.MessageId) { lock (this.internalQueue) { // received message processed this.internalQueue.Dequeue(); acknowledge = true; msgReceivedProcessed = true; logger.Debug("dequeued {0}" , msgReceived); } internalEvent = new MsgPublishedInternalEvent(msgReceived, true); // notify received acknowledge from broker of a published message this.OnInternalEvent(internalEvent); // PUBCOMP received for PUBLISH message with QoS Level 2, remove from session state if ((msgInflight.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) && (this.session != null) && (this.session.InflightMessages.ContainsKey(msgContext.Key))) { this.session.InflightMessages.Remove(msgContext.Key); } logger.Debug("processed {0}" , msgInflight); } } // it is a PUBREC message else if ((msgReceived != null) && (msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBREC_TYPE)) { // another PUBREC message for the current message due to a retransmitted PUBLISH // I'm in waiting for PUBCOMP, so I can discard this PUBREC if (msgReceived.MessageId == msgInflight.MessageId) { lock (this.internalQueue) { // received message processed this.internalQueue.Dequeue(); acknowledge = true; msgReceivedProcessed = true; logger.Debug("dequeued {0}" , msgReceived); // re-enqueue message this.inflightQueue.Enqueue(msgContext); } } } // current message not acknowledged if (!acknowledge) { delta = Environment.TickCount - msgContext.Timestamp; // check timeout for receiving PUBCOMP since PUBREL was sent if (delta >= this.settings.DelayOnRetry) { // max retry not reached, resend if (msgContext.Attempt < this.settings.AttemptsOnRetry) { msgContext.State = MqttMsgState.SendPubrel; // re-enqueue message this.inflightQueue.Enqueue(msgContext); // update timeout (0 -> reanalyze queue immediately) timeout = 0; } else { // PUBCOMP not received, PUBREL retries failed, need to remove from session inflight messages too if ((this.session != null) && (this.session.InflightMessages.ContainsKey(msgContext.Key))) { this.session.InflightMessages.Remove(msgContext.Key); } // if PUBCOMP for a PUBLISH message not received after retries, raise event for not published internalEvent = new MsgPublishedInternalEvent(msgInflight, false); // notify not received acknowledge from broker and message not published this.OnInternalEvent(internalEvent); } } else { // re-enqueue message this.inflightQueue.Enqueue(msgContext); // update timeout int msgTimeout = (this.settings.DelayOnRetry - delta); timeout = (msgTimeout < timeout) ? msgTimeout : timeout; } } } break; case MqttMsgState.SendPubrec: // TODO : impossible ? --> QueuedQos2 ToAcknowledge break; case MqttMsgState.SendPubrel: // QoS 2, PUBREL message to send to broker, state change to wait PUBCOMP if (msgContext.Flow == MqttMsgFlow.ToPublish) { MqttMsgPubrel pubrel = new MqttMsgPubrel(); pubrel.MessageId = msgInflight.MessageId; msgContext.State = MqttMsgState.WaitForPubcomp; msgContext.Timestamp = Environment.TickCount; msgContext.Attempt++; // retry ? set dup flag [v3.1.1] no needed if (this.ProtocolVersion == MqttProtocolVersion.Version_3_1) { if (msgContext.Attempt > 1) pubrel.DupFlag = true; } this.Send(pubrel); // update timeout : minimum between delay (based on current message sent) or current timeout timeout = (this.settings.DelayOnRetry < timeout) ? this.settings.DelayOnRetry : timeout; // re-enqueue message this.inflightQueue.Enqueue(msgContext); } break; case MqttMsgState.SendPubcomp: // TODO : impossible ? break; case MqttMsgState.SendPuback: // TODO : impossible ? --> QueuedQos1 ToAcknowledge break; default: break; } } // if calculated timeout is MaxValue, it means that must be Infinite (-1) if (timeout == Int32.MaxValue) timeout = Timeout.Infinite; // if message received is orphan, no corresponding message in inflight queue // based on messageId, we need to remove from the queue if ((msgReceived != null) && !msgReceivedProcessed) { this.internalQueue.Dequeue(); logger.Debug("dequeued {0}" , msgReceived); } } } } } catch (MqttCommunicationException e) { // possible exception on Send, I need to re-enqueue not sent message if (msgContext != null) // re-enqueue message this.inflightQueue.Enqueue(msgContext); logger.Debug("Exception occurred: {0}" , e.ToString()); // raise disconnection client event this.OnConnectionClosing(); } } /// /// Restore session /// private void RestoreSession() { // if not clean session if (!this.CleanSession) { // there is a previous session if (this.session != null) { lock (this.inflightQueue) { foreach (MqttMsgContext msgContext in this.session.InflightMessages.Values) { this.inflightQueue.Enqueue(msgContext); // if it is a PUBLISH message to publish if ((msgContext.Message.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) && (msgContext.Flow == MqttMsgFlow.ToPublish)) { // it's QoS 1 and we haven't received PUBACK if ((msgContext.Message.QosLevel == MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE) && (msgContext.State == MqttMsgState.WaitForPuback)) { // we haven't received PUBACK, we need to resend PUBLISH message msgContext.State = MqttMsgState.QueuedQos1; } // it's QoS 2 else if (msgContext.Message.QosLevel == MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE) { // we haven't received PUBREC, we need to resend PUBLISH message if (msgContext.State == MqttMsgState.WaitForPubrec) { msgContext.State = MqttMsgState.QueuedQos2; } // we haven't received PUBCOMP, we need to resend PUBREL for it else if (msgContext.State == MqttMsgState.WaitForPubcomp) { msgContext.State = MqttMsgState.SendPubrel; } } } } } // unlock process inflight queue this.inflightWaitHandle.Set(); } else { // create new session this.session = new MqttClientSession(this.ClientId); } } // clean any previous session else { if (this.session != null) this.session.Clear(); } } /// /// Load a given session /// /// MQTT Client session to load public void LoadSession(MqttClientSession session) { // if not clean session if (!this.CleanSession) { // set the session ... this.session = session; // ... and restore it this.RestoreSession(); } } /// /// Generate the next message identifier /// /// Message identifier private ushort GetMessageId() { // if 0 or max UInt16, it becomes 1 (first valid messageId) this.messageIdCounter = ((this.messageIdCounter % UInt16.MaxValue) != 0) ? (ushort)(this.messageIdCounter + 1) : (ushort)1; return this.messageIdCounter; } /// /// Finder class for PUBLISH message inside a queue /// internal class MqttMsgContextFinder { // PUBLISH message id internal ushort MessageId { get; set; } // message flow into inflight queue internal MqttMsgFlow Flow { get; set; } /// /// Constructor /// /// Message Id /// Message flow inside inflight queue internal MqttMsgContextFinder(ushort messageId, MqttMsgFlow flow) { this.MessageId = messageId; this.Flow = flow; } internal bool Find(object item) { MqttMsgContext msgCtx = (MqttMsgContext)item; return ((msgCtx.Message.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) && (msgCtx.Message.MessageId == this.MessageId) && msgCtx.Flow == this.Flow); } } } /// /// MQTT protocol version /// public enum MqttProtocolVersion { Version_3_1 = MqttMsgConnect.PROTOCOL_VERSION_V3_1, Version_3_1_1 = MqttMsgConnect.PROTOCOL_VERSION_V3_1_1 } }