using System; using System.Collections.Generic; using System.Linq; using System.Text; using NLog; using uPLibrary.Networking.M2Mqtt.Communication; using uPLibrary.Networking.M2Mqtt.Exceptions; using uPLibrary.Networking.M2Mqtt.Managers; using uPLibrary.Networking.M2Mqtt.Messages; using uPLibrary.Networking.M2Mqtt.Session; using uPLibrary.Networking.M2Mqtt.Utility; namespace uPLibrary.Networking.M2Mqtt { public class MqttBrokerExt { private static Logger logger = NLog.LogManager.GetCurrentClassLogger(); // MQTT broker settings private MqttSettings settings; // clients connected list private MqttClientCollection clients; // reference to publisher manager private MqttPublisherManager publisherManager; // reference to subscriber manager private MqttSubscriberManager subscriberManager; // reference to session manager private MqttSessionManager sessionManager; // reference to User Access Control manager private MqttUacManager uacManager; // MQTT communication layer private IMqttCommunicationLayer commLayer; /// /// User authentication method /// public MqttUserAuthenticationDelegate UserAuth { get { return this.uacManager.UserAuth; } set { this.uacManager.UserAuth = value; } } public MqttBrokerExt(int port) { // MQTT broker settings this.settings = MqttSettings.Instance; // MQTT communication layer this.commLayer = new MqttTcpCommunicationLayer(port); this.commLayer.ClientConnected += OnClientConnected; // create managers (publisher, subscriber, session and UAC) this.subscriberManager = new MqttSubscriberManager(); this.sessionManager = new MqttSessionManager(); this.publisherManager = new MqttPublisherManager(this.subscriberManager , this.sessionManager); this.uacManager = new MqttUacManager(); this.uacManager.UserAuth = UserAuthentication; this.clients = new MqttClientCollection(); } public bool UserAuthentication(string username , string password) { logger.Info("用户认证"); return true; } /// /// Start broker /// public void Start() { this.commLayer.Start(); this.publisherManager.Start(); } /// /// Stop broker /// public void Stop() { this.commLayer.Stop(); this.publisherManager.Stop(); // close connection with all clients foreach (MqttClient client in this.clients) { client.Close(); } } void OnClientConnected(object sender , MqttClientConnectedEventArgs e) { // register event handlers from client e.Client.MqttMsgDisconnected += OnClientMqttMsgDisconnected; e.Client.MqttMsgPublishReceived += OnClientMqttMsgPublishReceived; e.Client.MqttMsgConnected += OnClientMqttMsgConnected; e.Client.MqttMsgSubscribeReceived += OnClientMqttMsgSubscribeReceived; e.Client.MqttMsgUnsubscribeReceived += OnClientMqttMsgUnsubscribeReceived; e.Client.ConnectionClosed += OnClientConnectionClosed; // add client to the collection this.clients.Add(e.Client); // start client threads e.Client.Open(); } void OnClientMqttMsgPublishReceived(object sender , MqttMsgPublishEventArgs e) { MqttClient client = (MqttClient)sender; // create PUBLISH message to publish // [v3.1.1] DUP flag from an incoming PUBLISH message is not propagated to subscribers // It should be set in the outgoing PUBLISH message based on transmission for each subscriber MqttMsgPublish publish = new MqttMsgPublish(e.Topic , e.Message , false , e.QosLevel , e.Retain); // publish message through publisher manager this.publisherManager.Publish(publish); } void OnClientMqttMsgConnected(object sender , MqttMsgConnectEventArgs e) { // [v3.1.1] session present flag bool sessionPresent = false; // [v3.1.1] generated client id for client who provides client id zero bytes length string clientId = null; MqttClient client = (MqttClient)sender; // verify message to determine CONNACK message return code to the client byte returnCode = this.MqttConnectVerify(e.Message); // [v3.1.1] if client id is zero length, the broker assigns a unique identifier to it clientId = (e.Message.ClientId.Length != 0) ? e.Message.ClientId : Guid.NewGuid().ToString(); logger.Info("客户端准备接入<{0}>" , clientId); //连接已被服务端接受,必须先断开 if (returnCode == MqttMsgConnack.CONN_ACCEPTED) { // check if there is a client already connected with same client Id MqttClient clientConnected = this.GetClient(clientId); // force connection close to the existing client (MQTT protocol) if (clientConnected != null) { this.CloseClient(clientConnected); } } try { // connection accepted, load (if exists) client session if (returnCode == MqttMsgConnack.CONN_ACCEPTED) { logger.Info("收到清理会话CleanSession标志为<{0}>的连接" , e.Message.CleanSession); //持久化Session if (!e.Message.CleanSession) { // create session for the client MqttClientSession clientSession = new MqttClientSession(clientId); // get session for the connected client MqttBrokerSession session = this.sessionManager.GetSession(clientId); // set inflight queue into the client session if (session != null) { clientSession.InflightMessages = session.InflightMessages; // [v3.1.1] session present flag if (client.ProtocolVersion == MqttProtocolVersion.Version_3_1_1) sessionPresent = true; } // send CONNACK message to the client client.Connack(e.Message , returnCode , clientId , sessionPresent); // load/inject session to the client client.LoadSession(clientSession); if (session != null) { // set reference to connected client into the session session.Client = client; // there are saved subscriptions if (session.Subscriptions != null) { // register all subscriptions for the connected client foreach (MqttSubscription subscription in session.Subscriptions) { this.subscriberManager.Subscribe(subscription.Topic , subscription.QosLevel , client); // publish retained message on the current subscription this.publisherManager.PublishRetaind(subscription.Topic , clientId); } } // there are saved outgoing messages if (session.OutgoingMessages.Count > 0) { // publish outgoing messages for the session this.publisherManager.PublishSession(session.ClientId); } } } // requested clean session else { // send CONNACK message to the client client.Connack(e.Message , returnCode , clientId , sessionPresent); this.sessionManager.ClearSession(clientId); } } else { // send CONNACK message to the client client.Connack(e.Message , returnCode , clientId , sessionPresent); } } catch (MqttCommunicationException ex) { this.CloseClient(client); logger.Error(ex , "请求连接发生异常"); } } /// /// Close a client /// /// Client to close private void CloseClient(MqttClient client) { if (this.clients.Contains(client)) { logger.Info("准备断开设备接入<{0}>" , client.ClientId); // if client is connected and it has a will message if (!client.IsConnected && client.WillFlag) { // create the will PUBLISH message MqttMsgPublish publish = new MqttMsgPublish(client.WillTopic , Encoding.UTF8.GetBytes(client.WillMessage) , false , client.WillQosLevel , false); // publish message through publisher manager this.publisherManager.Publish(publish); } // if not clean session if (!client.CleanSession) { List subscriptions = this.subscriberManager.GetSubscriptionsByClient(client.ClientId); if ((subscriptions != null) && (subscriptions.Count > 0)) { this.sessionManager.SaveSession(client.ClientId , client.Session , subscriptions); // TODO : persist client session if broker close } } // delete client from runtime subscription this.subscriberManager.Unsubscribe(client); // close the client client.Close(); // remove client from the collection this.clients.Remove(client); logger.Info("断开设备接入成功<{0}>" , client.ClientId); } } void OnClientMqttMsgDisconnected(object sender , EventArgs e) { MqttClient client = (MqttClient)sender; // close the client this.CloseClient(client); } void OnClientConnectionClosed(object sender , EventArgs e) { MqttClient client = (MqttClient)sender; // close the client this.CloseClient(client); } void OnClientMqttMsgSubscribeReceived(object sender , MqttMsgSubscribeEventArgs e) { MqttClient client = (MqttClient)sender; for (int i = 0; i < e.Topics.Length; i++) { // TODO : business logic to grant QoS levels based on some conditions ? // now the broker granted the QoS levels requested by client // subscribe client for each topic and QoS level requested this.subscriberManager.Subscribe(e.Topics[i] , e.QoSLevels[i] , client); } try { // send SUBACK message to the client client.Suback(e.MessageId , e.QoSLevels); for (int i = 0; i < e.Topics.Length; i++) { // publish retained message on the current subscription this.publisherManager.PublishRetaind(e.Topics[i] , client.ClientId); } } catch (MqttCommunicationException) { this.CloseClient(client); } } void OnClientMqttMsgUnsubscribeReceived(object sender , MqttMsgUnsubscribeEventArgs e) { MqttClient client = (MqttClient)sender; for (int i = 0; i < e.Topics.Length; i++) { // unsubscribe client for each topic requested this.subscriberManager.Unsubscribe(e.Topics[i] , client); } try { // send UNSUBACK message to the client client.Unsuback(e.MessageId); } catch (MqttCommunicationException) { this.CloseClient(client); } } internal const byte PROTOCOL_VERSION_V3_1 = 0x03; internal const byte PROTOCOL_VERSION_V3_1_1 = 0x04; // max length for client id (removed in 3.1.1) internal const int CLIENT_ID_MAX_LENGTH = 23; /// /// Check CONNECT message to accept or not the connection request /// /// CONNECT message received from client /// Return code for CONNACK message private byte MqttConnectVerify(MqttMsgConnect connect) { byte returnCode = MqttMsgConnack.CONN_ACCEPTED; //0x01连接已拒绝,不支持的协议版本 if ((connect.ProtocolVersion != PROTOCOL_VERSION_V3_1) && (connect.ProtocolVersion != PROTOCOL_VERSION_V3_1_1)) { returnCode = MqttMsgConnack.CONN_REFUSED_PROT_VERS; } else { //0x02连接已拒绝,不合格的客户端标识符 (only for old MQTT 3.1) if ((connect.ProtocolVersion == PROTOCOL_VERSION_V3_1) && (connect.ClientId.Length > CLIENT_ID_MAX_LENGTH)) { returnCode = MqttMsgConnack.CONN_REFUSED_IDENT_REJECTED; } else { // [v.3.1.1] client id zero length is allowed but clean session must be true if ((connect.ClientId.Length == 0) && (!connect.CleanSession)) { returnCode = MqttMsgConnack.CONN_REFUSED_IDENT_REJECTED; } else { // 校验用户身份,0x04连接已拒绝,无效的用户名或密码 if (!this.uacManager.UserAuthentication(connect.Username , connect.Password)) { returnCode = MqttMsgConnack.CONN_REFUSED_USERNAME_PASSWORD; } // server unavailable and not authorized ? else { // TODO : other checks on CONNECT message } } } } return returnCode; } /// /// Return reference to a client with a specified Id is already connected /// /// Client Id to verify /// Reference to client public MqttClient GetClient(string clientId) { var query = from c in this.clients where c.ClientId == clientId select c; return query.FirstOrDefault(); } } }