package com.fastbee.mqtt.handler; import com.fastbee.mqtt.annotation.Process; import com.fastbee.mqtt.handler.adapter.MqttHandler; import com.fastbee.mqtt.manager.ClientManager; import com.fastbee.mqtt.service.IMessageStore; import com.fastbee.base.session.Session; import com.fastbee.base.util.AttributeUtils; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttPubAckMessage; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; /** * 消息等级=Qos1,收到发布消息确认 * * @author bill */ @Process(type = MqttMessageType.PUBACK) @Slf4j public class MqttPubAck implements MqttHandler { @Autowired private IMessageStore messageStore; @Override public void handler(ChannelHandlerContext ctx, MqttMessage message) { MqttPubAckMessage ackMessage = (MqttPubAckMessage) message; // PacketId int packetId = ackMessage.variableHeader().messageId(); Session session = AttributeUtils.getSession(ctx.channel()); // Qos1 的存储消息释放 messageStore.removePubMsg(packetId); /*更新平台ping*/ ClientManager.updatePing(session.getClientId()); } }