package com.fastbee.mqtt.handler.adapter; import com.fastbee.common.exception.iot.MqttAuthorizationException; import com.fastbee.common.exception.iot.MqttClientUserNameOrPassException; import com.fastbee.mqtt.manager.SessionManger; import com.fastbee.base.util.AttributeUtils; import com.fastbee.mqtt.utils.MqttMessageUtils; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.mqtt.*; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.io.IOException; import java.net.InetSocketAddress; import java.util.Optional; /** * @author gsb * @date 2022/9/15 10:34 */ @Slf4j @Component @ChannelHandler.Sharable public class MqttMessageAdapter extends SimpleChannelInboundHandler { // @Autowired private MqttMessageDelegate messageDelegate; public MqttMessageAdapter(MqttMessageDelegate delegate) { this.messageDelegate = delegate; } /** * 客户端上报消息处理 * * @param context 上下文 * @param message 消息 */ @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored") protected void channelRead0(ChannelHandlerContext context, MqttMessage message) { try { /*校验消息*/ if (message.decoderResult().isFailure()) { exceptionCaught(context, message.decoderResult().cause()); return; } messageDelegate.process(context, message); }catch (Exception e){ log.error("=>数据进栈异常",e); } } /** * 客户端心跳处理 */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress(); String host = socketAddress.getAddress().getHostAddress(); int port = socketAddress.getPort(); String clientId = AttributeUtils.getClientId(ctx.channel()); if (evt instanceof IdleStateEvent) { IdleStateEvent idleStateEvent = (IdleStateEvent) evt; IdleState state = idleStateEvent.state(); if (state == IdleState.ALL_IDLE || state == IdleState.READER_IDLE || state == IdleState.WRITER_IDLE) { log.error("客户端id[{}] 客户端[{}]port:[{}]心跳超时!",clientId, host, port); /*关闭通道*/ ctx.close(); } } else { super.userEventTriggered(ctx, evt); } } /** * 处理消息异常情况 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { log.error("=>mqtt连接异常",cause); } }