leutu
2024-06-03 3ef35e6cd16bbfa206b26bb3271eac40ad020bcb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
package com.fastbee.data.service.impl;
 
import com.fastbee.common.constant.ScheduleConstants;
import com.fastbee.common.exception.job.TaskException;
import com.fastbee.iot.domain.DeviceJob;
import com.fastbee.iot.mapper.DeviceJobMapper;
import com.fastbee.iot.service.IDeviceJobService;
import com.fastbee.iot.service.IDeviceService;
import com.fastbee.iot.service.cache.IDeviceCache;
import com.fastbee.data.quartz.CronUtils;
import com.fastbee.data.quartz.ScheduleUtils;
import com.fastbee.quartz.domain.SysJob;
import com.fastbee.quartz.mapper.SysJobMapper;
import lombok.extern.slf4j.Slf4j;
import org.quartz.JobDataMap;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
 
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.List;
 
/**
 * 定时任务调度信息 服务层
 * 
 * @author kerwincui
 */
@Service
@Slf4j
public class DeviceJobServiceImpl implements IDeviceJobService
{
    @Autowired
    private Scheduler scheduler;
 
    @Autowired
    private DeviceJobMapper jobMapper;
 
    @Autowired
    private SysJobMapper sysJobMapper;
 
 
    /**
     * 项目启动时,初始化定时器 主要是防止手动修改数据库导致未同步到定时任务处理(注:不能手动修改数据库ID和任务组名,否则会导致脏数据)
     */
    @PostConstruct
    public void init() throws SchedulerException, TaskException
    {
        scheduler.clear();
        // 设备定时任务
        List<DeviceJob> jobList = jobMapper.selectJobAll();
        for (DeviceJob deviceJob : jobList)
        {
            ScheduleUtils.createScheduleJob(scheduler, deviceJob);
        }
 
        // 系统定时任务
        List<SysJob> sysJobList = sysJobMapper.selectJobAll();
        for (SysJob job : sysJobList)
        {
            com.fastbee.quartz.util.ScheduleUtils.createScheduleJob(scheduler, job);
        }
    }
 
    /**
     * 获取quartz调度器的计划任务列表
     * 
     * @param job 调度信息
     * @return
     */
    @Override
    public List<DeviceJob> selectJobList(DeviceJob job)
    {
        return jobMapper.selectJobList(job);
    }
 
    /**
     * 通过调度任务ID查询调度信息
     * 
     * @param jobId 调度任务ID
     * @return 调度任务对象信息
     */
    @Override
    public DeviceJob selectJobById(Long jobId)
    {
        return jobMapper.selectJobById(jobId);
    }
 
