forked from github/dataease
feat: 增量更新的sql,增加校验
This commit is contained in:
parent
c084ac68cf
commit
a75734b948
@ -81,6 +81,9 @@ public class DataSetTableService {
|
||||
private QrtzSchedulerStateMapper qrtzSchedulerStateMapper;
|
||||
@Resource
|
||||
private DatasetTableTaskLogMapper datasetTableTaskLogMapper;
|
||||
private static String lastUpdateTime = "${__last_update_time__}";
|
||||
private static String currentUpdateTime = "${__current_update_time__}";
|
||||
|
||||
@Value("${upload.file.path}")
|
||||
private String path;
|
||||
|
||||
@ -652,6 +655,7 @@ public class DataSetTableService {
|
||||
} else {
|
||||
return new DatasetTableIncrementalConfig();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public DatasetTableIncrementalConfig incrementalConfig(String datasetTableId) {
|
||||
@ -661,7 +665,7 @@ public class DataSetTableService {
|
||||
}
|
||||
|
||||
|
||||
public void saveIncrementalConfig(DatasetTableIncrementalConfig datasetTableIncrementalConfig) {
|
||||
public void saveIncrementalConfig(DatasetTableIncrementalConfig datasetTableIncrementalConfig) throws Exception{
|
||||
if (datasetTableIncrementalConfig == null || StringUtils.isEmpty(datasetTableIncrementalConfig.getTableId())) {
|
||||
return;
|
||||
}
|
||||
@ -671,8 +675,65 @@ public class DataSetTableService {
|
||||
} else {
|
||||
datasetTableIncrementalConfigMapper.updateByPrimaryKey(datasetTableIncrementalConfig);
|
||||
}
|
||||
checkColumes(datasetTableIncrementalConfig);
|
||||
}
|
||||
|
||||
private void checkColumes(DatasetTableIncrementalConfig datasetTableIncrementalConfig) throws Exception {
|
||||
DatasetTable datasetTable = datasetTableMapper.selectByPrimaryKey(datasetTableIncrementalConfig.getTableId());
|
||||
List<DatasetTableField> datasetTableFields = dataSetTableFieldsService.getFieldsByTableId(datasetTable.getId());
|
||||
datasetTableFields.sort((o1, o2) -> {
|
||||
if (o1.getOriginName() == null) {
|
||||
return -1;
|
||||
}
|
||||
if (o2.getOriginName() == null) {
|
||||
return 1;
|
||||
}
|
||||
return o1.getOriginName().compareTo(o2.getOriginName());
|
||||
});
|
||||
List<String> originNameFileds = datasetTableFields.stream().map(DatasetTableField::getOriginName).collect(Collectors.toList());
|
||||
Datasource ds = datasourceMapper.selectByPrimaryKey(datasetTable.getDataSourceId());
|
||||
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(ds.getType());
|
||||
DatasourceRequest datasourceRequest = new DatasourceRequest();
|
||||
datasourceRequest.setDatasource(ds);
|
||||
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd()) && StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd().replace(" ", ""))) {// 增量添加
|
||||
String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, Long.valueOf(System.currentTimeMillis()).toString())
|
||||
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString());
|
||||
datasourceRequest.setQuery(sql);
|
||||
List<String> sqlFileds = new ArrayList<>();
|
||||
datasourceProvider.fetchResultField(datasourceRequest).stream().map(TableFiled::getFieldName).forEach(filed ->{
|
||||
sqlFileds.add(filed);
|
||||
});
|
||||
sort(sqlFileds);
|
||||
if(!originNameFileds.equals(sqlFileds)){
|
||||
throw new Exception(Translator.get("i18n_sql_add_not_matching") + sqlFileds.toString());
|
||||
}
|
||||
}
|
||||
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete()) && StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete().replace(" ", ""))) {// 增量删除
|
||||
String sql = datasetTableIncrementalConfig.getIncrementalDelete().replace(lastUpdateTime, Long.valueOf(System.currentTimeMillis()).toString())
|
||||
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString());
|
||||
datasourceRequest.setQuery(sql);
|
||||
List<String> sqlFileds = new ArrayList<>();
|
||||
datasourceProvider.fetchResultField(datasourceRequest).stream().map(TableFiled::getFieldName).forEach(filed ->{
|
||||
sqlFileds.add(filed);
|
||||
});
|
||||
sort(sqlFileds);
|
||||
if(!originNameFileds.equals(sqlFileds)){
|
||||
throw new Exception(Translator.get("i18n_sql_delete_not_matching") + sqlFileds.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void sort(List<String> sqlFileds){
|
||||
sqlFileds.sort((o1, o2) -> {
|
||||
if (o1 == null) {
|
||||
return -1;
|
||||
}
|
||||
if (o2 == null) {
|
||||
return 1;
|
||||
}
|
||||
return o1.compareTo(o2);
|
||||
});
|
||||
}
|
||||
private void checkName(DatasetTable datasetTable) {
|
||||
// if (StringUtils.isEmpty(datasetTable.getId()) && StringUtils.equalsIgnoreCase("db", datasetTable.getType())) {
|
||||
// return;
|
||||
|
@ -24,7 +24,7 @@ import java.util.UUID;
|
||||
* @Date 2021/3/4 1:26 下午
|
||||
*/
|
||||
@Service
|
||||
@Transactional
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public class DataSetTableTaskService {
|
||||
@Resource
|
||||
private DatasetTableTaskMapper datasetTableTaskMapper;
|
||||
|
@ -245,7 +245,7 @@ public class ExtractDataService {
|
||||
datasetTableTaskLog = getDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
|
||||
}
|
||||
|
||||
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd().replace(" ", ""))) {// 增量添加
|
||||
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd()) && StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd().replace(" ", ""))) {// 增量添加
|
||||
String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, datasetTableTaskLogs.get(0).getStartTime().toString())
|
||||
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString());
|
||||
generateTransFile("incremental_add", datasetTable, datasource, datasetTableFields, sql);
|
||||
@ -253,7 +253,7 @@ public class ExtractDataService {
|
||||
extractData(datasetTable, "incremental_add");
|
||||
}
|
||||
|
||||
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete().replace(" ", ""))) {// 增量删除
|
||||
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete()) && StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete().replace(" ", ""))) {// 增量删除
|
||||
String sql = datasetTableIncrementalConfig.getIncrementalDelete().replace(lastUpdateTime, datasetTableTaskLogs.get(0).getStartTime().toString())
|
||||
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString());
|
||||
generateTransFile("incremental_delete", datasetTable, datasource, datasetTableFields, sql);
|
||||
@ -563,23 +563,26 @@ public class ExtractDataService {
|
||||
}
|
||||
|
||||
private String fetchSqlField(String sql, Datasource ds) throws Exception {
|
||||
String tmpSql = sql;
|
||||
String tmpSql = "SELECT * FROM (" + sqlFix(sql) + ") AS tmp " + " LIMIT 0";
|
||||
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(ds.getType());
|
||||
DatasourceRequest datasourceRequest = new DatasourceRequest();
|
||||
datasourceRequest.setDatasource(ds);
|
||||
if (tmpSql.trim().endsWith(";")) {
|
||||
tmpSql = tmpSql.substring(0, tmpSql.length() - 1) + " limit 0";
|
||||
} else {
|
||||
tmpSql = tmpSql + " limit 0";
|
||||
}
|
||||
datasourceRequest.setQuery(tmpSql);
|
||||
List<String>dorisFileds = new ArrayList<>();
|
||||
List<String> dorisFileds = new ArrayList<>();
|
||||
datasourceProvider.fetchResultField(datasourceRequest).stream().map(TableFiled::getFieldName).forEach(filed ->{
|
||||
dorisFileds.add(DorisTableUtils.columnName(filed));
|
||||
});
|
||||
return String.join(",", dorisFileds);
|
||||
}
|
||||
|
||||
private String sqlFix(String sql) {
|
||||
sql = sql.trim();
|
||||
if (sql.lastIndexOf(";") == (sql.length() - 1)) {
|
||||
sql = sql.substring(0, sql.length() - 1);
|
||||
}
|
||||
return sql;
|
||||
}
|
||||
|
||||
|
||||
private void generateTransFile(String extractType, DatasetTable datasetTable, Datasource datasource, List<DatasetTableField> datasetTableFields, String selectSQL) throws Exception {
|
||||
TransMeta transMeta = new TransMeta();
|
||||
|
@ -254,4 +254,6 @@ i18n_dataset_delete=Data set is delete
|
||||
i18n_chart_delete=Chart is delete
|
||||
i18n_not_exec_add_sync=There is no completed synchronization task. Incremental synchronization cannot be performed
|
||||
i18n_excel_header_empty=Excel first row can not empty
|
||||
i18n_custom_ds_delete=Custom dataset union data is deleted,can not display
|
||||
i18n_custom_ds_delete=Custom dataset union data is deleted,can not display
|
||||
i18n_sql_add_not_matching=The data column of incremental SQL does not match the dataset,
|
||||
i18n_sql_delete_not_matching=The data column of incremental delete SQL does not match the dataset,
|
@ -255,3 +255,6 @@ i18n_chart_delete=当前用到的视图已被删除
|
||||
i18n_not_exec_add_sync=没有已完成的同步任务,无法进行增量同步
|
||||
i18n_excel_header_empty=Excel第一行为空
|
||||
i18n_custom_ds_delete=自定义数据集所关联数据被删除,无法正常显示
|
||||
i18n_sql_add_not_matching=增量添加 sql 的数据列与数据集不匹配,
|
||||
i18n_sql_delete_not_matching=增量删除 sql 的数据列与数据集不匹配,
|
||||
|
||||
|
@ -256,4 +256,6 @@ i18n_dataset_delete=當前用到的數據集已被刪除
|
||||
i18n_chart_delete=當前用到的視圖已被刪除
|
||||
i18n_not_exec_add_sync=沒有已經完成的同步任務,無法進行增量同步
|
||||
i18n_excel_header_empty=Excel第一行為空
|
||||
i18n_custom_ds_delete=自定義數據集所關聯數據被刪除,無法正常顯示
|
||||
i18n_custom_ds_delete=自定義數據集所關聯數據被刪除,無法正常顯示
|
||||
i18n_sql_add_not_matching=增量添加 sql 的數據列與數據集不匹配,
|
||||
i18n_sql_delete_not_matching=增量刪除 sql 的數據列與數據集不匹配,
|
@ -472,7 +472,6 @@ export default {
|
||||
})
|
||||
},
|
||||
saveIncrementalConfig() {
|
||||
this.update_setting = false
|
||||
if (this.incrementalUpdateType === 'incrementalAdd') {
|
||||
this.incrementalConfig.incrementalAdd = this.sql
|
||||
} else {
|
||||
@ -485,6 +484,7 @@ export default {
|
||||
type: 'success',
|
||||
showClose: true
|
||||
})
|
||||
this.update_setting = false
|
||||
})
|
||||
},
|
||||
saveTask(task) {
|
||||
|
Loading…
Reference in New Issue
Block a user