leutu
2024-06-03 3ef35e6cd16bbfa206b26bb3271eac40ad020bcb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package com.fastbee.mqtt.handler;
 
import com.fastbee.common.constant.FastBeeConstant;
import com.fastbee.common.core.redis.RedisCache;
import com.fastbee.common.enums.ServerType;
import com.fastbee.common.utils.DateUtils;
import com.fastbee.mqtt.annotation.Process;
import com.fastbee.mqtt.auth.AuthService;
import com.fastbee.mqtt.handler.adapter.MqttHandler;
import com.fastbee.mqtt.manager.ResponseManager;
import com.fastbee.mqtt.manager.SessionManger;
import com.fastbee.mqtt.manager.WillMessageManager;
import com.fastbee.mqtt.model.WillMessage;
import com.fastbee.base.session.Session;
import com.fastbee.base.util.AttributeUtils;
import com.fastbee.mqtt.utils.MqttMessageUtils;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.*;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
 
import javax.annotation.Resource;
import java.net.InetSocketAddress;
 
 
@Slf4j
@Process(type = MqttMessageType.CONNECT)
public class MqttConnect implements MqttHandler {
 
    @Autowired
    private AuthService authService;
    @Resource
    private RedisCache redisCache;
 
    @Override
    public void handler(ChannelHandlerContext ctx, MqttMessage message) {
        MqttConnectMessage connectMessage = (MqttConnectMessage) message;
        /*获取客户端Id*/
        String clientId = connectMessage.payload().clientIdentifier();
        if (clientId.contains("&")){
            clientId =  clientId.split("&")[1];
        }
        log.debug("=>客户端:{} 连接:{}", clientId, message);
        /*获取session*/
        Session session = new Session();
        /*mqtt版本*/
        MqttVersion version = MqttVersion.fromProtocolNameAndLevel(connectMessage.variableHeader().name(), (byte) connectMessage.variableHeader().version());
        boolean cleanSession = connectMessage.variableHeader().isCleanSession();
        session.setHandlerContext(ctx);
        session.setVersion(version);
        session.setClientId(clientId);
        session.setCleanSession(cleanSession);
        session.setUsername(connectMessage.payload().userName());
        session.setConnected_at(DateUtils.getNowDate());
        InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
        session.setIp(socketAddress.getAddress().getHostAddress());
        session.setServerType(ServerType.MQTT);
        /*设置客户端ping时间*/
        MqttConnectVariableHeader header = connectMessage.variableHeader();
        if (header.keepAliveTimeSeconds() > 0 && session.getKeepAliveMax() >= header.keepAliveTimeSeconds()) {
            session.setKeepAlive(header.keepAliveTimeSeconds());
        }
        /*mqtt客户端登录校验*/
        if (!check(session, connectMessage)) {
            log.error("=>客户端:{},连接异常", clientId);
            session.getHandlerContext().close();
            return;
        }
 
        /*保存ClientId 和 session 到Attribute*/
        AttributeUtils.setClientId(ctx.channel(), clientId);
        AttributeUtils.setSession(ctx.channel(), session);
        SessionManger.removeClient(clientId);
        session.setConnected(true);
 
        SessionManger.buildSession(clientId, session);
        handleWill(connectMessage);
        ResponseManager.responseMessage(session,
                MqttMessageFactory.newMessage(
                        new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0x02),
                        new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, false),
                        null),
                true
        );
    }
 
    /**
     * 遗嘱消息处理
     *
     * @param message 连接消息
     */
    private void handleWill(MqttConnectMessage message) {
        /*如果没有设置处理遗嘱消息,返回*/
        if (!message.variableHeader().isWillFlag()) {
            return;
        }
        /*生成客户端model*/
        MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
                new MqttFixedHeader(MqttMessageType.PUBLISH, false,
                        MqttQoS.valueOf(message.variableHeader().willQos()),
                        message.variableHeader().isWillRetain(), 0),
                new MqttPublishVariableHeader(message.payload().willTopic(), 0),
                Unpooled.buffer().writeBytes(message.payload().willMessageInBytes()));
 
        WillMessage msg = new WillMessage(message.payload().clientIdentifier(),
                message.variableHeader().isCleanSession(), message.payload().willTopic(), publishMessage);
        WillMessageManager.push(msg);
    }
 
    /**
     * 客户端连接校验
     *
     * @param session 客户端
     * @param message 连接消息
     * @return 结果
     */
    private boolean check(Session session, MqttConnectMessage message) {
        /*获取客户端连接地址*/
        InetSocketAddress address = (InetSocketAddress) session.getHandlerContext().channel().remoteAddress();
        String host = address.getAddress().getHostAddress();
        /*webSocket客户端 系统内部客户端不校验*/
        String clientId = message.payload().clientIdentifier();
        /*根据用户名,密码校验*/
        String username = message.payload().userName();
        String password = message.payload().passwordInBytes() == null ? null : new String(message.payload().passwordInBytes(), CharsetUtil.UTF_8);
        /*验证失败,应答客户端*/
        if (!authService.auth(clientId, username, password)) {
            MqttConnAckMessage connAckMessage = MqttMessageUtils.buildConntAckMessage(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false);
            ResponseManager.responseMessage(session, connAckMessage, true);
            return false;
        }
        return true;
    }
 
}