    /**
     * 暂停任务
     * 
     * @param job 调度信息
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public int pauseJob(DeviceJob job) throws SchedulerException
    {
        Long jobId = job.getJobId();
        String jobGroup = job.getJobGroup();
        job.setStatus(ScheduleConstants.Status.PAUSE.getValue());
        int rows = jobMapper.updateJob(job);
        if (rows > 0)
        {
            scheduler.pauseJob(ScheduleUtils.getJobKey(jobId, jobGroup));
        }
        return rows;
    }
 
    /**
     * 恢复任务
     * 
     * @param job 调度信息
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public int resumeJob(DeviceJob job) throws SchedulerException
    {
        Long jobId = job.getJobId();
        String jobGroup = job.getJobGroup();
        job.setStatus(ScheduleConstants.Status.NORMAL.getValue());
        int rows = jobMapper.updateJob(job);
        if (rows > 0)
        {
            scheduler.resumeJob(ScheduleUtils.getJobKey(jobId, jobGroup));
        }
        return rows;
    }
 
    /**
     * 删除任务后,所对应的trigger也将被删除
     * 
     * @param job 调度信息
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public int deleteJob(DeviceJob job) throws SchedulerException
    {
        Long jobId = job.getJobId();
        String jobGroup = job.getJobGroup();
        int rows = jobMapper.deleteJobById(jobId);
        if (rows > 0)
        {
            scheduler.deleteJob(ScheduleUtils.getJobKey(jobId, jobGroup));
        }
        return rows;
    }
 
    /**
     * 批量删除调度信息
     * 
     * @param jobIds 需要删除的任务ID
     * @return 结果
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public void deleteJobByIds(Long[] jobIds) throws SchedulerException
    {
        for (Long jobId : jobIds)
        {
            DeviceJob job = jobMapper.selectJobById(jobId);
            deleteJob(job);
        }
    }
 
    /**
     * 根据设备Ids批量删除调度信息
     *
     * @param deviceIds 需要删除数据的设备Ids
     * @return 结果
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public void deleteJobByDeviceIds(Long[] deviceIds) throws SchedulerException
    {
        // 查出所有job
        List<DeviceJob> deviceJobs=jobMapper.selectShortJobListByDeviceIds(deviceIds);
        // 批量删除job
        int rows=jobMapper.deleteJobByDeviceIds(deviceIds);
        // 批量删除调度器
        for(DeviceJob job:deviceJobs){
            scheduler.deleteJob(ScheduleUtils.getJobKey(job.getJobId(), job.getJobGroup()));
        }
    }
 
    /**
     * 根据告警Ids批量删除调度信息
     *
     * @param alertIds 需要删除数据的告警Ids
     * @return 结果
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public void deleteJobByAlertIds(Long[] alertIds) throws SchedulerException
    {
        // 查出所有job
        List<DeviceJob> deviceJobs=jobMapper.selectShortJobListByAlertIds(alertIds);
        // 批量删除job
        int rows=jobMapper.deleteJobByAlertIds(alertIds);
        // 批量删除调度器
        for(DeviceJob job:deviceJobs){
            scheduler.deleteJob(ScheduleUtils.getJobKey(job.getJobId(), job.getJobGroup()));
        }
    }
 
    /**
     * 根据场景联动Ids批量删除调度信息
     *
     * @param sceneIds 需要删除数据的场景Ids
     * @return 结果
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public void deleteJobBySceneIds(Long[] sceneIds) throws SchedulerException
    {
        // 查出所有job
        List<DeviceJob> deviceJobs=jobMapper.selectShortJobListBySceneIds(sceneIds);
        // 批量删除job
        int rows=jobMapper.deleteJobBySceneIds(sceneIds);
        // 批量删除调度器
        for(DeviceJob job:deviceJobs){
            scheduler.deleteJob(ScheduleUtils.getJobKey(job.getJobId(), job.getJobGroup()));
        }
    }
 
    /**
     * 任务调度状态修改
     * 
     * @param job 调度信息
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public int changeStatus(DeviceJob job) throws SchedulerException
    {
        int rows = 0;
        String status = job.getStatus();
        if (ScheduleConstants.Status.NORMAL.getValue().equals(status))
        {
            rows = resumeJob(job);
        }
        else if (ScheduleConstants.Status.PAUSE.getValue().equals(status))
        {
            rows = pauseJob(job);
        }
        return rows;
    }
 
    /**
     * 立即运行任务
     * 
     * @param job 调度信息
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public void run(DeviceJob job) throws SchedulerException
    {
        Long jobId = job.getJobId();
        String jobGroup = job.getJobGroup();
        DeviceJob properties = selectJobById(job.getJobId());
        // 参数
        JobDataMap dataMap = new JobDataMap();
        dataMap.put(ScheduleConstants.TASK_PROPERTIES, properties);
        scheduler.triggerJob(ScheduleUtils.getJobKey(jobId, jobGroup), dataMap);
    }
 
    /**
     * 新增任务
     * 
     * @param deviceJob 调度信息 调度信息
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public int insertJob(DeviceJob deviceJob) throws SchedulerException, TaskException
    {
        int rows = jobMapper.insertJob(deviceJob);
        if (rows > 0)
        {
            ScheduleUtils.createScheduleJob(scheduler, deviceJob);
        }
        return rows;
    }
 
    /**
     * 更新任务的时间表达式
     * 
     * @param deviceJob 调度信息
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public int updateJob(DeviceJob deviceJob) throws SchedulerException, TaskException
    {
        DeviceJob properties = selectJobById(deviceJob.getJobId());
        int rows = jobMapper.updateJob(deviceJob);
        if (rows > 0)
        {
            updateSchedulerJob(deviceJob, properties.getJobGroup());
        }
        return rows;
    }
 
    /**
     * 更新任务
     * 
     * @param deviceJob 任务对象
     * @param jobGroup 任务组名
     */
    public void updateSchedulerJob(DeviceJob deviceJob, String jobGroup) throws SchedulerException, TaskException
    {
        Long jobId = deviceJob.getJobId();
        // 判断是否存在
        JobKey jobKey = ScheduleUtils.getJobKey(jobId, jobGroup);
        if (scheduler.checkExists(jobKey))
        {
            // 防止创建时存在数据问题 先移除,然后在执行创建操作
            scheduler.deleteJob(jobKey);
        }
        ScheduleUtils.createScheduleJob(scheduler, deviceJob);
    }
 
    /**
     * 校验cron表达式是否有效
     * 
     * @param cronExpression 表达式
     * @return 结果
     */
    @Override
    public boolean checkCronExpressionIsValid(String cronExpression)
    {
        return CronUtils.isValid(cronExpression);
    }
}