leutu
2024-06-03 3ef35e6cd16bbfa206b26bb3271eac40ad020bcb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package com.fastbee.mqtt.handler;
 
import com.alibaba.fastjson2.JSON;
import com.fastbee.common.constant.FastBeeConstant;
import com.fastbee.common.core.redis.RedisCache;
import com.fastbee.mqtt.annotation.Process;
import com.fastbee.mqtt.model.ClientMessage;
import com.fastbee.base.session.Session;
import com.fastbee.base.util.AttributeUtils;
import com.fastbee.mqtt.utils.MqttMessageUtils;
import com.fastbee.mqtt.model.RetainMessage;
import com.fastbee.mqtt.handler.adapter.MqttHandler;
import com.fastbee.mqtt.manager.ClientManager;
import com.fastbee.mqtt.manager.ResponseManager;
import com.fastbee.mqtt.manager.RetainMsgManager;
import com.fastbee.common.utils.gateway.mq.TopicsUtils;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.*;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
 
import javax.annotation.Resource;
import java.util.List;
import java.util.stream.Collectors;
 
 
@Slf4j
@Process(type = MqttMessageType.SUBSCRIBE)
public class MqttSubscribe implements MqttHandler {
 
    @Resource
    private RedisCache redisCache;
 
    @Override
    public void handler(ChannelHandlerContext ctx, MqttMessage message) {
        subscribe(ctx, (MqttSubscribeMessage) message);
    }
 
 
    public void subscribe(ChannelHandlerContext ctx, MqttSubscribeMessage message) {
        /*获取session*/
        Session session = AttributeUtils.getSession(ctx.channel());
        /*获取客户端订阅的topic列表*/
        List<MqttTopicSubscription> topList = message.payload().topicSubscriptions();
        /*获取topicName列表*/
        List<String> topicNameList = topList.stream().map(MqttTopicSubscription::topicName).collect(Collectors.toList());
        log.debug("=>客户端:{},订阅主题:{}", session.getClientId(), JSON.toJSONString(topicNameList));
        if (!TopicsUtils.validTopicFilter(topicNameList)) {
            log.error("=>订阅主题不合法:{}", JSON.toJSONString(topicNameList));
            return;
        }
        /*存储到本地topic缓存*/
        topicNameList.forEach(topicName -> {
            ClientManager.push(topicName, session);
            /*累计订阅数*/
            redisCache.incr2(FastBeeConstant.REDIS.MESSAGE_SUBSCRIBE_TOTAL,-1L);
        });
        /*更新客户端ping*/
        ClientManager.updatePing(session.getClientId());
        /*应答客户端订阅成功*/
        MqttSubAckMessage subAckMessage = MqttMessageUtils.buildSubAckMessage(message);
        ResponseManager.responseMessage(session, subAckMessage, true);
        /*客户端订阅了遗留消息主题后,推送遗留消息给客户端*/
        topList.forEach(topic -> {
            retain(topic.topicName(), session, topic.qualityOfService());
        });
    }
 
 
    /**
     * 推送遗留消息
     *
     * @param topicName 主题
     * @param session   客户端
     * @param mqttQoS   消息质量
     */
    @SneakyThrows
    private void retain(String topicName, Session session, MqttQoS mqttQoS) {
        RetainMessage message = RetainMsgManager.getRetain(topicName);
        if (null == message) {
            return;
        }
        MqttQoS qos = message.getQos() > mqttQoS.value() ? mqttQoS : MqttQoS.valueOf(message.getQos());
        switch (qos.value()) {
            case 0:
                buildMessage(qos, topicName, 0, message.getMessage(), session);
                break;
            case 1:
            case 2:
                /*使用实时时间戳充当 packId*/
                buildMessage(qos, topicName, (int) System.currentTimeMillis(), message.getMessage(), session);
                break;
        }
    }
 
    /*组装推送数据*/
    private void buildMessage(MqttQoS qos, String topicName, int packetId, byte[] message, Session session) {
        /*生成客户端model*/
        ClientMessage clientMessage = ClientMessage.of(qos, topicName, false, message);
        /*组建推送消息*/
        MqttPublishMessage publishMessage = MqttMessageUtils.buildPublishMessage(clientMessage, packetId);
        /*推送消息*/
        ResponseManager.publishClients(publishMessage, session);
    }
 
 
}