package com.fastbee.base.session; import com.fastbee.common.core.protocol.Message; import com.fastbee.common.enums.ServerType; import com.fastbee.common.exception.ServiceException; import com.fastbee.common.utils.DateUtils; import com.fastbee.common.utils.gateway.mq.Topics; import com.fastbee.base.core.model.Response; import com.fasterxml.jackson.annotation.JsonFormat; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttVersion; import lombok.Data; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; import java.net.InetSocketAddress; import java.util.*; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; /** * 会话 * * @Author guanshubiao * @Date 2022/9/12 20:22 */ @Data @Slf4j public class Session { private boolean udp; private Function remover; protected Channel channel; /** * 客户端id */ private String clientId; private String productId; private SessionManager sessionManager; private InetSocketAddress remoteAddress; private String remoteAddressStr; private long creationTime; private long lastAccessTime; private Map attributes; private String sessionId; //原子计数器,报文没有消息流水号的,充当流水号 private AtomicInteger serialNo = new AtomicInteger(0); private int keepAlive = 60; /*mqtt版本号*/ private MqttVersion version; /*是否清楚会话*/ private Boolean cleanSession = false; /*mqtt账号*/ private String username; /*是否链接*/ private Boolean connected = false; /*mqtt消息类型*/ private MqttMessageType mqttMessageType; private int keepAliveMax = 120; /*主题*/ private String topicName; /*Channel处理类上下文*/ private ChannelHandlerContext handlerContext; private List topics; private Integer topicCount; @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") private Date connected_at; private String ip; /*服务协议类型 MQTT,TCP UDP COAP*/ private ServerType serverType; public Session(){ } public Session(SessionManager sessionManager, Channel channel, InetSocketAddress remoteAddress, Function remover, boolean udp, ServerType serverType) { this.channel = channel; this.creationTime = DateUtils.getTimestamp(); this.lastAccessTime = creationTime; this.sessionManager = sessionManager; this.remoteAddress = remoteAddress; this.remoteAddressStr = remoteAddress.toString(); this.remover = remover; this.udp = udp; this.serverType = serverType; this.ip = remoteAddress.getHostName(); this.connected = true; this.connected_at = DateUtils.getNowDate(); if (sessionManager != null && sessionManager.getSessionKeys() != null) { this.attributes = new EnumMap(sessionManager.getSessionKeys()); } else { this.attributes = new TreeMap<>(); } } /** * 判断设备是否已经注册上线 */ public boolean isRegistered() { return clientId != null; } /*设备端注册*/ public void register(Message message) { register(message.getClientId(), message); } public void register(String clientId, Message message) { //已经注册,不再发送注册包数据 if (isRegistered()){ return; } if (clientId == null) { throw new ServiceException("客户端注册clientId不能为空"); } this.clientId = clientId.toUpperCase(); if (sessionManager != null) { sessionManager.add(this); } } public Object getAttribute(Object name) { return attributes.get(name); } public void setAttribute(Object name, Object value) { attributes.put(name, value); } /** * 获取最后上线时间 */ public long access() { return lastAccessTime = DateUtils.getTimestamp(); } /** * 获取远程端口 */ public InetSocketAddress remoteAddress() { return remoteAddress; } /** * 获取流水号 */ public int nextSerialNO() { int current; int next; do { current = serialNo.get(); next = current > 0xffff ? 0 : current; } while (!serialNo.compareAndSet(current, next + 1)); return next; } /** * 处理离线方法 */ public void invalidate() { if (isRegistered() && sessionManager != null) { sessionManager.remove(this); } remover.apply(this); } public boolean isUdp() { return udp; } private final Map topicSubscribers = new HashMap<>(); private static final Mono rejected = Mono.error(new RejectedExecutionException("设备端暂无响应")); /** * 异步发送通知类消息 * 同步发送 mono.block() * 订阅回调 mono.doOnSuccess(处理成功).doOnError(处理异常).subscribe(开始订阅) */ public Mono notify(Message message) { return mono(channel.writeAndFlush(Packet.of(this, message))); } public Mono notify(ByteBuf message) { return mono(channel.writeAndFlush(Packet.of(this, message))); } public static Mono mono(ChannelFuture channelFuture) { return Mono.create(sink -> channelFuture.addListener(future -> { if (future.isSuccess()) { sink.success(); } else { sink.error(future.cause()); } })); } /** * 异步发送消息,接收响应 * 同步接收 mono.block() * 订阅回调 mono.doOnSuccess(处理成功).doOnError(处理异常).subscribe(开始订阅) */ public Mono request(Message message, Class responseClass) { String key = requestKey(message, responseClass); Mono subscribe = this.subscribe(key); if (subscribe == null) { return rejected; } ChannelFuture channelFuture = channel.writeAndFlush(Packet.of(this, message)); return Mono.create(sink -> channelFuture.addListener(future -> { if (future.isSuccess()) { sink.success(future); } else { sink.error(future.cause()); } })).then(subscribe).doFinally(signal -> unsubscribe(key)); } /** * 消息响应 */ public boolean response(Message message){ MonoSink monoSink = topicSubscribers.get(responseKey(message)); if (monoSink != null){ monoSink.success(message); return true; } return false; } /** * 订阅 */ private Mono subscribe(String key) { synchronized (topicSubscribers) { if (!topicSubscribers.containsKey(key)) { return Mono.create(sink -> topicSubscribers.put(key, sink)); } } return null; } /** * 取消订阅 */ private void unsubscribe(String key) { topicSubscribers.remove(key); } /*生成流水号*/ private static String requestKey(Message request, Class responseClass) { String className = responseClass.getName(); if (Response.class.isAssignableFrom(responseClass)) { String serNo = request.getSerNo(); return new StringBuffer(34).append(className).append('.').append(serNo).toString(); } return className; } /*返回流水号*/ private static String responseKey(Object response) { String className = response.getClass().getName(); if (response instanceof Response) { int serialNo = ((Response) response).getResponseSerialNo(); return new StringBuffer(34).append(className).append('.').append(serialNo).toString(); } return className; } }