leutu
2024-06-03 3ef35e6cd16bbfa206b26bb3271eac40ad020bcb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package com.fastbee.mqtt.service.impl;
 
import com.fastbee.common.core.mq.DeviceReport;
import com.fastbee.common.core.mq.DeviceReportBo;
import com.fastbee.common.core.mq.message.DeviceData;
import com.fastbee.common.enums.ServerType;
import com.fastbee.common.enums.ThingsModelType;
import com.fastbee.common.enums.TopicType;
import com.fastbee.common.exception.ServiceException;
import com.fastbee.common.utils.gateway.mq.TopicsUtils;
import com.fastbee.iot.domain.Device;
import com.fastbee.iot.service.IDeviceService;
import com.fastbee.json.JsonProtocolService;
import com.fastbee.mq.model.ReportDataBo;
import com.fastbee.mq.service.IDataHandler;
import com.fastbee.mq.service.IDeviceReportMessageService;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
 
import javax.annotation.Resource;
import java.util.Optional;
 
/**
 * 处理类 处理设备主动上报和设备回调信息
 *
 * @author bill
 */
@Service
@Slf4j
public class DeviceReportMessageServiceImpl implements IDeviceReportMessageService {
 
    @Autowired
    private IDeviceService deviceService;
    @Autowired
    private JsonProtocolService jsonProtocolService;
    @Resource
    private TopicsUtils topicsUtils;
    @Resource
    private IDataHandler dataHandler;
 
 
    /**
     * 处理设备主动上报数据
     */
    @Override
    public void parseReportMsg(DeviceReportBo bo) {
        if (bo.getServerType() == ServerType.MQTT) {
            //构建消息
            Device report = buildReport(bo);
            /*获取协议处理器*/
            DeviceData data = DeviceData.builder()
                    .serialNumber(bo.getSerialNumber())
                    .topicName(bo.getTopicName())
                    .productId(report.getProductId())
                    .data(bo.getData())
                    .prop(bo.getProp())
                    .buf(Unpooled.wrappedBuffer(bo.getData()))
                    .build();
            /*根据协议解析后的数据*/
            DeviceReport reportMessage = jsonProtocolService.decode(data, null);
 
            reportMessage.setSerialNumber(bo.getSerialNumber());
            reportMessage.setProductId(bo.getProductId());
            reportMessage.setPlatformDate(bo.getPlatformDate());
            reportMessage.setServerType(bo.getServerType());
            reportMessage.setUserId(report.getUserId());
            reportMessage.setUserName(report.getUserName());
            reportMessage.setDeviceName(report.getDeviceName());
            processNoSub(reportMessage, bo.getTopicName());
        }
    }
 
    /**
     * 构建消息
     *
     * @param bo
     */
    @Override
    public Device buildReport(DeviceReportBo bo) {
        String serialNumber = topicsUtils.parseSerialNumber(bo.getTopicName());
        Device device = deviceService.selectDeviceBySerialNumber(serialNumber);
        Optional.ofNullable(device).orElseThrow(() -> new ServiceException("设备不存在"));
        //设置物模型
        String thingsModel = topicsUtils.getThingsModel(bo.getTopicName());
        ThingsModelType thingsModelType = ThingsModelType.getType(thingsModel);
        bo.setType(thingsModelType);
        //产品id
        bo.setProductId(device.getProductId());
        //设备编号
        bo.setSerialNumber(serialNumber);
        return device;
    }
 
 
    /**
     * 处理网关设备
     *
     * @param message
     * @param topicName
     */
    private void processNoSub(DeviceReport message, String topicName) {
        //处理设备上报数据
        handlerReportMessage(message, topicName);
    }
 
 
    /**
     * 处理设备主动上报属性
     *
     * @param topicName
     * @param message
     */
    public void handlerReportMessage(DeviceReport message, String topicName) {
 
        if (message.getServerType().equals(ServerType.MQTT)){
            //处理topic以prop结尾上报的数据 (属性)
            if (message.getServerType().equals(ServerType.MQTT)) {
                if (!topicName.endsWith(TopicType.PROPERTY_POST.getTopicSuffix())
                        && !topicName.endsWith(TopicType.PROPERTY_POST_SIMULATE.getTopicSuffix())) {
                    return;
                }
            }
        }
 
        ReportDataBo report = new ReportDataBo();
        report.setSerialNumber(message.getSerialNumber());
        report.setProductId(message.getProductId());
        report.setDataList(message.getValuesInput().getThingsModelValueRemarkItem());
        report.setType(1);
        report.setSlaveId(message.getSlaveId());
        report.setUserId(message.getUserId());
        report.setUserName(message.getUserName());
        report.setDeviceName(message.getDeviceName());
        //属性上报执行规则引擎
        report.setRuleEngine(true);
        dataHandler.reportData(report);
    }
 
}