You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

429 lines
16 KiB
C#

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using NLog;
using uPLibrary.Networking.M2Mqtt.Communication;
using uPLibrary.Networking.M2Mqtt.Exceptions;
using uPLibrary.Networking.M2Mqtt.Managers;
using uPLibrary.Networking.M2Mqtt.Messages;
using uPLibrary.Networking.M2Mqtt.Session;
using uPLibrary.Networking.M2Mqtt.Utility;
namespace uPLibrary.Networking.M2Mqtt
{
public class MqttBrokerExt
{
private static Logger logger = NLog.LogManager.GetCurrentClassLogger();
// MQTT broker settings
private MqttSettings settings;
// clients connected list
private MqttClientCollection clients;
// reference to publisher manager
private MqttPublisherManager publisherManager;
// reference to subscriber manager
private MqttSubscriberManager subscriberManager;
// reference to session manager
private MqttSessionManager sessionManager;
// reference to User Access Control manager
private MqttUacManager uacManager;
// MQTT communication layer
private IMqttCommunicationLayer commLayer;
/// <summary>
/// User authentication method
/// </summary>
public MqttUserAuthenticationDelegate UserAuth
{
get { return this.uacManager.UserAuth; }
set { this.uacManager.UserAuth = value; }
}
public MqttBrokerExt(int port)
{
// MQTT broker settings
this.settings = MqttSettings.Instance;
// MQTT communication layer
this.commLayer = new MqttTcpCommunicationLayer(port);
this.commLayer.ClientConnected += OnClientConnected;
// create managers (publisher, subscriber, session and UAC)
this.subscriberManager = new MqttSubscriberManager();
this.sessionManager = new MqttSessionManager();
this.publisherManager = new MqttPublisherManager(this.subscriberManager , this.sessionManager);
this.uacManager = new MqttUacManager();
this.uacManager.UserAuth = UserAuthentication;
this.clients = new MqttClientCollection();
}
public bool UserAuthentication(string username , string password)
{
logger.Info("用户认证");
return true;
}
/// <summary>
/// Start broker
/// </summary>
public void Start()
{
this.commLayer.Start();
this.publisherManager.Start();
}
/// <summary>
/// Stop broker
/// </summary>
public void Stop()
{
this.commLayer.Stop();
this.publisherManager.Stop();
// close connection with all clients
foreach (MqttClient client in this.clients)
{
client.Close();
}
}
void OnClientConnected(object sender , MqttClientConnectedEventArgs e)
{
// register event handlers from client
e.Client.MqttMsgDisconnected += OnClientMqttMsgDisconnected;
e.Client.MqttMsgPublishReceived += OnClientMqttMsgPublishReceived;
e.Client.MqttMsgConnected += OnClientMqttMsgConnected;
e.Client.MqttMsgSubscribeReceived += OnClientMqttMsgSubscribeReceived;
e.Client.MqttMsgUnsubscribeReceived += OnClientMqttMsgUnsubscribeReceived;
e.Client.ConnectionClosed += OnClientConnectionClosed;
// add client to the collection
this.clients.Add(e.Client);
// start client threads
e.Client.Open();
}
void OnClientMqttMsgPublishReceived(object sender , MqttMsgPublishEventArgs e)
{
MqttClient client = (MqttClient)sender;
// create PUBLISH message to publish
// [v3.1.1] DUP flag from an incoming PUBLISH message is not propagated to subscribers
// It should be set in the outgoing PUBLISH message based on transmission for each subscriber
MqttMsgPublish publish = new MqttMsgPublish(e.Topic , e.Message , false , e.QosLevel , e.Retain);
// publish message through publisher manager
this.publisherManager.Publish(publish);
}
void OnClientMqttMsgConnected(object sender , MqttMsgConnectEventArgs e)
{
// [v3.1.1] session present flag
bool sessionPresent = false;
// [v3.1.1] generated client id for client who provides client id zero bytes length
string clientId = null;
MqttClient client = (MqttClient)sender;
// verify message to determine CONNACK message return code to the client
byte returnCode = this.MqttConnectVerify(e.Message);
// [v3.1.1] if client id is zero length, the broker assigns a unique identifier to it
clientId = (e.Message.ClientId.Length != 0) ? e.Message.ClientId : Guid.NewGuid().ToString();
logger.Info("客户端准备接入<{0}>" , clientId);
//连接已被服务端接受,必须先断开
if (returnCode == MqttMsgConnack.CONN_ACCEPTED)
{
// check if there is a client already connected with same client Id
MqttClient clientConnected = this.GetClient(clientId);
// force connection close to the existing client (MQTT protocol)
if (clientConnected != null)
{
this.CloseClient(clientConnected);
}
}
try
{
// connection accepted, load (if exists) client session
if (returnCode == MqttMsgConnack.CONN_ACCEPTED)
{
logger.Info("收到清理会话CleanSession标志为<{0}>的连接" , e.Message.CleanSession);
//持久化Session
if (!e.Message.CleanSession)
{
// create session for the client
MqttClientSession clientSession = new MqttClientSession(clientId);
// get session for the connected client
MqttBrokerSession session = this.sessionManager.GetSession(clientId);
// set inflight queue into the client session
if (session != null)
{
clientSession.InflightMessages = session.InflightMessages;
// [v3.1.1] session present flag
if (client.ProtocolVersion == MqttProtocolVersion.Version_3_1_1)
sessionPresent = true;
}
// send CONNACK message to the client
client.Connack(e.Message , returnCode , clientId , sessionPresent);
// load/inject session to the client
client.LoadSession(clientSession);
if (session != null)
{
// set reference to connected client into the session
session.Client = client;
// there are saved subscriptions
if (session.Subscriptions != null)
{
// register all subscriptions for the connected client
foreach (MqttSubscription subscription in session.Subscriptions)
{
this.subscriberManager.Subscribe(subscription.Topic , subscription.QosLevel , client);
// publish retained message on the current subscription
this.publisherManager.PublishRetaind(subscription.Topic , clientId);
}
}
// there are saved outgoing messages
if (session.OutgoingMessages.Count > 0)
{
// publish outgoing messages for the session
this.publisherManager.PublishSession(session.ClientId);
}
}
}
// requested clean session
else
{
// send CONNACK message to the client
client.Connack(e.Message , returnCode , clientId , sessionPresent);
this.sessionManager.ClearSession(clientId);
}
}
else
{
// send CONNACK message to the client
client.Connack(e.Message , returnCode , clientId , sessionPresent);
}
}
catch (MqttCommunicationException ex)
{
this.CloseClient(client);
logger.Error(ex , "请求连接发生异常");
}
}
/// <summary>
/// Close a client
/// </summary>
/// <param name="client">Client to close</param>
private void CloseClient(MqttClient client)
{
if (this.clients.Contains(client))
{
logger.Info("准备断开设备接入<{0}>" , client.ClientId);
// if client is connected and it has a will message
if (!client.IsConnected && client.WillFlag)
{
// create the will PUBLISH message
MqttMsgPublish publish =
new MqttMsgPublish(client.WillTopic , Encoding.UTF8.GetBytes(client.WillMessage) , false , client.WillQosLevel , false);
// publish message through publisher manager
this.publisherManager.Publish(publish);
}
// if not clean session
if (!client.CleanSession)
{
List<MqttSubscription> subscriptions = this.subscriberManager.GetSubscriptionsByClient(client.ClientId);
if ((subscriptions != null) && (subscriptions.Count > 0))
{
this.sessionManager.SaveSession(client.ClientId , client.Session , subscriptions);
// TODO : persist client session if broker close
}
}
// delete client from runtime subscription
this.subscriberManager.Unsubscribe(client);
// close the client
client.Close();
// remove client from the collection
this.clients.Remove(client);
logger.Info("断开设备接入成功<{0}>" , client.ClientId);
}
}
void OnClientMqttMsgDisconnected(object sender , EventArgs e)
{
MqttClient client = (MqttClient)sender;
// close the client
this.CloseClient(client);
}
void OnClientConnectionClosed(object sender , EventArgs e)
{
MqttClient client = (MqttClient)sender;
// close the client
this.CloseClient(client);
}
void OnClientMqttMsgSubscribeReceived(object sender , MqttMsgSubscribeEventArgs e)
{
MqttClient client = (MqttClient)sender;
for (int i = 0; i < e.Topics.Length; i++)
{
// TODO : business logic to grant QoS levels based on some conditions ?
// now the broker granted the QoS levels requested by client
// subscribe client for each topic and QoS level requested
this.subscriberManager.Subscribe(e.Topics[i] , e.QoSLevels[i] , client);
}
try
{
// send SUBACK message to the client
client.Suback(e.MessageId , e.QoSLevels);
for (int i = 0; i < e.Topics.Length; i++)
{
// publish retained message on the current subscription
this.publisherManager.PublishRetaind(e.Topics[i] , client.ClientId);
}
}
catch (MqttCommunicationException)
{
this.CloseClient(client);
}
}
void OnClientMqttMsgUnsubscribeReceived(object sender , MqttMsgUnsubscribeEventArgs e)
{
MqttClient client = (MqttClient)sender;
for (int i = 0; i < e.Topics.Length; i++)
{
// unsubscribe client for each topic requested
this.subscriberManager.Unsubscribe(e.Topics[i] , client);
}
try
{
// send UNSUBACK message to the client
client.Unsuback(e.MessageId);
}
catch (MqttCommunicationException)
{
this.CloseClient(client);
}
}
internal const byte PROTOCOL_VERSION_V3_1 = 0x03;
internal const byte PROTOCOL_VERSION_V3_1_1 = 0x04;
// max length for client id (removed in 3.1.1)
internal const int CLIENT_ID_MAX_LENGTH = 23;
/// <summary>
/// Check CONNECT message to accept or not the connection request
/// </summary>
/// <param name="connect">CONNECT message received from client</param>
/// <returns>Return code for CONNACK message</returns>
private byte MqttConnectVerify(MqttMsgConnect connect)
{
byte returnCode = MqttMsgConnack.CONN_ACCEPTED;
//0x01连接已拒绝不支持的协议版本
if ((connect.ProtocolVersion != PROTOCOL_VERSION_V3_1) &&
(connect.ProtocolVersion != PROTOCOL_VERSION_V3_1_1))
{
returnCode = MqttMsgConnack.CONN_REFUSED_PROT_VERS;
}
else
{
//0x02连接已拒绝不合格的客户端标识符 (only for old MQTT 3.1)
if ((connect.ProtocolVersion == PROTOCOL_VERSION_V3_1) &&
(connect.ClientId.Length > CLIENT_ID_MAX_LENGTH))
{
returnCode = MqttMsgConnack.CONN_REFUSED_IDENT_REJECTED;
}
else
{
// [v.3.1.1] client id zero length is allowed but clean session must be true
if ((connect.ClientId.Length == 0) && (!connect.CleanSession))
{
returnCode = MqttMsgConnack.CONN_REFUSED_IDENT_REJECTED;
}
else
{
// 校验用户身份,0x04连接已拒绝无效的用户名或密码
if (!this.uacManager.UserAuthentication(connect.Username , connect.Password))
{
returnCode = MqttMsgConnack.CONN_REFUSED_USERNAME_PASSWORD;
}
// server unavailable and not authorized ?
else
{
// TODO : other checks on CONNECT message
}
}
}
}
return returnCode;
}
/// <summary>
/// Return reference to a client with a specified Id is already connected
/// </summary>
/// <param name="clientId">Client Id to verify</param>
/// <returns>Reference to client</returns>
public MqttClient GetClient(string clientId)
{
var query = from c in this.clients
where c.ClientId == clientId
select c;
return query.FirstOrDefault();
}
}
}