diff --git a/backend/pom.xml b/backend/pom.xml
index b2216cd9b2..1ca0c22e7f 100644
--- a/backend/pom.xml
+++ b/backend/pom.xml
@@ -457,33 +457,33 @@
-
- org.apache.maven.plugins
- maven-antrun-plugin
-
-
- main-class-placement
- generate-resources
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- run
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/backend/src/main/java/io/dataease/commons/utils/DorisTableUtils.java b/backend/src/main/java/io/dataease/commons/utils/DorisTableUtils.java
index 74c6e007c8..1b61020a14 100644
--- a/backend/src/main/java/io/dataease/commons/utils/DorisTableUtils.java
+++ b/backend/src/main/java/io/dataease/commons/utils/DorisTableUtils.java
@@ -6,8 +6,12 @@ public class DorisTableUtils {
return "ds_" + datasetId.replace("-", "_");
}
- public static String doristmpName(String dorisName){
- return "tmp" + dorisName;
+ public static String dorisTmpName(String dorisName){
+ return "tmp_" + dorisName;
+ }
+
+ public static String dorisDeleteName(String dorisName){
+ return "delete_" + dorisName;
}
}
diff --git a/backend/src/main/java/io/dataease/config/CommonConfig.java b/backend/src/main/java/io/dataease/config/CommonConfig.java
index 36917dd40e..d7ce6c3e5a 100644
--- a/backend/src/main/java/io/dataease/config/CommonConfig.java
+++ b/backend/src/main/java/io/dataease/config/CommonConfig.java
@@ -33,6 +33,7 @@ public class CommonConfig {
jsonObject.put("password", env.getProperty("doris.password", "dataease"));
jsonObject.put("host", env.getProperty("doris.host", "doris"));
jsonObject.put("port", env.getProperty("doris.port", "9030"));
+ jsonObject.put("httpPort", env.getProperty("doris.httpPort", "8030"));
Datasource datasource = new Datasource();
datasource.setId("doris");
diff --git a/backend/src/main/java/io/dataease/datasource/dto/DorisConfigration.java b/backend/src/main/java/io/dataease/datasource/dto/DorisConfigration.java
new file mode 100644
index 0000000000..b3f7306b5c
--- /dev/null
+++ b/backend/src/main/java/io/dataease/datasource/dto/DorisConfigration.java
@@ -0,0 +1,11 @@
+package io.dataease.datasource.dto;
+
+import lombok.Getter;
+import lombok.Setter;
+
+@Getter
+@Setter
+public class DorisConfigration extends MysqlConfigration {
+
+ private Integer httpPort;
+}
diff --git a/backend/src/main/java/io/dataease/datasource/dto/MysqlConfigrationDTO.java b/backend/src/main/java/io/dataease/datasource/dto/MysqlConfigration.java
similarity index 88%
rename from backend/src/main/java/io/dataease/datasource/dto/MysqlConfigrationDTO.java
rename to backend/src/main/java/io/dataease/datasource/dto/MysqlConfigration.java
index 9e2d444480..2bb05ee873 100644
--- a/backend/src/main/java/io/dataease/datasource/dto/MysqlConfigrationDTO.java
+++ b/backend/src/main/java/io/dataease/datasource/dto/MysqlConfigration.java
@@ -6,7 +6,7 @@ import org.apache.commons.lang3.StringUtils;
@Getter
@Setter
-public class MysqlConfigrationDTO extends JdbcDTO {
+public class MysqlConfigration extends JdbcDTO {
private String driver = "com.mysql.cj.jdbc.Driver";
diff --git a/backend/src/main/java/io/dataease/datasource/provider/JdbcProvider.java b/backend/src/main/java/io/dataease/datasource/provider/JdbcProvider.java
index 5f157aa163..d1d03efa9f 100644
--- a/backend/src/main/java/io/dataease/datasource/provider/JdbcProvider.java
+++ b/backend/src/main/java/io/dataease/datasource/provider/JdbcProvider.java
@@ -3,7 +3,7 @@ package io.dataease.datasource.provider;
import com.google.gson.Gson;
import com.mchange.v2.c3p0.ComboPooledDataSource;
import io.dataease.datasource.constants.DatasourceTypes;
-import io.dataease.datasource.dto.MysqlConfigrationDTO;
+import io.dataease.datasource.dto.MysqlConfigration;
import io.dataease.datasource.dto.SqlServerConfigration;
import io.dataease.datasource.dto.TableFiled;
import io.dataease.datasource.request.DatasourceRequest;
@@ -292,11 +292,11 @@ public class JdbcProvider extends DatasourceProvider {
DatasourceTypes datasourceType = DatasourceTypes.valueOf(datasourceRequest.getDatasource().getType());
switch (datasourceType) {
case mysql:
- MysqlConfigrationDTO mysqlConfigrationDTO = new Gson().fromJson(datasourceRequest.getDatasource().getConfiguration(), MysqlConfigrationDTO.class);
- username = mysqlConfigrationDTO.getUsername();
- password = mysqlConfigrationDTO.getPassword();
- driver = mysqlConfigrationDTO.getDriver();
- jdbcurl = mysqlConfigrationDTO.getJdbc();
+ MysqlConfigration mysqlConfigration = new Gson().fromJson(datasourceRequest.getDatasource().getConfiguration(), MysqlConfigration.class);
+ username = mysqlConfigration.getUsername();
+ password = mysqlConfigration.getPassword();
+ driver = mysqlConfigration.getDriver();
+ jdbcurl = mysqlConfigration.getJdbc();
break;
case sqlServer:
SqlServerConfigration sqlServerConfigration = new Gson().fromJson(datasourceRequest.getDatasource().getConfiguration(), SqlServerConfigration.class);
@@ -323,11 +323,11 @@ public class JdbcProvider extends DatasourceProvider {
DatasourceTypes datasourceType = DatasourceTypes.valueOf(datasourceRequest.getDatasource().getType());
switch (datasourceType) {
case mysql:
- MysqlConfigrationDTO mysqlConfigrationDTO = new Gson().fromJson(datasourceRequest.getDatasource().getConfiguration(), MysqlConfigrationDTO.class);
- dataSource.setUser(mysqlConfigrationDTO.getUsername());
- dataSource.setDriverClass(mysqlConfigrationDTO.getDriver());
- dataSource.setPassword(mysqlConfigrationDTO.getPassword());
- dataSource.setJdbcUrl(mysqlConfigrationDTO.getJdbc());
+ MysqlConfigration mysqlConfigration = new Gson().fromJson(datasourceRequest.getDatasource().getConfiguration(), MysqlConfigration.class);
+ dataSource.setUser(mysqlConfigration.getUsername());
+ dataSource.setDriverClass(mysqlConfigration.getDriver());
+ dataSource.setPassword(mysqlConfigration.getPassword());
+ dataSource.setJdbcUrl(mysqlConfigration.getJdbc());
break;
case sqlServer:
SqlServerConfigration sqlServerConfigration = new Gson().fromJson(datasourceRequest.getDatasource().getConfiguration(), SqlServerConfigration.class);
@@ -345,8 +345,8 @@ public class JdbcProvider extends DatasourceProvider {
DatasourceTypes datasourceType = DatasourceTypes.valueOf(datasourceRequest.getDatasource().getType());
switch (datasourceType) {
case mysql:
- MysqlConfigrationDTO mysqlConfigrationDTO = new Gson().fromJson(datasourceRequest.getDatasource().getConfiguration(), MysqlConfigrationDTO.class);
- return mysqlConfigrationDTO.getDataBase();
+ MysqlConfigration mysqlConfigration = new Gson().fromJson(datasourceRequest.getDatasource().getConfiguration(), MysqlConfigration.class);
+ return mysqlConfigration.getDataBase();
case sqlServer:
SqlServerConfigration sqlServerConfigration = new Gson().fromJson(datasourceRequest.getDatasource().getConfiguration(), SqlServerConfigration.class);
return sqlServerConfigration.getDataBase();
diff --git a/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java b/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java
index d32518b05f..f6f94e4c4c 100644
--- a/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java
+++ b/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java
@@ -10,7 +10,8 @@ import io.dataease.commons.utils.CommonBeanFactory;
import io.dataease.commons.utils.DorisTableUtils;
import io.dataease.commons.utils.LogUtil;
import io.dataease.datasource.constants.DatasourceTypes;
-import io.dataease.datasource.dto.MysqlConfigrationDTO;
+import io.dataease.datasource.dto.DorisConfigration;
+import io.dataease.datasource.dto.MysqlConfigration;
import io.dataease.datasource.provider.JdbcProvider;
import io.dataease.datasource.request.DatasourceRequest;
import io.dataease.dto.dataset.DataSetTaskLogDTO;
@@ -25,6 +26,7 @@ import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobExecutionConfiguration;
import org.pentaho.di.job.JobHopMeta;
import org.pentaho.di.job.JobMeta;
+import org.pentaho.di.job.entries.shell.JobEntryShell;
import org.pentaho.di.job.entries.special.JobEntrySpecial;
import org.pentaho.di.job.entries.success.JobEntrySuccess;
import org.pentaho.di.job.entries.trans.JobEntryTrans;
@@ -36,7 +38,8 @@ import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.steps.sql.ExecSQLMeta;
import org.pentaho.di.trans.steps.tableinput.TableInputMeta;
-import org.pentaho.di.trans.steps.tableoutput.TableOutputMeta;
+import org.pentaho.di.trans.steps.textfileoutput.TextFileField;
+import org.pentaho.di.trans.steps.textfileoutput.TextFileOutputMeta;
import org.pentaho.di.trans.steps.userdefinedjavaclass.UserDefinedJavaClassDef;
import org.pentaho.di.trans.steps.userdefinedjavaclass.UserDefinedJavaClassMeta;
import org.pentaho.di.www.SlaveServerJobStatus;
@@ -68,11 +71,9 @@ public class ExtractDataService {
private static String lastUpdateTime = "${__last_update_time__}";
private static String currentUpdateTime = "${__current_update_time__}";
- private static String dataease_column_family = "dataease";
+ private static String separator = "|";
+ private static String extention = "txt";
private static String root_path = "/opt/dataease/data/kettle/";
- private static String data_path = "/opt/dataease/data/db/";
- private static String hbase_conf_file = "/opt/dataease/conf/hbase-site.xml";
- private static String pentaho_mappings = "pentaho_mappings";
@Value("${carte.host:127.0.0.1}")
private String carte;
@@ -85,8 +86,14 @@ public class ExtractDataService {
private static String creatTableSql = "CREATE TABLE IF NOT EXISTS TABLE_NAME" +
"Column_Fields" +
+ "UNIQUE KEY(dataease_uuid)\n" +
+ "DISTRIBUTED BY HASH(dataease_uuid) BUCKETS 10\n" +
"PROPERTIES(\"replication_num\" = \"1\");";
+ private static String shellScript = "curl --location-trusted -u %s:%s -H \"label:%s\" -H \"column_separator:%s\" -H \"columns:%s\" -H \"merge_type: %s\" -T %s -XPUT http://%s:%s/api/%s/%s/_stream_load\n" +
+// "rm -rf %s\n" +
+ "return $?";
+
private String createDorisTablColumnSql( List datasetTableFields){
String Column_Fields = "dataease_uuid varchar(50),";
for (DatasetTableField datasetTableField : datasetTableFields) {
@@ -110,7 +117,7 @@ public class ExtractDataService {
}
}
Column_Fields = Column_Fields.substring(0, Column_Fields.length() -1 );
- Column_Fields = "(" + Column_Fields + ")" + "DISTRIBUTED BY HASH(dataease_uuid) BUCKETS 10\n";
+ Column_Fields = "(" + Column_Fields + ")\n";
return Column_Fields;
}
@@ -120,6 +127,7 @@ public class ExtractDataService {
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(dorisDatasource);
datasourceRequest.setQuery(creatTableSql.replace("TABLE_NAME", dorisTableName).replace("Column_Fields", dorisTablColumnSql));
+ System.out.println(datasourceRequest.getQuery());
jdbcProvider.exec(datasourceRequest);
}
@@ -128,7 +136,7 @@ public class ExtractDataService {
JdbcProvider jdbcProvider = CommonBeanFactory.getBean(JdbcProvider.class);;
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(dorisDatasource);
- datasourceRequest.setQuery("ALTER TABLE DORIS_TABLE REPLACE WITH TABLE DORIS_TMP_TABLE PROPERTIES('swap' = 'false');".replace("DORIS_TABLE", dorisTableName).replace("DORIS_TMP_TABLE", DorisTableUtils.doristmpName(dorisTableName)));
+ datasourceRequest.setQuery("ALTER TABLE DORIS_TABLE REPLACE WITH TABLE DORIS_TMP_TABLE PROPERTIES('swap' = 'false');".replace("DORIS_TABLE", dorisTableName).replace("DORIS_TMP_TABLE", DorisTableUtils.dorisTmpName(dorisTableName)));
jdbcProvider.exec(datasourceRequest);
}
@@ -139,6 +147,16 @@ public class ExtractDataService {
DatasetTable datasetTable = dataSetTableService.get(datasetTableId);
Datasource datasource = datasourceMapper.selectByPrimaryKey(datasetTable.getDataSourceId());
List datasetTableFields = dataSetTableFieldsService.list(DatasetTableField.builder().tableId(datasetTable.getId()).build());
+ datasetTableFields.sort((o1, o2) -> {
+ if (o1.getOriginName() == null) {
+ return -1;
+ }
+ if (o2.getOriginName() == null) {
+ return 1;
+ }
+ return o1.getOriginName().compareTo(o2.getOriginName());
+ });
+
String tableName = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getTable();
String dorisTablColumnSql = createDorisTablColumnSql(datasetTableFields);
switch (updateType) {
@@ -147,9 +165,9 @@ public class ExtractDataService {
writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
// TODO before: check doris table column type
createDorisTable(DorisTableUtils.dorisName(datasetTableId), dorisTablColumnSql);
- createDorisTable(DorisTableUtils.doristmpName(DorisTableUtils.dorisName(datasetTableId)), dorisTablColumnSql);
+ createDorisTable(DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTableId)), dorisTablColumnSql);
generateTransFile("all_scope", datasetTable, datasource, tableName, datasetTableFields, null);
- generateJobFile("all_scope", datasetTable);
+ generateJobFile("all_scope", datasetTable, datasetTableFields);
extractData(datasetTable, "all_scope");
replaceTable(DorisTableUtils.dorisName(datasetTableId));
datasetTableTaskLog.setStatus(JobStatus.Completed.name());
@@ -177,7 +195,7 @@ public class ExtractDataService {
String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString()
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString()));
generateTransFile("incremental_add", datasetTable, datasource, tableName, datasetTableFields, sql);
- generateJobFile("incremental_add", datasetTable);
+ generateJobFile("incremental_add", datasetTable, datasetTableFields);
extractData(datasetTable, "incremental_add");
}
@@ -186,7 +204,7 @@ public class ExtractDataService {
String sql = datasetTableIncrementalConfig.getIncrementalDelete().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString()
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString()));
generateTransFile("incremental_delete", datasetTable, datasource, tableName, datasetTableFields, sql);
- generateJobFile("incremental_delete", datasetTable);
+ generateJobFile("incremental_delete", datasetTable, datasetTableFields);
extractData(datasetTable, "incremental_delete");
}
datasetTableTaskLog.setStatus(JobStatus.Completed.name());
@@ -242,9 +260,14 @@ public class ExtractDataService {
jobExecutionConfiguration.setRepository(repository);
String lastCarteObjectId = Job.sendToSlaveServer(jobMeta, jobExecutionConfiguration, repository, null);
SlaveServerJobStatus jobStatus = null;
- do {
+ boolean running = true;
+ while(running) {
jobStatus = remoteSlaveServer.getJobStatus(jobMeta.getName(), lastCarteObjectId, 0);
- } while (jobStatus != null && jobStatus.isRunning());
+ running = jobStatus.isRunning();
+ if(!running)
+ break;
+ Thread.sleep(1000);
+ }
if (jobStatus.getStatusDescription().equals("Finished")) {
return;
} else {
@@ -266,16 +289,25 @@ public class ExtractDataService {
return remoteSlaveServer;
}
- private void generateJobFile(String extractType, DatasetTable datasetTable) throws Exception {
+ private void generateJobFile(String extractType, DatasetTable datasetTable, List datasetTableFields) throws Exception {
+ String dorisOutputTable = null;
String jobName = null;
+ String script = null;
+ Datasource dorisDatasource = (Datasource)CommonBeanFactory.getBean("DorisDatasource");
+ DorisConfigration dorisConfigration = new Gson().fromJson(dorisDatasource.getConfiguration(), DorisConfigration.class);
+ String columns = String.join(",", datasetTableFields.stream().map(DatasetTableField::getOriginName).collect(Collectors.toList())) + ",dataease_uuid";
+
switch (extractType) {
case "all_scope":
jobName = "job_" + datasetTable.getId();
+ script = String.format(shellScript, dorisConfigration.getUsername(), dorisConfigration.getPassword(), String.valueOf(System.currentTimeMillis()), separator, columns, "APPEND",root_path + dorisOutputTable + "." + extention, dorisConfigration.getHost(),dorisConfigration.getHttpPort(), dorisConfigration.getDataBase(), dorisOutputTable, root_path + dorisOutputTable + "." + extention);
break;
case "incremental_add":
jobName = "job_add_" + datasetTable.getId();
+ script = String.format(shellScript, dorisConfigration.getUsername(), dorisConfigration.getPassword(), String.valueOf(System.currentTimeMillis()), separator, columns, "APPEND", root_path + dorisOutputTable + "." + extention, dorisConfigration.getHost(),dorisConfigration.getHttpPort(), dorisConfigration.getDataBase(), dorisOutputTable, root_path + dorisOutputTable + "." + extention);
break;
case "incremental_delete":
+ script = String.format(shellScript, dorisConfigration.getUsername(), dorisConfigration.getPassword(), String.valueOf(System.currentTimeMillis()), separator, columns, "DELETE", root_path + dorisOutputTable + "." + extention, dorisConfigration.getHost(),dorisConfigration.getHttpPort(), dorisConfigration.getDataBase(), dorisOutputTable, root_path + dorisOutputTable + "." + extention);
jobName = "job_delete_" + datasetTable.getId();
break;
default:
@@ -286,12 +318,15 @@ public class ExtractDataService {
switch (extractType) {
case "all_scope":
transName = "trans_" + datasetTable.getId();
+ dorisOutputTable = DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTable.getId()));
break;
case "incremental_add":
transName = "trans_add_" + datasetTable.getId();
+ dorisOutputTable = DorisTableUtils.dorisName(datasetTable.getId());
break;
case "incremental_delete":
transName = "trans_delete_" + datasetTable.getId();
+ dorisOutputTable = DorisTableUtils.dorisName(datasetTable.getId());
break;
default:
break;
@@ -299,6 +334,8 @@ public class ExtractDataService {
JobMeta jobMeta = new JobMeta();
jobMeta.setName(jobName);
+
+ //start
JobEntrySpecial start = new JobEntrySpecial();
start.setName("START");
start.setStart(true);
@@ -307,6 +344,7 @@ public class ExtractDataService {
startEntry.setLocation(100, 100);
jobMeta.addJobEntry(startEntry);
+ //trans
JobEntryTrans transrans = new JobEntryTrans();
transrans.setTransname(transName);
transrans.setName("Transformation");
@@ -317,41 +355,52 @@ public class ExtractDataService {
jobMeta.addJobHop(new JobHopMeta(startEntry, transEntry));
+ //exec shell
+ JobEntryShell shell = new JobEntryShell();
+ shell.setScript(script);
+ shell.insertScript = true;
+ shell.setName("shell");
+ JobEntryCopy shellEntry = new JobEntryCopy(shell);
+ shellEntry.setDrawn(true);
+ shellEntry.setLocation(500, 100);
+ jobMeta.addJobEntry(shellEntry);
+
+ JobHopMeta transHop = new JobHopMeta(transEntry, shellEntry);
+ transHop.setEvaluation(true);
+ jobMeta.addJobHop(transHop);
+
+ //success
JobEntrySuccess success = new JobEntrySuccess();
success.setName("Success");
JobEntryCopy successEntry = new JobEntryCopy(success);
successEntry.setDrawn(true);
- successEntry.setLocation(500, 100);
+ successEntry.setLocation(700, 100);
jobMeta.addJobEntry(successEntry);
- JobHopMeta greenHop = new JobHopMeta(transEntry, successEntry);
+
+ JobHopMeta greenHop = new JobHopMeta(shellEntry, successEntry);
greenHop.setEvaluation(true);
jobMeta.addJobHop(greenHop);
+
+ if(!extractType.equals("incremental_delete")){
+
+ }
+
String jobXml = jobMeta.getXML();
File file = new File(root_path + jobName + ".kjb");
FileUtils.writeStringToFile(file, jobXml, "UTF-8");
}
private void generateTransFile(String extractType, DatasetTable datasetTable, Datasource datasource, String table, List datasetTableFields, String selectSQL) throws Exception {
- datasetTableFields.sort((o1, o2) -> {
- if (o1.getOriginName() == null) {
- return -1;
- }
- if (o2.getOriginName() == null) {
- return 1;
- }
- return o1.getOriginName().compareTo(o2.getOriginName());
- });
-
TransMeta transMeta = new TransMeta();
- String dorisOutputTable = DorisTableUtils.doristmpName(DorisTableUtils.dorisName(datasetTable.getId()));
+ String dorisOutputTable = null;
DatasourceTypes datasourceType = DatasourceTypes.valueOf(datasource.getType());
DatabaseMeta dataMeta = null;
switch (datasourceType) {
case mysql:
- MysqlConfigrationDTO mysqlConfigrationDTO = new Gson().fromJson(datasource.getConfiguration(), MysqlConfigrationDTO.class);
- dataMeta = new DatabaseMeta("db", "MYSQL", "Native", mysqlConfigrationDTO.getHost(), mysqlConfigrationDTO.getDataBase(), mysqlConfigrationDTO.getPort().toString(), mysqlConfigrationDTO.getUsername(), mysqlConfigrationDTO.getPassword());
+ MysqlConfigration mysqlConfigration = new Gson().fromJson(datasource.getConfiguration(), MysqlConfigration.class);
+ dataMeta = new DatabaseMeta("db", "MYSQL", "Native", mysqlConfigration.getHost(), mysqlConfigration.getDataBase(), mysqlConfigration.getPort().toString(), mysqlConfigration.getUsername(), mysqlConfigration.getPassword());
transMeta.addDatabase(dataMeta);
break;
default:
@@ -359,7 +408,7 @@ public class ExtractDataService {
}
Datasource dorisDatasource = (Datasource)CommonBeanFactory.getBean("DorisDatasource");
- MysqlConfigrationDTO dorisConfigration = new Gson().fromJson(dorisDatasource.getConfiguration(), MysqlConfigrationDTO.class);
+ MysqlConfigration dorisConfigration = new Gson().fromJson(dorisDatasource.getConfiguration(), MysqlConfigration.class);
DatabaseMeta dorisDataMeta = new DatabaseMeta("doris", "MYSQL", "Native", dorisConfigration.getHost(), dorisConfigration.getDataBase(), dorisConfigration.getPort().toString(), dorisConfigration.getUsername(), dorisConfigration.getPassword());
transMeta.addDatabase(dorisDataMeta);
StepMeta inputStep = null;
@@ -368,56 +417,39 @@ public class ExtractDataService {
TransHopMeta hi1 = null;
TransHopMeta hi2 = null;
String transName = null;
+
switch (extractType) {
case "all_scope":
transName = "trans_" + datasetTable.getId();
+ dorisOutputTable = DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTable.getId()));
selectSQL = dataSetTableService.createQuerySQL(datasource.getType(), table, datasetTableFields.stream().map(DatasetTableField::getOriginName).toArray(String[]::new));
transMeta.setName(transName);
- inputStep = inputStep(transMeta, selectSQL);
- udjcStep = udjc(datasetTableFields);
- outputStep = outputStep(transMeta, dorisOutputTable);
- hi1 = new TransHopMeta(inputStep, udjcStep);
- hi2 = new TransHopMeta(udjcStep, outputStep);
- transMeta.addTransHop(hi1);
- transMeta.addTransHop(hi2);
- transMeta.addStep(inputStep);
- transMeta.addStep(udjcStep);
- transMeta.addStep(outputStep);
break;
case "incremental_add":
transName = "trans_add_" + datasetTable.getId();
dorisOutputTable = DorisTableUtils.dorisName(datasetTable.getId());
transMeta.setName(transName);
- inputStep = inputStep(transMeta, selectSQL);
- udjcStep = udjc(datasetTableFields);
- outputStep = outputStep(transMeta, dorisOutputTable);
- hi1 = new TransHopMeta(inputStep, udjcStep);
- hi2 = new TransHopMeta(udjcStep, outputStep);
- transMeta.addTransHop(hi1);
- transMeta.addTransHop(hi2);
- transMeta.addStep(inputStep);
- transMeta.addStep(udjcStep);
- transMeta.addStep(outputStep);
break;
case "incremental_delete":
- dorisOutputTable = DorisTableUtils.dorisName(datasetTable.getId());
+ dorisOutputTable = DorisTableUtils.dorisDeleteName(DorisTableUtils.dorisName(datasetTable.getId()));
transName = "trans_delete_" + datasetTable.getId();
transMeta.setName(transName);
- inputStep = inputStep(transMeta, selectSQL);
- udjcStep = udjc(datasetTableFields);
- outputStep = execSqlStep(transMeta, dorisOutputTable, datasetTableFields);
- hi1 = new TransHopMeta(inputStep, udjcStep);
- hi2 = new TransHopMeta(udjcStep, outputStep);
- transMeta.addTransHop(hi1);
- transMeta.addTransHop(hi2);
- transMeta.addStep(inputStep);
- transMeta.addStep(udjcStep);
- transMeta.addStep(outputStep);
break;
default:
break;
}
+ inputStep = inputStep(transMeta, selectSQL);
+ udjcStep = udjc(datasetTableFields);
+ outputStep = outputStep(dorisOutputTable);
+ hi1 = new TransHopMeta(inputStep, udjcStep);
+ hi2 = new TransHopMeta(udjcStep, outputStep);
+ transMeta.addTransHop(hi1);
+ transMeta.addTransHop(hi2);
+ transMeta.addStep(inputStep);
+ transMeta.addStep(udjcStep);
+ transMeta.addStep(outputStep);
+
String transXml = transMeta.getXML();
File file = new File(root_path + transName + ".ktr");
FileUtils.writeStringToFile(file, transXml, "UTF-8");
@@ -434,14 +466,15 @@ public class ExtractDataService {
return fromStep;
}
- private StepMeta outputStep(TransMeta transMeta, String dorisOutputTable){
- TableOutputMeta tableOutputMeta = new TableOutputMeta();
- DatabaseMeta dorisDatabaseMeta = transMeta.findDatabase("doris");
- tableOutputMeta.setDatabaseMeta(dorisDatabaseMeta);
- tableOutputMeta.setTableName(dorisOutputTable);
- tableOutputMeta.setCommitSize(10000);
- tableOutputMeta.setUseBatchUpdate(true);
- StepMeta outputStep = new StepMeta("TableOutput", "TableOutput", tableOutputMeta);
+ private StepMeta outputStep(String dorisOutputTable){
+ TextFileOutputMeta textFileOutputMeta = new TextFileOutputMeta();
+ textFileOutputMeta.setEncoding("UTF-8");
+ textFileOutputMeta.setHeaderEnabled(false);
+ textFileOutputMeta.setFilename(root_path + dorisOutputTable);
+ textFileOutputMeta.setSeparator(separator);
+ textFileOutputMeta.setExtension(extention);
+ textFileOutputMeta.setOutputFields(new TextFileField[0]);
+ StepMeta outputStep = new StepMeta("TextFileOutput", "TextFileOutput", textFileOutputMeta);
outputStep.setLocation(600, 100);
outputStep.setDraw(true);
return outputStep;