leutu
2024-06-03 3ef35e6cd16bbfa206b26bb3271eac40ad020bcb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
package com.fastbee.common.utils.gateway.mq;
 
import com.fastbee.common.constant.FastBeeConstant;
import com.fastbee.common.core.mq.message.DeviceDownMessage;
import com.fastbee.common.enums.ThingsModelType;
import com.fastbee.common.enums.TopicType;
import com.fastbee.common.utils.collection.CollectionUtils;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
 
 
import java.util.*;
 
/**
 * topic工具类
 *
 * @author gsb
 * @date 2022/9/15 16:49
 */
@Slf4j
@Component
public class TopicsUtils {
 
    @Value("${server.broker.enabled}")
    private Boolean enabled;
 
    /**
     * 拼接topic
     *
     * @param productId    产品id
     * @param serialNumber 设备编号
     * @param type         主题类型
     * @return topic
     */
    public String buildTopic(Long productId, String serialNumber, TopicType type) {
        /*
         * 订阅属性:
         * 如果启动emq  则为  /+/+/property/post
         * 如果启动netty的mqttBroker 则为   /{productId}/{serialNumber}/property/post
         *
         * 发布都为:/{productId}/{serialNumber}/property/get
         */
        String product = String.valueOf(productId);
        if (null == productId || productId == -1L || productId == 0L) {
            product = "+";
        }
        if (com.fastbee.common.utils.StringUtils.isEmpty(serialNumber)) {
            serialNumber = "+";
        }
        if (type.getType() == 0) {
            return enabled ? "/" + product + "/" + serialNumber + type.getTopicSuffix() : FastBeeConstant.MQTT.PREDIX + type.getTopicSuffix();
        } else {
            return "/" + product + "/" + serialNumber + type.getTopicSuffix();
        }
    }
 
 
    /**
     * 获取所有可订阅的主题
     *
     * @return 订阅主题列表
     */
    public TopicsPost getAllPost() {
        List<Integer> qos = new ArrayList<>();
        List<String> topics = new ArrayList<>();
        TopicsPost post = new TopicsPost();
        for (TopicType topicType : TopicType.values()) {
            if (topicType.getType() == 0) {
                String topic = this.buildTopic(0L, null, topicType);
                topics.add(topic);
                qos.add(1);
            }
        }
        post.setTopics(topics.toArray(new String[0]));
        int[] ints = Arrays.stream(qos.toArray(new Integer[0])).mapToInt(Integer::valueOf).toArray();
        post.setQos(ints);
        return post;
    }
 
    /**
     * 获取所有get topic
     *
     * @param isSimulate 是否是设备模拟
     * @return list
     */
    public static List<Topics> getAllGet(boolean isSimulate) {
        List<Topics> result = new ArrayList<>();
        for (TopicType type : TopicType.values()) {
            if (type.getType() == 4) {
                Topics topics = new Topics();
                topics.setTopicName(type.getTopicSuffix());
                topics.setDesc(type.getMsg());
                topics.setQos(1);
                result.add(topics);
                if (isSimulate && type == TopicType.PROPERTY_GET) {
                    result.remove(topics);
                }
            }
        }
        return result;
    }
 
 
    /**
     * 替换topic中的产品编码和设备编码,唯一作用是在系统收到来自网关设备上报子设备消息时将topic进行替换
     *
     * @param orgTopic     String 原始topic
     * @param productId    String 目标产品编码
     * @param serialNumber String 目标设备编码
     * @return 替换产品编码和设备编码后的新topic
     */
    public String topicSubDevice(String orgTopic, Long productId, String serialNumber) {
        if (com.fastbee.common.utils.StringUtils.isEmpty(orgTopic)) {
            return orgTopic;
        }
        String[] splits = orgTopic.split("/");
        StringBuilder sb = new StringBuilder(splits[0])
                .append("/")
                .append(productId)
                .append("/")
                .append(serialNumber);
        for (int index = 3; index < splits.length; index++) {
            sb.append("/").append(splits[index]);
        }
        return sb.toString();
    }
 
    /**
     * 从topic中获取IMEI号 IMEI即是设备编号
     *
     * @param topic /{productId}/{serialNumber}/property/post
     * @return serialNumber
     */
    @SneakyThrows
    public Long parseProductId(String topic) {
        String[] values = topic.split("/");
        return Long.parseLong(values[1]);
    }
 
 
    /**
     * 从topic中获取IMEI号 IMEI即是设备编号
     *
     * @param topic /{productId}/{serialNumber}/property/post
     * @return serialNumber
     */
    @SneakyThrows
    public String parseSerialNumber(String topic) {
        String[] values = topic.split("/");
        return values[2];
    }
 
