leutu
2024-06-03 3ef35e6cd16bbfa206b26bb3271eac40ad020bcb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
package com.fastbee.mqtt.handler;
 
import com.fastbee.common.constant.FastBeeConstant;
import com.fastbee.common.core.mq.DeviceReportBo;
import com.fastbee.common.core.redis.RedisCache;
import com.fastbee.common.enums.ServerType;
import com.fastbee.common.utils.DateUtils;
import com.fastbee.common.utils.gateway.mq.TopicsUtils;
import com.fastbee.mq.redischannel.producer.MessageProducer;
import com.fastbee.mq.service.IDeviceReportMessageService;
import com.fastbee.mqtt.annotation.Process;
import com.fastbee.mqtt.handler.adapter.MqttHandler;
import com.fastbee.mqtt.manager.ClientManager;
import com.fastbee.mqtt.manager.ResponseManager;
import com.fastbee.mqtt.manager.RetainMsgManager;
import com.fastbee.mqtt.model.ClientMessage;
import com.fastbee.mqtt.service.IMessageStore;
import com.fastbee.base.session.Session;
import com.fastbee.base.util.AttributeUtils;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.*;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
 
import javax.annotation.Resource;
 
/**
 * 客户端消息推送处理类
 *
 * @author bill
 */
@Slf4j
@Process(type = MqttMessageType.PUBLISH)
public class MqttPublish implements MqttHandler {
 
    @Autowired
    private IMessageStore messageStore;
    @Resource
    private TopicsUtils topicsUtils;
    @Resource
    private RedisCache redisCache;
    @Resource
    private IDeviceReportMessageService deviceReportMessageService;
 
    @Override
    public void handler(ChannelHandlerContext ctx, MqttMessage message) {
        MqttPublishMessage publishMessage = (MqttPublishMessage) message;
        /*获取客户端id*/
        String clientId = AttributeUtils.getClientId(ctx.channel());
        String topicName = publishMessage.variableHeader().topicName();
        log.debug("=>***客户端[{}],主题[{}],推送消息[{}]", clientId, topicName,
                ByteBufUtil.hexDump(publishMessage.content()));
        // 以get结尾是模拟客户端数据,只转发消息
        if (topicName.endsWith(FastBeeConstant.MQTT.PROPERTY_GET_SIMULATE)) {
            sendTestToMQ(publishMessage);
        } else {
            /*获取客户端session*/
            Session session = AttributeUtils.getSession(ctx.channel());
            /*推送保留信息*/
            pubRetain(publishMessage);
            /*响应客户端消息到达Broker*/
            callBack(session, publishMessage, clientId);
            /*推送到订阅的客户端*/
            sendMessageToClients(publishMessage);
            /*推送到MQ处理*/
            sendToMQ(publishMessage);
            /*累计接收消息数*/
            redisCache.incr2(FastBeeConstant.REDIS.MESSAGE_RECEIVE_TOTAL, -1L);
            redisCache.incr2(FastBeeConstant.REDIS.MESSAGE_RECEIVE_TODAY, 60 * 60 * 24);
        }
    }
 
    /**
     * 消息推送
     *
     * @param message 推送消息
     */
    @SneakyThrows
    public void sendToMQ(MqttPublishMessage message) {
        /*获取topic*/
        String topicName = message.variableHeader().topicName();
        /*只处理上报数据*/
        if (!topicName.endsWith(FastBeeConstant.MQTT.UP_TOPIC_SUFFIX)) {
            return;
        }
        DeviceReportBo reportBo = DeviceReportBo.builder()
                .serialNumber(topicsUtils.parseSerialNumber(topicName))
                .topicName(topicName)
                .packetId((long) message.variableHeader().packetId())
                .platformDate(DateUtils.getNowDate())
                .data(ByteBufUtil.getBytes(message.content()))
                .serverType(ServerType.MQTT)
                .build();
        if (topicName.endsWith(FastBeeConstant.TOPIC.MSG_REPLY) ||
                topicName.endsWith(FastBeeConstant.TOPIC.SUB_UPGRADE_REPLY) ||
                topicName.endsWith(FastBeeConstant.TOPIC.UPGRADE_REPLY)) {
            /*设备应答服务器回调数据*/
            reportBo.setReportType(2);
        } else {
            /*设备上报数据*/
            reportBo.setReportType(1);
        }
        if (topicName.contains("property")) {
            deviceReportMessageService.parseReportMsg(reportBo);
        }
 
    }
 
    /**
     * 发送模拟数据进行处理
     * @param message
     */
    public void sendTestToMQ(MqttPublishMessage message) {
        /*获取topic*/
        String topicName = message.variableHeader().topicName();
        DeviceReportBo reportBo = DeviceReportBo.builder()
                .serialNumber(topicsUtils.parseSerialNumber(topicName))
                .topicName(topicName)
                .packetId((long) message.variableHeader().packetId())
                .platformDate(DateUtils.getNowDate())
                .data(ByteBufUtil.getBytes(message.content()))
                .build();
        MessageProducer.sendOtherMsg(reportBo);
    }
 
 
    /**
     * 推送消息到订阅客户端
     *
     * @param message 消息
     */
    public void sendMessageToClients(MqttPublishMessage message) {
        ClientManager.pubTopic(message);
    }
 
 
    /**
     * 应答客户端,消息到达Broker
     *
     * @param session 客户端
     * @param message 消息
     */
    private void callBack(Session session, MqttPublishMessage message, String clientId) {
        /*获取消息等级*/
        MqttQoS mqttQoS = message.fixedHeader().qosLevel();
        int packetId = message.variableHeader().packetId();
        MqttFixedHeader header;
        switch (mqttQoS.value()) {
            /*0,1消息等级,直接回复*/
            case 0:
            case 1:
                header = new MqttFixedHeader(MqttMessageType.PUBACK, false, mqttQoS, false, 0);
                break;
            case 2:
                // 处理Qos2的消息确认
                if (!messageStore.outRelContains(packetId)) {
                    messageStore.saveRelInMsg(packetId);
                }
                header = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0);
                break;
            default:
                header = null;
        }
        /*处理消息等级*/
        handleMqttQos(packetId, mqttQoS, true, clientId);
        /*响应客户端*/
        MqttMessageIdVariableHeader variableHeader = null;
        if (packetId > 0) {
            variableHeader = MqttMessageIdVariableHeader.from(packetId);
        }
        MqttPubAckMessage ackMessage = new MqttPubAckMessage(header, variableHeader);
        if (mqttQoS.value() >= 1) {
            ResponseManager.responseMessage(session, ackMessage, true);
        }
        /*更新客户端ping时间*/
        ClientManager.updatePing(session.getClientId());
 
    }
 
    /**
     * Qos不同消息处理
     */
    private void handleMqttQos(int packetId, MqttQoS qoS, boolean clearSession, String clientId) {
        if (qoS == MqttQoS.AT_LEAST_ONCE || qoS == MqttQoS.EXACTLY_ONCE) {
            ClientMessage clientMessage = ClientMessage.of(clientId, qoS, null, false);
            messageStore.savePubMsg(packetId, clientMessage);
        }
    }
 
 
    /**
     * 推送保留信息
     */
    @SneakyThrows
    private void pubRetain(MqttPublishMessage message) {
        redisCache.incr2(FastBeeConstant.REDIS.MESSAGE_RETAIN_TOTAL, -1L);
        /*根据message.fixedHeader().isRetain() 判断是否有保留信息*/
        RetainMsgManager.pushMessage(message);
    }
 
 
}