leutu
2024-06-03 3ef35e6cd16bbfa206b26bb3271eac40ad020bcb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package com.fastbee.mqtt.service.impl;
 
import com.fastbee.common.utils.gateway.mq.TopicsUtils;
import com.fastbee.mqtt.model.ClientMessage;
import com.fastbee.mqtt.service.IMessageStore;
import org.springframework.stereotype.Service;
 
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
 
/**
 * Retain will Qos12消息存储接口 -TODO 后续Redis处理
 *
 * @author gsb
 * @date 2022/10/14 14:35
 */
@Service
public class MessageStoreImpl implements IMessageStore {
 
 
    /**
     * 存储消息,保留消息,遗留消息
     */
    private final Map<String, ClientMessage> willOrRetainMap = new ConcurrentHashMap<>();
 
    /**
     * Qos2 Pub消息
     */
    private final Map<Integer, ClientMessage> publishMap = new ConcurrentHashMap<>();
    /**
     * Qos2 REL IN消息
      */
    private final Set<Integer> outRelSet = new HashSet<>();
 
    /**
     * Qos2 REL out
     */
    private final Set<Integer> inRelSet = new HashSet<>();
 
    /**
     * 存储控制包
     *
     * @param topic:         控制包所属主题
     * @param clientMessage: 需要存储的消息
     */
    @Override
    public void storeMessage(String topic, ClientMessage clientMessage) {
        willOrRetainMap.put(topic, clientMessage);
    }
 
    /**
     * 清除topic下的所有消息
     *
     * @param topic: 主题
     */
    @Override
    public void cleanTopic(String topic) {
        willOrRetainMap.remove(topic);
    }
 
    /**
     * 根据clientId清除消息
     *
     * @param clientId: 客户端唯一标识
     */
    @Override
    public void removeMessage(String clientId) {
        for (Map.Entry<String, ClientMessage> entry : willOrRetainMap.entrySet()) {
            if (entry.getValue().getClientId().equals(clientId)) {
                willOrRetainMap.remove(entry.getKey());
            }
        }
    }
 
    /**
     * 匹配主题过滤器,匹配消息
     *
     * @param topicFilter: 主题过滤器
     */
    @Override
    public List<ClientMessage> searchMessages(String topicFilter) {
        List<ClientMessage> messageList = new ArrayList<>();
        for (String topic : willOrRetainMap.keySet()) {
            if (TopicsUtils.matchTopic(topic, topicFilter)) {
                messageList.add(willOrRetainMap.get(topic));
            }
        }
        return messageList;
    }
 
    /**
     * 保存 clientMessage
     *
     * @param messageId 消息id
     */
    @Override
    public void savePubMsg(Integer messageId, ClientMessage clientMessage){
        publishMap.put(messageId,clientMessage);
    }
 
    /**
     * 移除
     *
     * @param messageId 消息id
     */
    @Override
    public void removePubMsg(int messageId){
        publishMap.remove(messageId);
    }
 
    /**
     * 保存 REL IN
     *
     * @param messageId 消息id
     */
    @Override
    public void saveRelInMsg(int messageId){
        inRelSet.add(messageId);
    }
 
    /**
     * 保存 REL OUT
     *
     * @param messageId 消息id
     */
    @Override
    public void saveRelOutMsg(int messageId){
        outRelSet.add(messageId);
    }
 
    /**
     * 移除
     *
     * @param messageId 消息id
     */
    @Override
    public void removeRelInMsg(int messageId){
        inRelSet.remove(messageId);
    }
 
    /**
     * 移除
     *
     * @param messageId 消息id
     */
    @Override
    public void removeRelOutMsg(int messageId){
        outRelSet.remove(messageId);
    }
 
    /**
     * 判断Rel out是否包含消息id
     */
    @Override
    public boolean outRelContains(int messageId){
       return outRelSet.contains(messageId);
    }
 
}