package com.fastbee.mqtt.manager;
|
|
import com.alibaba.fastjson2.JSON;
|
import com.fastbee.common.enums.DeviceStatus;
|
import com.fastbee.common.enums.TopicType;
|
import com.fastbee.common.utils.gateway.mq.TopicsUtils;
|
import com.fastbee.iot.domain.Device;
|
import com.fastbee.iot.service.IDeviceService;
|
import com.fastbee.mq.mqttClient.PubMqttClient;
|
import com.fastbee.mqtt.model.PushMessageBo;
|
import io.netty.buffer.Unpooled;
|
import io.netty.handler.codec.mqtt.*;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.stereotype.Component;
|
|
import javax.annotation.Resource;
|
import java.nio.charset.StandardCharsets;
|
|
|
@Component
|
public class MqttRemoteManager {
|
|
@Resource
|
private TopicsUtils topicsUtils;
|
@Resource
|
private IDeviceService deviceService;
|
/**
|
* true: 使用netty搭建的mqttBroker false: 使用emq
|
*/
|
@Value("${server.broker.enabled}")
|
private Boolean enabled;
|
|
@Resource
|
private PubMqttClient pubMqttClient;
|
|
/**
|
* 推送设备状态
|
* @param serialNumber 设备
|
* @param status 状态
|
*/
|
public void pushDeviceStatus(Long productId, String serialNumber, DeviceStatus status){
|
//兼容emqx推送TCP客户端上线
|
Device device = deviceService.selectDeviceNoModel(serialNumber);
|
String message = "{\"status\":" + status.getType() + ",\"isShadow\":" + device.getIsShadow() + ",\"rssi\":" + device.getRssi() + "}";
|
String topic = topicsUtils.buildTopic(device.getProductId(), serialNumber, TopicType.STATUS_POST);
|
if (enabled){
|
MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
|
new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_MOST_ONCE, false, 0),
|
new MqttPublishVariableHeader(topic, 0),
|
Unpooled.buffer().writeBytes(message.getBytes(StandardCharsets.UTF_8))
|
);
|
ClientManager.pubTopic(publishMessage);
|
}else {
|
//emqx直接用客户端推送
|
pubMqttClient.publish(1,false,topic,message);
|
}
|
|
}
|
|
/**
|
* 公共推送消息方法
|
* @param bo 消息体
|
*/
|
public void pushCommon(PushMessageBo bo){
|
//netty版本发送
|
if (enabled){
|
MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
|
new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_MOST_ONCE, false, 0),
|
new MqttPublishVariableHeader(bo.getTopic(), 0),
|
Unpooled.buffer().writeBytes(bo.getMessage().getBytes(StandardCharsets.UTF_8))
|
);
|
ClientManager.pubTopic(publishMessage);
|
}else {
|
pubMqttClient.publish(0,false,bo.getTopic(), bo.getMessage());
|
}
|
}
|
}
|