feat: 收取数据后,删掉 kettle 文件

This commit is contained in:
taojinlong 2021-06-09 13:24:10 +08:00
parent 943d753b29
commit 08a193d4ee
6 changed files with 219 additions and 139 deletions

View File

@ -69,6 +69,7 @@ public class DataSetTableTaskLogService {
if(StringUtils.isNotEmpty(datasetTableTaskLog.getTaskId())){
criteria.andTaskIdEqualTo(datasetTableTaskLog.getTaskId());
}
example.setOrderByClause("create_time desc");
return datasetTableTaskLogMapper.selectByExampleWithBLOBs(example);
}
}

View File

@ -7,6 +7,7 @@ import io.dataease.commons.constants.ScheduleType;
import io.dataease.controller.request.dataset.DataSetTaskRequest;
import io.dataease.i18n.Translator;
import io.dataease.service.ScheduleService;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.quartz.CronExpression;
@ -61,7 +62,16 @@ public class DataSetTableTaskService {
datasetTableTask.setCreateTime(System.currentTimeMillis());
// SIMPLE 类型提前占位
if (datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) {
if (extractDataService.updateSyncStatus(dataSetTableService.get(datasetTableTask.getTableId()))) {
if(datasetTableTask.getType().equalsIgnoreCase("add_scope")){
DatasetTableTaskLog request = new DatasetTableTaskLog();
request.setTableId(datasetTableTask.getTableId());
request.setStatus(JobStatus.Completed.name());
List<DatasetTableTaskLog> datasetTableTaskLogs = dataSetTableTaskLogService.select(request);
if (CollectionUtils.isEmpty(datasetTableTaskLogs)) {
throw new Exception(Translator.get("i18n_not_exec_add_sync"));
}
}
if (extractDataService.updateSyncStatusIsNone(dataSetTableService.get(datasetTableTask.getTableId()))) {
throw new Exception(Translator.get("i18n_sync_job_exists"));
}else {
//write log

View File

@ -130,6 +130,180 @@ public class ExtractDataService {
"fi\n" +
"rm -rf %s\n";
public synchronized boolean updateSyncStatusIsNone(DatasetTable datasetTable ){
datasetTable.setSyncStatus(JobStatus.Underway.name());
DatasetTableExample example = new DatasetTableExample();
example.createCriteria().andIdEqualTo(datasetTable.getId());
datasetTableMapper.selectByExample(example);
example.clear();
example.createCriteria().andIdEqualTo(datasetTable.getId()).andSyncStatusNotEqualTo(JobStatus.Underway.name());
example.or(example.createCriteria().andIdEqualTo(datasetTable.getId()).andSyncStatusIsNull());
return datasetTableMapper.updateByExampleSelective(datasetTable, example) == 0;
}
public void extractData(String datasetTableId, String taskId, String type, JobExecutionContext context) {
DatasetTable datasetTable = getDatasetTable(datasetTableId);
if(datasetTable == null){
LogUtil.error("Can not find DatasetTable: " + datasetTableId);
}
DatasetTableTask datasetTableTask = datasetTableTaskMapper.selectByPrimaryKey(taskId);
boolean isCronJob = (datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.CRON.toString()));
if(updateSyncStatusIsNone(datasetTable) && isCronJob){
LogUtil.info("Skip synchronization task for table : " + datasetTableId);
return;
}
DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog();
UpdateType updateType = UpdateType.valueOf(type);
Datasource datasource = new Datasource();
if(context != null){
datasetTable.setQrtzInstance(context.getFireInstanceId());
datasetTableMapper.updateByPrimaryKeySelective(datasetTable);
}
if (StringUtils.isNotEmpty(datasetTable.getDataSourceId())) {
datasource = datasourceMapper.selectByPrimaryKey(datasetTable.getDataSourceId());
} else {
datasource.setType(datasetTable.getType());
}
List<DatasetTableField> datasetTableFields = dataSetTableFieldsService.list(DatasetTableField.builder().tableId(datasetTable.getId()).build());
datasetTableFields.sort((o1, o2) -> {
if (o1.getColumnIndex() == null) {
return -1;
}
if (o2.getColumnIndex() == null) {
return 1;
}
return o1.getColumnIndex().compareTo(o2.getColumnIndex());
});
String dorisTablColumnSql = createDorisTablColumnSql(datasetTableFields);
switch (updateType) {
case all_scope: // 全量更新
try{
if(datasource.getType().equalsIgnoreCase("excel")){
datasetTableTaskLog = writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, null);
}
if(datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.CRON.toString())) {
datasetTableTaskLog = writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
}
if(datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) {
datasetTableTaskLog = getDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
}
createDorisTable(DorisTableUtils.dorisName(datasetTableId), dorisTablColumnSql);
createDorisTable(DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTableId)), dorisTablColumnSql);
generateTransFile("all_scope", datasetTable, datasource, datasetTableFields, null);
generateJobFile("all_scope", datasetTable, String.join(",", datasetTableFields.stream().map(DatasetTableField::getDataeaseName).collect(Collectors.toList())));
extractData(datasetTable, "all_scope");
replaceTable(DorisTableUtils.dorisName(datasetTableId));
saveSucessLog(datasetTableTaskLog);
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed);
}catch (Exception e){
saveErrorLog(datasetTableId, taskId, e);
updateTableStatus(datasetTableId, datasetTable, JobStatus.Error);
dropDorisTable(DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTableId)));
}finally {
if (datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) {
datasetTableTask.setRate(ScheduleType.SIMPLE_COMPLETE.toString());
dataSetTableTaskService.update(datasetTableTask);
}
deleteFile("all_scope", datasetTableId);
}
break;
case add_scope: // 增量更新
try {
if(datasource.getType().equalsIgnoreCase("excel")){
datasetTableTaskLog = writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, null);
generateTransFile("incremental_add", datasetTable, datasource, datasetTableFields, null);
generateJobFile("incremental_add", datasetTable, String.join(",", datasetTableFields.stream().map(DatasetTableField::getDataeaseName).collect(Collectors.toList())));
extractData(datasetTable, "incremental_add");
saveSucessLog(datasetTableTaskLog);
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed);
}else {
DatasetTableIncrementalConfig datasetTableIncrementalConfig = dataSetTableService.incrementalConfig(datasetTableId);
if (datasetTableIncrementalConfig == null || StringUtils.isEmpty(datasetTableIncrementalConfig.getTableId())) {
return;
}
DatasetTableTaskLog request = new DatasetTableTaskLog();
request.setTableId(datasetTableId);
request.setStatus(JobStatus.Completed.name());
List<DatasetTableTaskLog> datasetTableTaskLogs = dataSetTableTaskLogService.select(request);
if (CollectionUtils.isEmpty(datasetTableTaskLogs)) {
return;
}
if(datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.CRON.toString())) {
datasetTableTaskLog = writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
}
if(datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) {
datasetTableTaskLog = getDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
}
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd().replace(" ", ""))) {// 增量添加
String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, datasetTableTaskLogs.get(0).getStartTime().toString()
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString()));
generateTransFile("incremental_add", datasetTable, datasource, datasetTableFields, sql);
generateJobFile("incremental_add", datasetTable, fetchSqlField(sql, datasource));
extractData(datasetTable, "incremental_add");
}
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete().replace(" ", ""))) {// 增量删除
String sql = datasetTableIncrementalConfig.getIncrementalDelete().replace(lastUpdateTime, datasetTableTaskLogs.get(0).getStartTime().toString()
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString()));
generateTransFile("incremental_delete", datasetTable, datasource, datasetTableFields, sql);
generateJobFile("incremental_delete", datasetTable, fetchSqlField(sql, datasource));
extractData(datasetTable, "incremental_delete");
}
saveSucessLog(datasetTableTaskLog);
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed);
}
}catch (Exception e){
saveErrorLog(datasetTableId, taskId, e);
updateTableStatus(datasetTableId, datasetTable, JobStatus.Error);
}finally {
if (datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) {
datasetTableTask.setRate(ScheduleType.SIMPLE_COMPLETE.toString());
dataSetTableTaskService.update(datasetTableTask);
}
deleteFile("incremental_add", datasetTableId);
deleteFile("incremental_delete", datasetTableId);
}
break;
}
}
private void updateTableStatus(String datasetTableId, DatasetTable datasetTable, JobStatus completed) {
datasetTable.setSyncStatus(completed.name());
DatasetTableExample example = new DatasetTableExample();
example.createCriteria().andIdEqualTo(datasetTableId);
datasetTableMapper.updateByExampleSelective(datasetTable, example);
}
private void saveSucessLog(DatasetTableTaskLog datasetTableTaskLog) {
datasetTableTaskLog.setStatus(JobStatus.Completed.name());
datasetTableTaskLog.setEndTime(System.currentTimeMillis());
dataSetTableTaskLogService.save(datasetTableTaskLog);
}
private void saveErrorLog(String datasetTableId, String taskId, Exception e){
LogUtil.error("Extract data error: " + datasetTableId, e);
DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog();
datasetTableTaskLog.setTableId(datasetTableId);
datasetTableTaskLog.setStatus(JobStatus.Underway.name());
if(StringUtils.isNotEmpty(taskId)){
datasetTableTaskLog.setTaskId(taskId);
}
List<DatasetTableTaskLog> datasetTableTaskLogs = dataSetTableTaskLogService.select(datasetTableTaskLog);
if(CollectionUtils.isNotEmpty(datasetTableTaskLogs)){
datasetTableTaskLog = datasetTableTaskLogs.get(0);
datasetTableTaskLog.setStatus(JobStatus.Error.name());
datasetTableTaskLog.setInfo(ExceptionUtils.getStackTrace(e));
datasetTableTaskLog.setEndTime(System.currentTimeMillis());
dataSetTableTaskLogService.save(datasetTableTaskLog);
}
}
private String createDorisTablColumnSql(List<DatasetTableField> datasetTableFields) {
String Column_Fields = "dataease_uuid varchar(50), `";
for (DatasetTableField datasetTableField : datasetTableFields) {
@ -195,144 +369,6 @@ public class ExtractDataService {
jdbcProvider.exec(datasourceRequest);
}
public synchronized boolean updateSyncStatus(DatasetTable datasetTable ){
datasetTable.setSyncStatus(JobStatus.Underway.name());
DatasetTableExample example = new DatasetTableExample();
example.createCriteria().andIdEqualTo(datasetTable.getId());
datasetTableMapper.selectByExample(example);
example.clear();
example.createCriteria().andIdEqualTo(datasetTable.getId()).andSyncStatusNotEqualTo(JobStatus.Underway.name());
example.or(example.createCriteria().andIdEqualTo(datasetTable.getId()).andSyncStatusIsNull());
return datasetTableMapper.updateByExampleSelective(datasetTable, example) == 0;
}
public void extractData(String datasetTableId, String taskId, String type, JobExecutionContext context) {
DatasetTable datasetTable = getDatasetTable(datasetTableId);
if(datasetTable == null){
LogUtil.error("Can not find DatasetTable: " + datasetTableId);
}
DatasetTableTask datasetTableTask = datasetTableTaskMapper.selectByPrimaryKey(taskId);
boolean isCronJob = (datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.CRON.toString()));
if(updateSyncStatus(datasetTable) && isCronJob){
LogUtil.info("Skip synchronization task for table : " + datasetTableId);
return;
}
DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog();
UpdateType updateType = UpdateType.valueOf(type);
Datasource datasource = new Datasource();
try {
if(context != null){
datasetTable.setQrtzInstance(context.getFireInstanceId());
datasetTableMapper.updateByPrimaryKeySelective(datasetTable);
}
if (StringUtils.isNotEmpty(datasetTable.getDataSourceId())) {
datasource = datasourceMapper.selectByPrimaryKey(datasetTable.getDataSourceId());
} else {
datasource.setType(datasetTable.getType());
}
List<DatasetTableField> datasetTableFields = dataSetTableFieldsService.list(DatasetTableField.builder().tableId(datasetTable.getId()).build());
datasetTableFields.sort((o1, o2) -> {
if (o1.getColumnIndex() == null) {
return -1;
}
if (o2.getColumnIndex() == null) {
return 1;
}
return o1.getColumnIndex().compareTo(o2.getColumnIndex());
});
String dorisTablColumnSql = createDorisTablColumnSql(datasetTableFields);
switch (updateType) {
// 全量更新
case all_scope:
if(datasource.getType().equalsIgnoreCase("excel")){
datasetTableTaskLog = writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, null);
}else {
datasetTableTaskLog = getDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
}
createDorisTable(DorisTableUtils.dorisName(datasetTableId), dorisTablColumnSql);
createDorisTable(DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTableId)), dorisTablColumnSql);
generateTransFile("all_scope", datasetTable, datasource, datasetTableFields, null);
generateJobFile("all_scope", datasetTable, String.join(",", datasetTableFields.stream().map(DatasetTableField::getDataeaseName).collect(Collectors.toList())));
extractData(datasetTable, "all_scope");
replaceTable(DorisTableUtils.dorisName(datasetTableId));
datasetTableTaskLog.setStatus(JobStatus.Completed.name());
datasetTableTaskLog.setEndTime(System.currentTimeMillis());
dataSetTableTaskLogService.save(datasetTableTaskLog);
break;
// 增量更新
case add_scope:
if(datasource.getType().equalsIgnoreCase("excel")){
datasetTableTaskLog = writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, null);
generateTransFile("incremental_add", datasetTable, datasource, datasetTableFields, null);
generateJobFile("incremental_add", datasetTable, String.join(",", datasetTableFields.stream().map(DatasetTableField::getDataeaseName).collect(Collectors.toList())));
extractData(datasetTable, "incremental_add");
}else {
DatasetTableIncrementalConfig datasetTableIncrementalConfig = dataSetTableService.incrementalConfig(datasetTableId);
if (datasetTableIncrementalConfig == null || StringUtils.isEmpty(datasetTableIncrementalConfig.getTableId())) {
return;
}
DatasetTableTaskLog request = new DatasetTableTaskLog();
request.setTableId(datasetTableId);
request.setStatus(JobStatus.Completed.name());
List<DataSetTaskLogDTO> dataSetTaskLogDTOS = dataSetTableTaskLogService.list(request);
if (CollectionUtils.isEmpty(dataSetTaskLogDTOS)) {
return;
}
datasetTableTaskLog = writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
// 增量添加
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd().replace(" ", ""))) {
String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString()
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString()));
generateTransFile("incremental_add", datasetTable, datasource, datasetTableFields, sql);
generateJobFile("incremental_add", datasetTable, fetchSqlField(sql, datasource));
extractData(datasetTable, "incremental_add");
}
// 增量删除
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete())) {
String sql = datasetTableIncrementalConfig.getIncrementalDelete().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString()
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString()));
generateTransFile("incremental_delete", datasetTable, datasource, datasetTableFields, sql);
generateJobFile("incremental_delete", datasetTable, fetchSqlField(sql, datasource));
extractData(datasetTable, "incremental_delete");
}
}
datasetTableTaskLog.setStatus(JobStatus.Completed.name());
datasetTableTaskLog.setEndTime(System.currentTimeMillis());
dataSetTableTaskLogService.save(datasetTableTaskLog);
break;
}
datasetTable.setSyncStatus(JobStatus.Completed.name());
DatasetTableExample example = new DatasetTableExample();
example.createCriteria().andIdEqualTo(datasetTableId);
datasetTableMapper.updateByExampleSelective(datasetTable, example);
} catch (Exception e) {
e.printStackTrace();
LogUtil.error("Extract data error: " + datasetTableId, e);
datasetTableTaskLog.setStatus(JobStatus.Error.name());
datasetTableTaskLog.setInfo(ExceptionUtils.getStackTrace(e));
datasetTableTaskLog.setEndTime(System.currentTimeMillis());
dataSetTableTaskLogService.save(datasetTableTaskLog);
datasetTable.setSyncStatus(JobStatus.Error.name());
DatasetTableExample example = new DatasetTableExample();
example.createCriteria().andIdEqualTo(datasetTableId);
datasetTableMapper.updateByExampleSelective(datasetTable, example);
if(updateType.name().equalsIgnoreCase("all_scope")){
dropDorisTable(DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTableId)));
}
} finally {
if (datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) {
datasetTableTask.setRate(ScheduleType.SIMPLE_COMPLETE.toString());
dataSetTableTaskService.update(datasetTableTask);
}
}
}
private DatasetTable getDatasetTable(String datasetTableId){
for (int i=0;i<5;i++){
DatasetTable datasetTable = dataSetTableService.get(datasetTableId);
@ -724,6 +760,36 @@ public class ExtractDataService {
return userDefinedJavaClassStep;
}
private void deleteFile(String type, String dataSetTableId){
String transName = null;
String jobName = null;
switch (type) {
case "all_scope":
transName = "trans_" + dataSetTableId;
jobName = "job_" + dataSetTableId;
break;
case "incremental_add":
transName = "trans_add_" + dataSetTableId;
jobName = "job_add_" + dataSetTableId;
break;
case "incremental_delete":
transName = "trans_delete_" + dataSetTableId;
jobName = "job_delete_" + dataSetTableId;
break;
default:
break;
}
try{
File file = new File(root_path + jobName + ".kjb");
FileUtils.forceDelete(file);
}catch (Exception e){}
try{
File file = new File(root_path + transName + ".ktr");
FileUtils.forceDelete(file);
}catch (Exception e){}
}
public boolean isKettleRunning() {
try {
if (!InetAddress.getByName(carte).isReachable(1000)) {

View File

@ -252,3 +252,4 @@ i18n_id_or_pwd_error=Invalid ID or password
i18n_datasource_delete=Data source is delete
i18n_dataset_delete=Data set is delete
i18n_chart_delete=Chart is delete
i18n_not_exec_add_sync=There is no completed synchronization task. Incremental synchronization cannot be performed

View File

@ -252,3 +252,4 @@ i18n_id_or_pwd_error=无效的ID或密码
i18n_datasource_delete=当前用到的数据源已被删除
i18n_dataset_delete=当前用到的数据集已被删除
i18n_chart_delete=当前用到的视图已被删除
i18n_not_exec_add_sync=没有已完成的同步任务,无法进行增量同步

View File

@ -254,3 +254,4 @@ i18n_id_or_pwd_error=無效的ID或密碼
i18n_datasource_delete=當前用到的數據源已被刪除
i18n_dataset_delete=當前用到的數據集已被刪除
i18n_chart_delete=當前用到的視圖已被刪除
i18n_not_exec_add_sync=沒有已經完成的同步任務,無法進行增量同步