You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
279 lines
8.3 KiB
C#
279 lines
8.3 KiB
C#
using NLog;
|
|
using System;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Text;
|
|
using uPLibrary.Networking.M2Mqtt;
|
|
using uPLibrary.Networking.M2Mqtt.Messages;
|
|
using uPLibrary.Networking.M2Mqtt.Utility;
|
|
using POSV.Common;
|
|
using POSV.Common.Transport;
|
|
using POSV.Entity;
|
|
using POSV.HttpApi;
|
|
|
|
namespace POSV.Utils
|
|
{
|
|
public class MqttUtils
|
|
{
|
|
private static object _lock = new object();
|
|
|
|
private static Logger logger = NLog.LogManager.GetCurrentClassLogger();
|
|
|
|
private static MqttUtils _instance = null;
|
|
|
|
private MqttClient _mqttClient = null;
|
|
|
|
private System.Timers.Timer _reconnect;
|
|
|
|
public static MqttUtils Instance
|
|
{
|
|
get
|
|
{
|
|
if (_instance == null)
|
|
{
|
|
lock (_lock)
|
|
{
|
|
_instance = new MqttUtils();
|
|
|
|
logger.Info("初始化MQTT服务......");
|
|
|
|
_instance.InitMqtt();
|
|
|
|
}
|
|
}
|
|
|
|
return _instance;
|
|
}
|
|
}
|
|
|
|
private void InitMqtt()
|
|
{
|
|
try
|
|
{
|
|
//获取消息中心配置
|
|
var messageCenter = Global.Instance.EnableServiceCenter;
|
|
//全局沽清走推送通道
|
|
var globalSaleClear = Global.Instance.GlobalConfigBoolValue(ConfigConstant.DEVICE_SALECLEAR_ENABLE, false);
|
|
|
|
if (messageCenter.Item1 && globalSaleClear)
|
|
{
|
|
|
|
this._reconnect = new System.Timers.Timer(5000);
|
|
|
|
this._reconnect.AutoReset = true;
|
|
this._reconnect.Enabled = true;
|
|
|
|
this._reconnect.Elapsed += ReconnectCallback;
|
|
|
|
this._reconnect.Start();
|
|
logger.Info("配置启用服务中心mqtt,开启监控");
|
|
}
|
|
else
|
|
{
|
|
logger.Info("未配置启用服务中心mqtt,关闭监控");
|
|
}
|
|
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
logger.Error(ex , "消息中心初始化异常");
|
|
}
|
|
}
|
|
|
|
public string ClientId
|
|
{
|
|
get
|
|
{
|
|
return string.Format("{0}_{1}" , Global.Instance.AppSign , Global.Instance.Authc.PosNo);
|
|
}
|
|
}
|
|
|
|
private void ReconnectCallback(object sender , System.Timers.ElapsedEventArgs e)
|
|
{
|
|
try
|
|
{
|
|
logger.Info("检测推送服务健康状况<{0}>...." , this._mqttClient == null ? false : this._mqttClient.IsConnected);
|
|
|
|
if (this._mqttClient == null || !this._mqttClient.IsConnected)
|
|
{
|
|
Connect();
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Global.Instance.BugReport(ex, "MQTT重连推送服务异常");
|
|
logger.Info(ex , "重连推送服务异常");
|
|
}
|
|
}
|
|
|
|
|
|
private void DefaultSubscribe()
|
|
{
|
|
//订购沽清变动通知
|
|
this._mqttClient.Subscribe(new string[] { MqttTopic.TOPIC_SALE_CLEAR } , new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
|
|
|
|
}
|
|
|
|
public void Publish(string payload)
|
|
{
|
|
if (this._mqttClient != null && this._mqttClient.IsConnected)
|
|
{
|
|
this._mqttClient.Publish(MqttTopic.TOPIC_PROXY_CENTER,Encoding.UTF8.GetBytes(payload));
|
|
}
|
|
}
|
|
|
|
|
|
void OnMqttMsgPublishReceived(object sender , MqttMsgPublishEventArgs e)
|
|
{
|
|
var json = Encoding.UTF8.GetString(e.Message);
|
|
|
|
MessageType messageType = MessageType.None;
|
|
var parse = DynamicJson.Parse(json);
|
|
if (parse.IsDefined("type"))
|
|
{
|
|
messageType = (MessageType)(Convert.ToInt32(parse["type"]));
|
|
}
|
|
|
|
logger.Info("收到消息中心通知:<{0}>" , messageType.ToString());
|
|
|
|
switch (messageType)
|
|
{
|
|
case MessageType.SaleClear:
|
|
{
|
|
var transport = JsonUtils.Deserialize<TransportMessage<SaleClear>>(json);
|
|
var saleClear = transport.Data;
|
|
|
|
SaleClearUtils.Instance.Maps[saleClear.ProductId] = saleClear;
|
|
}
|
|
break;
|
|
case MessageType.ProxySaleClear:
|
|
{
|
|
var transport = JsonUtils.Deserialize<TransportMessage<SaleClear>>(json);
|
|
var saleClear = transport.Data;
|
|
logger.Info($"收到消息中心通知:<{messageType.ToString()}>,Data:{JsonUtils.Serialize(saleClear)}");
|
|
SaleClearUtils.Instance.SaveOrUpdateNoChanged(saleClear);
|
|
}
|
|
break;
|
|
default:
|
|
{
|
|
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
|
|
private bool Connect()
|
|
{
|
|
bool isSuccess = false;
|
|
|
|
try
|
|
{
|
|
//获取消息中心配置
|
|
var messageCenter = Global.Instance.EnableServiceCenter;
|
|
//全局沽清走推送通道
|
|
var globalSaleClear = Global.Instance.GlobalConfigBoolValue(ConfigConstant.DEVICE_SALECLEAR_ENABLE,false);
|
|
|
|
if (messageCenter.Item1 && globalSaleClear)
|
|
{
|
|
string host = Global.Instance.GlobalConfigStringValue(ConfigConstant.DEVICE_SERVICECENTER_IP);
|
|
|
|
int port = Global.Instance.GlobalConfigIntValue(ConfigConstant.DEVICE_SERVICECENTER_PORT);
|
|
|
|
//MQTT端口为基础端口+2
|
|
this._mqttClient = new MqttClient(host , port + 2 , false , null , null , MqttSslProtocols.None);
|
|
|
|
// register to message received
|
|
this._mqttClient.MqttMsgPublishReceived -= OnMqttMsgPublishReceived;
|
|
this._mqttClient.MqttMsgPublishReceived += OnMqttMsgPublishReceived;
|
|
|
|
this._mqttClient.ConnectionClosed -= OnConnectionClosed;
|
|
this._mqttClient.ConnectionClosed += OnConnectionClosed;
|
|
|
|
this._mqttClient.Connect(this.ClientId);
|
|
|
|
isSuccess = this._mqttClient.IsConnected;
|
|
}
|
|
else
|
|
{
|
|
logger.Info(messageCenter.Item2);
|
|
}
|
|
}
|
|
catch(Exception ex)
|
|
{
|
|
this._mqttClient = null;
|
|
|
|
isSuccess = false;
|
|
Global.Instance.BugReport(ex, "MQTT连接失败");
|
|
logger.Error(ex,"连接失败");
|
|
}
|
|
finally
|
|
{
|
|
if(isSuccess)
|
|
{
|
|
//加载默认的订购主题
|
|
this.DefaultSubscribe();
|
|
}
|
|
}
|
|
|
|
return isSuccess;
|
|
}
|
|
|
|
private void OnConnectionClosed(object sender , EventArgs e)
|
|
{
|
|
logger.Info("收银机断开连接......");
|
|
}
|
|
|
|
/// <summary>
|
|
/// 启动任务
|
|
/// </summary>
|
|
public void Startup()
|
|
{
|
|
try
|
|
{
|
|
logger.Info("启动连接MQTT服务......");
|
|
|
|
var connected = Connect();
|
|
if (connected)
|
|
{
|
|
logger.Info("连接MQTT服务成功");
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Global.Instance.BugReport(ex, "MQTT服务连接失败");
|
|
logger.Error(ex,"连接MQTT服务异常");
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 停止任务
|
|
/// </summary>
|
|
public void Stop()
|
|
{
|
|
if (this._mqttClient != null)
|
|
{
|
|
this._mqttClient.Disconnect();
|
|
|
|
this._mqttClient = null;
|
|
|
|
this._reconnect.Stop();
|
|
|
|
this._reconnect = null;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 全部重载
|
|
/// </summary>
|
|
public void Restart()
|
|
{
|
|
//if (this._mqttClient != null)
|
|
//{
|
|
// this.Stop();
|
|
//}
|
|
|
|
//this.Startup();
|
|
}
|
|
}
|
|
}
|