leutu
2024-06-03 3ef35e6cd16bbfa206b26bb3271eac40ad020bcb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package com.fastbee.mqtt.manager;
 
import com.fastbee.common.constant.FastBeeConstant;
import com.fastbee.common.utils.gateway.mq.TopicsUtils;
import com.fastbee.common.utils.DateUtils;
import com.fastbee.common.utils.StringUtils;
import com.fastbee.base.session.Session;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
 
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
 
/**
 * 客户端管理
 *
 * @author gsb
 * @date 2022/9/15 16:02
 */
@Slf4j
public class ClientManager {
 
    /*topic本地缓存*/
    public static Map<String, Map<String, Session>> topicMap = new ConcurrentHashMap<>();
    /*客户端最后一次ping时间,设备不正常断开判断*/
    private static Map<String, Long> pingMap = new ConcurrentHashMap<>();
    /*客户端与topic关联本地缓存*/
    public static Map<String, Map<String, Boolean>> clientTopicMap = new ConcurrentHashMap<>();
 
    /**
     * 将client的上下文相关信息添加到映射关系表中
     * @param topic 主题
     * @param session
     */
    public static void push(String topic, Session session) {
        try {
            /*处理topic对应的topic*/
            Map<String, Session> clientMap = topicMap.get(topic);
            if (StringUtils.isEmpty(clientMap)) {
                clientMap = new ConcurrentHashMap<>();
            }
            clientMap.put(session.getClientId(), session);
            topicMap.put(topic, clientMap);
 
            /*处理client对应的所有topic*/
            Map<String, Boolean> topicsMap = null;
            if (clientTopicMap.containsKey(session.getClientId())) {
                topicsMap = clientTopicMap.get(session.getClientId());
                if (!topicsMap.containsKey(topic)) {
                    topicsMap.put(topic, true);
                }
            } else {
                topicsMap = new HashMap<>();
                topicsMap.put(topic, true);
                clientTopicMap.put(session.getClientId(), topicsMap);
            }
        } catch (Exception e) {
            log.error("=>clientId映射topic出现异常:{},e=", e.getMessage(), e);
        }
    }
 
    /**
     * 清理对应client下的所有数据
     *
     * @param clientId 客户端id
     */
    public static void remove(String clientId) {
        try {
            /*移除client对应的topic*/
            Map<String, Boolean> topics = clientTopicMap.get(clientId);
            if (null != topics) {
                /*从topic中移除client*/
                for (String key : topics.keySet()) {
                    Map<String, Session> clientMap = topicMap.get(key);
                    if (CollectionUtils.isEmpty(clientMap)) {
                        continue;
                    }
                    clientMap.remove(clientId);
                }
                clientTopicMap.remove(clientId);
            }
            pingMap.remove(clientId);
        } catch (Exception e) {
            log.warn("=>移除client[{}]异常", e.getMessage());
        }
    }
 
    /**
     * 客户端取消订阅
     * 删除指定topic下的指定client
     *
     * @param topic   主题
     * @param session 客户端
     */
    public static void unsubscribe(String topic, Session session) {
        try {
            Map<String, Session> clientMap = topicMap.get(topic);
            if (StringUtils.isEmpty(clientMap)) {
                return;
            }
            Session s = clientMap.get(session.getClientId());
            if (null == s) {
                return;
            }
            clientMap.remove(session.getClientId());
        } catch (Exception e) {
            log.error("=>客户端取消订阅异常:{}", e.getMessage());
        }
    }
 
    /**
     * 将消息发送到指定topic下的所有client上去
     *
     * @param msg 推送消息
     */
    public static void pubTopic(MqttPublishMessage msg) {
        String topic = msg.variableHeader().topicName();
        List<String> topicList = TopicsUtils.searchTopic(topic);
        for (String itemTopic : topicList) {
            Map<String, Session> clientMap = topicMap.get(itemTopic);
            if (StringUtils.isEmpty(clientMap)) {
                continue;
            }
            for (Session session : clientMap.values()) {
                String clientId = session.getClientId();
                if (!validClient(clientId)) {
                    ///*ws的客户端不正常断开连接后,直接移除所有信息*/
                    //if (session.getClientId().startsWith(FastBeeConstant.SERVER.WS_PREFIX)) {
                    //    log.debug("=>移除ws客户端,clientId={}", session);
                    //    remove(clientId);
                    //}
                    log.warn("=>{}不在线", clientId);
                    continue;
                }
                ResponseManager.publishClients(msg, session);
            }
        }
    }
 
    /**
     * 更新客户端在线时间,给客户端发送消息时用这个看客户端最近是否在线
     * 用来判断设备不正常掉线没有应答服务器的情况
     *
     * @param clientId 客户端id
     */
    public static void updatePing(String clientId) {
        pingMap.put(clientId, DateUtils.getTimestamp());
    }
 
    /**
     * 平台判定设备状态 Ping客户端是否在线
     *
     * @param clientId 客户端id
     * @return 结果
     */
    public static Boolean validClient(String clientId) {
        long currTime = DateUtils.getTimestamp();
        /*获取客户端连接时,时间*/
        Long timestamp = pingMap.get(clientId);
        if (null == timestamp) {
            return false;
        }
        //当设备缓存的心跳时间大于 平台判断时间 1.5f  表示设备不正常断开了服务器
        if (currTime - timestamp > FastBeeConstant.SERVER.DEVICE_PING_EXPIRED) {
            //pingMap.remove(clientId);
            //SessionManger.removeClient(clientId);
            return false;
        }
        return true;
    }
}