package com.fastbee.mqtt.handler; import com.fastbee.common.constant.FastBeeConstant; import com.fastbee.common.core.redis.RedisCache; import com.fastbee.common.enums.ServerType; import com.fastbee.common.utils.DateUtils; import com.fastbee.mqtt.annotation.Process; import com.fastbee.mqtt.auth.AuthService; import com.fastbee.mqtt.handler.adapter.MqttHandler; import com.fastbee.mqtt.manager.ResponseManager; import com.fastbee.mqtt.manager.SessionManger; import com.fastbee.mqtt.manager.WillMessageManager; import com.fastbee.mqtt.model.WillMessage; import com.fastbee.base.session.Session; import com.fastbee.base.util.AttributeUtils; import com.fastbee.mqtt.utils.MqttMessageUtils; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.mqtt.*; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import javax.annotation.Resource; import java.net.InetSocketAddress; @Slf4j @Process(type = MqttMessageType.CONNECT) public class MqttConnect implements MqttHandler { @Autowired private AuthService authService; @Resource private RedisCache redisCache; @Override public void handler(ChannelHandlerContext ctx, MqttMessage message) { MqttConnectMessage connectMessage = (MqttConnectMessage) message; /*获取客户端Id*/ String clientId = connectMessage.payload().clientIdentifier(); if (clientId.contains("&")){ clientId = clientId.split("&")[1]; } log.debug("=>客户端:{} 连接:{}", clientId, message); /*获取session*/ Session session = new Session(); /*mqtt版本*/ MqttVersion version = MqttVersion.fromProtocolNameAndLevel(connectMessage.variableHeader().name(), (byte) connectMessage.variableHeader().version()); boolean cleanSession = connectMessage.variableHeader().isCleanSession(); session.setHandlerContext(ctx); session.setVersion(version); session.setClientId(clientId); session.setCleanSession(cleanSession); session.setUsername(connectMessage.payload().userName()); session.setConnected_at(DateUtils.getNowDate()); InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress(); session.setIp(socketAddress.getAddress().getHostAddress()); session.setServerType(ServerType.MQTT); /*设置客户端ping时间*/ MqttConnectVariableHeader header = connectMessage.variableHeader(); if (header.keepAliveTimeSeconds() > 0 && session.getKeepAliveMax() >= header.keepAliveTimeSeconds()) { session.setKeepAlive(header.keepAliveTimeSeconds()); } /*mqtt客户端登录校验*/ if (!check(session, connectMessage)) { log.error("=>客户端:{},连接异常", clientId); session.getHandlerContext().close(); return; } /*保存ClientId 和 session 到Attribute*/ AttributeUtils.setClientId(ctx.channel(), clientId); AttributeUtils.setSession(ctx.channel(), session); SessionManger.removeClient(clientId); session.setConnected(true); SessionManger.buildSession(clientId, session); handleWill(connectMessage); ResponseManager.responseMessage(session, MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0x02), new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, false), null), true ); } /** * 遗嘱消息处理 * * @param message 连接消息 */ private void handleWill(MqttConnectMessage message) { /*如果没有设置处理遗嘱消息,返回*/ if (!message.variableHeader().isWillFlag()) { return; } /*生成客户端model*/ MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.valueOf(message.variableHeader().willQos()), message.variableHeader().isWillRetain(), 0), new MqttPublishVariableHeader(message.payload().willTopic(), 0), Unpooled.buffer().writeBytes(message.payload().willMessageInBytes())); WillMessage msg = new WillMessage(message.payload().clientIdentifier(), message.variableHeader().isCleanSession(), message.payload().willTopic(), publishMessage); WillMessageManager.push(msg); } /** * 客户端连接校验 * * @param session 客户端 * @param message 连接消息 * @return 结果 */ private boolean check(Session session, MqttConnectMessage message) { /*获取客户端连接地址*/ InetSocketAddress address = (InetSocketAddress) session.getHandlerContext().channel().remoteAddress(); String host = address.getAddress().getHostAddress(); /*webSocket客户端 系统内部客户端不校验*/ String clientId = message.payload().clientIdentifier(); /*根据用户名,密码校验*/ String username = message.payload().userName(); String password = message.payload().passwordInBytes() == null ? null : new String(message.payload().passwordInBytes(), CharsetUtil.UTF_8); /*验证失败,应答客户端*/ if (!authService.auth(clientId, username, password)) { MqttConnAckMessage connAckMessage = MqttMessageUtils.buildConntAckMessage(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false); ResponseManager.responseMessage(session, connAckMessage, true); return false; } return true; } }