fix: 检测并更新失败任务状态

This commit is contained in:
taojinlong 2022-04-05 17:30:49 +08:00
parent 0a9d2a8810
commit 9a9a1a1c0f
2 changed files with 20 additions and 16 deletions

View File

@ -19,8 +19,7 @@ public class DataSourceInitStartListener implements ApplicationListener<Applicat
@Override @Override
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) { public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
datasourceService.initAllDataSourceConnectionPool(); datasourceService.initAllDataSourceConnectionPool();
dataSetTableService.updateDatasetTableStatus(); // dataSetTableService.updateDatasetTableStatus();
} }

View File

@ -2220,10 +2220,11 @@ public class DataSetTableService {
.filter(qrtzSchedulerState -> qrtzSchedulerState.getLastCheckinTime() .filter(qrtzSchedulerState -> qrtzSchedulerState.getLastCheckinTime()
+ qrtzSchedulerState.getCheckinInterval() + 1000 > utilMapper.currentTimestamp()) + qrtzSchedulerState.getCheckinInterval() + 1000 > utilMapper.currentTimestamp())
.map(QrtzSchedulerStateKey::getInstanceName).collect(Collectors.toList()); .map(QrtzSchedulerStateKey::getInstanceName).collect(Collectors.toList());
List<DatasetTable> jobStoppeddDatasetTables = new ArrayList<>();
DatasetTableExample example = new DatasetTableExample(); DatasetTableExample example = new DatasetTableExample();
example.createCriteria().andSyncStatusEqualTo(JobStatus.Underway.name()); example.createCriteria().andSyncStatusEqualTo(JobStatus.Underway.name());
List<DatasetTable> jobStoppeddDatasetTables = new ArrayList<>();
datasetTableMapper.selectByExample(example).forEach(datasetTable -> { datasetTableMapper.selectByExample(example).forEach(datasetTable -> {
if (StringUtils.isEmpty(datasetTable.getQrtzInstance()) || !activeQrtzInstances.contains( if (StringUtils.isEmpty(datasetTable.getQrtzInstance()) || !activeQrtzInstances.contains(
datasetTable.getQrtzInstance().substring(0, datasetTable.getQrtzInstance().length() - 13))) { datasetTable.getQrtzInstance().substring(0, datasetTable.getQrtzInstance().length() - 13))) {
@ -2235,21 +2236,25 @@ public class DataSetTableService {
return; return;
} }
//Task
DatasetTableTaskExample datasetTableTaskExample = new DatasetTableTaskExample();
DatasetTableTaskExample.Criteria criteria = datasetTableTaskExample.createCriteria();
criteria.andTableIdIn(jobStoppeddDatasetTables.stream().map(DatasetTable::getId).collect(Collectors.toList())).andLastExecStatusEqualTo(JobStatus.Underway.name());
List<DatasetTableTask> datasetTableTasks = dataSetTableTaskService.list(datasetTableTaskExample);
if (CollectionUtils.isEmpty(datasetTableTasks)) {
return;
}
dataSetTableTaskService.updateTaskStatus(datasetTableTasks, JobStatus.Error);
//DatasetTable //DatasetTable
DatasetTable record = new DatasetTable(); DatasetTable record = new DatasetTable();
record.setSyncStatus(JobStatus.Error.name()); record.setSyncStatus(JobStatus.Error.name());
example.clear(); example.clear();
example.createCriteria().andSyncStatusEqualTo(JobStatus.Underway.name()) example.createCriteria().andSyncStatusEqualTo(JobStatus.Underway.name())
.andIdIn(jobStoppeddDatasetTables.stream().map(DatasetTable::getId).collect(Collectors.toList())); .andIdIn(datasetTableTasks.stream().map(DatasetTableTask::getTableId).collect(Collectors.toList()));
datasetTableMapper.updateByExampleSelective(record, example); datasetTableMapper.updateByExampleSelective(record, example);
//Task
DatasetTableTaskExample datasetTableTaskExample = new DatasetTableTaskExample();
DatasetTableTaskExample.Criteria criteria = datasetTableTaskExample.createCriteria();
criteria.andTableIdIn(jobStoppeddDatasetTables.stream().map(DatasetTable::getId).collect(Collectors.toList())).andStatusEqualTo(JobStatus.Underway.name());
List<DatasetTableTask> datasetTableTasks = dataSetTableTaskService.list(datasetTableTaskExample);
dataSetTableTaskService.updateTaskStatus(datasetTableTasks, JobStatus.Error);
//TaskLog //TaskLog
DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog(); DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog();
datasetTableTaskLog.setStatus(JobStatus.Error.name()); datasetTableTaskLog.setStatus(JobStatus.Error.name());
@ -2257,13 +2262,13 @@ public class DataSetTableService {
DatasetTableTaskLogExample datasetTableTaskLogExample = new DatasetTableTaskLogExample(); DatasetTableTaskLogExample datasetTableTaskLogExample = new DatasetTableTaskLogExample();
datasetTableTaskLogExample.createCriteria().andStatusEqualTo(JobStatus.Underway.name()) datasetTableTaskLogExample.createCriteria().andStatusEqualTo(JobStatus.Underway.name())
.andTableIdIn(jobStoppeddDatasetTables.stream().map(DatasetTable::getId).collect(Collectors.toList())); .andTableIdIn(datasetTableTasks.stream().map(DatasetTableTask::getTableId).collect(Collectors.toList()));
datasetTableTaskLogMapper.updateByExampleSelective(datasetTableTaskLog, datasetTableTaskLogExample); datasetTableTaskLogMapper.updateByExampleSelective(datasetTableTaskLog, datasetTableTaskLogExample);
for (DatasetTable jobStoppeddDatasetTable : jobStoppeddDatasetTables) { for (DatasetTableTask datasetTableTask : datasetTableTasks) {
extractDataService.deleteFile("all_scope", jobStoppeddDatasetTable.getId()); extractDataService.deleteFile("all_scope", datasetTableTask.getTableId());
extractDataService.deleteFile("incremental_add", jobStoppeddDatasetTable.getId()); extractDataService.deleteFile("incremental_add", datasetTableTask.getTableId());
extractDataService.deleteFile("incremental_delete", jobStoppeddDatasetTable.getId()); extractDataService.deleteFile("incremental_delete", datasetTableTask.getTableId());
} }
} }