    /**
     * 获取topic 判断字段 name
     **/
    public String parseTopicName(String topic) {
        String[] values = topic.split("/");
        return values[3];
    }
 
    /**
     * 获取topic 判断字段 name
     **/
    public String parseTopicName4(String topic) {
        String[] values = topic.split("/");
        return values[4];
    }
 
    /**
     * 从topic解析物模型类型
     *
     * @param topic /{productId}/{serialNumber}/property/post
     * @return 物模型类型
     */
    @SneakyThrows
    public String getThingsModel(String topic) {
        String[] split = topic.split("/");
        return split[2].toUpperCase();
    }
 
    /**
     * 检查topic的合法性
     *
     * @param topicNameList 主题list
     * @return 验证结果
     */
    public static boolean validTopicFilter(List<String> topicNameList) {
        for (String topicName : topicNameList) {
            if (com.fastbee.common.utils.StringUtils.isEmpty(topicName)) {
                return false;
            }
            /*以#或+符号开头的、以/符号结尾的及不存在/符号的订阅按非法订阅处理*/
            if (StringUtils.startsWithIgnoreCase(topicName, "#") || StringUtils.startsWithIgnoreCase(topicName, "+") || StringUtils.endsWithIgnoreCase(topicName, "/") || !topicName.contains("/")) {
                return false;
            }
            if (topicName.contains("#")) {
                /*不是以/#字符串结尾的订阅按非法订阅处理*/
                if (!StringUtils.endsWithIgnoreCase(topicName, "/#")) {
                    return false;
                }
                /*如果出现多个#符号的订阅按非法订阅处理*/
                if (StringUtils.countOccurrencesOf(topicName, "#") > 1) {
                    return false;
                }
            }
            if (topicName.contains("+")) {
                /*如果+符号和/+字符串出现的次数不等的情况按非法订阅处理*/
                if (StringUtils.countOccurrencesOf(topicName, "+") != StringUtils.countOccurrencesOf(topicName, "/+")) {
                    return false;
                }
            }
        }
        return true;
    }
 
    /**
     * 判断topic与topicFilter是否匹配,topic与topicFilter需要符合协议规范
     *
     * @param topic:       主题
     * @param topicFilter: 主题过滤器
     * @return boolean
     * @author ZhangJun
     * @date 23:57 2021/2/27
     */
    public static boolean matchTopic(String topic, String topicFilter) {
        if (topic.contains("+") || topic.contains("#")) {
 
            String[] topicSpilts = topic.split("/");
            String[] filterSpilts = topicFilter.split("/");
 
            if (!topic.contains("#") && topicSpilts.length < filterSpilts.length) {
                return false;
            }
 
            String level;
            for (int i = 0; i < topicSpilts.length; i++) {
                level = topicSpilts[i];
                if (!level.equals(filterSpilts[i]) && !level.equals("+") && !level.equals("#")) {
                    return false;
                }
            }
        } else {
            return topic.equals(topicFilter);
        }
        return true;
    }
 
    /**
     * 根据指定topic搜索所有订阅的topic
     * 指定的topic没有通配符,但是订阅的时候可能会存在通配符,所以有个查找的过程
     *
     * @param topic 主题
     * @return 返回的所有主题
     */
    public static List<String> searchTopic(String topic) {
        try {
            List<String> topicList = new ArrayList<>();
            topicList.add(topic);
            /*先处理#通配符*/
            String[] filterDup = topic.split("/");
            int[] source = new int[filterDup.length];
            String itemTopic = "";
            for (int i = 0; i < filterDup.length; i++) {
                String item = itemTopic.concat("#");
                topicList.add(item);
                itemTopic = itemTopic.concat(filterDup[i]).concat("/");
                source[i] = i;
                continue;
            }
            /*处理+通配符*/
            Map<List<Integer>, Boolean> map = TopicsUtils.handle(source);
            for (List<Integer> key : map.keySet()) {
                String[] arr = CollectionUtils.copy(filterDup);
                for (Integer index : key) {
                    arr[index] = "+";
                }
                String newTopic = CollectionUtils.concat(arr, "/");
                topicList.add(newTopic);
            }
            return topicList;
        } catch (Exception e) {
            log.error("=>查询topic异常", e);
            return null;
        }
    }
 
 
    public static Map<List<Integer>, Boolean> handle(int[] src) {
        int nCnt = src.length;
        int nBit = (0xFFFFFFFF >>> (32 - nCnt));
        Map<List<Integer>, Boolean> map = new HashMap<>();
        for (int i = 1; i <= nBit; i++) {
            List<Integer> list = new ArrayList<>();
            for (int j = 0; j < nCnt; j++) {
                if ((i << (31 - j)) >> 31 == -1) {
                    list.add(j);
                }
            }
            map.put(list, true);
        }
        return map;
    }
}