feat(backend): Quartz框架初步实现

This commit is contained in:
junjie 2021-03-05 14:19:23 +08:00
parent 7bb6a63ab2
commit 4c75b5b410
9 changed files with 221 additions and 67 deletions

View File

@ -18,7 +18,7 @@ public class DataSetTableTaskController {
private DataSetTableTaskService dataSetTableTaskService;
@PostMapping("save")
public DatasetTableTask save(@RequestBody DatasetTableTask datasetTableTask) {
public DatasetTableTask save(@RequestBody DatasetTableTask datasetTableTask) throws Exception {
return dataSetTableTaskService.save(datasetTableTask);
}

View File

@ -0,0 +1,24 @@
package io.dataease.job.sechedule;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
public abstract class DeScheduleJob implements Job {
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
// JobKey jobKey = context.getTrigger().getJobKey();
// JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
// this.resourceId = jobDataMap.getString("resourceId");
// this.userId = jobDataMap.getString("userId");
// this.expression = jobDataMap.getString("expression");
//
// LogUtil.info(jobKey.getGroup() + " Running: " + resourceId);
// LogUtil.info("CronExpression: " + expression);
businessExecute(context);
}
abstract void businessExecute(JobExecutionContext context);
}

View File

@ -1,29 +0,0 @@
package io.dataease.job.sechedule;
import io.dataease.commons.utils.LogUtil;
import org.quartz.*;
public abstract class MsScheduleJob implements Job {
protected String resourceId;
protected String userId;
protected String expression;
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
JobKey jobKey = context.getTrigger().getJobKey();
JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
this.resourceId = jobDataMap.getString("resourceId");
this.userId = jobDataMap.getString("userId");
this.expression = jobDataMap.getString("expression");
LogUtil.info(jobKey.getGroup() + " Running: " + resourceId);
LogUtil.info("CronExpression: " + expression);
businessExecute(context);
}
abstract void businessExecute(JobExecutionContext context);
}

View File

