package com.fastbee.mqtt.manager; import com.fastbee.common.constant.FastBeeConstant; import com.fastbee.common.core.mq.DeviceStatusBo; import com.fastbee.common.enums.DeviceStatus; import com.fastbee.common.exception.ServiceException; import com.fastbee.common.utils.StringUtils; import com.fastbee.common.utils.spring.SpringUtils; import com.fastbee.iot.service.cache.IDeviceCache; import com.fastbee.mq.redischannel.producer.MessageProducer; import com.fastbee.mq.service.IMessagePublishService; import com.fastbee.base.service.ISessionStore; import com.fastbee.base.session.Session; import com.fastbee.base.util.AttributeUtils; import com.fastbee.mqtt.utils.MqttMessageUtils; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.mqtt.MqttMessageType; import lombok.extern.slf4j.Slf4j; import java.util.Map; import java.util.Set; /** * 会话管理类 * * @Author guanshubiao * @Date 2022/9/12 20:22 */ @Slf4j public class SessionManger { private static ISessionStore sessionStore = SpringUtils.getBean(ISessionStore.class); private static MqttRemoteManager remoteManager = SpringUtils.getBean(MqttRemoteManager.class); private static IDeviceCache deviceCache = SpringUtils.getBean(IDeviceCache.class); /** * mqtt新客户连接 * * @param clientId 客户端id * @param session 客户端 */ public static void buildSession(String clientId, Session session) { log.debug("=>新客户端连接,clientId={}", clientId); if (StringUtils.isEmpty(clientId) || handleContext(session)) { log.error("=>客户端id为空或者session未注册!"); return; } sessionStore.storeSession(clientId, session); //contextMap.put(session.getHandlerContext(), session); /*更新客户端在平台的最新响应时间*/ ClientManager.updatePing(clientId); /*发送MQ,设备上线*/ DeviceStatusBo statusBo = MqttMessageUtils.buildStatusMsg(session.getHandlerContext(), session.getClientId(), DeviceStatus.ONLINE, session.getIp()); if (!statusBo.getSerialNumber().startsWith(FastBeeConstant.SERVER.WM_PREFIX) && !statusBo.getSerialNumber().startsWith(FastBeeConstant.SERVER.WS_PREFIX) && !statusBo.getSerialNumber().startsWith(FastBeeConstant.SERVER.FAST_PHONE)) { deviceCache.updateDeviceStatusCache(statusBo); remoteManager.pushDeviceStatus(-1L,statusBo.getSerialNumber(), statusBo.getStatus()); } } /** * 根据客户端id移除客户端 * * @param clientId 客户端id */ public static void removeClient(String clientId) { log.debug("=>移除客户端,clientId={}", clientId); try { if (StringUtils.isEmpty(clientId) || !sessionStore.containsKey(clientId) || clientId.endsWith(FastBeeConstant.SERVER.WS_PREFIX) || clientId.endsWith(FastBeeConstant.SERVER.FAST_PHONE)) { return; } Session session = sessionStore.getSession(clientId); if (handleContext(session)) { log.error("移除客户端失败,客户端未注册!"); return; } //关闭通道 session.getHandlerContext().close(); //移除client sessionStore.cleanSession(clientId); session.setMqttMessageType(MqttMessageType.DISCONNECT); //发送至MQ,设备下线 DeviceStatusBo statusBo = MqttMessageUtils.buildStatusMsg(session.getHandlerContext(), session.getClientId(), DeviceStatus.OFFLINE, session.getIp()); if (!statusBo.getSerialNumber().startsWith(FastBeeConstant.SERVER.WM_PREFIX) && !statusBo.getSerialNumber().startsWith(FastBeeConstant.SERVER.WS_PREFIX)) { deviceCache.updateDeviceStatusCache(statusBo); remoteManager.pushDeviceStatus(-1L,statusBo.getSerialNumber(), statusBo.getStatus()); } } catch (Exception e) { throw new ServiceException("移除客户端失败,message=" + e.getMessage()); } } /** * 根据客户通道移除客户端 * * @param ctx 上下文通道 */ public static void removeContextByContext(ChannelHandlerContext ctx) { try { /*获取*/ Session session = AttributeUtils.getSession(ctx.channel()); if (handleContext(session)) { log.error("=>客户端通道不存在!移除失败"); return; } sessionStore.cleanSession(session.getClientId()); session.setMqttMessageType(MqttMessageType.DISCONNECT); //发送至MQ,设备下线 DeviceStatusBo statusBo = MqttMessageUtils.buildStatusMsg(session.getHandlerContext(), session.getClientId(), DeviceStatus.OFFLINE, session.getIp()); if (!statusBo.getSerialNumber().startsWith(FastBeeConstant.SERVER.WM_PREFIX) && !statusBo.getSerialNumber().startsWith(FastBeeConstant.SERVER.WS_PREFIX)) { deviceCache.updateDeviceStatusCache(statusBo); remoteManager.pushDeviceStatus(-1L,statusBo.getSerialNumber(), statusBo.getStatus()); } } catch (Exception e) { log.error("=>移除客户端失败={}", e.getMessage()); } } /** * ping判定时间超时 * * @param clientId 客户id */ public static void pingTimeout(String clientId) { try { removeClient(clientId); } catch (Exception e) { throw new ServiceException("移除超时客户端失败"); } } /** * 根据clientId获取客户通道 * * @param clientId 客户端id * @return session */ public static Session getSession(String clientId) { return sessionStore.getSession(clientId); } /** * 校验Session已经注册通道 * * @param session 客户端 * @return 结果 */ private static boolean handleContext(Session session) { if (null == session || null == session.getHandlerContext()) { return true; } return false; } }