package com.fastbee.mqtt.handler;
|
|
import com.fastbee.common.constant.FastBeeConstant;
|
import com.fastbee.common.core.mq.DeviceReportBo;
|
import com.fastbee.common.core.redis.RedisCache;
|
import com.fastbee.common.enums.ServerType;
|
import com.fastbee.common.utils.DateUtils;
|
import com.fastbee.common.utils.gateway.mq.TopicsUtils;
|
import com.fastbee.mq.redischannel.producer.MessageProducer;
|
import com.fastbee.mq.service.IDeviceReportMessageService;
|
import com.fastbee.mqtt.annotation.Process;
|
import com.fastbee.mqtt.handler.adapter.MqttHandler;
|
import com.fastbee.mqtt.manager.ClientManager;
|
import com.fastbee.mqtt.manager.ResponseManager;
|
import com.fastbee.mqtt.manager.RetainMsgManager;
|
import com.fastbee.mqtt.model.ClientMessage;
|
import com.fastbee.mqtt.service.IMessageStore;
|
import com.fastbee.base.session.Session;
|
import com.fastbee.base.util.AttributeUtils;
|
import io.netty.buffer.ByteBufUtil;
|
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.handler.codec.mqtt.*;
|
import lombok.SneakyThrows;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import javax.annotation.Resource;
|
|
/**
|
* 客户端消息推送处理类
|
*
|
* @author bill
|
*/
|
@Slf4j
|
@Process(type = MqttMessageType.PUBLISH)
|
public class MqttPublish implements MqttHandler {
|
|
@Autowired
|
private IMessageStore messageStore;
|
@Resource
|
private TopicsUtils topicsUtils;
|
@Resource
|
private RedisCache redisCache;
|
@Resource
|
private IDeviceReportMessageService deviceReportMessageService;
|
|
@Override
|
public void handler(ChannelHandlerContext ctx, MqttMessage message) {
|
MqttPublishMessage publishMessage = (MqttPublishMessage) message;
|
/*获取客户端id*/
|
String clientId = AttributeUtils.getClientId(ctx.channel());
|
String topicName = publishMessage.variableHeader().topicName();
|
log.debug("=>***客户端[{}],主题[{}],推送消息[{}]", clientId, topicName,
|
ByteBufUtil.hexDump(publishMessage.content()));
|
// 以get结尾是模拟客户端数据,只转发消息
|
if (topicName.endsWith(FastBeeConstant.MQTT.PROPERTY_GET_SIMULATE)) {
|
sendTestToMQ(publishMessage);
|
} else {
|
/*获取客户端session*/
|
Session session = AttributeUtils.getSession(ctx.channel());
|
/*推送保留信息*/
|
pubRetain(publishMessage);
|
/*响应客户端消息到达Broker*/
|
callBack(session, publishMessage, clientId);
|
/*推送到订阅的客户端*/
|
sendMessageToClients(publishMessage);
|
/*推送到MQ处理*/
|
sendToMQ(publishMessage);
|
/*累计接收消息数*/
|
redisCache.incr2(FastBeeConstant.REDIS.MESSAGE_RECEIVE_TOTAL, -1L);
|
redisCache.incr2(FastBeeConstant.REDIS.MESSAGE_RECEIVE_TODAY, 60 * 60 * 24);
|
}
|
}
|
|
/**
|
* 消息推送
|
*
|
* @param message 推送消息
|
*/
|
@SneakyThrows
|
public void sendToMQ(MqttPublishMessage message) {
|
/*获取topic*/
|
String topicName = message.variableHeader().topicName();
|
/*只处理上报数据*/
|
if (!topicName.endsWith(FastBeeConstant.MQTT.UP_TOPIC_SUFFIX)) {
|
return;
|
}
|
DeviceReportBo reportBo = DeviceReportBo.builder()
|
.serialNumber(topicsUtils.parseSerialNumber(topicName))
|
.topicName(topicName)
|
.packetId((long) message.variableHeader().packetId())
|
.platformDate(DateUtils.getNowDate())
|
.data(ByteBufUtil.getBytes(message.content()))
|
.serverType(ServerType.MQTT)
|
.build();
|
if (topicName.endsWith(FastBeeConstant.TOPIC.MSG_REPLY) ||
|
topicName.endsWith(FastBeeConstant.TOPIC.SUB_UPGRADE_REPLY) ||
|
topicName.endsWith(FastBeeConstant.TOPIC.UPGRADE_REPLY)) {
|
/*设备应答服务器回调数据*/
|
reportBo.setReportType(2);
|
} else {
|
/*设备上报数据*/
|
reportBo.setReportType(1);
|
}
|
if (topicName.contains("property")) {
|
deviceReportMessageService.parseReportMsg(reportBo);
|
}
|
|
}
|
|
/**
|
* 发送模拟数据进行处理
|
* @param message
|
*/
|
public void sendTestToMQ(MqttPublishMessage message) {
|
/*获取topic*/
|
String topicName = message.variableHeader().topicName();
|
DeviceReportBo reportBo = DeviceReportBo.builder()
|
.serialNumber(topicsUtils.parseSerialNumber(topicName))
|
.topicName(topicName)
|
.packetId((long) message.variableHeader().packetId())
|
.platformDate(DateUtils.getNowDate())
|
.data(ByteBufUtil.getBytes(message.content()))
|
.build();
|
MessageProducer.sendOtherMsg(reportBo);
|
}
|
|
|
/**
|
* 推送消息到订阅客户端
|
*
|
* @param message 消息
|
*/
|
public void sendMessageToClients(MqttPublishMessage message) {
|
ClientManager.pubTopic(message);
|
}
|
|
|
/**
|
* 应答客户端,消息到达Broker
|
*
|
* @param session 客户端
|
* @param message 消息
|
*/
|
private void callBack(Session session, MqttPublishMessage message, String clientId) {
|
/*获取消息等级*/
|
MqttQoS mqttQoS = message.fixedHeader().qosLevel();
|
int packetId = message.variableHeader().packetId();
|
MqttFixedHeader header;
|
switch (mqttQoS.value()) {
|
/*0,1消息等级,直接回复*/
|
case 0:
|
case 1:
|
header = new MqttFixedHeader(MqttMessageType.PUBACK, false, mqttQoS, false, 0);
|
break;
|
case 2:
|
// 处理Qos2的消息确认
|
if (!messageStore.outRelContains(packetId)) {
|
messageStore.saveRelInMsg(packetId);
|
}
|
header = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0);
|
break;
|
default:
|
header = null;
|
}
|
/*处理消息等级*/
|
handleMqttQos(packetId, mqttQoS, true, clientId);
|
/*响应客户端*/
|
MqttMessageIdVariableHeader variableHeader = null;
|
if (packetId > 0) {
|
variableHeader = MqttMessageIdVariableHeader.from(packetId);
|
}
|
MqttPubAckMessage ackMessage = new MqttPubAckMessage(header, variableHeader);
|
if (mqttQoS.value() >= 1) {
|
ResponseManager.responseMessage(session, ackMessage, true);
|
}
|
/*更新客户端ping时间*/
|
ClientManager.updatePing(session.getClientId());
|
|
}
|
|
/**
|
* Qos不同消息处理
|
*/
|
private void handleMqttQos(int packetId, MqttQoS qoS, boolean clearSession, String clientId) {
|
if (qoS == MqttQoS.AT_LEAST_ONCE || qoS == MqttQoS.EXACTLY_ONCE) {
|
ClientMessage clientMessage = ClientMessage.of(clientId, qoS, null, false);
|
messageStore.savePubMsg(packetId, clientMessage);
|
}
|
}
|
|
|
/**
|
* 推送保留信息
|
*/
|
@SneakyThrows
|
private void pubRetain(MqttPublishMessage message) {
|
redisCache.incr2(FastBeeConstant.REDIS.MESSAGE_RETAIN_TOTAL, -1L);
|
/*根据message.fixedHeader().isRetain() 判断是否有保留信息*/
|
RetainMsgManager.pushMessage(message);
|
}
|
|
|
}
|