You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

279 lines
8.3 KiB
C#

using NLog;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages;
using uPLibrary.Networking.M2Mqtt.Utility;
using POSV.Common;
using POSV.Common.Transport;
using POSV.Entity;
using POSV.HttpApi;
namespace POSV.Utils
{
public class MqttUtils
{
private static object _lock = new object();
private static Logger logger = NLog.LogManager.GetCurrentClassLogger();
private static MqttUtils _instance = null;
private MqttClient _mqttClient = null;
private System.Timers.Timer _reconnect;
public static MqttUtils Instance
{
get
{
if (_instance == null)
{
lock (_lock)
{
_instance = new MqttUtils();
logger.Info("初始化MQTT服务......");
_instance.InitMqtt();
}
}
return _instance;
}
}
private void InitMqtt()
{
try
{
//获取消息中心配置
var messageCenter = Global.Instance.EnableServiceCenter;
//全局沽清走推送通道
var globalSaleClear = Global.Instance.GlobalConfigBoolValue(ConfigConstant.DEVICE_SALECLEAR_ENABLE, false);
if (messageCenter.Item1 && globalSaleClear)
{
this._reconnect = new System.Timers.Timer(5000);
this._reconnect.AutoReset = true;
this._reconnect.Enabled = true;
this._reconnect.Elapsed += ReconnectCallback;
this._reconnect.Start();
logger.Info("配置启用服务中心mqtt,开启监控");
}
else
{
logger.Info("未配置启用服务中心mqtt,关闭监控");
}
}
catch (Exception ex)
{
logger.Error(ex , "消息中心初始化异常");
}
}
public string ClientId
{
get
{
return string.Format("{0}_{1}" , Global.Instance.AppSign , Global.Instance.Authc.PosNo);
}
}
private void ReconnectCallback(object sender , System.Timers.ElapsedEventArgs e)
{
try
{
logger.Info("检测推送服务健康状况<{0}>...." , this._mqttClient == null ? false : this._mqttClient.IsConnected);
if (this._mqttClient == null || !this._mqttClient.IsConnected)
{
Connect();
}
}
catch (Exception ex)
{
Global.Instance.BugReport(ex, "MQTT重连推送服务异常");
logger.Info(ex , "重连推送服务异常");
}
}
private void DefaultSubscribe()
{
//订购沽清变动通知
this._mqttClient.Subscribe(new string[] { MqttTopic.TOPIC_SALE_CLEAR } , new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
}
public void Publish(string payload)
{
if (this._mqttClient != null && this._mqttClient.IsConnected)
{
this._mqttClient.Publish(MqttTopic.TOPIC_PROXY_CENTER,Encoding.UTF8.GetBytes(payload));
}
}
void OnMqttMsgPublishReceived(object sender , MqttMsgPublishEventArgs e)
{
var json = Encoding.UTF8.GetString(e.Message);
MessageType messageType = MessageType.None;
var parse = DynamicJson.Parse(json);
if (parse.IsDefined("type"))
{
messageType = (MessageType)(Convert.ToInt32(parse["type"]));
}
logger.Info("收到消息中心通知:<{0}>" , messageType.ToString());
switch (messageType)
{
case MessageType.SaleClear:
{
var transport = JsonUtils.Deserialize<TransportMessage<SaleClear>>(json);
var saleClear = transport.Data;
SaleClearUtils.Instance.Maps[saleClear.ProductId] = saleClear;
}
break;
case MessageType.ProxySaleClear:
{
var transport = JsonUtils.Deserialize<TransportMessage<SaleClear>>(json);
var saleClear = transport.Data;
logger.Info($"收到消息中心通知:<{messageType.ToString()}>,Data:{JsonUtils.Serialize(saleClear)}");
SaleClearUtils.Instance.SaveOrUpdateNoChanged(saleClear);
}
break;
default:
{
}
break;
}
}
private bool Connect()
{
bool isSuccess = false;
try
{
//获取消息中心配置
var messageCenter = Global.Instance.EnableServiceCenter;
//全局沽清走推送通道
var globalSaleClear = Global.Instance.GlobalConfigBoolValue(ConfigConstant.DEVICE_SALECLEAR_ENABLE,false);
if (messageCenter.Item1 && globalSaleClear)
{
string host = Global.Instance.GlobalConfigStringValue(ConfigConstant.DEVICE_SERVICECENTER_IP);
int port = Global.Instance.GlobalConfigIntValue(ConfigConstant.DEVICE_SERVICECENTER_PORT);
//MQTT端口为基础端口+2
this._mqttClient = new MqttClient(host , port + 2 , false , null , null , MqttSslProtocols.None);
// register to message received
this._mqttClient.MqttMsgPublishReceived -= OnMqttMsgPublishReceived;
this._mqttClient.MqttMsgPublishReceived += OnMqttMsgPublishReceived;
this._mqttClient.ConnectionClosed -= OnConnectionClosed;
this._mqttClient.ConnectionClosed += OnConnectionClosed;
this._mqttClient.Connect(this.ClientId);
isSuccess = this._mqttClient.IsConnected;
}
else
{
logger.Info(messageCenter.Item2);
}
}
catch(Exception ex)
{
this._mqttClient = null;
isSuccess = false;
Global.Instance.BugReport(ex, "MQTT连接失败");
logger.Error(ex,"连接失败");
}
finally
{
if(isSuccess)
{
//加载默认的订购主题
this.DefaultSubscribe();
}
}
return isSuccess;
}
private void OnConnectionClosed(object sender , EventArgs e)
{
logger.Info("收银机断开连接......");
}
/// <summary>
/// 启动任务
/// </summary>
public void Startup()
{
try
{
logger.Info("启动连接MQTT服务......");
var connected = Connect();
if (connected)
{
logger.Info("连接MQTT服务成功");
}
}
catch (Exception ex)
{
Global.Instance.BugReport(ex, "MQTT服务连接失败");
logger.Error(ex,"连接MQTT服务异常");
}
}
/// <summary>
/// 停止任务
/// </summary>
public void Stop()
{
if (this._mqttClient != null)
{
this._mqttClient.Disconnect();
this._mqttClient = null;
this._reconnect.Stop();
this._reconnect = null;
}
}
/// <summary>
/// 全部重载
/// </summary>
public void Restart()
{
//if (this._mqttClient != null)
//{
// this.Stop();
//}
//this.Startup();
}
}
}