package com.fastbee.mqtt.handler;
|
|
import com.fastbee.mqtt.annotation.Process;
|
import com.fastbee.mqtt.handler.adapter.MqttHandler;
|
import com.fastbee.mqtt.manager.ClientManager;
|
import com.fastbee.mqtt.manager.ResponseManager;
|
import com.fastbee.mqtt.service.IMessageStore;
|
import com.fastbee.base.session.Session;
|
import com.fastbee.base.util.AttributeUtils;
|
import com.fastbee.mqtt.utils.MqttMessageUtils;
|
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.handler.codec.mqtt.MqttMessage;
|
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
|
import io.netty.handler.codec.mqtt.MqttMessageType;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
/**
|
* 消息等级=Qos2 发布消息释放 PUBREL
|
* @author bill
|
*/
|
@Slf4j
|
@Process(type = MqttMessageType.PUBREL)
|
public class MqttPubRel implements MqttHandler {
|
|
@Autowired
|
private IMessageStore messageStore;
|
|
@Override
|
public void handler(ChannelHandlerContext ctx, MqttMessage message){
|
MqttMessageIdVariableHeader variableHeader = MqttMessageUtils.getIdVariableHeader(message);
|
Session session = AttributeUtils.getSession(ctx.channel());
|
//获取packetId
|
int messageId = variableHeader.messageId();
|
messageStore.removeRelInMsg(messageId);
|
MqttMessage mqttMessage = MqttMessageUtils.buildPubCompMessage(message);
|
ResponseManager.responseMessage(session,mqttMessage,true);
|
/*更新本地ping时间*/
|
ClientManager.updatePing(session.getClientId());
|
}
|
}
|