package com.fastbee.mqtt.manager; import com.fastbee.common.constant.FastBeeConstant; import com.fastbee.common.utils.gateway.mq.TopicsUtils; import com.fastbee.common.utils.DateUtils; import com.fastbee.common.utils.StringUtils; import com.fastbee.base.session.Session; import io.netty.handler.codec.mqtt.MqttPublishMessage; import lombok.extern.slf4j.Slf4j; import org.springframework.util.CollectionUtils; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * 客户端管理 * * @author gsb * @date 2022/9/15 16:02 */ @Slf4j public class ClientManager { /*topic本地缓存*/ public static Map> topicMap = new ConcurrentHashMap<>(); /*客户端最后一次ping时间,设备不正常断开判断*/ private static Map pingMap = new ConcurrentHashMap<>(); /*客户端与topic关联本地缓存*/ public static Map> clientTopicMap = new ConcurrentHashMap<>(); /** * 将client的上下文相关信息添加到映射关系表中 * @param topic 主题 * @param session */ public static void push(String topic, Session session) { try { /*处理topic对应的topic*/ Map clientMap = topicMap.get(topic); if (StringUtils.isEmpty(clientMap)) { clientMap = new ConcurrentHashMap<>(); } clientMap.put(session.getClientId(), session); topicMap.put(topic, clientMap); /*处理client对应的所有topic*/ Map topicsMap = null; if (clientTopicMap.containsKey(session.getClientId())) { topicsMap = clientTopicMap.get(session.getClientId()); if (!topicsMap.containsKey(topic)) { topicsMap.put(topic, true); } } else { topicsMap = new HashMap<>(); topicsMap.put(topic, true); clientTopicMap.put(session.getClientId(), topicsMap); } } catch (Exception e) { log.error("=>clientId映射topic出现异常:{},e=", e.getMessage(), e); } } /** * 清理对应client下的所有数据 * * @param clientId 客户端id */ public static void remove(String clientId) { try { /*移除client对应的topic*/ Map topics = clientTopicMap.get(clientId); if (null != topics) { /*从topic中移除client*/ for (String key : topics.keySet()) { Map clientMap = topicMap.get(key); if (CollectionUtils.isEmpty(clientMap)) { continue; } clientMap.remove(clientId); } clientTopicMap.remove(clientId); } pingMap.remove(clientId); } catch (Exception e) { log.warn("=>移除client[{}]异常", e.getMessage()); } } /** * 客户端取消订阅 * 删除指定topic下的指定client * * @param topic 主题 * @param session 客户端 */ public static void unsubscribe(String topic, Session session) { try { Map clientMap = topicMap.get(topic); if (StringUtils.isEmpty(clientMap)) { return; } Session s = clientMap.get(session.getClientId()); if (null == s) { return; } clientMap.remove(session.getClientId()); } catch (Exception e) { log.error("=>客户端取消订阅异常:{}", e.getMessage()); } } /** * 将消息发送到指定topic下的所有client上去 * * @param msg 推送消息 */ public static void pubTopic(MqttPublishMessage msg) { String topic = msg.variableHeader().topicName(); List topicList = TopicsUtils.searchTopic(topic); for (String itemTopic : topicList) { Map clientMap = topicMap.get(itemTopic); if (StringUtils.isEmpty(clientMap)) { continue; } for (Session session : clientMap.values()) { String clientId = session.getClientId(); if (!validClient(clientId)) { ///*ws的客户端不正常断开连接后,直接移除所有信息*/ //if (session.getClientId().startsWith(FastBeeConstant.SERVER.WS_PREFIX)) { // log.debug("=>移除ws客户端,clientId={}", session); // remove(clientId); //} log.warn("=>{}不在线", clientId); continue; } ResponseManager.publishClients(msg, session); } } } /** * 更新客户端在线时间,给客户端发送消息时用这个看客户端最近是否在线 * 用来判断设备不正常掉线没有应答服务器的情况 * * @param clientId 客户端id */ public static void updatePing(String clientId) { pingMap.put(clientId, DateUtils.getTimestamp()); } /** * 平台判定设备状态 Ping客户端是否在线 * * @param clientId 客户端id * @return 结果 */ public static Boolean validClient(String clientId) { long currTime = DateUtils.getTimestamp(); /*获取客户端连接时,时间*/ Long timestamp = pingMap.get(clientId); if (null == timestamp) { return false; } //当设备缓存的心跳时间大于 平台判断时间 1.5f 表示设备不正常断开了服务器 if (currTime - timestamp > FastBeeConstant.SERVER.DEVICE_PING_EXPIRED) { //pingMap.remove(clientId); //SessionManger.removeClient(clientId); return false; } return true; } }