package com.fastbee.mqtt.handler; import com.alibaba.fastjson2.JSON; import com.fastbee.common.constant.FastBeeConstant; import com.fastbee.common.core.redis.RedisCache; import com.fastbee.mqtt.annotation.Process; import com.fastbee.mqtt.model.ClientMessage; import com.fastbee.base.session.Session; import com.fastbee.base.util.AttributeUtils; import com.fastbee.mqtt.utils.MqttMessageUtils; import com.fastbee.mqtt.model.RetainMessage; import com.fastbee.mqtt.handler.adapter.MqttHandler; import com.fastbee.mqtt.manager.ClientManager; import com.fastbee.mqtt.manager.ResponseManager; import com.fastbee.mqtt.manager.RetainMsgManager; import com.fastbee.common.utils.gateway.mq.TopicsUtils; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.mqtt.*; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import javax.annotation.Resource; import java.util.List; import java.util.stream.Collectors; @Slf4j @Process(type = MqttMessageType.SUBSCRIBE) public class MqttSubscribe implements MqttHandler { @Resource private RedisCache redisCache; @Override public void handler(ChannelHandlerContext ctx, MqttMessage message) { subscribe(ctx, (MqttSubscribeMessage) message); } public void subscribe(ChannelHandlerContext ctx, MqttSubscribeMessage message) { /*获取session*/ Session session = AttributeUtils.getSession(ctx.channel()); /*获取客户端订阅的topic列表*/ List topList = message.payload().topicSubscriptions(); /*获取topicName列表*/ List topicNameList = topList.stream().map(MqttTopicSubscription::topicName).collect(Collectors.toList()); log.debug("=>客户端:{},订阅主题:{}", session.getClientId(), JSON.toJSONString(topicNameList)); if (!TopicsUtils.validTopicFilter(topicNameList)) { log.error("=>订阅主题不合法:{}", JSON.toJSONString(topicNameList)); return; } /*存储到本地topic缓存*/ topicNameList.forEach(topicName -> { ClientManager.push(topicName, session); /*累计订阅数*/ redisCache.incr2(FastBeeConstant.REDIS.MESSAGE_SUBSCRIBE_TOTAL,-1L); }); /*更新客户端ping*/ ClientManager.updatePing(session.getClientId()); /*应答客户端订阅成功*/ MqttSubAckMessage subAckMessage = MqttMessageUtils.buildSubAckMessage(message); ResponseManager.responseMessage(session, subAckMessage, true); /*客户端订阅了遗留消息主题后,推送遗留消息给客户端*/ topList.forEach(topic -> { retain(topic.topicName(), session, topic.qualityOfService()); }); } /** * 推送遗留消息 * * @param topicName 主题 * @param session 客户端 * @param mqttQoS 消息质量 */ @SneakyThrows private void retain(String topicName, Session session, MqttQoS mqttQoS) { RetainMessage message = RetainMsgManager.getRetain(topicName); if (null == message) { return; } MqttQoS qos = message.getQos() > mqttQoS.value() ? mqttQoS : MqttQoS.valueOf(message.getQos()); switch (qos.value()) { case 0: buildMessage(qos, topicName, 0, message.getMessage(), session); break; case 1: case 2: /*使用实时时间戳充当 packId*/ buildMessage(qos, topicName, (int) System.currentTimeMillis(), message.getMessage(), session); break; } } /*组装推送数据*/ private void buildMessage(MqttQoS qos, String topicName, int packetId, byte[] message, Session session) { /*生成客户端model*/ ClientMessage clientMessage = ClientMessage.of(qos, topicName, false, message); /*组建推送消息*/ MqttPublishMessage publishMessage = MqttMessageUtils.buildPublishMessage(clientMessage, packetId); /*推送消息*/ ResponseManager.publishClients(publishMessage, session); } }