From 9c3ca09973f127b21e2a129762d0103a62ead224 Mon Sep 17 00:00:00 2001 From: taojinlong Date: Thu, 29 Apr 2021 10:47:11 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=8A=BD=E5=8F=96=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=88=B0doris?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/pom.xml | 54 +++--- .../commons/utils/DorisTableUtils.java | 8 +- .../java/io/dataease/config/CommonConfig.java | 1 + .../datasource/dto/DorisConfigration.java | 11 ++ ...grationDTO.java => MysqlConfigration.java} | 2 +- .../datasource/provider/JdbcProvider.java | 26 +-- .../service/dataset/ExtractDataService.java | 173 +++++++++++------- 7 files changed, 162 insertions(+), 113 deletions(-) create mode 100644 backend/src/main/java/io/dataease/datasource/dto/DorisConfigration.java rename backend/src/main/java/io/dataease/datasource/dto/{MysqlConfigrationDTO.java => MysqlConfigration.java} (88%) diff --git a/backend/pom.xml b/backend/pom.xml index b2216cd9b2..1ca0c22e7f 100644 --- a/backend/pom.xml +++ b/backend/pom.xml @@ -457,33 +457,33 @@ - - org.apache.maven.plugins - maven-antrun-plugin - - - main-class-placement - generate-resources - - - - - - - - - - - - - - - - run - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/backend/src/main/java/io/dataease/commons/utils/DorisTableUtils.java b/backend/src/main/java/io/dataease/commons/utils/DorisTableUtils.java index 74c6e007c8..1b61020a14 100644 --- a/backend/src/main/java/io/dataease/commons/utils/DorisTableUtils.java +++ b/backend/src/main/java/io/dataease/commons/utils/DorisTableUtils.java @@ -6,8 +6,12 @@ public class DorisTableUtils { return "ds_" + datasetId.replace("-", "_"); } - public static String doristmpName(String dorisName){ - return "tmp" + dorisName; + public static String dorisTmpName(String dorisName){ + return "tmp_" + dorisName; + } + + public static String dorisDeleteName(String dorisName){ + return "delete_" + dorisName; } } diff --git a/backend/src/main/java/io/dataease/config/CommonConfig.java b/backend/src/main/java/io/dataease/config/CommonConfig.java index 36917dd40e..d7ce6c3e5a 100644 --- a/backend/src/main/java/io/dataease/config/CommonConfig.java +++ b/backend/src/main/java/io/dataease/config/CommonConfig.java @@ -33,6 +33,7 @@ public class CommonConfig { jsonObject.put("password", env.getProperty("doris.password", "dataease")); jsonObject.put("host", env.getProperty("doris.host", "doris")); jsonObject.put("port", env.getProperty("doris.port", "9030")); + jsonObject.put("httpPort", env.getProperty("doris.httpPort", "8030")); Datasource datasource = new Datasource(); datasource.setId("doris"); diff --git a/backend/src/main/java/io/dataease/datasource/dto/DorisConfigration.java b/backend/src/main/java/io/dataease/datasource/dto/DorisConfigration.java new file mode 100644 index 0000000000..b3f7306b5c --- /dev/null +++ b/backend/src/main/java/io/dataease/datasource/dto/DorisConfigration.java @@ -0,0 +1,11 @@ +package io.dataease.datasource.dto; + +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +public class DorisConfigration extends MysqlConfigration { + + private Integer httpPort; +} diff --git a/backend/src/main/java/io/dataease/datasource/dto/MysqlConfigrationDTO.java b/backend/src/main/java/io/dataease/datasource/dto/MysqlConfigration.java similarity index 88% rename from backend/src/main/java/io/dataease/datasource/dto/MysqlConfigrationDTO.java rename to backend/src/main/java/io/dataease/datasource/dto/MysqlConfigration.java index 9e2d444480..2bb05ee873 100644 --- a/backend/src/main/java/io/dataease/datasource/dto/MysqlConfigrationDTO.java +++ b/backend/src/main/java/io/dataease/datasource/dto/MysqlConfigration.java @@ -6,7 +6,7 @@ import org.apache.commons.lang3.StringUtils; @Getter @Setter -public class MysqlConfigrationDTO extends JdbcDTO { +public class MysqlConfigration extends JdbcDTO { private String driver = "com.mysql.cj.jdbc.Driver"; diff --git a/backend/src/main/java/io/dataease/datasource/provider/JdbcProvider.java b/backend/src/main/java/io/dataease/datasource/provider/JdbcProvider.java index 5f157aa163..d1d03efa9f 100644 --- a/backend/src/main/java/io/dataease/datasource/provider/JdbcProvider.java +++ b/backend/src/main/java/io/dataease/datasource/provider/JdbcProvider.java @@ -3,7 +3,7 @@ package io.dataease.datasource.provider; import com.google.gson.Gson; import com.mchange.v2.c3p0.ComboPooledDataSource; import io.dataease.datasource.constants.DatasourceTypes; -import io.dataease.datasource.dto.MysqlConfigrationDTO; +import io.dataease.datasource.dto.MysqlConfigration; import io.dataease.datasource.dto.SqlServerConfigration; import io.dataease.datasource.dto.TableFiled; import io.dataease.datasource.request.DatasourceRequest; @@ -292,11 +292,11 @@ public class JdbcProvider extends DatasourceProvider { DatasourceTypes datasourceType = DatasourceTypes.valueOf(datasourceRequest.getDatasource().getType()); switch (datasourceType) { case mysql: - MysqlConfigrationDTO mysqlConfigrationDTO = new Gson().fromJson(datasourceRequest.getDatasource().getConfiguration(), MysqlConfigrationDTO.class); - username = mysqlConfigrationDTO.getUsername(); - password = mysqlConfigrationDTO.getPassword(); - driver = mysqlConfigrationDTO.getDriver(); - jdbcurl = mysqlConfigrationDTO.getJdbc(); + MysqlConfigration mysqlConfigration = new Gson().fromJson(datasourceRequest.getDatasource().getConfiguration(), MysqlConfigration.class); + username = mysqlConfigration.getUsername(); + password = mysqlConfigration.getPassword(); + driver = mysqlConfigration.getDriver(); + jdbcurl = mysqlConfigration.getJdbc(); break; case sqlServer: SqlServerConfigration sqlServerConfigration = new Gson().fromJson(datasourceRequest.getDatasource().getConfiguration(), SqlServerConfigration.class); @@ -323,11 +323,11 @@ public class JdbcProvider extends DatasourceProvider { DatasourceTypes datasourceType = DatasourceTypes.valueOf(datasourceRequest.getDatasource().getType()); switch (datasourceType) { case mysql: - MysqlConfigrationDTO mysqlConfigrationDTO = new Gson().fromJson(datasourceRequest.getDatasource().getConfiguration(), MysqlConfigrationDTO.class); - dataSource.setUser(mysqlConfigrationDTO.getUsername()); - dataSource.setDriverClass(mysqlConfigrationDTO.getDriver()); - dataSource.setPassword(mysqlConfigrationDTO.getPassword()); - dataSource.setJdbcUrl(mysqlConfigrationDTO.getJdbc()); + MysqlConfigration mysqlConfigration = new Gson().fromJson(datasourceRequest.getDatasource().getConfiguration(), MysqlConfigration.class); + dataSource.setUser(mysqlConfigration.getUsername()); + dataSource.setDriverClass(mysqlConfigration.getDriver()); + dataSource.setPassword(mysqlConfigration.getPassword()); + dataSource.setJdbcUrl(mysqlConfigration.getJdbc()); break; case sqlServer: SqlServerConfigration sqlServerConfigration = new Gson().fromJson(datasourceRequest.getDatasource().getConfiguration(), SqlServerConfigration.class); @@ -345,8 +345,8 @@ public class JdbcProvider extends DatasourceProvider { DatasourceTypes datasourceType = DatasourceTypes.valueOf(datasourceRequest.getDatasource().getType()); switch (datasourceType) { case mysql: - MysqlConfigrationDTO mysqlConfigrationDTO = new Gson().fromJson(datasourceRequest.getDatasource().getConfiguration(), MysqlConfigrationDTO.class); - return mysqlConfigrationDTO.getDataBase(); + MysqlConfigration mysqlConfigration = new Gson().fromJson(datasourceRequest.getDatasource().getConfiguration(), MysqlConfigration.class); + return mysqlConfigration.getDataBase(); case sqlServer: SqlServerConfigration sqlServerConfigration = new Gson().fromJson(datasourceRequest.getDatasource().getConfiguration(), SqlServerConfigration.class); return sqlServerConfigration.getDataBase(); diff --git a/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java b/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java index d32518b05f..f6f94e4c4c 100644 --- a/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java +++ b/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java @@ -10,7 +10,8 @@ import io.dataease.commons.utils.CommonBeanFactory; import io.dataease.commons.utils.DorisTableUtils; import io.dataease.commons.utils.LogUtil; import io.dataease.datasource.constants.DatasourceTypes; -import io.dataease.datasource.dto.MysqlConfigrationDTO; +import io.dataease.datasource.dto.DorisConfigration; +import io.dataease.datasource.dto.MysqlConfigration; import io.dataease.datasource.provider.JdbcProvider; import io.dataease.datasource.request.DatasourceRequest; import io.dataease.dto.dataset.DataSetTaskLogDTO; @@ -25,6 +26,7 @@ import org.pentaho.di.job.Job; import org.pentaho.di.job.JobExecutionConfiguration; import org.pentaho.di.job.JobHopMeta; import org.pentaho.di.job.JobMeta; +import org.pentaho.di.job.entries.shell.JobEntryShell; import org.pentaho.di.job.entries.special.JobEntrySpecial; import org.pentaho.di.job.entries.success.JobEntrySuccess; import org.pentaho.di.job.entries.trans.JobEntryTrans; @@ -36,7 +38,8 @@ import org.pentaho.di.trans.TransMeta; import org.pentaho.di.trans.step.StepMeta; import org.pentaho.di.trans.steps.sql.ExecSQLMeta; import org.pentaho.di.trans.steps.tableinput.TableInputMeta; -import org.pentaho.di.trans.steps.tableoutput.TableOutputMeta; +import org.pentaho.di.trans.steps.textfileoutput.TextFileField; +import org.pentaho.di.trans.steps.textfileoutput.TextFileOutputMeta; import org.pentaho.di.trans.steps.userdefinedjavaclass.UserDefinedJavaClassDef; import org.pentaho.di.trans.steps.userdefinedjavaclass.UserDefinedJavaClassMeta; import org.pentaho.di.www.SlaveServerJobStatus; @@ -68,11 +71,9 @@ public class ExtractDataService { private static String lastUpdateTime = "${__last_update_time__}"; private static String currentUpdateTime = "${__current_update_time__}"; - private static String dataease_column_family = "dataease"; + private static String separator = "|"; + private static String extention = "txt"; private static String root_path = "/opt/dataease/data/kettle/"; - private static String data_path = "/opt/dataease/data/db/"; - private static String hbase_conf_file = "/opt/dataease/conf/hbase-site.xml"; - private static String pentaho_mappings = "pentaho_mappings"; @Value("${carte.host:127.0.0.1}") private String carte; @@ -85,8 +86,14 @@ public class ExtractDataService { private static String creatTableSql = "CREATE TABLE IF NOT EXISTS TABLE_NAME" + "Column_Fields" + + "UNIQUE KEY(dataease_uuid)\n" + + "DISTRIBUTED BY HASH(dataease_uuid) BUCKETS 10\n" + "PROPERTIES(\"replication_num\" = \"1\");"; + private static String shellScript = "curl --location-trusted -u %s:%s -H \"label:%s\" -H \"column_separator:%s\" -H \"columns:%s\" -H \"merge_type: %s\" -T %s -XPUT http://%s:%s/api/%s/%s/_stream_load\n" + +// "rm -rf %s\n" + + "return $?"; + private String createDorisTablColumnSql( List datasetTableFields){ String Column_Fields = "dataease_uuid varchar(50),"; for (DatasetTableField datasetTableField : datasetTableFields) { @@ -110,7 +117,7 @@ public class ExtractDataService { } } Column_Fields = Column_Fields.substring(0, Column_Fields.length() -1 ); - Column_Fields = "(" + Column_Fields + ")" + "DISTRIBUTED BY HASH(dataease_uuid) BUCKETS 10\n"; + Column_Fields = "(" + Column_Fields + ")\n"; return Column_Fields; } @@ -120,6 +127,7 @@ public class ExtractDataService { DatasourceRequest datasourceRequest = new DatasourceRequest(); datasourceRequest.setDatasource(dorisDatasource); datasourceRequest.setQuery(creatTableSql.replace("TABLE_NAME", dorisTableName).replace("Column_Fields", dorisTablColumnSql)); + System.out.println(datasourceRequest.getQuery()); jdbcProvider.exec(datasourceRequest); } @@ -128,7 +136,7 @@ public class ExtractDataService { JdbcProvider jdbcProvider = CommonBeanFactory.getBean(JdbcProvider.class);; DatasourceRequest datasourceRequest = new DatasourceRequest(); datasourceRequest.setDatasource(dorisDatasource); - datasourceRequest.setQuery("ALTER TABLE DORIS_TABLE REPLACE WITH TABLE DORIS_TMP_TABLE PROPERTIES('swap' = 'false');".replace("DORIS_TABLE", dorisTableName).replace("DORIS_TMP_TABLE", DorisTableUtils.doristmpName(dorisTableName))); + datasourceRequest.setQuery("ALTER TABLE DORIS_TABLE REPLACE WITH TABLE DORIS_TMP_TABLE PROPERTIES('swap' = 'false');".replace("DORIS_TABLE", dorisTableName).replace("DORIS_TMP_TABLE", DorisTableUtils.dorisTmpName(dorisTableName))); jdbcProvider.exec(datasourceRequest); } @@ -139,6 +147,16 @@ public class ExtractDataService { DatasetTable datasetTable = dataSetTableService.get(datasetTableId); Datasource datasource = datasourceMapper.selectByPrimaryKey(datasetTable.getDataSourceId()); List datasetTableFields = dataSetTableFieldsService.list(DatasetTableField.builder().tableId(datasetTable.getId()).build()); + datasetTableFields.sort((o1, o2) -> { + if (o1.getOriginName() == null) { + return -1; + } + if (o2.getOriginName() == null) { + return 1; + } + return o1.getOriginName().compareTo(o2.getOriginName()); + }); + String tableName = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getTable(); String dorisTablColumnSql = createDorisTablColumnSql(datasetTableFields); switch (updateType) { @@ -147,9 +165,9 @@ public class ExtractDataService { writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId); // TODO before: check doris table column type createDorisTable(DorisTableUtils.dorisName(datasetTableId), dorisTablColumnSql); - createDorisTable(DorisTableUtils.doristmpName(DorisTableUtils.dorisName(datasetTableId)), dorisTablColumnSql); + createDorisTable(DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTableId)), dorisTablColumnSql); generateTransFile("all_scope", datasetTable, datasource, tableName, datasetTableFields, null); - generateJobFile("all_scope", datasetTable); + generateJobFile("all_scope", datasetTable, datasetTableFields); extractData(datasetTable, "all_scope"); replaceTable(DorisTableUtils.dorisName(datasetTableId)); datasetTableTaskLog.setStatus(JobStatus.Completed.name()); @@ -177,7 +195,7 @@ public class ExtractDataService { String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString() .replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString())); generateTransFile("incremental_add", datasetTable, datasource, tableName, datasetTableFields, sql); - generateJobFile("incremental_add", datasetTable); + generateJobFile("incremental_add", datasetTable, datasetTableFields); extractData(datasetTable, "incremental_add"); } @@ -186,7 +204,7 @@ public class ExtractDataService { String sql = datasetTableIncrementalConfig.getIncrementalDelete().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString() .replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString())); generateTransFile("incremental_delete", datasetTable, datasource, tableName, datasetTableFields, sql); - generateJobFile("incremental_delete", datasetTable); + generateJobFile("incremental_delete", datasetTable, datasetTableFields); extractData(datasetTable, "incremental_delete"); } datasetTableTaskLog.setStatus(JobStatus.Completed.name()); @@ -242,9 +260,14 @@ public class ExtractDataService { jobExecutionConfiguration.setRepository(repository); String lastCarteObjectId = Job.sendToSlaveServer(jobMeta, jobExecutionConfiguration, repository, null); SlaveServerJobStatus jobStatus = null; - do { + boolean running = true; + while(running) { jobStatus = remoteSlaveServer.getJobStatus(jobMeta.getName(), lastCarteObjectId, 0); - } while (jobStatus != null && jobStatus.isRunning()); + running = jobStatus.isRunning(); + if(!running) + break; + Thread.sleep(1000); + } if (jobStatus.getStatusDescription().equals("Finished")) { return; } else { @@ -266,16 +289,25 @@ public class ExtractDataService { return remoteSlaveServer; } - private void generateJobFile(String extractType, DatasetTable datasetTable) throws Exception { + private void generateJobFile(String extractType, DatasetTable datasetTable, List datasetTableFields) throws Exception { + String dorisOutputTable = null; String jobName = null; + String script = null; + Datasource dorisDatasource = (Datasource)CommonBeanFactory.getBean("DorisDatasource"); + DorisConfigration dorisConfigration = new Gson().fromJson(dorisDatasource.getConfiguration(), DorisConfigration.class); + String columns = String.join(",", datasetTableFields.stream().map(DatasetTableField::getOriginName).collect(Collectors.toList())) + ",dataease_uuid"; + switch (extractType) { case "all_scope": jobName = "job_" + datasetTable.getId(); + script = String.format(shellScript, dorisConfigration.getUsername(), dorisConfigration.getPassword(), String.valueOf(System.currentTimeMillis()), separator, columns, "APPEND",root_path + dorisOutputTable + "." + extention, dorisConfigration.getHost(),dorisConfigration.getHttpPort(), dorisConfigration.getDataBase(), dorisOutputTable, root_path + dorisOutputTable + "." + extention); break; case "incremental_add": jobName = "job_add_" + datasetTable.getId(); + script = String.format(shellScript, dorisConfigration.getUsername(), dorisConfigration.getPassword(), String.valueOf(System.currentTimeMillis()), separator, columns, "APPEND", root_path + dorisOutputTable + "." + extention, dorisConfigration.getHost(),dorisConfigration.getHttpPort(), dorisConfigration.getDataBase(), dorisOutputTable, root_path + dorisOutputTable + "." + extention); break; case "incremental_delete": + script = String.format(shellScript, dorisConfigration.getUsername(), dorisConfigration.getPassword(), String.valueOf(System.currentTimeMillis()), separator, columns, "DELETE", root_path + dorisOutputTable + "." + extention, dorisConfigration.getHost(),dorisConfigration.getHttpPort(), dorisConfigration.getDataBase(), dorisOutputTable, root_path + dorisOutputTable + "." + extention); jobName = "job_delete_" + datasetTable.getId(); break; default: @@ -286,12 +318,15 @@ public class ExtractDataService { switch (extractType) { case "all_scope": transName = "trans_" + datasetTable.getId(); + dorisOutputTable = DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTable.getId())); break; case "incremental_add": transName = "trans_add_" + datasetTable.getId(); + dorisOutputTable = DorisTableUtils.dorisName(datasetTable.getId()); break; case "incremental_delete": transName = "trans_delete_" + datasetTable.getId(); + dorisOutputTable = DorisTableUtils.dorisName(datasetTable.getId()); break; default: break; @@ -299,6 +334,8 @@ public class ExtractDataService { JobMeta jobMeta = new JobMeta(); jobMeta.setName(jobName); + + //start JobEntrySpecial start = new JobEntrySpecial(); start.setName("START"); start.setStart(true); @@ -307,6 +344,7 @@ public class ExtractDataService { startEntry.setLocation(100, 100); jobMeta.addJobEntry(startEntry); + //trans JobEntryTrans transrans = new JobEntryTrans(); transrans.setTransname(transName); transrans.setName("Transformation"); @@ -317,41 +355,52 @@ public class ExtractDataService { jobMeta.addJobHop(new JobHopMeta(startEntry, transEntry)); + //exec shell + JobEntryShell shell = new JobEntryShell(); + shell.setScript(script); + shell.insertScript = true; + shell.setName("shell"); + JobEntryCopy shellEntry = new JobEntryCopy(shell); + shellEntry.setDrawn(true); + shellEntry.setLocation(500, 100); + jobMeta.addJobEntry(shellEntry); + + JobHopMeta transHop = new JobHopMeta(transEntry, shellEntry); + transHop.setEvaluation(true); + jobMeta.addJobHop(transHop); + + //success JobEntrySuccess success = new JobEntrySuccess(); success.setName("Success"); JobEntryCopy successEntry = new JobEntryCopy(success); successEntry.setDrawn(true); - successEntry.setLocation(500, 100); + successEntry.setLocation(700, 100); jobMeta.addJobEntry(successEntry); - JobHopMeta greenHop = new JobHopMeta(transEntry, successEntry); + + JobHopMeta greenHop = new JobHopMeta(shellEntry, successEntry); greenHop.setEvaluation(true); jobMeta.addJobHop(greenHop); + + if(!extractType.equals("incremental_delete")){ + + } + String jobXml = jobMeta.getXML(); File file = new File(root_path + jobName + ".kjb"); FileUtils.writeStringToFile(file, jobXml, "UTF-8"); } private void generateTransFile(String extractType, DatasetTable datasetTable, Datasource datasource, String table, List datasetTableFields, String selectSQL) throws Exception { - datasetTableFields.sort((o1, o2) -> { - if (o1.getOriginName() == null) { - return -1; - } - if (o2.getOriginName() == null) { - return 1; - } - return o1.getOriginName().compareTo(o2.getOriginName()); - }); - TransMeta transMeta = new TransMeta(); - String dorisOutputTable = DorisTableUtils.doristmpName(DorisTableUtils.dorisName(datasetTable.getId())); + String dorisOutputTable = null; DatasourceTypes datasourceType = DatasourceTypes.valueOf(datasource.getType()); DatabaseMeta dataMeta = null; switch (datasourceType) { case mysql: - MysqlConfigrationDTO mysqlConfigrationDTO = new Gson().fromJson(datasource.getConfiguration(), MysqlConfigrationDTO.class); - dataMeta = new DatabaseMeta("db", "MYSQL", "Native", mysqlConfigrationDTO.getHost(), mysqlConfigrationDTO.getDataBase(), mysqlConfigrationDTO.getPort().toString(), mysqlConfigrationDTO.getUsername(), mysqlConfigrationDTO.getPassword()); + MysqlConfigration mysqlConfigration = new Gson().fromJson(datasource.getConfiguration(), MysqlConfigration.class); + dataMeta = new DatabaseMeta("db", "MYSQL", "Native", mysqlConfigration.getHost(), mysqlConfigration.getDataBase(), mysqlConfigration.getPort().toString(), mysqlConfigration.getUsername(), mysqlConfigration.getPassword()); transMeta.addDatabase(dataMeta); break; default: @@ -359,7 +408,7 @@ public class ExtractDataService { } Datasource dorisDatasource = (Datasource)CommonBeanFactory.getBean("DorisDatasource"); - MysqlConfigrationDTO dorisConfigration = new Gson().fromJson(dorisDatasource.getConfiguration(), MysqlConfigrationDTO.class); + MysqlConfigration dorisConfigration = new Gson().fromJson(dorisDatasource.getConfiguration(), MysqlConfigration.class); DatabaseMeta dorisDataMeta = new DatabaseMeta("doris", "MYSQL", "Native", dorisConfigration.getHost(), dorisConfigration.getDataBase(), dorisConfigration.getPort().toString(), dorisConfigration.getUsername(), dorisConfigration.getPassword()); transMeta.addDatabase(dorisDataMeta); StepMeta inputStep = null; @@ -368,56 +417,39 @@ public class ExtractDataService { TransHopMeta hi1 = null; TransHopMeta hi2 = null; String transName = null; + switch (extractType) { case "all_scope": transName = "trans_" + datasetTable.getId(); + dorisOutputTable = DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTable.getId())); selectSQL = dataSetTableService.createQuerySQL(datasource.getType(), table, datasetTableFields.stream().map(DatasetTableField::getOriginName).toArray(String[]::new)); transMeta.setName(transName); - inputStep = inputStep(transMeta, selectSQL); - udjcStep = udjc(datasetTableFields); - outputStep = outputStep(transMeta, dorisOutputTable); - hi1 = new TransHopMeta(inputStep, udjcStep); - hi2 = new TransHopMeta(udjcStep, outputStep); - transMeta.addTransHop(hi1); - transMeta.addTransHop(hi2); - transMeta.addStep(inputStep); - transMeta.addStep(udjcStep); - transMeta.addStep(outputStep); break; case "incremental_add": transName = "trans_add_" + datasetTable.getId(); dorisOutputTable = DorisTableUtils.dorisName(datasetTable.getId()); transMeta.setName(transName); - inputStep = inputStep(transMeta, selectSQL); - udjcStep = udjc(datasetTableFields); - outputStep = outputStep(transMeta, dorisOutputTable); - hi1 = new TransHopMeta(inputStep, udjcStep); - hi2 = new TransHopMeta(udjcStep, outputStep); - transMeta.addTransHop(hi1); - transMeta.addTransHop(hi2); - transMeta.addStep(inputStep); - transMeta.addStep(udjcStep); - transMeta.addStep(outputStep); break; case "incremental_delete": - dorisOutputTable = DorisTableUtils.dorisName(datasetTable.getId()); + dorisOutputTable = DorisTableUtils.dorisDeleteName(DorisTableUtils.dorisName(datasetTable.getId())); transName = "trans_delete_" + datasetTable.getId(); transMeta.setName(transName); - inputStep = inputStep(transMeta, selectSQL); - udjcStep = udjc(datasetTableFields); - outputStep = execSqlStep(transMeta, dorisOutputTable, datasetTableFields); - hi1 = new TransHopMeta(inputStep, udjcStep); - hi2 = new TransHopMeta(udjcStep, outputStep); - transMeta.addTransHop(hi1); - transMeta.addTransHop(hi2); - transMeta.addStep(inputStep); - transMeta.addStep(udjcStep); - transMeta.addStep(outputStep); break; default: break; } + inputStep = inputStep(transMeta, selectSQL); + udjcStep = udjc(datasetTableFields); + outputStep = outputStep(dorisOutputTable); + hi1 = new TransHopMeta(inputStep, udjcStep); + hi2 = new TransHopMeta(udjcStep, outputStep); + transMeta.addTransHop(hi1); + transMeta.addTransHop(hi2); + transMeta.addStep(inputStep); + transMeta.addStep(udjcStep); + transMeta.addStep(outputStep); + String transXml = transMeta.getXML(); File file = new File(root_path + transName + ".ktr"); FileUtils.writeStringToFile(file, transXml, "UTF-8"); @@ -434,14 +466,15 @@ public class ExtractDataService { return fromStep; } - private StepMeta outputStep(TransMeta transMeta, String dorisOutputTable){ - TableOutputMeta tableOutputMeta = new TableOutputMeta(); - DatabaseMeta dorisDatabaseMeta = transMeta.findDatabase("doris"); - tableOutputMeta.setDatabaseMeta(dorisDatabaseMeta); - tableOutputMeta.setTableName(dorisOutputTable); - tableOutputMeta.setCommitSize(10000); - tableOutputMeta.setUseBatchUpdate(true); - StepMeta outputStep = new StepMeta("TableOutput", "TableOutput", tableOutputMeta); + private StepMeta outputStep(String dorisOutputTable){ + TextFileOutputMeta textFileOutputMeta = new TextFileOutputMeta(); + textFileOutputMeta.setEncoding("UTF-8"); + textFileOutputMeta.setHeaderEnabled(false); + textFileOutputMeta.setFilename(root_path + dorisOutputTable); + textFileOutputMeta.setSeparator(separator); + textFileOutputMeta.setExtension(extention); + textFileOutputMeta.setOutputFields(new TextFileField[0]); + StepMeta outputStep = new StepMeta("TextFileOutput", "TextFileOutput", textFileOutputMeta); outputStep.setLocation(600, 100); outputStep.setDraw(true); return outputStep;