diff --git a/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java b/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java index f6f94e4c4c..5b2a638017 100644 --- a/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java +++ b/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java @@ -12,7 +12,10 @@ import io.dataease.commons.utils.LogUtil; import io.dataease.datasource.constants.DatasourceTypes; import io.dataease.datasource.dto.DorisConfigration; import io.dataease.datasource.dto.MysqlConfigration; +import io.dataease.datasource.dto.TableFiled; +import io.dataease.datasource.provider.DatasourceProvider; import io.dataease.datasource.provider.JdbcProvider; +import io.dataease.datasource.provider.ProviderFactory; import io.dataease.datasource.request.DatasourceRequest; import io.dataease.dto.dataset.DataSetTaskLogDTO; import io.dataease.dto.dataset.DataTableInfoDTO; @@ -91,8 +94,7 @@ public class ExtractDataService { "PROPERTIES(\"replication_num\" = \"1\");"; private static String shellScript = "curl --location-trusted -u %s:%s -H \"label:%s\" -H \"column_separator:%s\" -H \"columns:%s\" -H \"merge_type: %s\" -T %s -XPUT http://%s:%s/api/%s/%s/_stream_load\n" + -// "rm -rf %s\n" + - "return $?"; + "rm -rf %s\n"; private String createDorisTablColumnSql( List datasetTableFields){ String Column_Fields = "dataease_uuid varchar(50),"; @@ -167,7 +169,7 @@ public class ExtractDataService { createDorisTable(DorisTableUtils.dorisName(datasetTableId), dorisTablColumnSql); createDorisTable(DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTableId)), dorisTablColumnSql); generateTransFile("all_scope", datasetTable, datasource, tableName, datasetTableFields, null); - generateJobFile("all_scope", datasetTable, datasetTableFields); + generateJobFile("all_scope", datasetTable, String.join(",", datasetTableFields.stream().map(DatasetTableField::getOriginName).collect(Collectors.toList()))); extractData(datasetTable, "all_scope"); replaceTable(DorisTableUtils.dorisName(datasetTableId)); datasetTableTaskLog.setStatus(JobStatus.Completed.name()); @@ -194,8 +196,10 @@ public class ExtractDataService { if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd().replace(" ", ""))) { String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString() .replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString())); + //TODO sql column + generateTransFile("incremental_add", datasetTable, datasource, tableName, datasetTableFields, sql); - generateJobFile("incremental_add", datasetTable, datasetTableFields); + generateJobFile("incremental_add", datasetTable, fetchSqlField(sql, datasource)); extractData(datasetTable, "incremental_add"); } @@ -203,8 +207,10 @@ public class ExtractDataService { if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete())) { String sql = datasetTableIncrementalConfig.getIncrementalDelete().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString() .replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString())); + //TODO sql column + ; generateTransFile("incremental_delete", datasetTable, datasource, tableName, datasetTableFields, sql); - generateJobFile("incremental_delete", datasetTable, datasetTableFields); + generateJobFile("incremental_delete", datasetTable, fetchSqlField(sql, datasource)); extractData(datasetTable, "incremental_delete"); } datasetTableTaskLog.setStatus(JobStatus.Completed.name()); @@ -289,44 +295,32 @@ public class ExtractDataService { return remoteSlaveServer; } - private void generateJobFile(String extractType, DatasetTable datasetTable, List datasetTableFields) throws Exception { + private void generateJobFile(String extractType, DatasetTable datasetTable, String columnFeilds) throws Exception { String dorisOutputTable = null; String jobName = null; String script = null; Datasource dorisDatasource = (Datasource)CommonBeanFactory.getBean("DorisDatasource"); DorisConfigration dorisConfigration = new Gson().fromJson(dorisDatasource.getConfiguration(), DorisConfigration.class); - String columns = String.join(",", datasetTableFields.stream().map(DatasetTableField::getOriginName).collect(Collectors.toList())) + ",dataease_uuid"; - - switch (extractType) { - case "all_scope": - jobName = "job_" + datasetTable.getId(); - script = String.format(shellScript, dorisConfigration.getUsername(), dorisConfigration.getPassword(), String.valueOf(System.currentTimeMillis()), separator, columns, "APPEND",root_path + dorisOutputTable + "." + extention, dorisConfigration.getHost(),dorisConfigration.getHttpPort(), dorisConfigration.getDataBase(), dorisOutputTable, root_path + dorisOutputTable + "." + extention); - break; - case "incremental_add": - jobName = "job_add_" + datasetTable.getId(); - script = String.format(shellScript, dorisConfigration.getUsername(), dorisConfigration.getPassword(), String.valueOf(System.currentTimeMillis()), separator, columns, "APPEND", root_path + dorisOutputTable + "." + extention, dorisConfigration.getHost(),dorisConfigration.getHttpPort(), dorisConfigration.getDataBase(), dorisOutputTable, root_path + dorisOutputTable + "." + extention); - break; - case "incremental_delete": - script = String.format(shellScript, dorisConfigration.getUsername(), dorisConfigration.getPassword(), String.valueOf(System.currentTimeMillis()), separator, columns, "DELETE", root_path + dorisOutputTable + "." + extention, dorisConfigration.getHost(),dorisConfigration.getHttpPort(), dorisConfigration.getDataBase(), dorisOutputTable, root_path + dorisOutputTable + "." + extention); - jobName = "job_delete_" + datasetTable.getId(); - break; - default: - break; - } - + String columns = columnFeilds + ",dataease_uuid"; String transName = null; switch (extractType) { case "all_scope": transName = "trans_" + datasetTable.getId(); dorisOutputTable = DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTable.getId())); + jobName = "job_" + datasetTable.getId(); + script = String.format(shellScript, dorisConfigration.getUsername(), dorisConfigration.getPassword(), String.valueOf(System.currentTimeMillis()), separator, columns, "APPEND",root_path + dorisOutputTable + "." + extention, dorisConfigration.getHost(),dorisConfigration.getHttpPort(), dorisConfigration.getDataBase(), dorisOutputTable, root_path + dorisOutputTable + "." + extention); break; case "incremental_add": transName = "trans_add_" + datasetTable.getId(); dorisOutputTable = DorisTableUtils.dorisName(datasetTable.getId()); + jobName = "job_add_" + datasetTable.getId(); + script = String.format(shellScript, dorisConfigration.getUsername(), dorisConfigration.getPassword(), String.valueOf(System.currentTimeMillis()), separator, columns, "APPEND", root_path + dorisOutputTable + "." + extention, dorisConfigration.getHost(),dorisConfigration.getHttpPort(), dorisConfigration.getDataBase(), dorisOutputTable, root_path + dorisOutputTable + "." + extention); break; case "incremental_delete": transName = "trans_delete_" + datasetTable.getId(); - dorisOutputTable = DorisTableUtils.dorisName(datasetTable.getId()); + dorisOutputTable = DorisTableUtils.dorisDeleteName(DorisTableUtils.dorisName(datasetTable.getId())); + script = String.format(shellScript, dorisConfigration.getUsername(), dorisConfigration.getPassword(), String.valueOf(System.currentTimeMillis()), separator, columns, "DELETE", root_path + dorisOutputTable + "." + extention, dorisConfigration.getHost(),dorisConfigration.getHttpPort(), dorisConfigration.getDataBase(), DorisTableUtils.dorisName(datasetTable.getId()), root_path + dorisOutputTable + "." + extention); + jobName = "job_delete_" + datasetTable.getId(); break; default: break; @@ -382,16 +376,25 @@ public class ExtractDataService { greenHop.setEvaluation(true); jobMeta.addJobHop(greenHop); - - if(!extractType.equals("incremental_delete")){ - - } - String jobXml = jobMeta.getXML(); File file = new File(root_path + jobName + ".kjb"); FileUtils.writeStringToFile(file, jobXml, "UTF-8"); } + private String fetchSqlField(String sql, Datasource ds)throws Exception{ + String tmpSql = sql; + DatasourceProvider datasourceProvider = ProviderFactory.getProvider(ds.getType()); + DatasourceRequest datasourceRequest = new DatasourceRequest(); + datasourceRequest.setDatasource(ds); + if(tmpSql.trim().endsWith(";")){ + tmpSql = tmpSql.substring(0, tmpSql.length() -1 ) + " limit 0"; + }else { + tmpSql = tmpSql + " limit 0"; + } + datasourceRequest.setQuery(tmpSql); + return String.join(",", datasourceProvider.fetchResultField(datasourceRequest).stream().map(TableFiled::getFieldName).collect(Collectors.toList())); + } + private void generateTransFile(String extractType, DatasetTable datasetTable, Datasource datasource, String table, List datasetTableFields, String selectSQL) throws Exception { TransMeta transMeta = new TransMeta(); String dorisOutputTable = null;