leutu
2024-06-03 3ef35e6cd16bbfa206b26bb3271eac40ad020bcb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.fastbee.mqtt.manager;
 
import com.fastbee.mqtt.model.RetainMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
 
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
 
 
@Slf4j
@Component
public class RetainMsgManager {
 
    /*保存topic的retain消息*/
    private static Map<String, RetainMessage> retainMap = new ConcurrentHashMap<>();
 
    /**
     * 推送保留信息到订阅客户端
     *
     * @param message 推送消息
     */
    public static void pushMessage(MqttPublishMessage message) {
        if (null == message || !message.fixedHeader().isRetain()) {
            return;
        }
        byte[] bytes = new byte[message.payload().readableBytes()];
        if (bytes.length > 0) {
            RetainMessage retainMsg = RetainMessage.builder()
                    .topic(message.variableHeader().topicName())
                    .qos(message.fixedHeader().qosLevel().value()).message(bytes).build();
            retainMap.put(message.variableHeader().topicName(), retainMsg);
        } else {
            retainMap.remove(message.variableHeader().topicName());
        }
    }
 
    public static Integer getSize() {
        return retainMap.size();
    }
 
    /**
     * 获取消息
     *
     * @param topic 主题
     * @return 消息
     */
    public static RetainMessage getRetain(String topic) {
        return retainMap.get(topic);
    }
 
}