Merge pull request #13321 from dataease/pr@dev-v2@fixds

fix(数据源): api数据源定时任务无法停止
This commit is contained in:
taojinlong 2024-11-14 15:42:21 +08:00 committed by GitHub
commit 5bbb02a30b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 47 additions and 65 deletions

View File

@ -3,11 +3,12 @@ package io.dataease.datasource.dao.auto.entity;
import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.TableName;
import java.io.Serializable; import java.io.Serializable;
/** /**
* <p> * <p>
* *
* </p> * </p>
* *
* @author fit2cloud * @author fit2cloud
@ -64,11 +65,6 @@ public class CoreDatasourceTask implements Serializable {
*/ */
private String simpleCronType; private String simpleCronType;
/**
* 结束限制 0 无限制 1 设定结束时间
*/
private String endLimit;
/** /**
* 结束时间 * 结束时间
*/ */
@ -168,14 +164,6 @@ public class CoreDatasourceTask implements Serializable {
this.simpleCronType = simpleCronType; this.simpleCronType = simpleCronType;
} }
public String getEndLimit() {
return endLimit;
}
public void setEndLimit(String endLimit) {
this.endLimit = endLimit;
}
public Long getEndTime() { public Long getEndTime() {
return endTime; return endTime;
} }
@ -227,22 +215,21 @@ public class CoreDatasourceTask implements Serializable {
@Override @Override
public String toString() { public String toString() {
return "CoreDatasourceTask{" + return "CoreDatasourceTask{" +
"id = " + id + "id = " + id +
", dsId = " + dsId + ", dsId = " + dsId +
", name = " + name + ", name = " + name +
", updateType = " + updateType + ", updateType = " + updateType +
", startTime = " + startTime + ", startTime = " + startTime +
", syncRate = " + syncRate + ", syncRate = " + syncRate +
", cron = " + cron + ", cron = " + cron +
", simpleCronValue = " + simpleCronValue + ", simpleCronValue = " + simpleCronValue +
", simpleCronType = " + simpleCronType + ", simpleCronType = " + simpleCronType +
", endLimit = " + endLimit + ", endTime = " + endTime +
", endTime = " + endTime + ", createTime = " + createTime +
", createTime = " + createTime + ", lastExecTime = " + lastExecTime +
", lastExecTime = " + lastExecTime + ", lastExecStatus = " + lastExecStatus +
", lastExecStatus = " + lastExecStatus + ", extraData = " + extraData +
", extraData = " + extraData + ", taskStatus = " + taskStatus +
", taskStatus = " + taskStatus + "}";
"}";
} }
} }

View File

