package com.fastbee.mqtt.manager; import com.fastbee.base.session.Session; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.handler.codec.mqtt.*; import lombok.extern.slf4j.Slf4j; @Slf4j public class ResponseManager { /** * 发送信息:用于服务端收到消息客户端数据后,向客户端发送响应信息 * * @param session 上下文 * @param msg mqtt消息 * @param flush 是否刷新 */ public static void responseMessage(Session session, MqttMessage msg, boolean flush) { ChannelFuture future = flush ? session.getHandlerContext().writeAndFlush(msg) : session.getHandlerContext().write(msg); future.addListener(f -> { if (!f.isSuccess()) { log.error("=>响应设备[{}],发送消息:{},失败原因:{}", session.getClientId(), msg, f.cause()); }else { //log.debug("=>相应设备:[{}],发送消息:[{}]",session.getClientId(),msg); } }); } /** * 发送信息:用于服务端向客户端通过clientID下发消息(单客户端) * * @param msg mqtt消息 * @param clientId 客户端id * @param flush 是否刷新 */ public static void sendMessage(MqttMessage msg, String clientId, boolean flush) { Session session = SessionManger.getSession(clientId); if (session == null || null == session.getHandlerContext()) { return; } responseMessage(session, msg, flush); } /** * 推送消息给订阅客户端(所有订阅客户端) * * @param msg 推送消息 * @param session 客户端 */ public static void publishClients(MqttPublishMessage msg, Session session) { try { final Channel channel = session.getHandlerContext().channel(); MqttQoS qos = msg.fixedHeader().qosLevel(); ByteBuf sendBuf = msg.content().retainedDuplicate(); sendBuf.resetReaderIndex(); /*配置推送消息类型*/ MqttFixedHeader Header = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, msg.fixedHeader().isRetain(), 0); /*设置topic packetId*/ MqttPublishVariableHeader publishVariableHeader = new MqttPublishVariableHeader( msg.variableHeader().topicName(), msg.variableHeader().packetId()); /*推送消息*/ MqttPublishMessage publishMessage = new MqttPublishMessage(Header, publishVariableHeader, sendBuf); channel.writeAndFlush(publishMessage); } catch (Exception e) { log.error("=>发送消息异常 {}", msg, e); } } }