forked from github/dataease
commit
a032fe9c88
@ -277,9 +277,6 @@ public class ExtractDataService {
|
|||||||
switch (updateType) {
|
switch (updateType) {
|
||||||
case all_scope: // 全量更新
|
case all_scope: // 全量更新
|
||||||
try{
|
try{
|
||||||
if(datasource.getType().equalsIgnoreCase("excel")){
|
|
||||||
datasetTableTaskLog = writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, null);
|
|
||||||
}
|
|
||||||
if(datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.CRON.toString())) {
|
if(datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.CRON.toString())) {
|
||||||
datasetTableTaskLog = writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
|
datasetTableTaskLog = writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
|
||||||
}
|
}
|
||||||
@ -315,67 +312,56 @@ public class ExtractDataService {
|
|||||||
sendWebMsg(datasetTable, taskId,false);
|
sendWebMsg(datasetTable, taskId,false);
|
||||||
updateTableStatus(datasetTableId, datasetTable, JobStatus.Error, null);
|
updateTableStatus(datasetTableId, datasetTable, JobStatus.Error, null);
|
||||||
dropDorisTable(DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTableId)));
|
dropDorisTable(DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTableId)));
|
||||||
// deleteFile("all_scope", datasetTableId);
|
deleteFile("all_scope", datasetTableId);
|
||||||
}finally {
|
}finally {
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case add_scope: // 增量更新
|
case add_scope: // 增量更新
|
||||||
try {
|
try {
|
||||||
if(datasource.getType().equalsIgnoreCase("excel")){
|
DatasetTableIncrementalConfig datasetTableIncrementalConfig = dataSetTableService.incrementalConfig(datasetTableId);
|
||||||
datasetTableTaskLog = writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, null);
|
if (datasetTableIncrementalConfig == null || StringUtils.isEmpty(datasetTableIncrementalConfig.getTableId())) {
|
||||||
generateTransFile("incremental_add", datasetTable, datasource, datasetTableFields, null);
|
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, null);
|
||||||
generateJobFile("incremental_add", datasetTable, String.join(",", datasetTableFields.stream().map(DatasetTableField::getDataeaseName).collect(Collectors.toList())));
|
return;
|
||||||
Long execTime = System.currentTimeMillis();
|
|
||||||
extractData(datasetTable, "incremental_add");
|
|
||||||
saveSucessLog(datasetTableTaskLog);
|
|
||||||
sendWebMsg(datasetTable, taskId,true);
|
|
||||||
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, execTime);
|
|
||||||
}else {
|
|
||||||
DatasetTableIncrementalConfig datasetTableIncrementalConfig = dataSetTableService.incrementalConfig(datasetTableId);
|
|
||||||
if (datasetTableIncrementalConfig == null || StringUtils.isEmpty(datasetTableIncrementalConfig.getTableId())) {
|
|
||||||
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, null);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (datasetTable.getLastUpdateTime() == 0 || datasetTable.getLastUpdateTime() == null) {
|
|
||||||
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, null);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if(datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.CRON.toString())) {
|
|
||||||
datasetTableTaskLog = writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
|
|
||||||
}
|
|
||||||
if(datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) {
|
|
||||||
datasetTableTaskLog = getDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
|
|
||||||
}
|
|
||||||
Long execTime = System.currentTimeMillis();
|
|
||||||
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd()) && StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd().replace(" ", ""))) {// 增量添加
|
|
||||||
String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, datasetTable.getLastUpdateTime().toString())
|
|
||||||
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString());
|
|
||||||
generateTransFile("incremental_add", datasetTable, datasource, datasetTableFields, sql);
|
|
||||||
generateJobFile("incremental_add", datasetTable, fetchSqlField(sql, datasource));
|
|
||||||
extractData(datasetTable, "incremental_add");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete()) && StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete().replace(" ", ""))) {// 增量删除
|
|
||||||
String sql = datasetTableIncrementalConfig.getIncrementalDelete().replace(lastUpdateTime, datasetTable.getLastUpdateTime().toString())
|
|
||||||
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString());
|
|
||||||
generateTransFile("incremental_delete", datasetTable, datasource, datasetTableFields, sql);
|
|
||||||
generateJobFile("incremental_delete", datasetTable, fetchSqlField(sql, datasource));
|
|
||||||
extractData(datasetTable, "incremental_delete");
|
|
||||||
}
|
|
||||||
saveSucessLog(datasetTableTaskLog);
|
|
||||||
|
|
||||||
sendWebMsg(datasetTable, taskId,true);
|
|
||||||
|
|
||||||
deleteFile("incremental_add", datasetTableId);
|
|
||||||
deleteFile("incremental_delete", datasetTableId);
|
|
||||||
|
|
||||||
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, execTime);
|
|
||||||
datasetTableTask.setLastExecStatus(JobStatus.Completed.name());
|
|
||||||
dataSetTableTaskService.update(datasetTableTask);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (datasetTable.getLastUpdateTime() == 0 || datasetTable.getLastUpdateTime() == null) {
|
||||||
|
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, null);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.CRON.toString())) {
|
||||||
|
datasetTableTaskLog = writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
|
||||||
|
}
|
||||||
|
if(datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) {
|
||||||
|
datasetTableTaskLog = getDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
|
||||||
|
}
|
||||||
|
Long execTime = System.currentTimeMillis();
|
||||||
|
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd()) && StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd().replace(" ", ""))) {// 增量添加
|
||||||
|
String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, datasetTable.getLastUpdateTime().toString())
|
||||||
|
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString());
|
||||||
|
generateTransFile("incremental_add", datasetTable, datasource, datasetTableFields, sql);
|
||||||
|
generateJobFile("incremental_add", datasetTable, fetchSqlField(sql, datasource));
|
||||||
|
extractData(datasetTable, "incremental_add");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete()) && StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete().replace(" ", ""))) {// 增量删除
|
||||||
|
String sql = datasetTableIncrementalConfig.getIncrementalDelete().replace(lastUpdateTime, datasetTable.getLastUpdateTime().toString())
|
||||||
|
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString());
|
||||||
|
generateTransFile("incremental_delete", datasetTable, datasource, datasetTableFields, sql);
|
||||||
|
generateJobFile("incremental_delete", datasetTable, fetchSqlField(sql, datasource));
|
||||||
|
extractData(datasetTable, "incremental_delete");
|
||||||
|
}
|
||||||
|
saveSucessLog(datasetTableTaskLog);
|
||||||
|
|
||||||
|
sendWebMsg(datasetTable, taskId,true);
|
||||||
|
|
||||||
|
deleteFile("incremental_add", datasetTableId);
|
||||||
|
deleteFile("incremental_delete", datasetTableId);
|
||||||
|
|
||||||
|
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, execTime);
|
||||||
|
datasetTableTask.setLastExecStatus(JobStatus.Completed.name());
|
||||||
|
dataSetTableTaskService.update(datasetTableTask);
|
||||||
}catch (Exception e){
|
}catch (Exception e){
|
||||||
saveErrorLog(datasetTableId, taskId, e);
|
saveErrorLog(datasetTableId, taskId, e);
|
||||||
sendWebMsg(datasetTable, taskId,false);
|
sendWebMsg(datasetTable, taskId,false);
|
||||||
@ -597,7 +583,7 @@ public class ExtractDataService {
|
|||||||
if (jobStatus.getStatusDescription().equals("Finished")) {
|
if (jobStatus.getStatusDescription().equals("Finished")) {
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
DataEaseException.throwException((jobStatus.getLoggingString()));
|
DataEaseException.throwException((jobStatus.getErrorDescription()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user