leutu
2024-06-03 3ef35e6cd16bbfa206b26bb3271eac40ad020bcb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package com.fastbee.mqtt.utils;
 
import com.fastbee.common.core.mq.DeviceStatusBo;
import com.fastbee.common.enums.DeviceStatus;
import com.fastbee.common.utils.DateUtils;
import com.fastbee.mqtt.model.ClientMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.*;
 
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
 
/**
 * 服务器应答信息构建
 * @author gsb
 * @date 2022/10/7 14:17
 */
public class MqttMessageUtils {
 
 
    /**
     * 服务器确认连接应答消息 CONNACK
     */
    public static MqttConnAckMessage buildConntAckMessage(MqttConnectReturnCode code, boolean sessionPresent) {
        MqttFixedHeader fixedHeader = buildFixedHeader(MqttMessageType.CONNACK);
        MqttConnAckVariableHeader variableHeader = new MqttConnAckVariableHeader(code, sessionPresent);
        return new MqttConnAckMessage(fixedHeader, variableHeader);
    }
 
 
    /**
     * 设备ping(心跳信息)应答 PINGRESP
     */
    public static MqttMessage buildPingResp() {
        MqttFixedHeader fixedHeader = buildFixedHeader(MqttMessageType.PINGRESP);
        return new MqttMessage(fixedHeader);
    }
 
    /**
     * 取消订阅消息应答 UNSUBACK
     */
    public static MqttUnsubAckMessage buildUnsubAckMessage(MqttMessage message) {
        /*构建固定报文*/
        MqttFixedHeader fixedHeader = buildFixedHeader(MqttMessageType.UNSUBACK);
        return new MqttUnsubAckMessage(fixedHeader, getIdVariableHeader(message));
    }
 
    /**
     * 订阅确认应答 SUBACK
     */
    public static MqttSubAckMessage buildSubAckMessage(MqttMessage message) {
        /*构建固定报文*/
        MqttFixedHeader fixedHeader = buildFixedHeader(MqttMessageType.SUBACK);
        /*构建可变报文*/
        MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) message;
 
        /*获取订阅topic的Qos*/
        Set<String> topics = mqttSubscribeMessage.payload().topicSubscriptions().stream().map(MqttTopicSubscription::topicName).collect(Collectors.toSet());
        List<Integer> grantedQos = new ArrayList<>(topics.size());
        for (int i = 0; i < topics.size(); i++) {
            grantedQos.add(mqttSubscribeMessage.payload().topicSubscriptions().get(i).qualityOfService().value());
        }
        /*负载*/
        MqttSubAckPayload payload = new MqttSubAckPayload(grantedQos);
        return new MqttSubAckMessage(fixedHeader, getIdVariableHeader(message), payload);
    }
 
    /**
     * 构建推送应答消息 PUBLISH
     */
    public static MqttPublishMessage buildPublishMessage(ClientMessage msg, int packageId) {
        /*报文固定头*/
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, msg.isDup(), msg.getQos(), false, 0);
        /*报文可变头*/
        MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(msg.getTopicName(), packageId);
        /*负载*/
        ByteBuf payload = msg.getPayload() == null ? Unpooled.EMPTY_BUFFER : Unpooled.wrappedBuffer(msg.getPayload());
        /*完整报文,固定头+可变头+payload*/
        return new MqttPublishMessage(fixedHeader, variableHeader, payload);
    }
 
    /**
     * Qos1 收到发布消息确认 无负载 PUBACK
     */
    public static MqttPubAckMessage buildAckMessage(MqttMessage message) {
        MqttFixedHeader fixedHeader = buildFixedHeader(MqttMessageType.PUBACK);
        return new MqttPubAckMessage(fixedHeader, getIdVariableHeader(message));
    }
 
    /**
     * Qos2 发到消息收到 无负载 PUBREC
     */
    public static MqttMessage buildPubRecMessage(MqttMessage message){
        MqttFixedHeader fixedHeader = buildFixedHeader(MqttMessageType.PUBREC);
        return new MqttMessage(fixedHeader, getIdVariableHeader(message));
    }
 
    /**
     * Qos2 发布消息释放 PUBREL
     */
    public static MqttMessage buildPubRelMessage(MqttMessage message){
        MqttFixedHeader fixedHeader = buildFixedHeader(MqttMessageType.PUBREL);
        return new MqttMessage(fixedHeader, getIdVariableHeader(message));
 
    }
 
    /**
     * Qos2 发布消息完成 PUBCOMP
     */
    public static MqttMessage buildPubCompMessage(MqttMessage message){
        MqttFixedHeader fixedHeader = buildFixedHeader(MqttMessageType.PUBCOMP);
        return new MqttMessage(fixedHeader, getIdVariableHeader(message));
    }
 
    /**
     * 固定头定制
     */
    public static MqttFixedHeader buildFixedHeader(MqttMessageType messageType) {
        return new MqttFixedHeader(messageType, false, MqttQoS.AT_MOST_ONCE, false, 0);
    }
 
    /**
     * 构造MqttMessageIdVariableHeader
     */
    public static MqttMessageIdVariableHeader getIdVariableHeader(MqttMessage mqttMessage) {
        MqttMessageIdVariableHeader idVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
        return MqttMessageIdVariableHeader.from(idVariableHeader.messageId());
    }
 
    /*构造返回MQ的设备状态model*/
    public static DeviceStatusBo buildStatusMsg(ChannelHandlerContext ctx, String clientId,DeviceStatus status,String ip){
        return DeviceStatusBo.builder()
                .serialNumber(clientId)
                .status(status)
                .ip(ip)
                .hostName(ip)
                .timestamp(DateUtils.getNowDate()).build();
    }
}