package com.fastbee.mq.mqttClient; import com.fastbee.common.constant.FastBeeConstant; import com.fastbee.common.core.mq.DeviceReportBo; import com.fastbee.common.enums.ServerType; import com.fastbee.common.utils.DateUtils; import com.fastbee.common.utils.gateway.mq.TopicsPost; import com.fastbee.common.utils.gateway.mq.TopicsUtils; import com.fastbee.mq.redischannel.producer.MessageProducer; import com.fastbee.mq.service.IDeviceReportMessageService; import com.fastbee.mq.service.IMessagePublishService; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttAsyncClient; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Arrays; @Component @Slf4j public class MqttService { @Resource private TopicsUtils topicsUtils; @Resource private IDeviceReportMessageService deviceReportMessageService; public void subscribe(MqttAsyncClient client) throws MqttException { TopicsPost allPost = topicsUtils.getAllPost(); client.subscribe(allPost.getTopics(), allPost.getQos()); log.info("mqtt监控主题,{}", Arrays.asList(allPost.getTopics())); } /** * 消息回调方法 * * @param topic 主题 * @param mqttMessage 消息体 */ public void subscribeCallback(String topic, MqttMessage mqttMessage) { String message = new String(mqttMessage.getPayload()); log.info("接收消息主题 : " + topic); log.info("接收消息Qos : " + mqttMessage.getQos()); log.info("接收消息内容 : " + message); String serialNumber = topicsUtils.parseSerialNumber(topic); Long productId = topicsUtils.parseProductId(topic); String name = topicsUtils.parseTopicName(topic); DeviceReportBo reportBo = DeviceReportBo.builder() .serialNumber(serialNumber) .productId(productId) .data(mqttMessage.getPayload()) .platformDate(DateUtils.getNowDate()) .topicName(topic) .serverType(ServerType.MQTT) .build(); if (name.startsWith("property")) { deviceReportMessageService.parseReportMsg(reportBo); } } }