You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

301 lines
9.7 KiB
C#

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

using NLog;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using uPLibrary.Networking.M2Mqtt.Utility;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages;
using POSV.MessageEvent;
using POSV.MsgCenter;
using POSV.OtherWaiMai;
namespace POSV.Utils
{
/// <summary>
/// zhangy Add 2020-01-16 发现该类没有被使用已经被pengjd迁移到POSV.Proxy包中,
/// </summary>
public class MqttClientUtils
{
private static object _lock = new object();
private static Logger logger = NLog.LogManager.GetCurrentClassLogger();
private static MqttClientUtils _instance = null;
private MqttClient mqttClient = null;
private System.Timers.Timer _reconnectTimer;
private Action<MqttMsgPublishEventArgs> _action = null;
public static MqttClientUtils Instance
{
get
{
if (_instance == null)
{
lock (_lock)
{
_instance = new MqttClientUtils();
}
}
return _instance;
}
}
private bool isRunning = false;
private void Reconnect()
{
bool isException = false;
try
{
if (isRunning)
{
logger.Info("连接mqtt正在运行忽略本次执行");
isException = true;
return;
}
isRunning = true;
logger.Info("尝试连接外卖消息中心.....");
this.mqttClient = new MqttClient("116.62.57.54" , 18830 , false , null , null , MqttSslProtocols.None);
this.mqttClient.ConnectionClosed -= OnMqttConnectionClosed;
this.mqttClient.ConnectionClosed += OnMqttConnectionClosed;
this.mqttClient.MqttMsgSubscribed -= OnMqttMsgSubscribed;
this.mqttClient.MqttMsgSubscribed += OnMqttMsgSubscribed;
this.mqttClient.MqttMsgPublishReceived -= OnMqttMsgPublishReceived;
this.mqttClient.MqttMsgPublishReceived += OnMqttMsgPublishReceived;
string clientId = string.Format("{0}-{1}-{2}-{3}" , "cy2" , Global.Instance.Worker.TenantId , Global.Instance.Worker.StoreInfo.Id , Global.Instance.Authc.PosNo);
//默认的用户名和密码连接为MQTT消息中心启用密码访问准备
this.mqttClient.Connect(clientId , Global.Instance.Authc.PosNo , Global.Instance.Authc.PosId , false , 30);
}
catch (Exception ex)
{
isException = true;
this.mqttClient = null;
logger.Error(ex , "外卖消息中心连接异常");
}
finally
{
//没有异常发生
if (!isException)
{
logger.Info("连接外卖消息中心成功.....");
this.mqttClient.Subscribe(this.Topics.Item1 , this.Topics.Item2);
}
isRunning = false;
}
}
private void OnMqttMsgSubscribed(object sender , MqttMsgSubscribedEventArgs e)
{
logger.Info("订阅外卖消息中心<{0}>..." , e.MessageId);
}
private void OnMqttConnectionClosed(object sender , EventArgs e)
{
logger.Info("断开外卖消息中心连接...");
}
public bool IsMqttConnected
{
get
{
return this.mqttClient != null && this.mqttClient.IsConnected;
}
}
private void ReconnectCallback(object sender, System.Timers.ElapsedEventArgs e)
{
if (this.IsMqttConnected)
{
logger.Info("外卖消息中心连接状态:<{0}>,{1}" , this.mqttClient.IsConnected , this.mqttClient.ClientId);
}
else
{
this.Reconnect();
}
}
////使用方式
//var action = new Action<MqttMsgPublishEventArgs>((args) => {
// LOGGER.Info("收到消息通知:<{0}>,{1}", args.Topic, Encoding.UTF8.GetString(args.Message));
//});
//MqttClientUtils.Instance.ReceiveMessage(action);
/// <summary>
/// 处理收到的消息
/// </summary>
/// <param name="action"></param>
public void ReceiveMessage(Action<MqttMsgPublishEventArgs> action)
{
this._action = action;
}
void OnMqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e)
{
logger.Info(Encoding.UTF8.GetString(e.Message));
string message = Encoding.UTF8.GetString(e.Message);
string topic = e.Topic;
if (topic.StartsWith("program"))
{
//JwWeiXinOrderFrom
logger.Info("巨为小程序订单消息来了==========>{0}", message);
OnMessageReceived?.Invoke(sender, new MessageEventArgs(1, message));
}
if (topic.StartsWith("waimai"))
{
OtherWaiMaiApi.sendWaiMaiMessage(message);
logger.Info("外卖订单来了==========>{0}", message);
OnMessageReceived?.Invoke(sender, new MessageEventArgs(0, message));
}
if (topic.StartsWith("weixin"))
{
OtherWaiMaiApi.sendWeixinMessage(message);
logger.Info("微信H5订单消息来了==========>{0}", message);
OnMessageReceived?.Invoke(sender, new MessageEventArgs(1, message));
}
if (topic.StartsWith("qimai"))
{
OtherWaiMaiApi.sendQiMaiMessage(message);
logger.Info("企迈小程序订单消息来了==========>{0}", message);
OnMessageReceived?.Invoke(sender, new MessageEventArgs(1, message));
}
this._action?.Invoke(e);
}
public ushort Subscribe(string topics)
{
return this.mqttClient.Subscribe(new string[] { topics }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
}
public ushort Subscribe(string[] topics, byte[] levels)
{
return this.mqttClient.Subscribe(topics, levels);
}
public ushort Publish(string topic, string message)
{
return this.mqttClient.Publish(topic, Encoding.UTF8.GetBytes(message), MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, false);
}
public delegate void EventHandler(object sender, MessageEventArgs e);
public event EventHandler OnMessageReceived;
private Tuple<string[] , byte[]> Topics
{
get
{
string waimai = string.Format("{0}/{1}/{2}/{3}" , "waimai" , "cy2", Global.Instance.Worker.TenantId , Global.Instance.Worker.StoreInfo.Id);
string weixin = string.Format("{0}/{1}/{2}/{3}" , "weixin" , "cy2", Global.Instance.Worker.TenantId , Global.Instance.Worker.StoreInfo.Id);
string qimai = string.Format("{0}/{1}/{2}/{3}", "qimai", "cy2", Global.Instance.Worker.TenantId, Global.Instance.Worker.StoreInfo.Id);
string program = string.Format("{0}/{1}/{2}/{3}", "program", "cy2", Global.Instance.Worker.TenantId, Global.Instance.Worker.StoreInfo.Id);
string[] topics = { waimai , weixin, qimai, program };
byte[] levels = { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE , MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE };
return new Tuple<string[] , byte[]>(topics , levels);
}
}
/// <summary>
/// 启动任务
/// </summary>
public void Startup()
{
try
{
if (this._reconnectTimer == null)
{
this._reconnectTimer = new System.Timers.Timer();
}
//设置定时间隔(毫秒为单位)
this._reconnectTimer.Interval = 55000;
//设置执行一次false还是一直执行(true)
this._reconnectTimer.AutoReset = true;
//设置是否执行System.Timers.Timer.Elapsed事件
this._reconnectTimer.Enabled = true;
//绑定Elapsed事件
this._reconnectTimer.Elapsed += ReconnectCallback;
logger.Info("初始化外卖消息中心......");
}
catch (Exception ex)
{
logger.Info(ex , "外卖消息中心启动异常");
}
finally
{
Reconnect();
}
}
/// <summary>
/// 停止任务
/// </summary>
public void Stop()
{
if (this.IsMqttConnected)
{
//取消订阅
this.mqttClient.Unsubscribe(this.Topics.Item1);
//断开连接
this.mqttClient.Disconnect();
//关闭时间
this._reconnectTimer.Stop();
}
}
}
public class MessageEventArgs : EventArgs
{
private readonly int _type;
private readonly string _message;
public MessageEventArgs(int type, string message)
{
this._type = type;
this._message = message;
}
public string Message
{
get
{
return _message;
}
}
public int Type
{
get
{
return _type;
}
}
}
}