From 0056839869ec5de44ef7ccf448a1a0d42cecd6c5 Mon Sep 17 00:00:00 2001 From: taojinlong Date: Thu, 8 Apr 2021 16:58:48 +0800 Subject: [PATCH 1/4] =?UTF-8?q?feat:=20kettle=20=E6=8A=BD=E5=8F=96?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=88=B0=20hbase?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/pom.xml | 52 ++ .../java/io/dataease/config/CommonConfig.java | 16 +- .../service/dataset/ExtractDataService.java | 540 +++++++++++++++--- frontend/package.json | 1 + 4 files changed, 522 insertions(+), 87 deletions(-) diff --git a/backend/pom.xml b/backend/pom.xml index 38822a30a2..6de71fa692 100644 --- a/backend/pom.xml +++ b/backend/pom.xml @@ -385,6 +385,42 @@ 3.0.8 + + pentaho-kettle + kettle-core + 8.3.0.18-1084 + + + pentaho-kettle + kettle-engine + 8.3.0.18-1084 + + + pentaho + metastore + 8.3.0.18-1084 + + + pentaho + pentaho-big-data-kettle-plugins-hbase-meta + 8.3.0.18-1084 + + + pentaho + pentaho-big-data-kettle-plugins-hbase + 8.3.0.18-1084 + + + pentaho + pentaho-big-data-impl-cluster + 8.3.0.18-1084 + + + org.pentaho.di.plugins + pdi-engine-configuration-impl + 8.3.0.7-683 + + @@ -521,4 +557,20 @@ + + + pentaho-public + Pentaho Public + http://nexus.pentaho.org/content/groups/omni + + true + always + + + true + always + + + + diff --git a/backend/src/main/java/io/dataease/config/CommonConfig.java b/backend/src/main/java/io/dataease/config/CommonConfig.java index e3a73048ed..94e5fb0050 100644 --- a/backend/src/main/java/io/dataease/config/CommonConfig.java +++ b/backend/src/main/java/io/dataease/config/CommonConfig.java @@ -4,6 +4,9 @@ import com.fit2cloud.autoconfigure.QuartzAutoConfiguration; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; +import org.pentaho.di.core.KettleEnvironment; +import org.pentaho.di.repository.filerep.KettleFileRepository; +import org.pentaho.di.repository.filerep.KettleFileRepositoryMeta; import org.springframework.boot.autoconfigure.AutoConfigureBefore; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; @@ -18,7 +21,7 @@ public class CommonConfig { @Resource private Environment env; // 保存了配置文件的信息 - + private static String root_path = "/opt/dataease/data/kettle/"; @Bean @ConditionalOnMissingBean @@ -51,4 +54,15 @@ public class CommonConfig { sqlContext.setConf("spark.default.parallelism", env.getProperty("spark.default.parallelism", "1")); return sqlContext; } + + @Bean + @ConditionalOnMissingBean + public KettleFileRepository kettleFileRepository()throws Exception{ + KettleEnvironment.init(); + KettleFileRepository repository = new KettleFileRepository(); + KettleFileRepositoryMeta kettleDatabaseMeta = new KettleFileRepositoryMeta("KettleFileRepository", "repo", + "dataease kettle repo", root_path); + repository.init(kettleDatabaseMeta); + return repository; + } } diff --git a/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java b/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java index 7078f03f5f..55bbf4d263 100644 --- a/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java +++ b/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java @@ -1,28 +1,89 @@ package io.dataease.service.dataset; import com.google.gson.Gson; +import com.sun.org.apache.bcel.internal.generic.SWITCH; import io.dataease.base.domain.*; +import io.dataease.base.mapper.DatasourceMapper; import io.dataease.commons.constants.JobStatus; import io.dataease.commons.constants.ScheduleType; import io.dataease.commons.constants.UpdateType; import io.dataease.commons.utils.CommonBeanFactory; import io.dataease.commons.utils.LogUtil; +import io.dataease.datasource.constants.DatasourceTypes; +import io.dataease.datasource.dto.MysqlConfigrationDTO; import io.dataease.dto.dataset.DataSetTaskLogDTO; import io.dataease.dto.dataset.DataTableInfoDTO; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; +import org.pentaho.big.data.api.cluster.NamedCluster; +import org.pentaho.big.data.api.cluster.NamedClusterService; +import org.pentaho.big.data.api.cluster.service.locator.NamedClusterServiceLocator; +import org.pentaho.big.data.api.cluster.service.locator.impl.NamedClusterServiceLocatorImpl; +import org.pentaho.big.data.api.initializer.ClusterInitializer; +import org.pentaho.big.data.api.initializer.ClusterInitializerProvider; +import org.pentaho.big.data.api.initializer.impl.ClusterInitializerImpl; +import org.pentaho.big.data.impl.cluster.NamedClusterImpl; +import org.pentaho.big.data.impl.cluster.NamedClusterManager; +import org.pentaho.big.data.kettle.plugins.hbase.MappingDefinition; +import org.pentaho.big.data.kettle.plugins.hbase.output.HBaseOutputMeta; +import org.pentaho.di.cluster.SlaveServer; +import org.pentaho.di.core.KettleEnvironment; +import org.pentaho.di.core.database.DatabaseMeta; +import org.pentaho.di.core.plugins.PluginRegistry; +import org.pentaho.di.core.plugins.StepPluginType; +import org.pentaho.di.core.util.EnvUtil; +import org.pentaho.di.engine.configuration.impl.pentaho.DefaultRunConfiguration; +import org.pentaho.di.job.Job; +import org.pentaho.di.job.JobExecutionConfiguration; +import org.pentaho.di.job.JobHopMeta; +import org.pentaho.di.job.JobMeta; +import org.pentaho.di.job.entries.special.JobEntrySpecial; +import org.pentaho.di.job.entries.success.JobEntrySuccess; +import org.pentaho.di.job.entries.trans.JobEntryTrans; +import org.pentaho.di.job.entries.writetolog.JobEntryWriteToLog; +import org.pentaho.di.job.entry.JobEntryCopy; +import org.pentaho.di.repository.RepositoryDirectoryInterface; +import org.pentaho.di.repository.filerep.KettleFileRepository; +import org.pentaho.di.repository.filerep.KettleFileRepositoryMeta; +import org.pentaho.di.trans.TransConfiguration; +import org.pentaho.di.trans.TransExecutionConfiguration; +import org.pentaho.di.trans.TransHopMeta; +import org.pentaho.di.trans.TransMeta; +import org.pentaho.di.trans.step.StepMeta; +import org.pentaho.di.trans.steps.tableinput.TableInputMeta; +import org.pentaho.di.trans.steps.userdefinedjavaclass.InfoStepDefinition; +import org.pentaho.di.trans.steps.userdefinedjavaclass.UserDefinedJavaClassDef; +import org.pentaho.di.trans.steps.userdefinedjavaclass.UserDefinedJavaClassMeta; +import org.pentaho.di.www.SlaveServerJobStatus; +import org.pentaho.runtime.test.RuntimeTest; +import org.pentaho.runtime.test.RuntimeTester; +import org.pentaho.runtime.test.action.RuntimeTestActionHandler; +import org.pentaho.runtime.test.action.RuntimeTestActionService; +import org.pentaho.runtime.test.action.impl.RuntimeTestActionServiceImpl; +import org.pentaho.runtime.test.impl.RuntimeTesterImpl; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import org.pentaho.di.core.row.ValueMetaInterface; +import scala.annotation.meta.field; import javax.annotation.Resource; +import javax.sound.sampled.Line; +import java.io.File; import java.security.MessageDigest; +import java.sql.ResultSet; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.List; -import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import static org.mockito.Mockito.mock; + @Service public class ExtractDataService { @@ -34,12 +95,31 @@ public class ExtractDataService { private DataSetTableTaskLogService dataSetTableTaskLogService; @Resource private DataSetTableTaskService dataSetTableTaskService; - private Long pageSize = 10000l; + @Resource + private DatasourceMapper datasourceMapper; private static ExecutorService pool = Executors.newScheduledThreadPool(50); //设置连接池 private Connection connection; + private static String lastUpdateTime = "${__last_update_time__}"; private static String currentUpdateTime = "${__current_update_time__}"; - private static String column_family = "dataease"; + private static String dataease_column_family = "dataease"; + private static String root_path = "/opt/dataease/data/kettle/"; + private static String hbase_conf_file = "/opt/dataease/conf/hbase-site.xml"; + private static String pentaho_mappings = "pentaho_mappings"; + + @Value("${carte.host:127.0.0.1}") + private String carte; + @Value("${carte.port:8080}") + private String port; + @Value("${carte.user:cluster}") + private String user; + @Value("${carte.passwd:cluster}") + private String passwd; + @Value("${hbase.zookeeper.quorum:zookeeper}") + private String zkHost; + @Value("${hbase.zookeeper.property.clientPort:2181}") + private String zkPort; + public void extractData(String datasetTableId, String taskId, String type) { DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog(); @@ -47,24 +127,41 @@ public class ExtractDataService { try { Admin admin = getConnection().getAdmin(); DatasetTable datasetTable = dataSetTableService.get(datasetTableId); + Datasource datasource = datasourceMapper.selectByPrimaryKey(datasetTable.getDataSourceId()); List datasetTableFields = dataSetTableFieldsService.list(DatasetTableField.builder().tableId(datasetTable.getId()).build()); String table = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getTable(); - TableName tableName = TableName.valueOf(table + "-" + datasetTable.getDataSourceId()); + TableName hbaseTable = TableName.valueOf(datasetTableId); switch (updateType){ // 全量更新 case all_scope: - writeDatasetTableTaskLog(datasetTableTaskLog,datasetTableId, taskId); - if(!admin.tableExists(tableName)){ - creatHaseTable(tableName, admin); + writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId); + + //check pentaho_mappings table + TableName pentaho_mappings = TableName.valueOf(this.pentaho_mappings); + if(!admin.tableExists(pentaho_mappings)){ + creatHaseTable(pentaho_mappings, admin, Arrays.asList("columns","key")); } - extractAllData(admin, tableName, table, datasetTable, datasetTableFields); + + //check pentaho files + if(!isExitFile("job_" + datasetTableId + ".kjb") || !isExitFile("trans_" + datasetTableId + ".ktr")){ + generateTransFile("all_scope", datasetTable, datasource, table, datasetTableFields, null); + generateJobFile("all_scope", datasetTable); + } + + if(!admin.tableExists(hbaseTable)){ + creatHaseTable(hbaseTable, admin, Arrays.asList(dataease_column_family)); + } + admin.disableTable(hbaseTable); + admin.truncateTable(hbaseTable, true); + + extractData(datasetTable, "all_scope"); datasetTableTaskLog.setStatus(JobStatus.Completed.name()); datasetTableTaskLog.setEndTime(System.currentTimeMillis()); dataSetTableTaskLogService.save(datasetTableTaskLog); break; case add_scope: // 增量更新 - if(!admin.tableExists(tableName)){ + if(!admin.tableExists(hbaseTable)){ LogUtil.error("TableName error, dataaset: " + datasetTableId); return; } @@ -82,17 +179,28 @@ public class ExtractDataService { writeDatasetTableTaskLog(datasetTableTaskLog,datasetTableId, taskId); // 增量添加 - if(StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd())){ + if(StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd().replace(" ", ""))){ + System.out.println("datasetTableIncrementalConfig.getIncrementalAdd(): " + datasetTableIncrementalConfig.getIncrementalAdd()); String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString() .replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString())); - extractIncrementalData(tableName,table,datasetTable, datasetTableFields, sql, "add"); + + if(!isExitFile("job_add_" + datasetTableId + ".kjb") || !isExitFile("trans_add_" + datasetTableId + ".ktr")){ + generateTransFile("incremental_add", datasetTable, datasource, table, datasetTableFields, sql); + generateJobFile("incremental_add", datasetTable); + } + + extractData(datasetTable, "incremental_add"); } // 增量删除 if( StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete())){ String sql = datasetTableIncrementalConfig.getIncrementalDelete().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString() .replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString())); - extractIncrementalData(tableName,table,datasetTable, datasetTableFields, sql, "delete"); + if(!isExitFile("job_delete_" + datasetTableId + ".kjb") || !isExitFile("trans_delete_" + datasetTableId + ".ktr")){ + generateTransFile("incremental_delete", datasetTable, datasource, table, datasetTableFields, sql); + generateJobFile("incremental_delete", datasetTable); + } + extractData(datasetTable, "incremental_delete"); } datasetTableTaskLog.setStatus(JobStatus.Completed.name()); @@ -125,57 +233,50 @@ public class ExtractDataService { dataSetTableTaskLogService.save(datasetTableTaskLog); } - private void creatHaseTable(TableName tableName, Admin admin)throws Exception{ + private void creatHaseTable(TableName tableName, Admin admin, List columnFamily)throws Exception{ TableDescriptorBuilder descBuilder = TableDescriptorBuilder.newBuilder(tableName); - ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(column_family); - descBuilder.setColumnFamily(hcd); + Collection families = new ArrayList<>(); + for (String s : columnFamily) { + ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(s); + families.add(hcd); + } + descBuilder.setColumnFamilies(families); TableDescriptor desc = descBuilder.build(); admin.createTable(desc); } - private void extractAllData(Admin admin, TableName tableName, String table, DatasetTable datasetTable, List datasetTableFields)throws Exception{ - admin.disableTable(tableName); - admin.truncateTable(tableName, true); - - Table tab = getConnection().getTable(tableName); - Long total = dataSetTableService.getDataSetTotalData(datasetTable.getDataSourceId(), table); - Long pageCount = total % pageSize == 0 ? total / pageSize : (total / pageSize) + 1; - - for (Long pageIndex = 1l; pageIndex <= pageCount; pageIndex++) { - List data = dataSetTableService.getDataSetPageData(datasetTable.getDataSourceId(), table, datasetTableFields, pageIndex, pageSize); - insertDataToHbaseTable(data,datasetTableFields,tab); + private void extractData(DatasetTable datasetTable, String extractType)throws Exception{ + KettleFileRepository repository = CommonBeanFactory.getBean(KettleFileRepository.class); + RepositoryDirectoryInterface repositoryDirectoryInterface = repository.loadRepositoryDirectoryTree(); + JobMeta jobMeta = null; + switch (extractType){ + case "all_scope": + jobMeta = repository.loadJob("job_" + datasetTable.getId(), repositoryDirectoryInterface, null, null); + break; + case "incremental_add": + jobMeta = repository.loadJob("job_add_" + datasetTable.getId(), repositoryDirectoryInterface, null, null); + break; + case "incremental_delete": + jobMeta = repository.loadJob("job_delete_" + datasetTable.getId(), repositoryDirectoryInterface, null, null); + break; + default: + break; } - } - private void extractIncrementalData(TableName tableName, String table, DatasetTable datasetTable, List datasetTableFields, String sql, String type)throws Exception{ - Table tab = getConnection().getTable(tableName); - List data = dataSetTableService.getDataSetDataBySql(datasetTable.getDataSourceId(), table, sql); - if (type.equalsIgnoreCase("add")){ - insertDataToHbaseTable(data,datasetTableFields,tab); - }else { - deleteDataFromHbaseTable(data,datasetTableFields,tab); - } - } - - private void insertDataToHbaseTable(List data, List datasetTableFields, Table tab)throws Exception{ - for (String[] d : data) { - Put put = new Put(md5(generateStr(datasetTableFields.size(), d)).getBytes()); - for(int i=0;i data, List datasetTableFields, Table tab)throws Exception{ - for (String[] d : data) { - Delete delete = new Delete(md5(generateStr(datasetTableFields.size(), d)).getBytes()); - tab.delete(delete); - } + SlaveServer remoteSlaveServer = getSlaveServer(); + JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration(); + jobExecutionConfiguration.setRemoteServer(remoteSlaveServer); + jobExecutionConfiguration.setRepository(repository); + String lastCarteObjectId = Job.sendToSlaveServer(jobMeta, jobExecutionConfiguration, repository, null); + SlaveServerJobStatus jobStatus = null; + do { + jobStatus = remoteSlaveServer.getJobStatus(jobMeta.getName(), lastCarteObjectId, 0); + } while (jobStatus != null && jobStatus.isRunning()); + if(jobStatus.getStatusDescription().equals("Finished")){ + return; + }else { + throw new Exception(jobStatus.getLoggingString()); + } } private synchronized Connection getConnection() throws Exception{ @@ -186,42 +287,309 @@ public class ExtractDataService { return connection; } - - private static final char[] HEX_DIGITS = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; - private static final String UTF_8 = "UTF-8"; - - public static String md5(String src) { - return md5(src, UTF_8); + private boolean isExitFile(String fileName){ + File file=new File(root_path + fileName); + return file.exists(); } - public static String md5(String src, String charset) { - try { - byte[] strTemp = io.micrometer.core.instrument.util.StringUtils.isEmpty(charset) ? src.getBytes() : src.getBytes(charset); - MessageDigest mdTemp = MessageDigest.getInstance("MD5"); - mdTemp.update(strTemp); + private SlaveServer getSlaveServer(){ + SlaveServer remoteSlaveServer = new SlaveServer(); + remoteSlaveServer.setHostname(carte);// 设置远程IP + remoteSlaveServer.setPort(port);// 端口 + remoteSlaveServer.setUsername(user); + remoteSlaveServer.setPassword(passwd); + return remoteSlaveServer; + } - byte[] md = mdTemp.digest(); - int j = md.length; - char[] str = new char[j * 2]; - int k = 0; + private void generateJobFile(String extractType, DatasetTable datasetTable) throws Exception{ + String jobName = null; + switch (extractType) { + case "all_scope": + jobName = "job_" + datasetTable.getId(); + break; + case "incremental_add": + jobName = "job_add_" + datasetTable.getId(); + break; + case "incremental_delete": + jobName = "job_delete_" + datasetTable.getId(); + break; + default: + break; + } - for (byte byte0 : md) { - str[k++] = HEX_DIGITS[byte0 >>> 4 & 0xf]; - str[k++] = HEX_DIGITS[byte0 & 0xf]; - } + String transName = null; + switch (extractType) { + case "all_scope": + transName = "trans_" + datasetTable.getId(); + break; + case "incremental_add": + transName = "trans_add_" + datasetTable.getId(); + break; + case "incremental_delete": + transName = "trans_delete_" + datasetTable.getId(); + break; + default: + break; + } - return new String(str); - } catch (Exception e) { - throw new RuntimeException("MD5 encrypt error:", e); + JobMeta jobMeta = new JobMeta(); + jobMeta.setName(jobName); + JobEntrySpecial start = new JobEntrySpecial(); + start.setName("START"); + start.setStart(true); + JobEntryCopy startEntry = new JobEntryCopy(start); + startEntry.setDrawn(true); + startEntry.setLocation(100, 100); + jobMeta.addJobEntry(startEntry); + + JobEntryTrans transrans = new JobEntryTrans(); + transrans.setTransname(transName); + transrans.setName("Transformation"); + JobEntryCopy transEntry = new JobEntryCopy(transrans); + transEntry.setDrawn(true); + transEntry.setLocation(300, 100); + jobMeta.addJobEntry(transEntry); + + jobMeta.addJobHop(new JobHopMeta(startEntry, transEntry)); + + JobEntrySuccess success = new JobEntrySuccess(); + success.setName("Success"); + JobEntryCopy successEntry = new JobEntryCopy(success); + successEntry.setDrawn(true); + successEntry.setLocation(500, 100); + jobMeta.addJobEntry(successEntry); + + JobHopMeta greenHop = new JobHopMeta(transEntry, successEntry); + greenHop.setEvaluation(true); + jobMeta.addJobHop(greenHop); + + String jobXml = jobMeta.getXML(); + File file = new File( root_path + jobName + ".kjb"); + FileUtils.writeStringToFile(file, jobXml, "UTF-8"); + } + + private void generateTransFile(String extractType, DatasetTable datasetTable, Datasource datasource, String table, List datasetTableFields, String selectSQL) throws Exception{ + TransMeta transMeta = new TransMeta(); + String transName = null; + switch (extractType) { + case "all_scope": + transName = "trans_" + datasetTable.getId(); + selectSQL = dataSetTableService.createQuerySQL(datasource.getType(), table, datasetTableFields.stream().map(DatasetTableField::getOriginName).toArray(String[]::new)); + break; + case "incremental_add": + transName = "trans_add_" + datasetTable.getId(); + break; + case "incremental_delete": + transName = "trans_delete_" + datasetTable.getId(); + break; + default: + break; + } + + transMeta.setName(transName); + DatasourceTypes datasourceType = DatasourceTypes.valueOf(datasource.getType()); + DatabaseMeta dataMeta = null; + switch (datasourceType) { + case mysql: + MysqlConfigrationDTO mysqlConfigrationDTO = new Gson().fromJson(datasource.getConfiguration(), MysqlConfigrationDTO.class); + dataMeta = new DatabaseMeta("db", "MYSQL", "Native", mysqlConfigrationDTO.getHost(), mysqlConfigrationDTO.getDataBase(), mysqlConfigrationDTO.getPort().toString(), mysqlConfigrationDTO.getUsername(), mysqlConfigrationDTO.getPassword()); + transMeta.addDatabase(dataMeta); + break; + default: + break; + + } + //registry是给每个步骤生成一个标识id + PluginRegistry registry = PluginRegistry.getInstance(); + //第一个表输入步骤(TableInputMeta) + TableInputMeta tableInput = new TableInputMeta(); + + //给表输入添加一个DatabaseMeta连接数据库 + DatabaseMeta database_bjdt = transMeta.findDatabase("db"); + tableInput.setDatabaseMeta(database_bjdt); + tableInput.setSQL(selectSQL); + //添加TableInputMeta到转换中 + String tableInputPluginId = registry.getPluginId(StepPluginType.class, tableInput); + StepMeta fromStep = new StepMeta(tableInputPluginId, "Data Input", tableInput); + //给步骤添加在spoon工具中的显示位置 + fromStep.setDraw(true); + fromStep.setLocation(100, 100); + transMeta.addStep(fromStep); + + //第二个 (User defined Java class) + UserDefinedJavaClassMeta userDefinedJavaClassMeta = new UserDefinedJavaClassMeta(); + List fields = new ArrayList<>(); + UserDefinedJavaClassMeta.FieldInfo fieldInfo = new UserDefinedJavaClassMeta.FieldInfo("uuid", ValueMetaInterface.TYPE_STRING, -1, -1); + fields.add(fieldInfo); + userDefinedJavaClassMeta.setFieldInfo(fields); + List definitions = new ArrayList(); + UserDefinedJavaClassDef userDefinedJavaClassDef = new UserDefinedJavaClassDef(UserDefinedJavaClassDef.ClassType.TRANSFORM_CLASS, "Processor", code); + userDefinedJavaClassDef.setActive(true); + definitions.add(userDefinedJavaClassDef); + userDefinedJavaClassMeta.replaceDefinitions(definitions); + + StepMeta userDefinedJavaClassStep = new StepMeta("UserDefinedJavaClass", "UserDefinedJavaClass", userDefinedJavaClassMeta); + userDefinedJavaClassStep.setLocation(300, 100); + userDefinedJavaClassStep.setDraw(true); + transMeta.addStep(userDefinedJavaClassStep); + + //第三个 (HBaseOutputMeta) + NamedClusterService namedClusterService = new NamedClusterManager(); + NamedCluster clusterTemplate = new NamedClusterImpl(); + clusterTemplate.setName("hadoop"); + clusterTemplate.setZooKeeperHost(zkHost); + clusterTemplate.setZooKeeperPort(zkPort); + clusterTemplate.setStorageScheme("HDFS"); + namedClusterService.setClusterTemplate(clusterTemplate); + + List providers = new ArrayList<>(); + ClusterInitializer clusterInitializer = new ClusterInitializerImpl(providers); + NamedClusterServiceLocator namedClusterServiceLocator = new NamedClusterServiceLocatorImpl(clusterInitializer); + + List runtimeTestActionHandlers = new ArrayList<>(); + RuntimeTestActionHandler defaultHandler = null; + + RuntimeTestActionService runtimeTestActionService = new RuntimeTestActionServiceImpl(runtimeTestActionHandlers, defaultHandler); + RuntimeTester runtimeTester = new RuntimeTesterImpl(new ArrayList<>( Arrays.asList( mock( RuntimeTest.class ) ) ), mock( ExecutorService.class ), "modules"); + + Put put = new Put((datasetTable.getId() + "," + "target_mapping").getBytes()); + for (DatasetTableField datasetTableField : datasetTableFields) { + put.addColumn("columns".getBytes(), (dataease_column_family + "," + datasetTableField.getOriginName() + "," + datasetTableField.getOriginName()).getBytes(), transToColumnType(datasetTableField.getDeType()).getBytes()); + } + put.addColumn("key".getBytes(), "uuid".getBytes(), "String".getBytes()); + TableName pentaho_mappings = TableName.valueOf(this.pentaho_mappings); + Table tab = getConnection().getTable(pentaho_mappings); + tab.put(put); + + HBaseOutputMeta hBaseOutputMeta = new HBaseOutputMeta(namedClusterService, namedClusterServiceLocator, runtimeTestActionService, runtimeTester); + hBaseOutputMeta.setTargetTableName(datasetTable.getId()); + hBaseOutputMeta.setTargetMappingName("target_mapping"); + hBaseOutputMeta.setNamedCluster(clusterTemplate); + hBaseOutputMeta.setCoreConfigURL(hbase_conf_file); + if(extractType.equalsIgnoreCase("incremental_delete")){ + hBaseOutputMeta.setDeleteRowKey(true); + } + StepMeta tostep = new StepMeta("HBaseOutput", "HBaseOutput", hBaseOutputMeta); + tostep.setLocation(600, 100); + + tostep.setDraw(true); + transMeta.addStep(tostep); + TransHopMeta hi1 = new TransHopMeta(fromStep, userDefinedJavaClassStep); + TransHopMeta hi2 = new TransHopMeta(userDefinedJavaClassStep, tostep); + transMeta.addTransHop(hi1); + transMeta.addTransHop(hi2); + + String transXml = transMeta.getXML(); + File file = new File(root_path + transName + ".ktr"); + FileUtils.writeStringToFile(file, transXml, "UTF-8"); + } + + public String transToColumnType(Integer field) { + switch (field) { + case 0: + return "String"; + case 1: + return "Date"; + case 2: + return "Long"; + default: + return "String"; } } - public String generateStr(int size, String[] d ){ - String str = null; - for(int i=0;i valueMetaList = data.outputRowMeta.getValueMetaList();\n" + + " for (ValueMetaInterface valueMetaInterface : valueMetaList) {\n" + + "\t if(!valueMetaInterface.getName().equalsIgnoreCase(\"uuid\")){\n" + + " str = str + get(Fields.In, valueMetaInterface.getName()).getString(r);\n" + + " }\n" + + " }\n" + + "\n" + + " String md5 = md5(str);\n" + + " get(Fields.Out, \"uuid\").setValue(r, md5);\n" + + "\n" + + " putRow(data.outputRowMeta, r);\n" + + "\n" + + " return true;\n" + + "}\n" + + "\n" + + " private static final char[] HEX_DIGITS = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};\n" + + " private static final String UTF_8 = \"UTF-8\";\n" + + " public static String md5(String src) {\n" + + " return md5(src, UTF_8);\n" + + " }\n" + + " public static String md5(String src, String charset) {\n" + + " try {\n" + + " byte[] strTemp = charset == null || charset.equals(\"\") ? src.getBytes() : src.getBytes(charset);\n" + + " MessageDigest mdTemp = MessageDigest.getInstance(\"MD5\");\n" + + " mdTemp.update(strTemp);\n" + + "\n" + + " byte[] md = mdTemp.digest();\n" + + " int j = md.length;\n" + + " char[] str = new char[j * 2];\n" + + " int k = 0;\n" + + "\n" + + " for (byte byte0 : md) {\n" + + " str[k++] = HEX_DIGITS[byte0 >>> 4 & 0xf];\n" + + " str[k++] = HEX_DIGITS[byte0 & 0xf];\n" + + " }\n" + + "\n" + + " return new String(str);\n" + + " } catch (Exception e) {\n" + + " throw new RuntimeException(\"MD5 encrypt error:\", e);\n" + + " }\n" + + " }\n" + + "\n" + + " public String generateStr(int size, String[] d ){\n" + + " String str = null;\n" + + " for(int i=0;i Date: Thu, 8 Apr 2021 17:04:14 +0800 Subject: [PATCH 2/4] =?UTF-8?q?feat(=E8=A7=86=E5=9B=BE):fix?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- frontend/src/views/chart/view/ChartEdit.vue | 1 + 1 file changed, 1 insertion(+) diff --git a/frontend/src/views/chart/view/ChartEdit.vue b/frontend/src/views/chart/view/ChartEdit.vue index f27727dafc..74c1456f66 100644 --- a/frontend/src/views/chart/view/ChartEdit.vue +++ b/frontend/src/views/chart/view/ChartEdit.vue @@ -598,6 +598,7 @@ export default { .view-panel { display: flex; height: calc(100% - 40px); + background-color: #f7f8fa; } .drag-list { From ce1d2035ad1de620f642d1c47d26e2459fd202e1 Mon Sep 17 00:00:00 2001 From: junjie Date: Thu, 8 Apr 2021 17:22:03 +0800 Subject: [PATCH 3/4] feat(frontend):css --- frontend/src/views/panel/list/PanelViewShow.vue | 4 ---- 1 file changed, 4 deletions(-) diff --git a/frontend/src/views/panel/list/PanelViewShow.vue b/frontend/src/views/panel/list/PanelViewShow.vue index 98bc5b945e..9fb452cf9e 100644 --- a/frontend/src/views/panel/list/PanelViewShow.vue +++ b/frontend/src/views/panel/list/PanelViewShow.vue @@ -170,10 +170,6 @@ export default { box-sizing: border-box; } - span { - font-size: 12px; - } - .custom-position { flex: 1; display: flex; From 81a802655a2e16477cb778d2a39531f5f7b3343d Mon Sep 17 00:00:00 2001 From: taojinlong Date: Thu, 8 Apr 2021 17:40:30 +0800 Subject: [PATCH 4/4] =?UTF-8?q?feat:=20kettle=20=E6=8A=BD=E5=8F=96?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=88=B0=20hbase?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- frontend/package.json | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/frontend/package.json b/frontend/package.json index 2f53ac8bf3..d99ab683db 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -17,6 +17,7 @@ "dependencies": { "@riophae/vue-treeselect": "0.4.0", "axios": "^0.21.1", + "core-js": "^2.6.5", "echarts": "^5.0.2", "element-resize-detector": "^1.2.2", "element-ui": "2.13.0", @@ -46,7 +47,7 @@ "@babel/register": "7.0.0", "@vue/cli-plugin-babel": "3.6.0", "@vue/cli-plugin-eslint": "^3.9.1", - "@vue/cli-service": "3.6.0", + "@vue/cli-service": "^4.5.12", "babel-eslint": "10.0.1", "chalk": "2.4.2", "connect": "3.6.6", @@ -57,7 +58,7 @@ "less": "^4.1.1", "less-loader": "^8.0.0", "mockjs": "1.0.1-beta3", - "runjs": "^4.3.2", + "runjs": "^4.1.3", "sass": "^1.32.5", "sass-loader": "^10.1.1", "script-ext-html-webpack-plugin": "2.1.3",