@ -324,18 +324,14 @@ public class DatasourceSyncManage {
scheduleManager.getDefaultJobDataMap(datasourceTask.getDsId().toString(), datasourceTask.getCron(), datasourceTask.getId().toString(), datasourceTask.getUpdateType())); scheduleManager.getDefaultJobDataMap(datasourceTask.getDsId().toString(), datasourceTask.getCron(), datasourceTask.getId().toString(), datasourceTask.getUpdateType()));
} else { } else {
Date endTime; Date endTime;
if (StringUtils.equalsIgnoreCase(datasourceTask.getEndLimit().toString(), "1")) { if (datasourceTask.getEndTime() == null || datasourceTask.getEndTime() == 0) {
if (datasourceTask.getEndTime() == null || datasourceTask.getEndTime() == 0) {
endTime = null;
} else {
endTime = new Date(datasourceTask.getEndTime());
if (endTime.before(new Date())) {
deleteSchedule(datasourceTask);
return;
}
}
} else {
endTime = null; endTime = null;
} else {
endTime = new Date(datasourceTask.getEndTime());
if (endTime.before(new Date())) {
deleteSchedule(datasourceTask);
return;
}
} }
scheduleManager.addOrUpdateCronJob(new JobKey(datasourceTask.getId().toString(), datasourceTask.getDsId().toString()), scheduleManager.addOrUpdateCronJob(new JobKey(datasourceTask.getId().toString(), datasourceTask.getDsId().toString()),

View File

@ -288,7 +288,7 @@ public class DatasourceServer implements DatasourceApi {
if (StringUtils.equalsIgnoreCase(coreDatasourceTask.getSyncRate(), RIGHTNOW.toString())) { if (StringUtils.equalsIgnoreCase(coreDatasourceTask.getSyncRate(), RIGHTNOW.toString())) {
coreDatasourceTask.setCron(null); coreDatasourceTask.setCron(null);
} else { } else {
if (StringUtils.equalsIgnoreCase(coreDatasourceTask.getEndLimit(), "1") && coreDatasourceTask.getStartTime() > coreDatasourceTask.getEndTime()) { if (coreDatasourceTask.getEndTime() != null && coreDatasourceTask.getEndTime() > 0 && coreDatasourceTask.getStartTime() > coreDatasourceTask.getEndTime()) {
DEException.throwException("结束时间不能小于开始时间!"); DEException.throwException("结束时间不能小于开始时间!");
} }
} }
@ -377,7 +377,7 @@ public class DatasourceServer implements DatasourceApi {
coreDatasourceTask.setStartTime(System.currentTimeMillis() - 20 * 1000); coreDatasourceTask.setStartTime(System.currentTimeMillis() - 20 * 1000);
coreDatasourceTask.setCron(null); coreDatasourceTask.setCron(null);
} else { } else {
if (StringUtils.equalsIgnoreCase(coreDatasourceTask.getEndLimit(), "1") && coreDatasourceTask.getStartTime() > coreDatasourceTask.getEndTime()) { if (coreDatasourceTask.getEndTime() != null && coreDatasourceTask.getEndTime() > 0 && coreDatasourceTask.getStartTime() > coreDatasourceTask.getEndTime()) {
DEException.throwException("结束时间不能小于开始时间!"); DEException.throwException("结束时间不能小于开始时间!");
} }
} }
@ -1149,10 +1149,10 @@ public class DatasourceServer implements DatasourceApi {
params.add(apiDefinition); params.add(apiDefinition);
} }
} }
if(CollectionUtils.isNotEmpty(params)){ if (CollectionUtils.isNotEmpty(params)) {
datasourceDTO.setParamsStr(RsaUtils.symmetricEncrypt(JsonUtil.toJSONString(params).toString())); datasourceDTO.setParamsStr(RsaUtils.symmetricEncrypt(JsonUtil.toJSONString(params).toString()));
} }
if(CollectionUtils.isNotEmpty(apiDefinitionListWithStatus)){ if (CollectionUtils.isNotEmpty(apiDefinitionListWithStatus)) {
datasourceDTO.setApiConfigurationStr(RsaUtils.symmetricEncrypt(JsonUtil.toJSONString(apiDefinitionListWithStatus).toString())); datasourceDTO.setApiConfigurationStr(RsaUtils.symmetricEncrypt(JsonUtil.toJSONString(apiDefinitionListWithStatus).toString()));
} }
if (success == apiDefinitionList.size()) { if (success == apiDefinitionList.size()) {

View File

@ -51,16 +51,16 @@ public class DatasourceTaskServer {
return CollectionUtils.isEmpty(coreDatasourceTasks) ? new CoreDatasourceTask() : coreDatasourceTasks.get(0); return CollectionUtils.isEmpty(coreDatasourceTasks) ? new CoreDatasourceTask() : coreDatasourceTasks.get(0);
} }
public CoreDatasourceTaskLog lastSyncLogForTable(Long dsId, String tableName){ public CoreDatasourceTaskLog lastSyncLogForTable(Long dsId, String tableName) {
List<CoreDatasourceTaskLog> coreDatasourceTaskLogs = new ArrayList<>(); List<CoreDatasourceTaskLog> coreDatasourceTaskLogs = new ArrayList<>();
QueryWrapper<CoreDatasourceTaskLog> queryWrapper = new QueryWrapper<>(); QueryWrapper<CoreDatasourceTaskLog> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("ds_id", dsId); queryWrapper.eq("ds_id", dsId);
queryWrapper.eq("table_name", tableName); queryWrapper.eq("table_name", tableName);
queryWrapper.orderByDesc("start_time"); queryWrapper.orderByDesc("start_time");
List<CoreDatasourceTaskLog> logs = coreDatasourceTaskLogMapper.selectList(queryWrapper); List<CoreDatasourceTaskLog> logs = coreDatasourceTaskLogMapper.selectList(queryWrapper);
if(!CollectionUtils.isEmpty(logs)){ if (!CollectionUtils.isEmpty(logs)) {
return logs.get(0); return logs.get(0);
}else { } else {
return null; return null;
} }
} }
@ -69,7 +69,7 @@ public class DatasourceTaskServer {
QueryWrapper<CoreDatasourceTask> queryWrapper = new QueryWrapper<>(); QueryWrapper<CoreDatasourceTask> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("ds_id", dsId); queryWrapper.eq("ds_id", dsId);
List<CoreDatasourceTask> coreDatasourceTasks = datasourceTaskMapper.selectList(queryWrapper); List<CoreDatasourceTask> coreDatasourceTasks = datasourceTaskMapper.selectList(queryWrapper);
if(!CollectionUtils.isEmpty(coreDatasourceTasks)){ if (!CollectionUtils.isEmpty(coreDatasourceTasks)) {
datasourceSyncManage.deleteSchedule(coreDatasourceTasks.get(0)); datasourceSyncManage.deleteSchedule(coreDatasourceTasks.get(0));
} }
datasourceTaskMapper.delete(queryWrapper); datasourceTaskMapper.delete(queryWrapper);
@ -79,14 +79,15 @@ public class DatasourceTaskServer {
coreDatasourceTask.setId(IDUtils.snowID()); coreDatasourceTask.setId(IDUtils.snowID());
datasourceTaskMapper.insert(coreDatasourceTask); datasourceTaskMapper.insert(coreDatasourceTask);
} }
public void delete(Long id) { public void delete(Long id) {
datasourceTaskMapper.deleteById(id); datasourceTaskMapper.deleteById(id);
} }
public void update(CoreDatasourceTask coreDatasourceTask) { public void update(CoreDatasourceTask coreDatasourceTask) {
if(coreDatasourceTask.getId() == null){ if (coreDatasourceTask.getId() == null) {
datasourceTaskMapper.insert(coreDatasourceTask); datasourceTaskMapper.insert(coreDatasourceTask);
}else { } else {
UpdateWrapper<CoreDatasourceTask> updateWrapper = new UpdateWrapper<>(); UpdateWrapper<CoreDatasourceTask> updateWrapper = new UpdateWrapper<>();
updateWrapper.eq("id", coreDatasourceTask.getId()); updateWrapper.eq("id", coreDatasourceTask.getId());
datasourceTaskMapper.updateById(coreDatasourceTask); datasourceTaskMapper.updateById(coreDatasourceTask);
@ -94,15 +95,16 @@ public class DatasourceTaskServer {
} }
public void updateByDsIds(List<Long> dsIds){ public void updateByDsIds(List<Long> dsIds) {
UpdateWrapper<CoreDatasourceTask> updateWrapper = new UpdateWrapper<>(); UpdateWrapper<CoreDatasourceTask> updateWrapper = new UpdateWrapper<>();
updateWrapper.in("ds_id", dsIds); updateWrapper.in("ds_id", dsIds);
CoreDatasourceTask record = new CoreDatasourceTask(); CoreDatasourceTask record = new CoreDatasourceTask();
record.setTaskStatus(TaskStatus.WaitingForExecution.name()); record.setTaskStatus(TaskStatus.WaitingForExecution.name());
datasourceTaskMapper.update(record, updateWrapper); datasourceTaskMapper.update(record, updateWrapper);
} }
public void checkTaskIsStopped(CoreDatasourceTask coreDatasourceTask) { public void checkTaskIsStopped(CoreDatasourceTask coreDatasourceTask) {
if (coreDatasourceTask.getEndLimit() != null && StringUtils.equalsIgnoreCase(coreDatasourceTask.getEndLimit(), "1")) { // 结束限制 0 无限制 1 设定结束时间' if (coreDatasourceTask.getEndTime() != null && coreDatasourceTask.getEndTime() > 0) {
List<CoreDatasourceTaskDTO> dataSetTaskDTOS = taskWithTriggers(coreDatasourceTask.getId()); List<CoreDatasourceTaskDTO> dataSetTaskDTOS = taskWithTriggers(coreDatasourceTask.getId());
if (CollectionUtils.isEmpty(dataSetTaskDTOS)) { if (CollectionUtils.isEmpty(dataSetTaskDTOS)) {
return; return;
@ -170,7 +172,7 @@ public class DatasourceTaskServer {
if (coreDatasourceTask.getSyncRate().equalsIgnoreCase(ScheduleType.RIGHTNOW.name())) { if (coreDatasourceTask.getSyncRate().equalsIgnoreCase(ScheduleType.RIGHTNOW.name())) {
record.setTaskStatus(TaskStatus.Stopped.name()); record.setTaskStatus(TaskStatus.Stopped.name());
} else { } else {
if (coreDatasourceTask.getEndLimit() != null && StringUtils.equalsIgnoreCase(coreDatasourceTask.getEndLimit(), "1")) { if (coreDatasourceTask.getEndTime() != null && coreDatasourceTask.getEndTime() > 0) {
List<CoreDatasourceTaskDTO> dataSetTaskDTOS = taskWithTriggers(coreDatasourceTask.getId()); List<CoreDatasourceTaskDTO> dataSetTaskDTOS = taskWithTriggers(coreDatasourceTask.getId());
if (CollectionUtils.isEmpty(dataSetTaskDTOS)) { if (CollectionUtils.isEmpty(dataSetTaskDTOS)) {
return; return;

View File

@ -38,25 +38,23 @@ public class DataSourceInitStartListener implements ApplicationListener<Applicat
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) { public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
try { try {
engineManage.initSimpleEngine(); engineManage.initSimpleEngine();
}catch (Exception e){ } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
try { try {
calciteProvider.initConnectionPool(); calciteProvider.initConnectionPool();
}catch (Exception e){ } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
List<CoreDatasourceTask> list = datasourceTaskServer.listAll(); List<CoreDatasourceTask> list = datasourceTaskServer.listAll();
for (CoreDatasourceTask task : list) { for (CoreDatasourceTask task : list) {
try { try {
if (!StringUtils.equalsIgnoreCase(task.getSyncRate(), DatasourceTaskServer.ScheduleType.RIGHTNOW.toString())) { if (!StringUtils.equalsIgnoreCase(task.getSyncRate(), DatasourceTaskServer.ScheduleType.RIGHTNOW.toString())) {
if (StringUtils.equalsIgnoreCase(task.getEndLimit(), "1")) { if (task.getEndTime() != null && task.getEndTime() > 0) {
if (task.getEndTime() != null && task.getEndTime() > 0) { if (task.getEndTime() > System.currentTimeMillis()) {
if (task.getEndTime() > System.currentTimeMillis()) {
datasourceSyncManage.addSchedule(task);
}
} else {
datasourceSyncManage.addSchedule(task); datasourceSyncManage.addSchedule(task);
} else {
datasourceSyncManage.deleteSchedule(task);
} }
} else { } else {
datasourceSyncManage.addSchedule(task); datasourceSyncManage.addSchedule(task);
@ -70,12 +68,11 @@ public class DataSourceInitStartListener implements ApplicationListener<Applicat
try { try {
List<CoreSysSetting> coreSysSettings = sysParameterManage.groupList("basic."); List<CoreSysSetting> coreSysSettings = sysParameterManage.groupList("basic.");
datasourceServer.addJob(coreSysSettings); datasourceServer.addJob(coreSysSettings);
}catch (Exception e){ } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
} }