You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

338 lines
11 KiB
C#

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

using POSV.OtherWaiMai;
using POSV.Proxy.ThirdPartyOrder;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages;
namespace POSV.Proxy.Mqtt
{
public class MqttClientUtils
{
private static object _lock = new object();
private static NLog.Logger logger = NLog.LogManager.GetCurrentClassLogger();
private static MqttClientUtils _instance = null;
private MqttClient mqttClient = null;
private System.Timers.Timer _reconnectTimer;
private Action<MqttMsgPublishEventArgs> _action = null;
public static MqttClientUtils Instance
{
get
{
if (_instance == null)
{
lock (_lock)
{
_instance = new MqttClientUtils();
}
}
return _instance;
}
}
private bool isRunning = false;
private void Reconnect()
{
//Global.Instance.Worker.StoreInfo.Id = "t_AGNTBICRy0 ";
bool isException = false;
try
{
if (isRunning)
{
logger.Info("连接mqtt正在运行忽略本次执行");
isException = true;
return;
}
isRunning = true;
logger.Info("尝试连接外卖消息中心.....");
//2022-12-13 11:06 zhangy Edit 更换IP地址为域名
this.mqttClient = new MqttClient("mqtt.ffcygl.com", 18830, false, null, null, MqttSslProtocols.None);
//this.mqttClient = new MqttClient("csmqtt.ffcygl.com", 18830, false, null, null, MqttSslProtocols.None);
//this.mqttClient = new MqttClient("127.0.0.1", 1883, false, null, null, MqttSslProtocols.None);
this.mqttClient.ConnectionClosed -= OnMqttConnectionClosed;
this.mqttClient.ConnectionClosed += OnMqttConnectionClosed;
this.mqttClient.MqttMsgSubscribed -= OnMqttMsgSubscribed;
this.mqttClient.MqttMsgSubscribed += OnMqttMsgSubscribed;
this.mqttClient.MqttMsgPublishReceived -= OnMqttMsgPublishReceived;
this.mqttClient.MqttMsgPublishReceived += OnMqttMsgPublishReceived;
string clientId = string.Format("{0}-{1}-{2}-{3}", "cy2", Global.Instance.Worker.TenantId, Global.Instance.Worker.StoreInfo.Id, Global.Instance.Authc.PosNo);
//默认的用户名和密码连接为MQTT消息中心启用密码访问准备
this.mqttClient.Connect(clientId, Global.Instance.Authc.PosNo, Global.Instance.Authc.PosId, false, 30);
}
catch (Exception ex)
{
isException = true;
if (this.IsMqttConnected)
{
this.mqttClient.Unsubscribe(this.Topics.Item1);
this.mqttClient.Disconnect();
}
this.mqttClient = null;
logger.Error(ex, "外卖消息中心连接异常");
}
finally
{
//没有异常发生
if (!isException)
{
logger.Info("连接外卖消息中心成功.....");
this.mqttClient.Subscribe(this.Topics.Item1, this.Topics.Item2);
}
isRunning = false;
}
}
private void OnMqttMsgSubscribed(object sender, MqttMsgSubscribedEventArgs e)
{
logger.Info("订阅外卖消息中心<{0}>...", e.MessageId);
}
private void OnMqttConnectionClosed(object sender, EventArgs e)
{
logger.Info("断开外卖消息中心连接...");
}
public bool IsMqttConnected
{
get
{
return this.mqttClient != null && this.mqttClient.IsConnected;
}
}
private void ReconnectCallback(object sender, System.Timers.ElapsedEventArgs e)
{
if (this.IsMqttConnected)
{
logger.Info("外卖消息中心连接状态:<{0}>,{1}", this.mqttClient.IsConnected, this.mqttClient.ClientId);
}
else
{
this.Reconnect();
}
}
/// <summary>
/// 处理收到的消息
/// </summary>
/// <param name="action"></param>
public void ReceiveMessage(Action<MqttMsgPublishEventArgs> action)
{
this._action = action;
}
/// <summary>
/// 外卖接单被pengjd调整到这里作为入口。
/// zhangy 2020-02-19 Add 添加注释
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
void OnMqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e)
{
string message = Encoding.UTF8.GetString(e.Message);
string topic = e.Topic;
logger.Info("服务端推送<{0}>消息:{1}", topic, message);
if (topic.StartsWith("program"))
{
logger.Info("收到巨为小程序订单");
JuWeiAppOrderProxy.sendJwWeixinMessage(message);
OnMessageReceived?.Invoke(sender, new MessageEventArgs(0, message));
//Task.Factory.StartNew(() =>
//{
// JuWeiAppOrderProxy.sendJwWeixinMessage(message);
// OnMessageReceived?.Invoke(sender, new MessageEventArgs(0, message));
//});
}
if (topic.StartsWith("waimai"))
{
//开启商家版美团外卖和餐道外卖
var enableShopMeiTuan = Global.Instance.GlobalConfigBoolValue(ConfigConstant.CONFIG_CASHIER_ENABLESHOPMEITUAN, false);
if (enableShopMeiTuan)
{
logger.Info("收到美团商家外卖订单");
OtherWaiMaiApi.sendSJWaiMaiMessage(message);
}
else
{
logger.Info("收到外卖订单");
OtherWaiMaiApi.sendWaiMaiMessage(message);
}
OnMessageReceived?.Invoke(sender, new MessageEventArgs(0, message));
}
if (topic.StartsWith("weixin"))
{
logger.Info("微信小程序订单");
OtherWaiMaiApi.sendWeixinMessage(message);
OnMessageReceived?.Invoke(sender, new MessageEventArgs(1, message));
}
if (topic.StartsWith("qimai"))
{
logger.Info("收到企迈第三方订单");
OtherWaiMaiApi.sendQiMaiMessage(message);
OnMessageReceived?.Invoke(sender, new MessageEventArgs(1, message));
}
this._action?.Invoke(e);
}
public ushort Subscribe(string topics)
{
return this.mqttClient.Subscribe(new string[] { topics }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
}
public ushort Subscribe(string[] topics, byte[] levels)
{
return this.mqttClient.Subscribe(topics, levels);
}
public ushort Publish(string topic, string message)
{
return this.mqttClient.Publish(topic, Encoding.UTF8.GetBytes(message), MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, false);
}
public delegate void EventHandler(object sender, MessageEventArgs e);
public event EventHandler OnMessageReceived;
private Tuple<string[], byte[]> Topics
{
get
{
//运维参数
string tenant = string.Format("{0}/{1}/{2}", "devops", "cy2", Global.Instance.Worker.TenantId);
string shop = string.Format("{0}/{1}/{2}/{3}", "devops", "cy2", Global.Instance.Worker.TenantId, Global.Instance.Worker.StoreInfo.No);
string pos = string.Format("{0}/{1}/{2}/{3}/{4}", "devops", "cy2", Global.Instance.Worker.TenantId, Global.Instance.Worker.StoreInfo.No, Global.Instance.Authc.PosNo);
//业务参数
string waimai = string.Format("{0}/{1}/{2}/{3}", "waimai", "cy2", Global.Instance.Worker.TenantId, Global.Instance.Worker.StoreInfo.Id);
string weixin = string.Format("{0}/{1}/{2}/{3}", "weixin", "cy2", Global.Instance.Worker.TenantId, Global.Instance.Worker.StoreInfo.Id);
string qimai = string.Format("{0}/{1}/{2}/{3}", "qimai", "cy2", Global.Instance.Worker.TenantId, Global.Instance.Worker.StoreInfo.Id);
string program = string.Format("{0}/{1}/{2}/{3}", "program", "cy2", Global.Instance.Worker.TenantId, Global.Instance.Worker.StoreInfo.Id);
string[] topics = { waimai, weixin, qimai, program, tenant, shop, pos };
byte[] levels = { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE };
return new Tuple<string[], byte[]>(topics, levels);
}
}
/// <summary>
/// 启动任务
/// </summary>
public void Startup()
{
try
{
if (this._reconnectTimer == null)
{
this._reconnectTimer = new System.Timers.Timer();
}
//设置定时间隔(毫秒为单位)
this._reconnectTimer.Interval = 55000;
//设置执行一次false还是一直执行(true)
this._reconnectTimer.AutoReset = true;
//设置是否执行System.Timers.Timer.Elapsed事件
this._reconnectTimer.Enabled = true;
//绑定Elapsed事件
this._reconnectTimer.Elapsed += ReconnectCallback;
logger.Info("初始化外卖消息中心......");
}
catch (Exception ex)
{
logger.Info(ex, "外卖消息中心启动异常");
}
finally
{
Reconnect();
}
}
/// <summary>
/// 停止任务
/// </summary>
public void Stop()
{
if (this.IsMqttConnected)
{
//取消订阅
this.mqttClient.Unsubscribe(this.Topics.Item1);
//断开连接
this.mqttClient.Disconnect();
//关闭时间
this._reconnectTimer.Stop();
}
}
}
public class MessageEventArgs : EventArgs
{
private readonly int _type;
private readonly string _message;
public MessageEventArgs(int type, string message)
{
this._type = type;
this._message = message;
}
public string Message
{
get
{
return _message;
}
}
public int Type
{
get
{
return _type;
}
}
}
}