diff --git a/backend/src/main/java/io/dataease/listener/DataSourceInitStartListener.java b/backend/src/main/java/io/dataease/listener/DataSourceInitStartListener.java index 8918c488b8..d3170c606b 100644 --- a/backend/src/main/java/io/dataease/listener/DataSourceInitStartListener.java +++ b/backend/src/main/java/io/dataease/listener/DataSourceInitStartListener.java @@ -19,8 +19,7 @@ public class DataSourceInitStartListener implements ApplicationListener qrtzSchedulerState.getLastCheckinTime() + qrtzSchedulerState.getCheckinInterval() + 1000 > utilMapper.currentTimestamp()) .map(QrtzSchedulerStateKey::getInstanceName).collect(Collectors.toList()); - List jobStoppeddDatasetTables = new ArrayList<>(); DatasetTableExample example = new DatasetTableExample(); example.createCriteria().andSyncStatusEqualTo(JobStatus.Underway.name()); + List jobStoppeddDatasetTables = new ArrayList<>(); + datasetTableMapper.selectByExample(example).forEach(datasetTable -> { if (StringUtils.isEmpty(datasetTable.getQrtzInstance()) || !activeQrtzInstances.contains( datasetTable.getQrtzInstance().substring(0, datasetTable.getQrtzInstance().length() - 13))) { @@ -2235,21 +2236,25 @@ public class DataSetTableService { return; } + //Task + DatasetTableTaskExample datasetTableTaskExample = new DatasetTableTaskExample(); + DatasetTableTaskExample.Criteria criteria = datasetTableTaskExample.createCriteria(); + criteria.andTableIdIn(jobStoppeddDatasetTables.stream().map(DatasetTable::getId).collect(Collectors.toList())).andLastExecStatusEqualTo(JobStatus.Underway.name()); + List datasetTableTasks = dataSetTableTaskService.list(datasetTableTaskExample); + if (CollectionUtils.isEmpty(datasetTableTasks)) { + return; + } + + dataSetTableTaskService.updateTaskStatus(datasetTableTasks, JobStatus.Error); + //DatasetTable DatasetTable record = new DatasetTable(); record.setSyncStatus(JobStatus.Error.name()); example.clear(); example.createCriteria().andSyncStatusEqualTo(JobStatus.Underway.name()) - .andIdIn(jobStoppeddDatasetTables.stream().map(DatasetTable::getId).collect(Collectors.toList())); + .andIdIn(datasetTableTasks.stream().map(DatasetTableTask::getTableId).collect(Collectors.toList())); datasetTableMapper.updateByExampleSelective(record, example); - //Task - DatasetTableTaskExample datasetTableTaskExample = new DatasetTableTaskExample(); - DatasetTableTaskExample.Criteria criteria = datasetTableTaskExample.createCriteria(); - criteria.andTableIdIn(jobStoppeddDatasetTables.stream().map(DatasetTable::getId).collect(Collectors.toList())).andStatusEqualTo(JobStatus.Underway.name()); - List datasetTableTasks = dataSetTableTaskService.list(datasetTableTaskExample); - dataSetTableTaskService.updateTaskStatus(datasetTableTasks, JobStatus.Error); - //TaskLog DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog(); datasetTableTaskLog.setStatus(JobStatus.Error.name()); @@ -2257,13 +2262,13 @@ public class DataSetTableService { DatasetTableTaskLogExample datasetTableTaskLogExample = new DatasetTableTaskLogExample(); datasetTableTaskLogExample.createCriteria().andStatusEqualTo(JobStatus.Underway.name()) - .andTableIdIn(jobStoppeddDatasetTables.stream().map(DatasetTable::getId).collect(Collectors.toList())); + .andTableIdIn(datasetTableTasks.stream().map(DatasetTableTask::getTableId).collect(Collectors.toList())); datasetTableTaskLogMapper.updateByExampleSelective(datasetTableTaskLog, datasetTableTaskLogExample); - for (DatasetTable jobStoppeddDatasetTable : jobStoppeddDatasetTables) { - extractDataService.deleteFile("all_scope", jobStoppeddDatasetTable.getId()); - extractDataService.deleteFile("incremental_add", jobStoppeddDatasetTable.getId()); - extractDataService.deleteFile("incremental_delete", jobStoppeddDatasetTable.getId()); + for (DatasetTableTask datasetTableTask : datasetTableTasks) { + extractDataService.deleteFile("all_scope", datasetTableTask.getTableId()); + extractDataService.deleteFile("incremental_add", datasetTableTask.getTableId()); + extractDataService.deleteFile("incremental_delete", datasetTableTask.getTableId()); } }