package com.fastbee.mqtt.service.impl; import com.fastbee.common.core.mq.DeviceReport; import com.fastbee.common.core.mq.DeviceReportBo; import com.fastbee.common.core.mq.message.DeviceData; import com.fastbee.common.enums.ServerType; import com.fastbee.common.enums.ThingsModelType; import com.fastbee.common.enums.TopicType; import com.fastbee.common.exception.ServiceException; import com.fastbee.common.utils.gateway.mq.TopicsUtils; import com.fastbee.iot.domain.Device; import com.fastbee.iot.service.IDeviceService; import com.fastbee.json.JsonProtocolService; import com.fastbee.mq.model.ReportDataBo; import com.fastbee.mq.service.IDataHandler; import com.fastbee.mq.service.IDeviceReportMessageService; import io.netty.buffer.Unpooled; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.Optional; /** * 处理类 处理设备主动上报和设备回调信息 * * @author bill */ @Service @Slf4j public class DeviceReportMessageServiceImpl implements IDeviceReportMessageService { @Autowired private IDeviceService deviceService; @Autowired private JsonProtocolService jsonProtocolService; @Resource private TopicsUtils topicsUtils; @Resource private IDataHandler dataHandler; /** * 处理设备主动上报数据 */ @Override public void parseReportMsg(DeviceReportBo bo) { if (bo.getServerType() == ServerType.MQTT) { //构建消息 Device report = buildReport(bo); /*获取协议处理器*/ DeviceData data = DeviceData.builder() .serialNumber(bo.getSerialNumber()) .topicName(bo.getTopicName()) .productId(report.getProductId()) .data(bo.getData()) .prop(bo.getProp()) .buf(Unpooled.wrappedBuffer(bo.getData())) .build(); /*根据协议解析后的数据*/ DeviceReport reportMessage = jsonProtocolService.decode(data, null); reportMessage.setSerialNumber(bo.getSerialNumber()); reportMessage.setProductId(bo.getProductId()); reportMessage.setPlatformDate(bo.getPlatformDate()); reportMessage.setServerType(bo.getServerType()); reportMessage.setUserId(report.getUserId()); reportMessage.setUserName(report.getUserName()); reportMessage.setDeviceName(report.getDeviceName()); processNoSub(reportMessage, bo.getTopicName()); } } /** * 构建消息 * * @param bo */ @Override public Device buildReport(DeviceReportBo bo) { String serialNumber = topicsUtils.parseSerialNumber(bo.getTopicName()); Device device = deviceService.selectDeviceBySerialNumber(serialNumber); Optional.ofNullable(device).orElseThrow(() -> new ServiceException("设备不存在")); //设置物模型 String thingsModel = topicsUtils.getThingsModel(bo.getTopicName()); ThingsModelType thingsModelType = ThingsModelType.getType(thingsModel); bo.setType(thingsModelType); //产品id bo.setProductId(device.getProductId()); //设备编号 bo.setSerialNumber(serialNumber); return device; } /** * 处理网关设备 * * @param message * @param topicName */ private void processNoSub(DeviceReport message, String topicName) { //处理设备上报数据 handlerReportMessage(message, topicName); } /** * 处理设备主动上报属性 * * @param topicName * @param message */ public void handlerReportMessage(DeviceReport message, String topicName) { if (message.getServerType().equals(ServerType.MQTT)){ //处理topic以prop结尾上报的数据 (属性) if (message.getServerType().equals(ServerType.MQTT)) { if (!topicName.endsWith(TopicType.PROPERTY_POST.getTopicSuffix()) && !topicName.endsWith(TopicType.PROPERTY_POST_SIMULATE.getTopicSuffix())) { return; } } } ReportDataBo report = new ReportDataBo(); report.setSerialNumber(message.getSerialNumber()); report.setProductId(message.getProductId()); report.setDataList(message.getValuesInput().getThingsModelValueRemarkItem()); report.setType(1); report.setSlaveId(message.getSlaveId()); report.setUserId(message.getUserId()); report.setUserName(message.getUserName()); report.setDeviceName(message.getDeviceName()); //属性上报执行规则引擎 report.setRuleEngine(true); dataHandler.reportData(report); } }