leutu
2024-06-03 3ef35e6cd16bbfa206b26bb3271eac40ad020bcb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package com.fastbee.mqtt.manager;
 
import com.fastbee.base.session.Session;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.handler.codec.mqtt.*;
import lombok.extern.slf4j.Slf4j;
 
 
@Slf4j
public class ResponseManager {
 
 
    /**
     * 发送信息:用于服务端收到消息客户端数据后,向客户端发送响应信息
     *
     * @param session 上下文
     * @param msg     mqtt消息
     * @param flush   是否刷新
     */
    public static void responseMessage(Session session, MqttMessage msg, boolean flush) {
        ChannelFuture future = flush ? session.getHandlerContext().writeAndFlush(msg) : session.getHandlerContext().write(msg);
        future.addListener(f -> {
            if (!f.isSuccess()) {
                log.error("=>响应设备[{}],发送消息:{},失败原因:{}", session.getClientId(), msg, f.cause());
            }else {
                //log.debug("=>相应设备:[{}],发送消息:[{}]",session.getClientId(),msg);
            }
        });
    }
 
    /**
     * 发送信息:用于服务端向客户端通过clientID下发消息(单客户端)
     *
     * @param msg      mqtt消息
     * @param clientId 客户端id
     * @param flush    是否刷新
     */
    public static void sendMessage(MqttMessage msg, String clientId, boolean flush) {
        Session session = SessionManger.getSession(clientId);
        if (session == null || null == session.getHandlerContext()) {
            return;
        }
        responseMessage(session, msg, flush);
    }
 
    /**
     * 推送消息给订阅客户端(所有订阅客户端)
     *
     * @param msg     推送消息
     * @param session 客户端
     */
    public static void publishClients(MqttPublishMessage msg, Session session) {
        try {
            final Channel channel = session.getHandlerContext().channel();
            MqttQoS qos = msg.fixedHeader().qosLevel();
            ByteBuf sendBuf = msg.content().retainedDuplicate();
            sendBuf.resetReaderIndex();
            /*配置推送消息类型*/
            MqttFixedHeader Header = new MqttFixedHeader(MqttMessageType.PUBLISH,
                    false, qos, msg.fixedHeader().isRetain(), 0);
            /*设置topic packetId*/
            MqttPublishVariableHeader publishVariableHeader = new MqttPublishVariableHeader(
                    msg.variableHeader().topicName(), msg.variableHeader().packetId());
            /*推送消息*/
            MqttPublishMessage publishMessage = new MqttPublishMessage(Header,
                    publishVariableHeader, sendBuf);
            channel.writeAndFlush(publishMessage);
 
        } catch (Exception e) {
            log.error("=>发送消息异常 {}", msg, e);
        }
    }
 
}