fix: API 数据集增量同步

This commit is contained in:
taojinlong 2022-03-04 15:45:43 +08:00
parent 264efa33bc
commit 7675f9c119

View File

@ -342,34 +342,26 @@ public class ExtractDataService {
try {
if(datasource.getType().equalsIgnoreCase(DatasourceTypes.api.name())){
extractData(datasetTable, datasource, datasetTableFields, "incremental_add", null);
return;
}
DatasetTableIncrementalConfig datasetTableIncrementalConfig = dataSetTableService.incrementalConfig(datasetTableId);
if (datasetTableIncrementalConfig == null || StringUtils.isEmpty(datasetTableIncrementalConfig.getTableId())) {
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, null);
return;
}
if (datasetTable.getLastUpdateTime() == null || datasetTable.getLastUpdateTime() == 0) {
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, null);
saveErrorLog(datasetTableId, taskId, new Exception("未进行全量同步"));
lastExecStatus = JobStatus.Error;
return;
}
}else{
DatasetTableIncrementalConfig datasetTableIncrementalConfig = dataSetTableService.incrementalConfig(datasetTableId);
if (datasetTable.getLastUpdateTime() == null || datasetTable.getLastUpdateTime() == 0) {
throw new Exception("未进行全量同步");
}
execTime = System.currentTimeMillis();
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd()) && StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd().replace(" ", ""))) {// 增量添加
String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, datasetTable.getLastUpdateTime().toString())
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString());
extractData(datasetTable, datasource, datasetTableFields, "incremental_add", sql);
}
execTime = System.currentTimeMillis();
if (datasetTableIncrementalConfig != null && StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd()) && StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd().replace(" ", ""))) {// 增量添加
String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, datasetTable.getLastUpdateTime().toString())
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString());
extractData(datasetTable, datasource, datasetTableFields, "incremental_add", sql);
}
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete()) && StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete().replace(" ", ""))) {// 增量删除
String sql = datasetTableIncrementalConfig.getIncrementalDelete().replace(lastUpdateTime, datasetTable.getLastUpdateTime().toString())
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString());
extractData(datasetTable, datasource, datasetTableFields, "incremental_delete", sql);
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete()) && StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete().replace(" ", ""))) {// 增量删除
String sql = datasetTableIncrementalConfig.getIncrementalDelete().replace(lastUpdateTime, datasetTable.getLastUpdateTime().toString())
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString());
extractData(datasetTable, datasource, datasetTableFields, "incremental_delete", sql);
}
}
saveSuccessLog(datasetTableTaskLog);
msg = true;
lastExecStatus = JobStatus.Completed;
} catch (Exception e) {