package com.fastbee.mqtt.handler; import com.fastbee.mqtt.annotation.Process; import com.fastbee.mqtt.handler.adapter.MqttHandler; import com.fastbee.mqtt.manager.ClientManager; import com.fastbee.mqtt.manager.ResponseManager; import com.fastbee.mqtt.service.IMessageStore; import com.fastbee.base.session.Session; import com.fastbee.base.util.AttributeUtils; import com.fastbee.mqtt.utils.MqttMessageUtils; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; import io.netty.handler.codec.mqtt.MqttMessageType; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; /** * 消息等级=Qos2 发布消息收到 交付第一步 * * @author bill */ @Process(type = MqttMessageType.PUBREC) @Slf4j public class MqttPubRec implements MqttHandler { @Autowired private IMessageStore messageStore; @Override public void handler(ChannelHandlerContext ctx, MqttMessage message) { MqttMessageIdVariableHeader variableHeader = MqttMessageUtils.getIdVariableHeader(message); //clientId String clientId = AttributeUtils.getClientId(ctx.channel()); Session session = AttributeUtils.getSession(ctx.channel()); //获取packetId int messageId = variableHeader.messageId(); /*释放消息*/ messageStore.removePubMsg(messageId); messageStore.saveRelOutMsg(messageId); // 回复REL 进入第二阶段 MqttMessage mqttMessage = MqttMessageUtils.buildPubRelMessage(message); ResponseManager.responseMessage(session, mqttMessage, true); /*更新平台ping*/ ClientManager.updatePing(session.getClientId()); } }