fix(數據集): 数据集任务,增加简单重复执行的功能

This commit is contained in:
taojinlong 2021-07-09 16:06:43 +08:00
parent 7b345b400f
commit 3b0b5d2f0c
3 changed files with 128 additions and 58 deletions

View File

@ -100,8 +100,9 @@ public class DataSetTableService {
private void extractData(DataSetTableRequest datasetTable) throws Exception{
if (StringUtils.equalsIgnoreCase(datasetTable.getType(), "excel")) {
commonThreadPool.addTask(() -> {
extractDataService.extractData(datasetTable.getId(), null, "all_scope", null);
extractDataService.extractExcelData(datasetTable.getId(), "all_scope");
});
return;
}
if (StringUtils.isNotEmpty(datasetTable.getSyncType()) && datasetTable.getSyncType().equalsIgnoreCase("sync_now")) {
DataSetTaskRequest dataSetTaskRequest = new DataSetTaskRequest();
@ -111,8 +112,6 @@ public class DataSetTableService {
datasetTableTask.setType("all_scope");
datasetTableTask.setName(datasetTable.getName() + " 更新设置");
datasetTableTask.setEnd("0");
datasetTableTask.setStatus(TaskStatus.Underway.name());
datasetTableTask.setStartTime(System.currentTimeMillis());
dataSetTaskRequest.setDatasetTableTask(datasetTableTask);
dataSetTableTaskService.save(dataSetTaskRequest);
}
@ -149,11 +148,11 @@ public class DataSetTableService {
if (StringUtils.equalsIgnoreCase(datasetTable.getType(), "excel")) {
if (datasetTable.getEditType() == 0) {
commonThreadPool.addTask(() -> {
extractDataService.extractData(datasetTable.getId(), null, "all_scope", null);
extractDataService.extractExcelData(datasetTable.getId(), "all_scope");
});
} else if (datasetTable.getEditType() == 1) {
commonThreadPool.addTask(() -> {
extractDataService.extractData(datasetTable.getId(), null, "add_scope", null);
extractDataService.extractExcelData(datasetTable.getId(), "add_scope");
});
}
}

View File

@ -1,10 +1,8 @@
package io.dataease.service.dataset;
import com.google.gson.Gson;
import io.dataease.base.domain.DatasetTable;
import io.dataease.base.domain.DatasetTableTask;
import io.dataease.base.domain.DatasetTableTaskExample;
import io.dataease.base.domain.DatasetTableTaskLog;
import io.dataease.base.domain.*;
import io.dataease.base.mapper.DatasetTableMapper;
import io.dataease.base.mapper.DatasetTableTaskMapper;
import io.dataease.base.mapper.ext.ExtDataSetTaskMapper;
import io.dataease.base.mapper.ext.query.GridExample;
@ -53,6 +51,9 @@ public class DataSetTableTaskService {
private ExtractDataService extractDataService;
@Resource
private ExtDataSetTaskMapper extDataSetTaskMapper;
@Resource
private DatasetTableMapper datasetTableMapper;
public DatasetTableTask save(DataSetTaskRequest dataSetTaskRequest) throws Exception {
checkName(dataSetTaskRequest);
@ -86,19 +87,15 @@ public class DataSetTableTaskService {
} else {
datasetTableTaskMapper.updateByPrimaryKeySelective(datasetTableTask);
}
// simple
if (datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString()) && datasetTableTask.getStatus().equalsIgnoreCase(TaskStatus.Underway.name())) { // SIMPLE 类型提前占位
execNow(datasetTableTask);
datasetTableTask.setLastExecStatus(JobStatus.Underway.name());
datasetTableTask.setLastExecTime(System.currentTimeMillis());
update(datasetTableTask);
scheduleService.addSchedule(datasetTableTask);
}
//cronsimple_cron
if(!datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.name())){
scheduleService.addSchedule(datasetTableTask);
}else {
if(datasetTableTask.getStatus().equalsIgnoreCase(JobStatus.Underway.name())){
System.out.println(new Gson().toJson(datasetTableTask));
scheduleService.addSchedule(datasetTableTask);
}
}
return datasetTableTask;
@ -111,10 +108,21 @@ public class DataSetTableTaskService {
DataEaseException.throwException(Translator.get("i18n_not_exec_add_sync"));
}
}
if (extractDataService.existSyncTask(dataSetTableService.get(datasetTableTask.getTableId()), null)) {
if (existSyncTask(dataSetTableService.get(datasetTableTask.getTableId()), datasetTableTask)) {
DataEaseException.throwException(Translator.get("i18n_sync_job_exists"));
}
//write log
}
private synchronized boolean existSyncTask(DatasetTable datasetTable, DatasetTableTask datasetTableTask) {
datasetTable.setSyncStatus(JobStatus.Underway.name());
DatasetTableExample example = new DatasetTableExample();
example.createCriteria().andIdEqualTo(datasetTable.getId()).andSyncStatusNotEqualTo(JobStatus.Underway.name());
example.or(example.createCriteria().andIdEqualTo(datasetTable.getId()).andSyncStatusIsNull());
Boolean existSyncTask = datasetTableMapper.updateByExampleSelective(datasetTable, example) == 0;
if(!existSyncTask){
datasetTableTask.setLastExecTime(System.currentTimeMillis());
datasetTableTask.setLastExecStatus(JobStatus.Underway.name());
update(datasetTableTask);
DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog();
datasetTableTaskLog.setTableId(datasetTableTask.getTableId());
datasetTableTaskLog.setTaskId(datasetTableTask.getId());
@ -123,6 +131,8 @@ public class DataSetTableTaskService {
datasetTableTaskLog.setTriggerType(TriggerType.Custom.name());
dataSetTableTaskLogService.save(datasetTableTaskLog);
}
return existSyncTask;
}
public void delete(String id) {
DatasetTableTask datasetTableTask = datasetTableTaskMapper.selectByPrimaryKey(id);
@ -207,11 +217,6 @@ public class DataSetTableTaskService {
public void execTask(DatasetTableTask datasetTableTask) throws Exception{
execNow(datasetTableTask);
// datasetTableTask.setStatus(TaskStatus.Underway.name());
datasetTableTask.setLastExecStatus(JobStatus.Underway.name());
datasetTableTask.setLastExecTime(System.currentTimeMillis());
update(datasetTableTask);
if(datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.CRON.toString())){
scheduleService.fireNow(datasetTableTask);
}

View File

@ -138,7 +138,6 @@ public class ExtractDataService {
example.or(example.createCriteria().andIdEqualTo(datasetTable.getId()).andSyncStatusIsNull());
Boolean existSyncTask = datasetTableMapper.updateByExampleSelective(datasetTable, example) == 0;
if(existSyncTask){
if(datasetTableTask != null){
DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog();
datasetTableTaskLog.setTaskId(datasetTableTask.getId());
datasetTableTaskLog.setTableId(datasetTable.getId());
@ -147,13 +146,94 @@ public class ExtractDataService {
if(CollectionUtils.isNotEmpty(datasetTableTaskLogs) && datasetTableTaskLogs.get(0).getTriggerType().equalsIgnoreCase(TriggerType.Custom.name())){
return false;
}
}
return true;
}else {
datasetTableTask.setLastExecTime(System.currentTimeMillis());
datasetTableTask.setLastExecStatus(JobStatus.Underway.name());
dataSetTableTaskService.update(datasetTableTask);
return false;
}
}
public void extractExcelData(String datasetTableId, String type) {
Datasource datasource = new Datasource();
datasource.setType("excel");
DatasetTable datasetTable = getDatasetTable(datasetTableId);
if (datasetTable == null) {
LogUtil.error("Can not find DatasetTable: " + datasetTableId);
return;
}
UpdateType updateType = UpdateType.valueOf(type);
DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog();
List<DatasetTableField> datasetTableFields = dataSetTableFieldsService.list(DatasetTableField.builder().tableId(datasetTable.getId()).build());
datasetTableFields.sort((o1, o2) -> {
if (o1.getColumnIndex() == null) {
return -1;
}
if (o2.getColumnIndex() == null) {
return 1;
}
return o1.getColumnIndex().compareTo(o2.getColumnIndex());
});
String dorisTablColumnSql = createDorisTablColumnSql(datasetTableFields);
switch (updateType) {
case all_scope: // 全量更新
try {
datasetTableTaskLog = writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, null);
createDorisTable(DorisTableUtils.dorisName(datasetTableId), dorisTablColumnSql);
createDorisTable(DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTableId)), dorisTablColumnSql);
generateTransFile("all_scope", datasetTable, datasource, datasetTableFields, null);
if (datasetTable.getType().equalsIgnoreCase("sql")) {
generateJobFile("all_scope", datasetTable, fetchSqlField(new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getSql(), datasource));
} else {
generateJobFile("all_scope", datasetTable, String.join(",", datasetTableFields.stream().map(DatasetTableField::getDataeaseName).collect(Collectors.toList())));
}
Long execTime = System.currentTimeMillis();
extractData(datasetTable, "all_scope");
replaceTable(DorisTableUtils.dorisName(datasetTableId));
saveSucessLog(datasetTableTaskLog);
sendWebMsg(datasetTable, null, true);
deleteFile("all_scope", datasetTableId);
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, execTime);
} catch (Exception e) {
saveErrorLog(datasetTableId, null, e);
sendWebMsg(datasetTable, null, false);
updateTableStatus(datasetTableId, datasetTable, JobStatus.Error, null);
dropDorisTable(DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTableId)));
deleteFile("all_scope", datasetTableId);
} finally {
}
break;
case add_scope: // 增量更新
try {
datasetTableTaskLog = writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, null);
generateTransFile("incremental_add", datasetTable, datasource, datasetTableFields, null);
generateJobFile("incremental_add", datasetTable, String.join(",", datasetTableFields.stream().map(DatasetTableField::getDataeaseName).collect(Collectors.toList())));
Long execTime = System.currentTimeMillis();
extractData(datasetTable, "incremental_add");
saveSucessLog(datasetTableTaskLog);
sendWebMsg(datasetTable, null, true);
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, execTime);
} catch (Exception e) {
saveErrorLog(datasetTableId, null, e);
sendWebMsg(datasetTable, null, false);
updateTableStatus(datasetTableId, datasetTable, JobStatus.Error, null);
deleteFile("incremental_add", datasetTableId);
deleteFile("incremental_delete", datasetTableId);
} finally {
}
break;
}
//侵入式清除下属视图缓存
List<String> viewIds = extChartViewMapper.allViewIds(datasetTableId);
if (CollectionUtils.isNotEmpty(viewIds)){
viewIds.forEach(viewId -> {
CacheUtils.remove(JdbcConstants.VIEW_CACHE_KEY, viewId);
});
}
}
public void extractData(String datasetTableId, String taskId, String type, JobExecutionContext context) {
DatasetTable datasetTable = getDatasetTable(datasetTableId);
if(datasetTable == null){
@ -161,7 +241,10 @@ public class ExtractDataService {
return;
}
DatasetTableTask datasetTableTask = datasetTableTaskMapper.selectByPrimaryKey(taskId);
if(datasetTableTask.getStatus().equalsIgnoreCase(TaskStatus.Stopped.name()) && !datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.name())){
if(datasetTableTask == null){
return;
}
if(datasetTableTask.getStatus().equalsIgnoreCase(TaskStatus.Stopped.name())){
LogUtil.info("Skip synchronization task, task ID : " + datasetTableTask.getId());
return;
}
@ -169,9 +252,6 @@ public class ExtractDataService {
LogUtil.info("Skip synchronization task for dataset, dataset ID : " + datasetTableId);
return;
}
datasetTableTask.setLastExecTime(System.currentTimeMillis());
datasetTableTask.setLastExecStatus(JobStatus.Underway.name());
dataSetTableTaskService.update(datasetTableTask);
DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog();
UpdateType updateType = UpdateType.valueOf(type);
@ -227,19 +307,12 @@ public class ExtractDataService {
deleteFile("all_scope", datasetTableId);
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, execTime);
// if (datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) {
// datasetTableTask.setStatus(TaskStatus.Stopped.name());
// }
datasetTableTask.setLastExecStatus(JobStatus.Completed.name());
dataSetTableTaskService.update(datasetTableTask);
}catch (Exception e){
saveErrorLog(datasetTableId, taskId, e);
datasetTableTask.setLastExecStatus(JobStatus.Error.name());
// if (datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) {
// datasetTableTask.setStatus(TaskStatus.Stopped.name());
// }
dataSetTableTaskService.update(datasetTableTask);
sendWebMsg(datasetTable, taskId,false);
@ -279,7 +352,6 @@ public class ExtractDataService {
if(datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) {
datasetTableTaskLog = getDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
}
Long execTime = System.currentTimeMillis();
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd()) && StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd().replace(" ", ""))) {// 增量添加
String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, datasetTable.getLastUpdateTime().toString())
@ -305,9 +377,6 @@ public class ExtractDataService {
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, execTime);
datasetTableTask.setLastExecStatus(JobStatus.Completed.name());
// if (datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) {
// datasetTableTask.setStatus(TaskStatus.Stopped.name());
// }
dataSetTableTaskService.update(datasetTableTask);
}
}catch (Exception e){
@ -315,9 +384,6 @@ public class ExtractDataService {
sendWebMsg(datasetTable, taskId,false);
updateTableStatus(datasetTableId, datasetTable, JobStatus.Error, null);
datasetTableTask.setLastExecStatus(JobStatus.Error.name());
// if (datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) {
// datasetTableTask.setStatus(TaskStatus.Stopped.name());
// }
dataSetTableTaskService.update(datasetTableTask);
deleteFile("incremental_add", datasetTableId);
deleteFile("incremental_delete", datasetTableId);