@ -5,6 +5,7 @@ import org.quartz.*;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -57,7 +58,7 @@ public class ScheduleManager {
* @param cron
* @param jobDataMap
*/
public void addCronJob(JobKey jobKey, TriggerKey triggerKey, Class jobClass, String cron, JobDataMap jobDataMap) {
public void addCronJob(JobKey jobKey, TriggerKey triggerKey, Class jobClass, String cron, Date startTime, Date endTime, JobDataMap jobDataMap) {
try {
LogUtil.info("addCronJob: " + triggerKey.getName() + "," + triggerKey.getGroup());
@ -72,7 +73,11 @@ public class ScheduleManager {
triggerBuilder.withIdentity(triggerKey);
triggerBuilder.startNow();
triggerBuilder.startAt(startTime);
if (endTime != null) {
triggerBuilder.endAt(endTime);
}
triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));
@ -86,8 +91,38 @@ public class ScheduleManager {
}
}
public void addCronJob(JobKey jobKey, TriggerKey triggerKey, Class jobClass, String cron) {
addCronJob(jobKey, triggerKey, jobClass, cron, null);
public void addCronJob(JobKey jobKey, TriggerKey triggerKey, Class jobClass, String cron, Date startTime, Date endTime) {
addCronJob(jobKey, triggerKey, jobClass, cron, startTime, endTime, null);
}
public void addSingleJob(JobKey jobKey, TriggerKey triggerKey, Class jobClass, Date date, JobDataMap jobDataMap) {
try {
LogUtil.info("addSingleJob: " + triggerKey.getName() + "," + triggerKey.getGroup());
JobBuilder jobBuilder = JobBuilder.newJob(jobClass).withIdentity(jobKey);
if (jobDataMap != null) {
jobBuilder.usingJobData(jobDataMap);
}
JobDetail jobDetail = jobBuilder.build();
TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();
triggerBuilder.withIdentity(triggerKey);
triggerBuilder.startAt(date);
Trigger trigger = triggerBuilder.build();
scheduler.scheduleJob(jobDetail, trigger);
} catch (Exception e) {
LogUtil.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
public void addSingleJob(JobKey jobKey, TriggerKey triggerKey, Class jobClass, Date date) {
addSingleJob(jobKey, triggerKey, jobClass, date, null);
}
/**
@ -97,7 +132,7 @@ public class ScheduleManager {
* @param cron
* @throws SchedulerException
*/
public void modifyCronJobTime(TriggerKey triggerKey, String cron) throws SchedulerException {
public void modifyCronJobTime(TriggerKey triggerKey, String cron, Date startTime, Date endTime) throws SchedulerException {
LogUtil.info("modifyCronJobTime: " + triggerKey.getName() + "," + triggerKey.getGroup());
@ -108,31 +143,31 @@ public class ScheduleManager {
return;
}
String oldTime = trigger.getCronExpression();
if (!oldTime.equalsIgnoreCase(cron)) {
/** 方式一 :调用 rescheduleJob 开始 */
TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();// 触发器
/** 方式一 :调用 rescheduleJob 开始 */
TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();// 触发器
triggerBuilder.withIdentity(triggerKey);// 触发器名,触发器组
triggerBuilder.withIdentity(triggerKey);// 触发器名,触发器组
triggerBuilder.startAt(startTime);
triggerBuilder.startNow();// 立即执行
triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));// 触发器时间设定
trigger = (CronTrigger) triggerBuilder.build();// 创建Trigger对象
scheduler.rescheduleJob(triggerKey, trigger);// 修改一个任务的触发时间
/** 方式一 :调用 rescheduleJob 结束 */
/** 方式二先删除然后在创建一个新的Job */
// JobDetail jobDetail = sched.getJobDetail(JobKey.jobKey(jobName, jobGroupName));
// Class<? extends Job> jobClass = jobDetail.getJobClass();
// removeJob(jobName, jobGroupName, triggerName, triggerGroupName);
// addJob(jobName, jobGroupName, triggerName, triggerGroupName, jobClass, cron);
/** 方式二 先删除然后在创建一个新的Job */
if (endTime != null) {
triggerBuilder.endAt(endTime);
}
triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));// 触发器时间设定
trigger = (CronTrigger) triggerBuilder.build();// 创建Trigger对象
scheduler.rescheduleJob(triggerKey, trigger);// 修改一个任务的触发时间
/** 方式一 :调用 rescheduleJob 结束 */
/** 方式二先删除然后在创建一个新的Job */
// JobDetail jobDetail = sched.getJobDetail(JobKey.jobKey(jobName, jobGroupName));
// Class<? extends Job> jobClass = jobDetail.getJobClass();
// removeJob(jobName, jobGroupName, triggerName, triggerGroupName);
// addJob(jobName, jobGroupName, triggerName, triggerGroupName, jobClass, cron);
/** 方式二 先删除然后在创建一个新的Job */
} catch (Exception e) {
throw new RuntimeException(e);
}
@ -190,6 +225,38 @@ public class ScheduleManager {
}
}
public void modifySingleJobTime(TriggerKey triggerKey, Date date) throws SchedulerException {
try {
LogUtil.info("modifySingleJobTime: " + triggerKey.getName() + "," + triggerKey.getGroup());
Trigger trigger = scheduler.getTrigger(triggerKey);
if (trigger == null) {
return;
}
Date oldTime = trigger.getStartTime();
if (oldTime.getTime() != date.getTime()) {
TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();// 触发器builder
triggerBuilder.withIdentity(triggerKey);// 触发器名,触发器组
triggerBuilder.startAt(date);
trigger = triggerBuilder.build();// 创建Trigger对象
scheduler.rescheduleJob(triggerKey, trigger);// 修改一个任务的触发时间
}
} catch (Exception e) {
LogUtil.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
/**
* @param jobKey
* @param triggerKey
@ -257,6 +324,21 @@ public class ScheduleManager {
}
public void addOrUpdateSingleJob(JobKey jobKey, TriggerKey triggerKey, Class clz,
Date date, JobDataMap jobDataMap) throws SchedulerException {
if (scheduler.checkExists(triggerKey)) {
modifySingleJobTime(triggerKey, date);
} else {
addSingleJob(jobKey, triggerKey, clz, date, jobDataMap);
}
}
public void addOrUpdateSingleJob(JobKey jobKey, TriggerKey triggerKey, Class clz,
Date date) throws SchedulerException {
addOrUpdateSingleJob(jobKey, triggerKey, clz, date, null);
}
public void addOrUpdateSimpleJob(JobKey jobKey, TriggerKey triggerKey, Class clz, int intervalTime) throws SchedulerException {
addOrUpdateSimpleJob(jobKey, triggerKey, clz, intervalTime, null);
}
@ -272,19 +354,19 @@ public class ScheduleManager {
* @param jobDataMap
* @throws SchedulerException
*/
public void addOrUpdateCronJob(JobKey jobKey, TriggerKey triggerKey, Class jobClass, String cron, JobDataMap jobDataMap) throws SchedulerException {
public void addOrUpdateCronJob(JobKey jobKey, TriggerKey triggerKey, Class jobClass, String cron, Date startTime, Date endTime, JobDataMap jobDataMap) throws SchedulerException {
LogUtil.info("AddOrUpdateCronJob: " + jobKey.getName() + "," + triggerKey.getGroup());
if (scheduler.checkExists(triggerKey)) {
modifyCronJobTime(triggerKey, cron);
modifyCronJobTime(triggerKey, cron, startTime, endTime);
} else {
addCronJob(jobKey, triggerKey, jobClass, cron, jobDataMap);
addCronJob(jobKey, triggerKey, jobClass, cron, startTime, endTime, jobDataMap);
}
}
public void addOrUpdateCronJob(JobKey jobKey, TriggerKey triggerKey, Class jobClass, String cron) throws SchedulerException {
addOrUpdateCronJob(jobKey, triggerKey, jobClass, cron, null);
public void addOrUpdateCronJob(JobKey jobKey, TriggerKey triggerKey, Class jobClass, String cron, Date startTime, Date endTime) throws SchedulerException {
addOrUpdateCronJob(jobKey, triggerKey, jobClass, cron, startTime, endTime, null);
}
public JobDataMap getDefaultJobDataMap(String resourceId, String expression, String userId) {
@ -295,7 +377,7 @@ public class ScheduleManager {
return jobDataMap;
}
public Object getCurrentlyExecutingJobs(){
public Object getCurrentlyExecutingJobs() {
Map<String, String> returnMap = new HashMap<>();
try {
List<JobExecutionContext> currentJobs = scheduler.getCurrentlyExecutingJobs();
@ -306,7 +388,7 @@ public class ScheduleManager {
returnMap.put("jobName", jobName);
returnMap.put("groupName", groupName);
}
}catch (Exception e){
} catch (Exception e) {
e.printStackTrace();
}

View File

@ -0,0 +1,17 @@
package io.dataease.job.sechedule;
import org.quartz.JobExecutionContext;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @Author gin
* @Date 2021/3/5 11:37 上午
*/
public class TestJob extends DeScheduleJob {
@Override
void businessExecute(JobExecutionContext context) {
System.out.println("Test Job -- " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
}
}

View File

@ -0,0 +1,26 @@
package io.dataease.listener;
import io.dataease.job.sechedule.ScheduleManager;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class AppStartListener implements ApplicationListener<ApplicationReadyEvent> {
@Resource
private ScheduleManager scheduleManager;
@Override
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
System.out.println("================= 应用启动 =================");
/* cron schedule */
// scheduleManager.addCronJob(new JobKey("abc", "def"), new TriggerKey("abc", "def"), TestJob.class, "*/10 * * * * ?");
/* single schedule*/
// long timestamp = System.currentTimeMillis() + 90 * 1000;
// Date date = new Date(timestamp);
// scheduleManager.addSingleJob(new JobKey("abc", "def"), new TriggerKey("abc", "def"), TestJob.class, date);
// TODO 项目启动从数据库读取任务加入到Quartz
}
}

View File

@ -3,10 +3,15 @@ package io.dataease.service.dataset;
import io.dataease.base.domain.DatasetTableTask;
import io.dataease.base.domain.DatasetTableTaskExample;
import io.dataease.base.mapper.DatasetTableTaskMapper;
import io.dataease.job.sechedule.ScheduleManager;
import io.dataease.job.sechedule.TestJob;
import org.apache.commons.lang3.StringUtils;
import org.quartz.JobKey;
import org.quartz.TriggerKey;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
import java.util.UUID;
@ -19,7 +24,10 @@ public class DataSetTableTaskService {
@Resource
private DatasetTableTaskMapper datasetTableTaskMapper;
public DatasetTableTask save(DatasetTableTask datasetTableTask) {
@Resource
private ScheduleManager scheduleManager;
public DatasetTableTask save(DatasetTableTask datasetTableTask) throws Exception {
if (StringUtils.isEmpty(datasetTableTask.getId())) {
datasetTableTask.setId(UUID.randomUUID().toString());
datasetTableTask.setCreateTime(System.currentTimeMillis());
@ -27,11 +35,32 @@ public class DataSetTableTaskService {
} else {
datasetTableTaskMapper.updateByPrimaryKey(datasetTableTask);
}
if (StringUtils.equalsIgnoreCase(datasetTableTask.getRate(), "0")) {
scheduleManager.addOrUpdateSingleJob(new JobKey(datasetTableTask.getId(), datasetTableTask.getTableId()),
new TriggerKey(datasetTableTask.getId(), datasetTableTask.getTableId()),
TestJob.class,
new Date(datasetTableTask.getStartTime()));
} else if (StringUtils.equalsIgnoreCase(datasetTableTask.getRate(), "1")) {
Date endTime;
if (datasetTableTask.getEndTime() == null || datasetTableTask.getEndTime() == 0) {
endTime = null;
} else {
endTime = new Date(datasetTableTask.getEndTime());
}
scheduleManager.addOrUpdateCronJob(new JobKey(datasetTableTask.getId(), datasetTableTask.getTableId()),
new TriggerKey(datasetTableTask.getId(), datasetTableTask.getTableId()),
TestJob.class,
datasetTableTask.getCron(), new Date(datasetTableTask.getStartTime()), endTime);
}
return datasetTableTask;
}
public void delete(String id) {
DatasetTableTask datasetTableTask = datasetTableTaskMapper.selectByPrimaryKey(id);
datasetTableTaskMapper.deleteByPrimaryKey(id);
scheduleManager.removeJob(new JobKey(datasetTableTask.getId(), datasetTableTask.getTableId()), new TriggerKey(datasetTableTask.getId(), datasetTableTask.getTableId()));
}
public List<DatasetTableTask> list(DatasetTableTask datasetTableTask) {

View File

@ -40,9 +40,9 @@ CREATE TABLE IF NOT EXISTS `dataset_table_task`
`name` varchar(255) NOT NULL COMMENT '任务名称',
`type` varchar(50) NOT NULL COMMENT '更新方式0-全量更新 1-增量更新',
`start_time` bigint(13) COMMENT '开始时间',
`rate` varchar(50) NOT NULL COMMENT '执行频率',
`rate` varchar(50) NOT NULL COMMENT '执行频率0 一次性 1 cron',
`cron` varchar(255) COMMENT 'cron表达式',
`end` varchar(50) NOT NULL COMMENT '结束限制',
`end` varchar(50) NOT NULL COMMENT '结束限制 0 无限制 1 设定结束时间',
`end_time` bigint(13) COMMENT '结束时间',
`create_time` bigint(13) COMMENT '创建时间',
PRIMARY KEY (`id`),

View File

@ -78,7 +78,7 @@
/>
</el-form-item>
<el-form-item :label="$t('dataset.execute_rate')" prop="rate">
<el-select v-model="taskForm.rate" size="mini">
<el-select v-model="taskForm.rate" size="mini" @change="onRateChange">
<el-option
:label="$t('dataset.execute_once')"
value="0"
@ -93,7 +93,7 @@
<el-input v-model="taskForm.cron" size="mini" style="width: 50%"/>
</el-form-item>
<el-form-item :label="$t('dataset.end_time')" prop="end">
<el-select v-model="taskForm.end" size="mini">
<el-select v-model="taskForm.end" size="mini" :disabled="taskForm.rate === '0'">
<el-option
:label="$t('dataset.no_limit')"
value="0"
@ -307,6 +307,11 @@ export default {
this.update_task = false
this.resetTaskForm()
},
onRateChange() {
if (this.taskForm.rate === '0') {
this.taskForm.end = '0'
}
},
resetTaskForm() {
this.taskForm = {
name: '',