package com.fastbee.mqtt.utils; import com.fastbee.common.core.mq.DeviceStatusBo; import com.fastbee.common.enums.DeviceStatus; import com.fastbee.common.utils.DateUtils; import com.fastbee.mqtt.model.ClientMessage; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.mqtt.*; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.stream.Collectors; /** * 服务器应答信息构建 * @author gsb * @date 2022/10/7 14:17 */ public class MqttMessageUtils { /** * 服务器确认连接应答消息 CONNACK */ public static MqttConnAckMessage buildConntAckMessage(MqttConnectReturnCode code, boolean sessionPresent) { MqttFixedHeader fixedHeader = buildFixedHeader(MqttMessageType.CONNACK); MqttConnAckVariableHeader variableHeader = new MqttConnAckVariableHeader(code, sessionPresent); return new MqttConnAckMessage(fixedHeader, variableHeader); } /** * 设备ping(心跳信息)应答 PINGRESP */ public static MqttMessage buildPingResp() { MqttFixedHeader fixedHeader = buildFixedHeader(MqttMessageType.PINGRESP); return new MqttMessage(fixedHeader); } /** * 取消订阅消息应答 UNSUBACK */ public static MqttUnsubAckMessage buildUnsubAckMessage(MqttMessage message) { /*构建固定报文*/ MqttFixedHeader fixedHeader = buildFixedHeader(MqttMessageType.UNSUBACK); return new MqttUnsubAckMessage(fixedHeader, getIdVariableHeader(message)); } /** * 订阅确认应答 SUBACK */ public static MqttSubAckMessage buildSubAckMessage(MqttMessage message) { /*构建固定报文*/ MqttFixedHeader fixedHeader = buildFixedHeader(MqttMessageType.SUBACK); /*构建可变报文*/ MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) message; /*获取订阅topic的Qos*/ Set topics = mqttSubscribeMessage.payload().topicSubscriptions().stream().map(MqttTopicSubscription::topicName).collect(Collectors.toSet()); List grantedQos = new ArrayList<>(topics.size()); for (int i = 0; i < topics.size(); i++) { grantedQos.add(mqttSubscribeMessage.payload().topicSubscriptions().get(i).qualityOfService().value()); } /*负载*/ MqttSubAckPayload payload = new MqttSubAckPayload(grantedQos); return new MqttSubAckMessage(fixedHeader, getIdVariableHeader(message), payload); } /** * 构建推送应答消息 PUBLISH */ public static MqttPublishMessage buildPublishMessage(ClientMessage msg, int packageId) { /*报文固定头*/ MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, msg.isDup(), msg.getQos(), false, 0); /*报文可变头*/ MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(msg.getTopicName(), packageId); /*负载*/ ByteBuf payload = msg.getPayload() == null ? Unpooled.EMPTY_BUFFER : Unpooled.wrappedBuffer(msg.getPayload()); /*完整报文,固定头+可变头+payload*/ return new MqttPublishMessage(fixedHeader, variableHeader, payload); } /** * Qos1 收到发布消息确认 无负载 PUBACK */ public static MqttPubAckMessage buildAckMessage(MqttMessage message) { MqttFixedHeader fixedHeader = buildFixedHeader(MqttMessageType.PUBACK); return new MqttPubAckMessage(fixedHeader, getIdVariableHeader(message)); } /** * Qos2 发到消息收到 无负载 PUBREC */ public static MqttMessage buildPubRecMessage(MqttMessage message){ MqttFixedHeader fixedHeader = buildFixedHeader(MqttMessageType.PUBREC); return new MqttMessage(fixedHeader, getIdVariableHeader(message)); } /** * Qos2 发布消息释放 PUBREL */ public static MqttMessage buildPubRelMessage(MqttMessage message){ MqttFixedHeader fixedHeader = buildFixedHeader(MqttMessageType.PUBREL); return new MqttMessage(fixedHeader, getIdVariableHeader(message)); } /** * Qos2 发布消息完成 PUBCOMP */ public static MqttMessage buildPubCompMessage(MqttMessage message){ MqttFixedHeader fixedHeader = buildFixedHeader(MqttMessageType.PUBCOMP); return new MqttMessage(fixedHeader, getIdVariableHeader(message)); } /** * 固定头定制 */ public static MqttFixedHeader buildFixedHeader(MqttMessageType messageType) { return new MqttFixedHeader(messageType, false, MqttQoS.AT_MOST_ONCE, false, 0); } /** * 构造MqttMessageIdVariableHeader */ public static MqttMessageIdVariableHeader getIdVariableHeader(MqttMessage mqttMessage) { MqttMessageIdVariableHeader idVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader(); return MqttMessageIdVariableHeader.from(idVariableHeader.messageId()); } /*构造返回MQ的设备状态model*/ public static DeviceStatusBo buildStatusMsg(ChannelHandlerContext ctx, String clientId,DeviceStatus status,String ip){ return DeviceStatusBo.builder() .serialNumber(clientId) .status(status) .ip(ip) .hostName(ip) .timestamp(DateUtils.getNowDate()).build(); } }