From 0db59a7fbea37d21cd1d66e7fe0bbbb833e2d2fd Mon Sep 17 00:00:00 2001 From: taojinlong Date: Tue, 27 Apr 2021 17:32:26 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=8A=BD=E5=8F=96=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=88=B0doris?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/pom.xml | 80 +--- .../base/domain/DatasetTableField.java | 8 +- .../base/domain/DatasetTableFieldExample.java | 180 ++++--- .../base/mapper/DatasetTableFieldMapper.xml | 63 ++- .../java/io/dataease/config/CommonConfig.java | 55 +-- .../provider/DatasourceProvider.java | 13 +- .../datasource/provider/JdbcProvider.java | 188 +++++--- .../datasource/service/DatasourceService.java | 29 -- .../AppStartInitDataSourceListener.java | 22 - .../service/dataset/DataSetTableService.java | 71 +-- .../service/dataset/ExtractDataService.java | 447 ++++++++---------- .../io/dataease/service/spark/CacheUtil.java | 53 --- .../io/dataease/service/spark/SparkCalc.java | 407 ---------------- .../src/main/resources/generatorConfig.xml | 2 +- 14 files changed, 525 insertions(+), 1093 deletions(-) delete mode 100644 backend/src/main/java/io/dataease/listener/AppStartInitDataSourceListener.java delete mode 100644 backend/src/main/java/io/dataease/service/spark/CacheUtil.java delete mode 100644 backend/src/main/java/io/dataease/service/spark/SparkCalc.java diff --git a/backend/pom.xml b/backend/pom.xml index 6de71fa692..b2216cd9b2 100644 --- a/backend/pom.xml +++ b/backend/pom.xml @@ -17,7 +17,6 @@ 1.8 20.1.0 3.12.1 - 3.1.1 @@ -315,70 +314,12 @@ ehcache 2.9.1 - - - org.apache.hbase - hbase-client - 2.4.1 - - - org.apache.hbase - hbase-common - 2.4.1 - - - org.apache.hbase - hbase-mapreduce - 2.4.1 - - org.testng testng 6.8 test - - - org.apache.spark - spark-core_2.12 - ${spark.version} - - - org.slf4j - slf4j-log4j12 - - - log4j - log4j - - - org.objenesis - objenesis - - - provided - - - - org.apache.spark - spark-streaming_2.12 - ${spark.version} - provided - - - - org.apache.spark - spark-sql_2.12 - ${spark.version} - - - janino - org.codehaus.janino - - - - org.codehaus.janino janino @@ -400,27 +341,16 @@ metastore 8.3.0.18-1084 - - pentaho - pentaho-big-data-kettle-plugins-hbase-meta - 8.3.0.18-1084 - - - pentaho - pentaho-big-data-kettle-plugins-hbase - 8.3.0.18-1084 - - - pentaho - pentaho-big-data-impl-cluster - 8.3.0.18-1084 - org.pentaho.di.plugins pdi-engine-configuration-impl 8.3.0.7-683 - + + c3p0 + c3p0 + 0.9.1.2 + diff --git a/backend/src/main/java/io/dataease/base/domain/DatasetTableField.java b/backend/src/main/java/io/dataease/base/domain/DatasetTableField.java index 9cd293962b..360ab4b24e 100644 --- a/backend/src/main/java/io/dataease/base/domain/DatasetTableField.java +++ b/backend/src/main/java/io/dataease/base/domain/DatasetTableField.java @@ -18,13 +18,15 @@ public class DatasetTableField implements Serializable { private String type; + private Integer size; + + private Integer deType; + private Boolean checked; private Integer columnIndex; private Long lastSyncTime; - private Integer deType; - private static final long serialVersionUID = 1L; -} +} \ No newline at end of file diff --git a/backend/src/main/java/io/dataease/base/domain/DatasetTableFieldExample.java b/backend/src/main/java/io/dataease/base/domain/DatasetTableFieldExample.java index 4b4a354d34..462a9a2b31 100644 --- a/backend/src/main/java/io/dataease/base/domain/DatasetTableFieldExample.java +++ b/backend/src/main/java/io/dataease/base/domain/DatasetTableFieldExample.java @@ -454,6 +454,126 @@ public class DatasetTableFieldExample { return (Criteria) this; } + public Criteria andSizeIsNull() { + addCriterion("`size` is null"); + return (Criteria) this; + } + + public Criteria andSizeIsNotNull() { + addCriterion("`size` is not null"); + return (Criteria) this; + } + + public Criteria andSizeEqualTo(Integer value) { + addCriterion("`size` =", value, "size"); + return (Criteria) this; + } + + public Criteria andSizeNotEqualTo(Integer value) { + addCriterion("`size` <>", value, "size"); + return (Criteria) this; + } + + public Criteria andSizeGreaterThan(Integer value) { + addCriterion("`size` >", value, "size"); + return (Criteria) this; + } + + public Criteria andSizeGreaterThanOrEqualTo(Integer value) { + addCriterion("`size` >=", value, "size"); + return (Criteria) this; + } + + public Criteria andSizeLessThan(Integer value) { + addCriterion("`size` <", value, "size"); + return (Criteria) this; + } + + public Criteria andSizeLessThanOrEqualTo(Integer value) { + addCriterion("`size` <=", value, "size"); + return (Criteria) this; + } + + public Criteria andSizeIn(List values) { + addCriterion("`size` in", values, "size"); + return (Criteria) this; + } + + public Criteria andSizeNotIn(List values) { + addCriterion("`size` not in", values, "size"); + return (Criteria) this; + } + + public Criteria andSizeBetween(Integer value1, Integer value2) { + addCriterion("`size` between", value1, value2, "size"); + return (Criteria) this; + } + + public Criteria andSizeNotBetween(Integer value1, Integer value2) { + addCriterion("`size` not between", value1, value2, "size"); + return (Criteria) this; + } + + public Criteria andDeTypeIsNull() { + addCriterion("de_type is null"); + return (Criteria) this; + } + + public Criteria andDeTypeIsNotNull() { + addCriterion("de_type is not null"); + return (Criteria) this; + } + + public Criteria andDeTypeEqualTo(Integer value) { + addCriterion("de_type =", value, "deType"); + return (Criteria) this; + } + + public Criteria andDeTypeNotEqualTo(Integer value) { + addCriterion("de_type <>", value, "deType"); + return (Criteria) this; + } + + public Criteria andDeTypeGreaterThan(Integer value) { + addCriterion("de_type >", value, "deType"); + return (Criteria) this; + } + + public Criteria andDeTypeGreaterThanOrEqualTo(Integer value) { + addCriterion("de_type >=", value, "deType"); + return (Criteria) this; + } + + public Criteria andDeTypeLessThan(Integer value) { + addCriterion("de_type <", value, "deType"); + return (Criteria) this; + } + + public Criteria andDeTypeLessThanOrEqualTo(Integer value) { + addCriterion("de_type <=", value, "deType"); + return (Criteria) this; + } + + public Criteria andDeTypeIn(List values) { + addCriterion("de_type in", values, "deType"); + return (Criteria) this; + } + + public Criteria andDeTypeNotIn(List values) { + addCriterion("de_type not in", values, "deType"); + return (Criteria) this; + } + + public Criteria andDeTypeBetween(Integer value1, Integer value2) { + addCriterion("de_type between", value1, value2, "deType"); + return (Criteria) this; + } + + public Criteria andDeTypeNotBetween(Integer value1, Integer value2) { + addCriterion("de_type not between", value1, value2, "deType"); + return (Criteria) this; + } + public Criteria andCheckedIsNull() { addCriterion("`checked` is null"); return (Criteria) this; @@ -633,66 +753,6 @@ public class DatasetTableFieldExample { addCriterion("last_sync_time not between", value1, value2, "lastSyncTime"); return (Criteria) this; } - - public Criteria andDeTypeIsNull() { - addCriterion("de_type is null"); - return (Criteria) this; - } - - public Criteria andDeTypeIsNotNull() { - addCriterion("de_type is not null"); - return (Criteria) this; - } - - public Criteria andDeTypeEqualTo(Integer value) { - addCriterion("de_type =", value, "deType"); - return (Criteria) this; - } - - public Criteria andDeTypeNotEqualTo(Integer value) { - addCriterion("de_type <>", value, "deType"); - return (Criteria) this; - } - - public Criteria andDeTypeGreaterThan(Integer value) { - addCriterion("de_type >", value, "deType"); - return (Criteria) this; - } - - public Criteria andDeTypeGreaterThanOrEqualTo(Integer value) { - addCriterion("de_type >=", value, "deType"); - return (Criteria) this; - } - - public Criteria andDeTypeLessThan(Integer value) { - addCriterion("de_type <", value, "deType"); - return (Criteria) this; - } - - public Criteria andDeTypeLessThanOrEqualTo(Integer value) { - addCriterion("de_type <=", value, "deType"); - return (Criteria) this; - } - - public Criteria andDeTypeIn(List values) { - addCriterion("de_type in", values, "deType"); - return (Criteria) this; - } - - public Criteria andDeTypeNotIn(List values) { - addCriterion("de_type not in", values, "deType"); - return (Criteria) this; - } - - public Criteria andDeTypeBetween(Integer value1, Integer value2) { - addCriterion("de_type between", value1, value2, "deType"); - return (Criteria) this; - } - - public Criteria andDeTypeNotBetween(Integer value1, Integer value2) { - addCriterion("de_type not between", value1, value2, "deType"); - return (Criteria) this; - } } public static class Criteria extends GeneratedCriteria { diff --git a/backend/src/main/java/io/dataease/base/mapper/DatasetTableFieldMapper.xml b/backend/src/main/java/io/dataease/base/mapper/DatasetTableFieldMapper.xml index 7902706f17..6bea2c8c99 100644 --- a/backend/src/main/java/io/dataease/base/mapper/DatasetTableFieldMapper.xml +++ b/backend/src/main/java/io/dataease/base/mapper/DatasetTableFieldMapper.xml @@ -7,10 +7,11 @@ + + - @@ -71,8 +72,8 @@ - id, table_id, origin_name, `name`, `type`, `checked`, column_index, last_sync_time, - de_type + id, table_id, origin_name, `name`, `type`, `size`, de_type, `checked`, column_index, + last_sync_time @@ -197,6 +206,12 @@ `type` = #{record.type,jdbcType=VARCHAR}, + + `size` = #{record.size,jdbcType=INTEGER}, + + + de_type = #{record.deType,jdbcType=INTEGER}, + `checked` = #{record.checked,jdbcType=BIT}, @@ -206,9 +221,6 @@ last_sync_time = #{record.lastSyncTime,jdbcType=BIGINT}, - - de_type = #{record.deType,jdbcType=INTEGER}, - @@ -221,10 +233,11 @@ origin_name = #{record.originName,jdbcType=VARCHAR}, `name` = #{record.name,jdbcType=VARCHAR}, `type` = #{record.type,jdbcType=VARCHAR}, + `size` = #{record.size,jdbcType=INTEGER}, + de_type = #{record.deType,jdbcType=INTEGER}, `checked` = #{record.checked,jdbcType=BIT}, column_index = #{record.columnIndex,jdbcType=INTEGER}, - last_sync_time = #{record.lastSyncTime,jdbcType=BIGINT}, - de_type = #{record.deType,jdbcType=INTEGER} + last_sync_time = #{record.lastSyncTime,jdbcType=BIGINT} @@ -244,6 +257,12 @@ `type` = #{type,jdbcType=VARCHAR}, + + `size` = #{size,jdbcType=INTEGER}, + + + de_type = #{deType,jdbcType=INTEGER}, + `checked` = #{checked,jdbcType=BIT}, @@ -253,9 +272,6 @@ last_sync_time = #{lastSyncTime,jdbcType=BIGINT}, - - de_type = #{deType,jdbcType=INTEGER}, - where id = #{id,jdbcType=VARCHAR} @@ -265,10 +281,11 @@ origin_name = #{originName,jdbcType=VARCHAR}, `name` = #{name,jdbcType=VARCHAR}, `type` = #{type,jdbcType=VARCHAR}, + `size` = #{size,jdbcType=INTEGER}, + de_type = #{deType,jdbcType=INTEGER}, `checked` = #{checked,jdbcType=BIT}, column_index = #{columnIndex,jdbcType=INTEGER}, - last_sync_time = #{lastSyncTime,jdbcType=BIGINT}, - de_type = #{deType,jdbcType=INTEGER} + last_sync_time = #{lastSyncTime,jdbcType=BIGINT} where id = #{id,jdbcType=VARCHAR} \ No newline at end of file diff --git a/backend/src/main/java/io/dataease/config/CommonConfig.java b/backend/src/main/java/io/dataease/config/CommonConfig.java index bb9cb36cbb..36917dd40e 100644 --- a/backend/src/main/java/io/dataease/config/CommonConfig.java +++ b/backend/src/main/java/io/dataease/config/CommonConfig.java @@ -1,6 +1,8 @@ package io.dataease.config; +import com.alibaba.fastjson.JSONObject; import com.fit2cloud.autoconfigure.QuartzAutoConfiguration; +import io.dataease.base.domain.Datasource; import io.dataease.commons.utils.CommonThreadPool; import org.pentaho.di.core.KettleEnvironment; import org.pentaho.di.repository.filerep.KettleFileRepository; @@ -21,41 +23,26 @@ public class CommonConfig { private Environment env; // 保存了配置文件的信息 private static String root_path = "/opt/dataease/data/kettle/"; -// @Bean -// @ConditionalOnMissingBean -// public org.apache.hadoop.conf.Configuration configuration() { -// org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration(); -// configuration.set("hbase.zookeeper.quorum", env.getProperty("hbase.zookeeper.quorum")); -// configuration.set("hbase.zookeeper.property.clientPort", env.getProperty("hbase.zookeeper.property.clientPort")); -// configuration.set("hbase.client.retries.number", env.getProperty("hbase.client.retries.number", "1")); -// return configuration; -// } + @Bean(name = "DorisDatasource") + @ConditionalOnMissingBean + public Datasource configuration() { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("dataSourceType", "jdbc"); + jsonObject.put("dataBase", env.getProperty("doris.db", "doris")); + jsonObject.put("username", env.getProperty("doris.user", "root")); + jsonObject.put("password", env.getProperty("doris.password", "dataease")); + jsonObject.put("host", env.getProperty("doris.host", "doris")); + jsonObject.put("port", env.getProperty("doris.port", "9030")); + + Datasource datasource = new Datasource(); + datasource.setId("doris"); + datasource.setName("doris"); + datasource.setDesc("doris"); + datasource.setType("mysql"); + datasource.setConfiguration(jsonObject.toJSONString()); + return datasource; + } -// @Bean -// @ConditionalOnMissingBean -// public SparkSession javaSparkSession() { -// SparkSession spark = SparkSession.builder() -// .appName(env.getProperty("spark.appName", "DataeaseJob")) -// .master(env.getProperty("spark.master", "local[*]")) -// .config("spark.scheduler.mode", env.getProperty("spark.scheduler.mode", "FAIR")) -//// .config("spark.serializer", env.getProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")) -//// .config("spark.executor.cores", env.getProperty("spark.executor.cores", "8")) -//// .config("spark.executor.memory", env.getProperty("spark.executor.memory", "6442450944b")) -//// .config("spark.locality.wait", env.getProperty("spark.locality.wait", "600000")) -//// .config("spark.maxRemoteBlockSizeFetchToMem", env.getProperty("spark.maxRemoteBlockSizeFetchToMem", "2000m")) -//// .config("spark.shuffle.detectCorrupt", env.getProperty("spark.shuffle.detectCorrupt", "false")) -//// .config("spark.shuffle.service.enabled", env.getProperty("spark.shuffle.service.enabled", "true")) -//// .config("spark.sql.adaptive.enabled", env.getProperty("spark.sql.adaptive.enabled", "true")) -//// .config("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", env.getProperty("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", "200M")) -//// .config("spark.sql.broadcastTimeout", env.getProperty("spark.sql.broadcastTimeout", "12000")) -//// .config("spark.sql.retainGroupColumns", env.getProperty("spark.sql.retainGroupColumns", "false")) -//// .config("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold", env.getProperty("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold", "100000")) -//// .config("spark.sql.sortMergeJoinExec.buffer.spill.threshold", env.getProperty("spark.sql.sortMergeJoinExec.buffer.spill.threshold", "100000")) -//// .config("spark.sql.variable.substitute", env.getProperty("spark.sql.variable.substitute", "false")) -//// .config("spark.temp.expired.time", env.getProperty("spark.temp.expired.time", "3600")) -// .getOrCreate(); -// return spark; -// } @Bean @ConditionalOnMissingBean diff --git a/backend/src/main/java/io/dataease/datasource/provider/DatasourceProvider.java b/backend/src/main/java/io/dataease/datasource/provider/DatasourceProvider.java index 05bb3c86e6..8f0eb17331 100644 --- a/backend/src/main/java/io/dataease/datasource/provider/DatasourceProvider.java +++ b/backend/src/main/java/io/dataease/datasource/provider/DatasourceProvider.java @@ -8,6 +8,7 @@ import io.dataease.datasource.request.DatasourceRequest; import java.sql.ResultSet; import java.util.ArrayList; import java.util.List; +import java.util.Map; public abstract class DatasourceProvider { @@ -15,8 +16,6 @@ public abstract class DatasourceProvider { abstract public List getData(DatasourceRequest datasourceRequest) throws Exception; - abstract public ResultSet getDataResultSet(DatasourceRequest datasourceRequest) throws Exception; - abstract public List getTables(DatasourceRequest datasourceRequest) throws Exception; public List getTableFileds(DatasourceRequest datasourceRequest) throws Exception { @@ -27,13 +26,11 @@ public abstract class DatasourceProvider { getData(datasourceRequest); } - abstract public Long count(DatasourceRequest datasourceRequest) throws Exception; + abstract public List fetchResult(DatasourceRequest datasourceRequest) throws Exception; - abstract public List getPageData(DatasourceRequest datasourceRequest) throws Exception; + abstract public List fetchResultField(DatasourceRequest datasourceRequest) throws Exception; - abstract public List fetchResult(ResultSet rs) throws Exception; + abstract public Map fetchResultAndField(DatasourceRequest datasourceRequest) throws Exception; - abstract public List fetchResultField(ResultSet rs) throws Exception; - - abstract public void initConnectionPool(DatasourceRequest datasourceRequest) throws Exception; + abstract public void initDataSource(DatasourceRequest datasourceRequest) throws Exception; } diff --git a/backend/src/main/java/io/dataease/datasource/provider/JdbcProvider.java b/backend/src/main/java/io/dataease/datasource/provider/JdbcProvider.java index e3868bc2d2..5f157aa163 100644 --- a/backend/src/main/java/io/dataease/datasource/provider/JdbcProvider.java +++ b/backend/src/main/java/io/dataease/datasource/provider/JdbcProvider.java @@ -1,26 +1,25 @@ package io.dataease.datasource.provider; import com.google.gson.Gson; -import io.dataease.base.domain.DatasetTableField; +import com.mchange.v2.c3p0.ComboPooledDataSource; import io.dataease.datasource.constants.DatasourceTypes; import io.dataease.datasource.dto.MysqlConfigrationDTO; import io.dataease.datasource.dto.SqlServerConfigration; import io.dataease.datasource.dto.TableFiled; import io.dataease.datasource.request.DatasourceRequest; -import org.apache.arrow.util.VisibleForTesting; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; +import java.beans.PropertyVetoException; import java.sql.*; -import java.text.MessageFormat; import java.util.*; -import java.util.concurrent.ArrayBlockingQueue; @Service("jdbc") public class JdbcProvider extends DatasourceProvider { - private static Map> jdbcConnection = new HashMap<>(); - private static int poolSize = 20; + private static Map jdbcConnection = new HashMap<>(); + private static int initPoolSize = 5; + private static int maxConnections = 200; @Override public List getData(DatasourceRequest datasourceRequest) throws Exception { List list = new LinkedList<>(); @@ -35,67 +34,46 @@ public class JdbcProvider extends DatasourceProvider { } catch (Exception e) { throw new Exception("ERROR:" + e.getMessage(), e); }finally { - returnSource(connection, datasourceRequest.getDatasource().getId()); + connection.close(); } return list; } - @VisibleForTesting public void exec(DatasourceRequest datasourceRequest) throws Exception { Connection connection = null; try { connection = getConnectionFromPool(datasourceRequest); Statement stat = connection.createStatement(); - stat.execute(datasourceRequest.getQuery()); + Boolean result = stat.execute(datasourceRequest.getQuery()); + stat.close(); } catch (SQLException e) { throw new Exception("ERROR:" + e.getMessage(), e); } catch (Exception e) { throw new Exception("ERROR:" + e.getMessage(), e); }finally { - returnSource(connection, datasourceRequest.getDatasource().getId()); + connection.close(); } } - @Override - public ResultSet getDataResultSet(DatasourceRequest datasourceRequest) throws Exception { + public List fetchResult(DatasourceRequest datasourceRequest) throws Exception { ResultSet rs; Connection connection = null; try { connection = getConnectionFromPool(datasourceRequest); Statement stat = connection.createStatement(); rs = stat.executeQuery(datasourceRequest.getQuery()); + return fetchResult(rs); } catch (SQLException e) { throw new Exception("ERROR:" + e.getMessage(), e); } catch (Exception e) { throw new Exception("ERROR:" + e.getMessage(), e); }finally { - returnSource(connection, datasourceRequest.getDatasource().getId()); + connection.close(); } - return rs; } - @Override - public List getPageData(DatasourceRequest datasourceRequest) throws Exception { - List list = new LinkedList<>(); - Connection connection = null; - try { - connection = getConnectionFromPool(datasourceRequest); - Statement stat = connection.createStatement(); - ResultSet rs = stat.executeQuery(datasourceRequest.getQuery() + MessageFormat.format(" LIMIT {0}, {1}", (datasourceRequest.getStartPage() - 1) * datasourceRequest.getPageSize(), datasourceRequest.getPageSize())); - list = fetchResult(rs); - } catch (SQLException e) { - throw new Exception("ERROR:" + e.getMessage(), e); - } catch (Exception e) { - throw new Exception("ERROR:" + e.getMessage(), e); - }finally { - returnSource(connection, datasourceRequest.getDatasource().getId()); - } - return list; - } - - @Override - public List fetchResult(ResultSet rs) throws Exception { + private List fetchResult(ResultSet rs) throws Exception { List list = new LinkedList<>(); ResultSetMetaData metaData = rs.getMetaData(); int columnCount = metaData.getColumnCount(); @@ -104,7 +82,7 @@ public class JdbcProvider extends DatasourceProvider { for (int j = 0; j < columnCount; j++) { int columType = metaData.getColumnType(j + 1); switch (columType) { - case java.sql.Types.DATE: + case Types.DATE: row[j] = rs.getDate(j + 1).toString(); break; default: @@ -118,7 +96,49 @@ public class JdbcProvider extends DatasourceProvider { } @Override - public List fetchResultField(ResultSet rs) throws Exception { + public List fetchResultField(DatasourceRequest datasourceRequest) throws Exception { + ResultSet rs; + Connection connection = null; + try { + connection = getConnectionFromPool(datasourceRequest); + Statement stat = connection.createStatement(); + rs = stat.executeQuery(datasourceRequest.getQuery()); + return fetchResultField(rs); + } catch (SQLException e) { + throw new Exception("ERROR:" + e.getMessage(), e); + } catch (Exception e) { + throw new Exception("ERROR:" + e.getMessage(), e); + }finally { + connection.close(); + } + } + + @Override + public Map fetchResultAndField(DatasourceRequest datasourceRequest) throws Exception { + ResultSet rs; + Map result = new HashMap<>(); + Connection connection = null; + List dataList = new LinkedList<>(); + List fieldList = new ArrayList<>(); + try { + connection = getConnectionFromPool(datasourceRequest); + Statement stat = connection.createStatement(); + rs = stat.executeQuery(datasourceRequest.getQuery()); + dataList = fetchResult(rs); + fieldList = fetchResultField(rs); + result.put("dataList", dataList); + result.put("fieldList", fieldList); + return result; + } catch (SQLException e) { + throw new Exception("ERROR:" + e.getMessage(), e); + } catch (Exception e) { + throw new Exception("ERROR:" + e.getMessage(), e); + }finally { + connection.close(); + } + } + + private List fetchResultField(ResultSet rs) throws Exception { List fieldList = new ArrayList<>(); ResultSetMetaData metaData = rs.getMetaData(); int columnCount = metaData.getColumnCount(); @@ -130,6 +150,7 @@ public class JdbcProvider extends DatasourceProvider { field.setFieldName(l); field.setRemarks(l); field.setFieldType(t); + field.setFieldSize(metaData.getColumnDisplaySize(j + 1)); fieldList.add(field); } return fieldList; @@ -142,16 +163,18 @@ public class JdbcProvider extends DatasourceProvider { Connection con = null; try { con = getConnectionFromPool(datasourceRequest); - Statement ps = con.createStatement(); - ResultSet resultSet = ps.executeQuery(queryStr); + Statement statement = con.createStatement(); + ResultSet resultSet = statement.executeQuery(queryStr); while (resultSet.next()) { tables.add(resultSet.getString(1)); } + resultSet.close(); + statement.close(); return tables; } catch (Exception e) { throw new Exception("ERROR: " + e.getMessage(), e); }finally { - returnSource(con, datasourceRequest.getDatasource().getId()); + con.close(); } } @@ -175,17 +198,19 @@ public class JdbcProvider extends DatasourceProvider { remarks = colName; } tableFiled.setRemarks(remarks); + tableFiled.setFieldSize(Integer.valueOf(resultSet.getString("COLUMN_SIZE"))); String dbType = resultSet.getString("TYPE_NAME"); tableFiled.setFieldType(dbType); list.add(tableFiled); } } + resultSet.close(); } catch (SQLException e) { throw new Exception("ERROR:" + e.getMessage(), e); } catch (Exception e) { throw new Exception("ERROR:" + e.getMessage(), e); }finally { - returnSource(connection, datasourceRequest.getDatasource().getId()); + connection.close(); } return list; } @@ -198,6 +223,8 @@ public class JdbcProvider extends DatasourceProvider { con = getConnection(datasourceRequest); Statement ps = con.createStatement(); ResultSet resultSet = ps.executeQuery(queryStr); + resultSet.close(); + ps.close(); } catch (Exception e) { throw new Exception("ERROR: " + e.getMessage(), e); }finally { @@ -217,45 +244,43 @@ public class JdbcProvider extends DatasourceProvider { } catch (Exception e) { throw new Exception("ERROR: " + e.getMessage(), e); }finally { - returnSource(con, datasourceRequest.getDatasource().getId()); + con.close(); } return 0L; } - private void returnSource(Connection connection, String dataSourceId) throws Exception{ - if(connection != null && !connection.isClosed()){ - ArrayBlockingQueue connections = jdbcConnection.get(dataSourceId); - connections.put(connection); - } - } - private Connection getConnectionFromPool(DatasourceRequest datasourceRequest)throws Exception { - ArrayBlockingQueue connections = jdbcConnection.get(datasourceRequest.getDatasource().getId()); - if (connections == null) { - initConnectionPool(datasourceRequest); + ComboPooledDataSource dataSource = jdbcConnection.get(datasourceRequest.getDatasource().getId()); + if (dataSource == null) { + initDataSource(datasourceRequest); } - connections = jdbcConnection.get(datasourceRequest.getDatasource().getId()); - Connection co = connections.take(); + dataSource = jdbcConnection.get(datasourceRequest.getDatasource().getId()); + Connection co = dataSource.getConnection(); return co; } @Override - public void initConnectionPool(DatasourceRequest datasourceRequest)throws Exception{ - ArrayBlockingQueue connections = jdbcConnection.get(datasourceRequest.getDatasource().getId()); - if (connections == null) { - connections = new ArrayBlockingQueue<>(poolSize); - for (int i = 0; i < poolSize ; i++) { - Connection connection = getConnection(datasourceRequest); - connections.add(connection); - } - jdbcConnection.put(datasourceRequest.getDatasource().getId(), connections); - }else { - for (int i = 0; i < poolSize ; i++) { - Connection connection = connections.take(); - connection.close(); - connection = getConnection(datasourceRequest); - connections.add(connection); - } + public void initDataSource(DatasourceRequest datasourceRequest)throws Exception{ + ComboPooledDataSource dataSource = jdbcConnection.get(datasourceRequest.getDatasource().getId()); + if (dataSource == null) { + dataSource = new ComboPooledDataSource(); + setCredential(datasourceRequest, dataSource); + dataSource.setMaxIdleTime(30); // 最大空闲时间 + dataSource.setAcquireIncrement(5);// 增长数 + dataSource.setInitialPoolSize(initPoolSize);// 初始连接数 + dataSource.setMinPoolSize(initPoolSize); // 最小连接数 + dataSource.setMaxPoolSize(maxConnections); // 最大连接数 + dataSource.setAcquireRetryAttempts(30);// 获取连接重试次数 + dataSource.setIdleConnectionTestPeriod(60); // 每60s检查数据库空闲连接 + dataSource.setMaxStatements(0); // c3p0全局的PreparedStatements缓存的大小 + dataSource.setBreakAfterAcquireFailure(false); // 获取连接失败将会引起所有等待连接池来获取连接的线程抛出异常。但是数据源仍有效保留,并在下次调用getConnection()的时候继续尝试获取连接。如果设为true,那么在尝试获取连接失败后该数据源将申明已断开并永久关闭。Default: false + dataSource.setTestConnectionOnCheckout(false); // 在每个connection 提交是校验有效性 + dataSource.setTestConnectionOnCheckin(true); // 取得连接的同时将校验连接的有效性 + dataSource.setCheckoutTimeout(60000); // 从连接池获取连接的超时时间,如设为0则无限期等待。单位毫秒,默认为0 + dataSource.setPreferredTestQuery("SELECT 1"); + dataSource.setDebugUnreturnedConnectionStackTraces(true); + dataSource.setUnreturnedConnectionTimeout(3600); + jdbcConnection.put(datasourceRequest.getDatasource().getId(), dataSource); } } @@ -293,6 +318,29 @@ public class JdbcProvider extends DatasourceProvider { return DriverManager.getConnection(jdbcurl, props); } + + private void setCredential(DatasourceRequest datasourceRequest, ComboPooledDataSource dataSource) throws PropertyVetoException { + DatasourceTypes datasourceType = DatasourceTypes.valueOf(datasourceRequest.getDatasource().getType()); + switch (datasourceType) { + case mysql: + MysqlConfigrationDTO mysqlConfigrationDTO = new Gson().fromJson(datasourceRequest.getDatasource().getConfiguration(), MysqlConfigrationDTO.class); + dataSource.setUser(mysqlConfigrationDTO.getUsername()); + dataSource.setDriverClass(mysqlConfigrationDTO.getDriver()); + dataSource.setPassword(mysqlConfigrationDTO.getPassword()); + dataSource.setJdbcUrl(mysqlConfigrationDTO.getJdbc()); + break; + case sqlServer: + SqlServerConfigration sqlServerConfigration = new Gson().fromJson(datasourceRequest.getDatasource().getConfiguration(), SqlServerConfigration.class); + dataSource.setUser(sqlServerConfigration.getUsername()); + dataSource.setDriverClass(sqlServerConfigration.getDriver()); + dataSource.setPassword(sqlServerConfigration.getPassword()); + dataSource.setJdbcUrl(sqlServerConfigration.getJdbc()); + break; + default: + break; + } + } + private String getDatabase(DatasourceRequest datasourceRequest) { DatasourceTypes datasourceType = DatasourceTypes.valueOf(datasourceRequest.getDatasource().getType()); switch (datasourceType) { diff --git a/backend/src/main/java/io/dataease/datasource/service/DatasourceService.java b/backend/src/main/java/io/dataease/datasource/service/DatasourceService.java index 7fb24d6fae..beb5483366 100644 --- a/backend/src/main/java/io/dataease/datasource/service/DatasourceService.java +++ b/backend/src/main/java/io/dataease/datasource/service/DatasourceService.java @@ -41,7 +41,6 @@ public class DatasourceService { datasource.setUpdateTime(currentTimeMillis); datasource.setCreateTime(currentTimeMillis); datasourceMapper.insertSelective(datasource); - initConnectionPool(datasource); return datasource; } @@ -71,7 +70,6 @@ public class DatasourceService { datasource.setCreateTime(null); datasource.setUpdateTime(System.currentTimeMillis()); datasourceMapper.updateByPrimaryKeySelective(datasource); - initConnectionPool(datasource); } public void validate(Datasource datasource) throws Exception { @@ -92,31 +90,4 @@ public class DatasourceService { public Datasource get(String id) { return datasourceMapper.selectByPrimaryKey(id); } - - private void initConnectionPool(Datasource datasource){ - commonThreadPool.addTask(() ->{ - try { - DatasourceProvider datasourceProvider = ProviderFactory.getProvider(datasource.getType()); - DatasourceRequest datasourceRequest = new DatasourceRequest(); - datasourceRequest.setDatasource(datasource); - datasourceProvider.initConnectionPool(datasourceRequest); - }catch (Exception e){} - }); - } - - public void initAllDataSourceConnectionPool(){ - List datasources = datasourceMapper.selectByExampleWithBLOBs(new DatasourceExample()); - datasources.forEach(datasource -> { - commonThreadPool.addTask(() ->{ - try { - DatasourceProvider datasourceProvider = ProviderFactory.getProvider(datasource.getType()); - DatasourceRequest datasourceRequest = new DatasourceRequest(); - datasourceRequest.setDatasource(datasource); - datasourceProvider.initConnectionPool(datasourceRequest); - }catch (Exception e){ - e.printStackTrace(); - } - }); - }); - } } diff --git a/backend/src/main/java/io/dataease/listener/AppStartInitDataSourceListener.java b/backend/src/main/java/io/dataease/listener/AppStartInitDataSourceListener.java deleted file mode 100644 index 245d7c674a..0000000000 --- a/backend/src/main/java/io/dataease/listener/AppStartInitDataSourceListener.java +++ /dev/null @@ -1,22 +0,0 @@ -package io.dataease.listener; - -import io.dataease.datasource.service.DatasourceService; -import org.springframework.boot.context.event.ApplicationReadyEvent; -import org.springframework.context.ApplicationListener; -import org.springframework.core.annotation.Order; -import org.springframework.stereotype.Component; - -import javax.annotation.Resource; - -@Component -@Order(value = 2) -public class AppStartInitDataSourceListener implements ApplicationListener { - @Resource - private DatasourceService datasourceService; - @Override - public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) { - System.out.println("================= Init datasource connection pool ================="); - // 项目启动,从数据集中找到定时抽取的表,从HBase中读取放入缓存 - datasourceService.initAllDataSourceConnectionPool(); - } -} diff --git a/backend/src/main/java/io/dataease/service/dataset/DataSetTableService.java b/backend/src/main/java/io/dataease/service/dataset/DataSetTableService.java index b670fe1b0f..7e5f034ad0 100644 --- a/backend/src/main/java/io/dataease/service/dataset/DataSetTableService.java +++ b/backend/src/main/java/io/dataease/service/dataset/DataSetTableService.java @@ -240,9 +240,9 @@ public class DataSetTableService { datasourceRequest.setDatasource(ds); String sql = new Gson().fromJson(dataSetTableRequest.getInfo(), DataTableInfoDTO.class).getSql(); datasourceRequest.setQuery(sql); - ResultSet dataResultSet = datasourceProvider.getDataResultSet(datasourceRequest); - List data = datasourceProvider.fetchResult(dataResultSet); - List fields = datasourceProvider.fetchResultField(dataResultSet); + Map result = datasourceProvider.fetchResultAndField(datasourceRequest); + List data = result.get("dataList"); + List fields = result.get("fieldList"); String[] fieldArray = fields.stream().map(TableFiled::getFieldName).toArray(String[]::new); List> jsonArray = new ArrayList<>(); @@ -263,67 +263,6 @@ public class DataSetTableService { return map; } - public List getDataSetData(String datasourceId, String table, List fields) { - List data = new ArrayList<>(); - Datasource ds = datasourceMapper.selectByPrimaryKey(datasourceId); - DatasourceProvider datasourceProvider = ProviderFactory.getProvider(ds.getType()); - DatasourceRequest datasourceRequest = new DatasourceRequest(); - datasourceRequest.setDatasource(ds); - String[] fieldArray = fields.stream().map(DatasetTableField::getOriginName).toArray(String[]::new); - datasourceRequest.setQuery(createQuerySQL(ds.getType(), table, fieldArray) + " LIMIT 0, 10"); - try { - data.addAll(datasourceProvider.getData(datasourceRequest)); - } catch (Exception e) { - } - return data; - } - - public Long getDataSetTotalData(String datasourceId, String table) { - List data = new ArrayList<>(); - Datasource ds = datasourceMapper.selectByPrimaryKey(datasourceId); - DatasourceProvider datasourceProvider = ProviderFactory.getProvider(ds.getType()); - DatasourceRequest datasourceRequest = new DatasourceRequest(); - datasourceRequest.setDatasource(ds); - datasourceRequest.setQuery("select count(*) from " + table); - try { - return datasourceProvider.count(datasourceRequest); - } catch (Exception e) { - - } - return 0l; - } - - public List getDataSetPageData(String datasourceId, String table, List fields, Long startPage, Long pageSize) { - List data = new ArrayList<>(); - Datasource ds = datasourceMapper.selectByPrimaryKey(datasourceId); - DatasourceProvider datasourceProvider = ProviderFactory.getProvider(ds.getType()); - DatasourceRequest datasourceRequest = new DatasourceRequest(); - datasourceRequest.setDatasource(ds); - String[] fieldArray = fields.stream().map(DatasetTableField::getOriginName).toArray(String[]::new); - datasourceRequest.setPageSize(pageSize); - datasourceRequest.setStartPage(startPage); - datasourceRequest.setQuery(createQuerySQL(ds.getType(), table, fieldArray)); - try { - return datasourceProvider.getData(datasourceRequest); - } catch (Exception e) { - } - return data; - } - - public List getDataSetDataBySql(String datasourceId, String table, String sql) { - List data = new ArrayList<>(); - Datasource ds = datasourceMapper.selectByPrimaryKey(datasourceId); - DatasourceProvider datasourceProvider = ProviderFactory.getProvider(ds.getType()); - DatasourceRequest datasourceRequest = new DatasourceRequest(); - datasourceRequest.setDatasource(ds); - datasourceRequest.setQuery(sql); - try { - return datasourceProvider.getData(datasourceRequest); - } catch (Exception e) { - } - return data; - } - public void saveTableField(DatasetTable datasetTable) throws Exception { Datasource ds = datasourceMapper.selectByPrimaryKey(datasetTable.getDataSourceId()); DataSetTableRequest dataSetTableRequest = new DataSetTableRequest(); @@ -338,8 +277,7 @@ public class DataSetTableService { DatasourceRequest datasourceRequest = new DatasourceRequest(); datasourceRequest.setDatasource(ds); datasourceRequest.setQuery(new Gson().fromJson(dataSetTableRequest.getInfo(), DataTableInfoDTO.class).getSql()); - ResultSet dataResultSet = datasourceProvider.getDataResultSet(datasourceRequest); - fields = datasourceProvider.fetchResultField(dataResultSet); + fields = datasourceProvider.fetchResultField(datasourceRequest); } else if (StringUtils.equalsIgnoreCase(datasetTable.getType(), "excel")) { DataTableInfoDTO dataTableInfoDTO = new Gson().fromJson(dataSetTableRequest.getInfo(), DataTableInfoDTO.class); String path = dataTableInfoDTO.getData(); @@ -367,6 +305,7 @@ public class DataSetTableService { } else { datasetTableField.setDeType(transFieldType(ds.getType(), filed.getFieldType())); } + datasetTableField.setSize(filed.getFieldSize()); datasetTableField.setChecked(true); datasetTableField.setColumnIndex(i); datasetTableField.setLastSyncTime(syncTime); diff --git a/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java b/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java index 714521d31b..4d3123438a 100644 --- a/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java +++ b/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java @@ -7,20 +7,20 @@ import io.dataease.commons.constants.JobStatus; import io.dataease.commons.constants.ScheduleType; import io.dataease.commons.constants.UpdateType; import io.dataease.commons.utils.CommonBeanFactory; +import io.dataease.commons.utils.DorisTableUtils; import io.dataease.commons.utils.LogUtil; import io.dataease.datasource.constants.DatasourceTypes; import io.dataease.datasource.dto.MysqlConfigrationDTO; +import io.dataease.datasource.provider.JdbcProvider; +import io.dataease.datasource.request.DatasourceRequest; import io.dataease.dto.dataset.DataSetTaskLogDTO; import io.dataease.dto.dataset.DataTableInfoDTO; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; import org.pentaho.di.cluster.SlaveServer; import org.pentaho.di.core.database.DatabaseMeta; -import org.pentaho.di.core.plugins.PluginRegistry; -import org.pentaho.di.core.plugins.StepPluginType; +import org.pentaho.di.core.row.ValueMetaInterface; import org.pentaho.di.job.Job; import org.pentaho.di.job.JobExecutionConfiguration; import org.pentaho.di.job.JobHopMeta; @@ -34,18 +34,22 @@ import org.pentaho.di.repository.filerep.KettleFileRepository; import org.pentaho.di.trans.TransHopMeta; import org.pentaho.di.trans.TransMeta; import org.pentaho.di.trans.step.StepMeta; +import org.pentaho.di.trans.steps.sql.ExecSQLMeta; import org.pentaho.di.trans.steps.tableinput.TableInputMeta; -import org.pentaho.di.trans.steps.textfileoutput.TextFileField; -import org.pentaho.di.trans.steps.textfileoutput.TextFileOutputMeta; +import org.pentaho.di.trans.steps.tableoutput.TableOutputMeta; +import org.pentaho.di.trans.steps.userdefinedjavaclass.UserDefinedJavaClassDef; +import org.pentaho.di.trans.steps.userdefinedjavaclass.UserDefinedJavaClassMeta; import org.pentaho.di.www.SlaveServerJobStatus; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.io.File; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.stream.Collectors; @Service public class ExtractDataService { @@ -61,7 +65,6 @@ public class ExtractDataService { @Resource private DatasourceMapper datasourceMapper; private static ExecutorService pool = Executors.newScheduledThreadPool(50); //设置连接池 - private Connection connection; private static String lastUpdateTime = "${__last_update_time__}"; private static String currentUpdateTime = "${__current_update_time__}"; @@ -79,61 +82,83 @@ public class ExtractDataService { private String user; @Value("${carte.passwd:cluster}") private String passwd; - @Value("${hbase.zookeeper.quorum:zookeeper}") - private String zkHost; - @Value("${hbase.zookeeper.property.clientPort:2181}") - private String zkPort; -// @Resource -// private SparkCalc sparkCalc; + private static String creatTableSql = "CREATE TABLE IF NOT EXISTS TABLE_NAME" + + "Column_Fields" + + "PROPERTIES(\"replication_num\" = \"1\");"; + private String createDorisTablColumnSql( List datasetTableFields){ + String Column_Fields = "dataease_uuid varchar(50),"; + for (DatasetTableField datasetTableField : datasetTableFields) { + Column_Fields = Column_Fields + datasetTableField.getOriginName() + " "; + switch (datasetTableField.getDeType()){ + case 0: + Column_Fields = Column_Fields + "varchar(lenth)".replace("lenth", String.valueOf(datasetTableField.getSize())) + ","; + break; + case 1: + Column_Fields = Column_Fields + "varchar(lenth)".replace("lenth", String.valueOf(datasetTableField.getSize())) + ","; + break; + case 2: + Column_Fields = Column_Fields + "bigint(lenth)".replace("lenth", String.valueOf(datasetTableField.getSize())) + ","; + break; + case 3: + Column_Fields = Column_Fields + "DOUBLE(lenth)".replace("lenth", String.valueOf(datasetTableField.getSize())) + ","; + break; + default: + Column_Fields = Column_Fields + "varchar(lenth)".replace("lenth", String.valueOf(datasetTableField.getSize())) + ","; + break; + } + } + Column_Fields = Column_Fields.substring(0, Column_Fields.length() -1 ); + Column_Fields = "(" + Column_Fields + ")" + "DISTRIBUTED BY HASH(dataease_uuid) BUCKETS 10\n"; + return Column_Fields; + } + + private void createDorisTable(String dorisTableName, String dorisTablColumnSql) throws Exception{ + Datasource dorisDatasource = (Datasource)CommonBeanFactory.getBean("DorisDatasource"); + JdbcProvider jdbcProvider = CommonBeanFactory.getBean(JdbcProvider.class);; + DatasourceRequest datasourceRequest = new DatasourceRequest(); + datasourceRequest.setDatasource(dorisDatasource); + datasourceRequest.setQuery(creatTableSql.replace("TABLE_NAME", dorisTableName).replace("Column_Fields", dorisTablColumnSql)); + jdbcProvider.exec(datasourceRequest); + } + + private void replaceTable (String dorisTableName) throws Exception{ + Datasource dorisDatasource = (Datasource)CommonBeanFactory.getBean("DorisDatasource"); + JdbcProvider jdbcProvider = CommonBeanFactory.getBean(JdbcProvider.class);; + DatasourceRequest datasourceRequest = new DatasourceRequest(); + datasourceRequest.setDatasource(dorisDatasource); + datasourceRequest.setQuery("ALTER TABLE DORIS_TABLE REPLACE WITH TABLE DORIS_TMP_TABLE PROPERTIES('swap' = 'false');".replace("DORIS_TABLE", dorisTableName).replace("DORIS_TMP_TABLE", DorisTableUtils.doristmpName(dorisTableName))); + jdbcProvider.exec(datasourceRequest); + } public void extractData(String datasetTableId, String taskId, String type) { DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog(); UpdateType updateType = UpdateType.valueOf(type); try { -// Admin admin = getConnection().getAdmin(); DatasetTable datasetTable = dataSetTableService.get(datasetTableId); Datasource datasource = datasourceMapper.selectByPrimaryKey(datasetTable.getDataSourceId()); List datasetTableFields = dataSetTableFieldsService.list(DatasetTableField.builder().tableId(datasetTable.getId()).build()); - String table = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getTable(); - TableName hbaseTable = TableName.valueOf(datasetTableId); + String tableName = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getTable(); + String dorisTablColumnSql = createDorisTablColumnSql(datasetTableFields); switch (updateType) { // 全量更新 case all_scope: writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId); - - //check pentaho_mappings table -// TableName pentaho_mappings = TableName.valueOf(this.pentaho_mappings); -// if (!admin.tableExists(pentaho_mappings)) { -// creatHaseTable(pentaho_mappings, admin, Arrays.asList("columns", "key")); -// } - - //check pentaho files - if (!isExitFile("job_" + datasetTableId + ".kjb") || !isExitFile("trans_" + datasetTableId + ".ktr")) { - generateTransFile("all_scope", datasetTable, datasource, table, datasetTableFields, null); - generateJobFile("all_scope", datasetTable); - } - -// if (!admin.tableExists(hbaseTable)) { -// creatHaseTable(hbaseTable, admin, Arrays.asList(dataease_column_family)); -// } -// admin.disableTable(hbaseTable); -// admin.truncateTable(hbaseTable, true); - + // TODO before: check doris table column type + createDorisTable(DorisTableUtils.dorisName(datasetTableId), dorisTablColumnSql); + createDorisTable(DorisTableUtils.doristmpName(DorisTableUtils.dorisName(datasetTableId)), dorisTablColumnSql); + generateTransFile("all_scope", datasetTable, datasource, tableName, datasetTableFields, null); + generateJobFile("all_scope", datasetTable); extractData(datasetTable, "all_scope"); - // after sync complete,read data to cache from HBase -// sparkCalc.getHBaseDataAndCache(datasetTableId, dataSetTableFieldsService.getFieldsByTableId(datasetTableId)); + replaceTable(DorisTableUtils.dorisName(datasetTableId)); datasetTableTaskLog.setStatus(JobStatus.Completed.name()); datasetTableTaskLog.setEndTime(System.currentTimeMillis()); dataSetTableTaskLogService.save(datasetTableTaskLog); break; + + // 增量更新 case add_scope: - // 增量更新 -// if (!admin.tableExists(hbaseTable)) { -// LogUtil.error("TableName error, dataaset: " + datasetTableId); -// return; -// } DatasetTableIncrementalConfig datasetTableIncrementalConfig = dataSetTableService.incrementalConfig(datasetTableId); if (datasetTableIncrementalConfig == null || StringUtils.isEmpty(datasetTableIncrementalConfig.getTableId())) { return; @@ -149,15 +174,10 @@ public class ExtractDataService { // 增量添加 if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd().replace(" ", ""))) { - System.out.println("datasetTableIncrementalConfig.getIncrementalAdd(): " + datasetTableIncrementalConfig.getIncrementalAdd()); String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString() .replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString())); - - if (!isExitFile("job_add_" + datasetTableId + ".kjb") || !isExitFile("trans_add_" + datasetTableId + ".ktr")) { - generateTransFile("incremental_add", datasetTable, datasource, table, datasetTableFields, sql); - generateJobFile("incremental_add", datasetTable); - } - + generateTransFile("incremental_add", datasetTable, datasource, tableName, datasetTableFields, sql); + generateJobFile("incremental_add", datasetTable); extractData(datasetTable, "incremental_add"); } @@ -165,14 +185,10 @@ public class ExtractDataService { if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete())) { String sql = datasetTableIncrementalConfig.getIncrementalDelete().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString() .replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString())); - if (!isExitFile("job_delete_" + datasetTableId + ".kjb") || !isExitFile("trans_delete_" + datasetTableId + ".ktr")) { - generateTransFile("incremental_delete", datasetTable, datasource, table, datasetTableFields, sql); - generateJobFile("incremental_delete", datasetTable); - } + generateTransFile("incremental_delete", datasetTable, datasource, tableName, datasetTableFields, sql); + generateJobFile("incremental_delete", datasetTable); extractData(datasetTable, "incremental_delete"); } - // after sync complete,read data to cache from HBase -// sparkCalc.getHBaseDataAndCache(datasetTableId, dataSetTableFieldsService.getFieldsByTableId(datasetTableId)); datasetTableTaskLog.setStatus(JobStatus.Completed.name()); datasetTableTaskLog.setEndTime(System.currentTimeMillis()); dataSetTableTaskLogService.save(datasetTableTaskLog); @@ -202,18 +218,6 @@ public class ExtractDataService { dataSetTableTaskLogService.save(datasetTableTaskLog); } -// private void creatHaseTable(TableName tableName, Admin admin, List columnFamily) throws Exception { -// TableDescriptorBuilder descBuilder = TableDescriptorBuilder.newBuilder(tableName); -// Collection families = new ArrayList<>(); -// for (String s : columnFamily) { -// ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(s); -// families.add(hcd); -// } -// descBuilder.setColumnFamilies(families); -// TableDescriptor desc = descBuilder.build(); -// admin.createTable(desc); -// } - private void extractData(DatasetTable datasetTable, String extractType) throws Exception { KettleFileRepository repository = CommonBeanFactory.getBean(KettleFileRepository.class); RepositoryDirectoryInterface repositoryDirectoryInterface = repository.loadRepositoryDirectoryTree(); @@ -248,14 +252,6 @@ public class ExtractDataService { } } -// private synchronized Connection getConnection() throws Exception { -// if (connection == null || connection.isClosed()) { -// Configuration cfg = CommonBeanFactory.getBean(Configuration.class); -// connection = ConnectionFactory.createConnection(cfg, pool); -// } -// return connection; -// } - private boolean isExitFile(String fileName) { File file = new File(root_path + fileName); return file.exists(); @@ -338,33 +334,18 @@ public class ExtractDataService { } private void generateTransFile(String extractType, DatasetTable datasetTable, Datasource datasource, String table, List datasetTableFields, String selectSQL) throws Exception { - TransMeta transMeta = new TransMeta(); - String transName = null; - switch (extractType) { - case "all_scope": - transName = "trans_" + datasetTable.getId(); - datasetTableFields.sort((o1, o2) -> { - if (o1.getOriginName() == null) { - return -1; - } - if (o2.getOriginName() == null) { - return 1; - } - return o1.getOriginName().compareTo(o2.getOriginName()); - }); - selectSQL = dataSetTableService.createQuerySQL(datasource.getType(), table, datasetTableFields.stream().map(DatasetTableField::getOriginName).toArray(String[]::new)); - break; - case "incremental_add": - transName = "trans_add_" + datasetTable.getId(); - break; - case "incremental_delete": - transName = "trans_delete_" + datasetTable.getId(); - break; - default: - break; - } + datasetTableFields.sort((o1, o2) -> { + if (o1.getOriginName() == null) { + return -1; + } + if (o2.getOriginName() == null) { + return 1; + } + return o1.getOriginName().compareTo(o2.getOriginName()); + }); - transMeta.setName(transName); + TransMeta transMeta = new TransMeta(); + String dorisOutputTable = DorisTableUtils.doristmpName(DorisTableUtils.dorisName(datasetTable.getId())); DatasourceTypes datasourceType = DatasourceTypes.valueOf(datasource.getType()); DatabaseMeta dataMeta = null; switch (datasourceType) { @@ -377,124 +358,126 @@ public class ExtractDataService { break; } - //registry是给每个步骤生成一个标识id - PluginRegistry registry = PluginRegistry.getInstance(); - //第一个表输入步骤(TableInputMeta) - TableInputMeta tableInput = new TableInputMeta(); - - //给表输入添加一个DatabaseMeta连接数据库 - DatabaseMeta database_bjdt = transMeta.findDatabase("db"); - tableInput.setDatabaseMeta(database_bjdt); - tableInput.setSQL(selectSQL); - //添加TableInputMeta到转换中 - String tableInputPluginId = registry.getPluginId(StepPluginType.class, tableInput); - StepMeta fromStep = new StepMeta(tableInputPluginId, "Data Input", tableInput); - //给步骤添加在spoon工具中的显示位置 - fromStep.setDraw(true); - fromStep.setLocation(100, 100); - transMeta.addStep(fromStep); - - //第二个 (TextFileOutput) - TextFileOutputMeta textFileOutputMeta = new TextFileOutputMeta(); - textFileOutputMeta.setFilename(data_path + datasetTable.getId()); - textFileOutputMeta.setExtension("txt"); - textFileOutputMeta.setSeparator(";"); - textFileOutputMeta.setFileCompression("None"); - textFileOutputMeta.setEnclosure("\""); - textFileOutputMeta.setEncoding("UTF-8"); - TextFileField[] outputFields = new TextFileField[1]; - outputFields[0] = new TextFileField(); - textFileOutputMeta.setOutputFields(outputFields); - - StepMeta tostep = new StepMeta("TextFileOutput", "TextFileOutput", textFileOutputMeta); - tostep.setLocation(600, 100); - tostep.setDraw(true); - transMeta.addStep(tostep); - TransHopMeta hi1 = new TransHopMeta(fromStep, tostep); - transMeta.addTransHop(hi1); - - -// //第二个 (User defined Java class) -// UserDefinedJavaClassMeta userDefinedJavaClassMeta = new UserDefinedJavaClassMeta(); -// List fields = new ArrayList<>(); -// UserDefinedJavaClassMeta.FieldInfo fieldInfo = new UserDefinedJavaClassMeta.FieldInfo("uuid", ValueMetaInterface.TYPE_STRING, -1, -1); -// fields.add(fieldInfo); -// userDefinedJavaClassMeta.setFieldInfo(fields); -// List definitions = new ArrayList(); -// UserDefinedJavaClassDef userDefinedJavaClassDef = new UserDefinedJavaClassDef(UserDefinedJavaClassDef.ClassType.TRANSFORM_CLASS, "Processor", code); -// userDefinedJavaClassDef.setActive(true); -// definitions.add(userDefinedJavaClassDef); -// userDefinedJavaClassMeta.replaceDefinitions(definitions); -// -// StepMeta userDefinedJavaClassStep = new StepMeta("UserDefinedJavaClass", "UserDefinedJavaClass", userDefinedJavaClassMeta); -// userDefinedJavaClassStep.setLocation(300, 100); -// userDefinedJavaClassStep.setDraw(true); -// transMeta.addStep(userDefinedJavaClassStep); -// -// //第三个 (HBaseOutputMeta) -// NamedClusterService namedClusterService = new NamedClusterManager(); -// NamedCluster clusterTemplate = new NamedClusterImpl(); -// clusterTemplate.setName("hadoop"); -// clusterTemplate.setZooKeeperHost(zkHost); -// clusterTemplate.setZooKeeperPort(zkPort); -// clusterTemplate.setStorageScheme("HDFS"); -// namedClusterService.setClusterTemplate(clusterTemplate); -// -// List providers = new ArrayList<>(); -// ClusterInitializer clusterInitializer = new ClusterInitializerImpl(providers); -// NamedClusterServiceLocator namedClusterServiceLocator = new NamedClusterServiceLocatorImpl(clusterInitializer); -// -// List runtimeTestActionHandlers = new ArrayList<>(); -// RuntimeTestActionHandler defaultHandler = null; -// -// RuntimeTestActionService runtimeTestActionService = new RuntimeTestActionServiceImpl(runtimeTestActionHandlers, defaultHandler); -// RuntimeTester runtimeTester = new RuntimeTesterImpl(new ArrayList<>(Arrays.asList(mock(RuntimeTest.class))), mock(ExecutorService.class), "modules"); -// -// Put put = new Put((datasetTable.getId() + "," + "target_mapping").getBytes()); -// for (DatasetTableField datasetTableField : datasetTableFields) { -// put.addColumn("columns".getBytes(), (dataease_column_family + "," + datasetTableField.getOriginName() + "," + datasetTableField.getOriginName()).getBytes(), transToColumnType(datasetTableField.getDeType()).getBytes()); -// } -// put.addColumn("key".getBytes(), "uuid".getBytes(), "String".getBytes()); -// TableName pentaho_mappings = TableName.valueOf(this.pentaho_mappings); -// Table tab = getConnection().getTable(pentaho_mappings); -// tab.put(put); -// -// HBaseOutputMeta hBaseOutputMeta = new HBaseOutputMeta(namedClusterService, namedClusterServiceLocator, runtimeTestActionService, runtimeTester); -// hBaseOutputMeta.setTargetTableName(datasetTable.getId()); -// hBaseOutputMeta.setTargetMappingName("target_mapping"); -// hBaseOutputMeta.setNamedCluster(clusterTemplate); -// hBaseOutputMeta.setCoreConfigURL(hbase_conf_file); -// hBaseOutputMeta.setDisableWriteToWAL(true); -// hBaseOutputMeta.setWriteBufferSize("31457280"); //30M -// if (extractType.equalsIgnoreCase("incremental_delete")) { -// hBaseOutputMeta.setDeleteRowKey(true); -// } -// StepMeta tostep = new StepMeta("HBaseOutput", "HBaseOutput", hBaseOutputMeta); -// tostep.setLocation(600, 100); -// -// tostep.setDraw(true); -// transMeta.addStep(tostep); -// TransHopMeta hi1 = new TransHopMeta(fromStep, userDefinedJavaClassStep); -// TransHopMeta hi2 = new TransHopMeta(userDefinedJavaClassStep, tostep); -// transMeta.addTransHop(hi1); -// transMeta.addTransHop(hi2); + Datasource dorisDatasource = (Datasource)CommonBeanFactory.getBean("DorisDatasource"); + MysqlConfigrationDTO dorisConfigration = new Gson().fromJson(dorisDatasource.getConfiguration(), MysqlConfigrationDTO.class); + DatabaseMeta dorisDataMeta = new DatabaseMeta("doris", "MYSQL", "Native", dorisConfigration.getHost(), dorisConfigration.getDataBase(), dorisConfigration.getPort().toString(), dorisConfigration.getUsername(), dorisConfigration.getPassword()); + transMeta.addDatabase(dorisDataMeta); + StepMeta inputStep = null; + StepMeta outputStep = null; + StepMeta udjcStep = null; + TransHopMeta hi1 = null; + TransHopMeta hi2 = null; + String transName = null; + switch (extractType) { + case "all_scope": + transName = "trans_" + datasetTable.getId(); + selectSQL = dataSetTableService.createQuerySQL(datasource.getType(), table, datasetTableFields.stream().map(DatasetTableField::getOriginName).toArray(String[]::new)); + transMeta.setName(transName); + inputStep = inputStep(transMeta, selectSQL); + udjcStep = udjc(datasetTableFields); + outputStep = outputStep(transMeta, dorisOutputTable); + hi1 = new TransHopMeta(inputStep, udjcStep); + hi2 = new TransHopMeta(udjcStep, outputStep); + transMeta.addTransHop(hi1); + transMeta.addTransHop(hi2); + transMeta.addStep(inputStep); + transMeta.addStep(udjcStep); + transMeta.addStep(outputStep); + break; + case "incremental_add": + transName = "trans_add_" + datasetTable.getId(); + dorisOutputTable = DorisTableUtils.dorisName(datasetTable.getId()); + transMeta.setName(transName); + inputStep = inputStep(transMeta, selectSQL); + udjcStep = udjc(datasetTableFields); + outputStep = outputStep(transMeta, dorisOutputTable); + hi1 = new TransHopMeta(inputStep, udjcStep); + hi2 = new TransHopMeta(udjcStep, outputStep); + transMeta.addTransHop(hi1); + transMeta.addTransHop(hi2); + transMeta.addStep(inputStep); + transMeta.addStep(udjcStep); + transMeta.addStep(outputStep); + break; + case "incremental_delete": + dorisOutputTable = DorisTableUtils.dorisName(datasetTable.getId()); + transName = "trans_delete_" + datasetTable.getId(); + transMeta.setName(transName); + inputStep = inputStep(transMeta, selectSQL); + udjcStep = udjc(datasetTableFields); + outputStep = execSqlStep(transMeta, dorisOutputTable, datasetTableFields); + hi1 = new TransHopMeta(inputStep, udjcStep); + hi2 = new TransHopMeta(udjcStep, outputStep); + transMeta.addTransHop(hi1); + transMeta.addTransHop(hi2); + transMeta.addStep(inputStep); + transMeta.addStep(udjcStep); + transMeta.addStep(outputStep); + break; + default: + break; + } String transXml = transMeta.getXML(); File file = new File(root_path + transName + ".ktr"); FileUtils.writeStringToFile(file, transXml, "UTF-8"); } - public String transToColumnType(Integer field) { - switch (field) { - case 0: - return "String"; - case 1: - return "Date"; - case 2: - return "BigNumber"; - default: - return "String"; - } + private StepMeta inputStep(TransMeta transMeta, String selectSQL){ + TableInputMeta tableInput = new TableInputMeta(); + DatabaseMeta database = transMeta.findDatabase("db"); + tableInput.setDatabaseMeta(database); + tableInput.setSQL(selectSQL); + StepMeta fromStep = new StepMeta("TableInput", "Data Input", tableInput); + fromStep.setDraw(true); + fromStep.setLocation(100, 100); + return fromStep; + } + + private StepMeta outputStep(TransMeta transMeta, String dorisOutputTable){ + TableOutputMeta tableOutputMeta = new TableOutputMeta(); + DatabaseMeta dorisDatabaseMeta = transMeta.findDatabase("doris"); + tableOutputMeta.setDatabaseMeta(dorisDatabaseMeta); + tableOutputMeta.setTableName(dorisOutputTable); + tableOutputMeta.setCommitSize(10000); + tableOutputMeta.setUseBatchUpdate(true); + StepMeta outputStep = new StepMeta("TableOutput", "TableOutput", tableOutputMeta); + outputStep.setLocation(600, 100); + outputStep.setDraw(true); + return outputStep; + } + + private StepMeta udjc(List datasetTableFields){ + UserDefinedJavaClassMeta userDefinedJavaClassMeta = new UserDefinedJavaClassMeta(); + List fields = new ArrayList<>(); + UserDefinedJavaClassMeta.FieldInfo fieldInfo = new UserDefinedJavaClassMeta.FieldInfo("dataease_uuid", ValueMetaInterface.TYPE_STRING, -1, -1); + fields.add(fieldInfo); + userDefinedJavaClassMeta.setFieldInfo(fields); + List definitions = new ArrayList(); + UserDefinedJavaClassDef userDefinedJavaClassDef = new UserDefinedJavaClassDef(UserDefinedJavaClassDef.ClassType.TRANSFORM_CLASS, "Processor", + code.replace("Column_Fields", String.join(",", datasetTableFields.stream().map(DatasetTableField::getOriginName).collect(Collectors.toList())))); + userDefinedJavaClassDef.setActive(true); + definitions.add(userDefinedJavaClassDef); + userDefinedJavaClassMeta.replaceDefinitions(definitions); + + StepMeta userDefinedJavaClassStep = new StepMeta("UserDefinedJavaClass", "UserDefinedJavaClass", userDefinedJavaClassMeta); + userDefinedJavaClassStep.setLocation(300, 100); + userDefinedJavaClassStep.setDraw(true); + return userDefinedJavaClassStep; + } + + private StepMeta execSqlStep(TransMeta transMeta, String dorisOutputTable, ListdatasetTableFields){ + ExecSQLMeta execSQLMeta = new ExecSQLMeta(); + DatabaseMeta dorisDatabaseMeta = transMeta.findDatabase("doris"); + execSQLMeta.setDatabaseMeta(dorisDatabaseMeta); + String deleteSql = "delete from DORIS_TABLE where dataease_uuid='?';".replace("DORIS_TABLE", dorisOutputTable); + execSQLMeta.setSql(deleteSql); + execSQLMeta.setExecutedEachInputRow(true); + execSQLMeta.setArguments(new String[]{"dataease_uuid"}); + StepMeta execSQLStep = new StepMeta("ExecSQL", "ExecSQL", execSQLMeta); + execSQLStep.setLocation(600, 100); + execSQLStep.setDraw(true); + return execSQLStep; } private static String code = "import org.pentaho.di.core.row.ValueMetaInterface;\n" + @@ -506,29 +489,11 @@ public class ExtractDataService { "import java.util.List;\n" + "import java.util.concurrent.ExecutorService;\n" + "import java.util.concurrent.Executors;\n" + + "import org.pentaho.di.core.util.StringUtil;\n" + "\n" + "public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {\n" + " if (first) {\n" + " first = false;\n" + - "\n" + - " /* TODO: Your code here. (Using info fields)\n" + - "\n" + - " FieldHelper infoField = get(Fields.Info, \"info_field_name\");\n" + - "\n" + - " RowSet infoStream = findInfoRowSet(\"info_stream_tag\");\n" + - "\n" + - " Object[] infoRow = null;\n" + - "\n" + - " int infoRowCount = 0;\n" + - "\n" + - " // Read all rows from info step before calling getRow() method, which returns first row from any\n" + - " // input rowset. As rowMeta for info and input steps varies getRow() can lead to errors.\n" + - " while((infoRow = getRowFrom(infoStream)) != null){\n" + - "\n" + - " // do something with info data\n" + - " infoRowCount++;\n" + - " }\n" + - " */\n" + " }\n" + "\n" + " Object[] r = getRow();\n" + @@ -538,19 +503,17 @@ public class ExtractDataService { " return false;\n" + " }\n" + "\n" + - " // It is always safest to call createOutputRow() to ensure that your output row's Object[] is large\n" + - " // enough to handle any new fields you are creating in this step.\n" + " r = createOutputRow(r, data.outputRowMeta.size());\n" + " String str = \"\";\n" + - " List valueMetaList = data.outputRowMeta.getValueMetaList();\n" + - " for (ValueMetaInterface valueMetaInterface : valueMetaList) {\n" + - "\t if(!valueMetaInterface.getName().equalsIgnoreCase(\"uuid\")){\n" + - " str = str + get(Fields.In, valueMetaInterface.getName()).getString(r);\n" + - " }\n" + - " }\n" + + "\n" + + " List fileds = Arrays.asList(\"Column_Fields\".split(\",\"));\n" + + " for (String filed : fileds) {\n" + + " String tmp = get(Fields.In, filed).getString(r);\n" + + " str = str + tmp;\n" + + " }\n" + "\n" + " String md5 = md5(str);\n" + - " get(Fields.Out, \"uuid\").setValue(r, md5);\n" + + " get(Fields.Out, \"dataease_uuid\").setValue(r, md5);\n" + "\n" + " putRow(data.outputRowMeta, r);\n" + "\n" + @@ -590,6 +553,6 @@ public class ExtractDataService { " str = str + d[i];\n" + " }\n" + " return str;\n" + - " }\n"; + " }"; } diff --git a/backend/src/main/java/io/dataease/service/spark/CacheUtil.java b/backend/src/main/java/io/dataease/service/spark/CacheUtil.java deleted file mode 100644 index 56986f5cb5..0000000000 --- a/backend/src/main/java/io/dataease/service/spark/CacheUtil.java +++ /dev/null @@ -1,53 +0,0 @@ -package io.dataease.service.spark; - -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; - -import java.util.HashMap; -import java.util.Map; - -/** - * @Author gin - * @Date 2021/4/13 12:32 下午 - */ -public class CacheUtil { - private static CacheUtil cacheUtil; - private static Map> cacheMap; - - private CacheUtil(){ - cacheMap = new HashMap>(); - } - - public static CacheUtil getInstance(){ - if (cacheUtil == null){ - cacheUtil = new CacheUtil(); - } - return cacheUtil; - } - - /** - * 添加缓存 - * @param key - * @param obj - */ - public void addCacheData(String key,Dataset obj){ - cacheMap.put(key,obj); - } - - /** - * 取出缓存 - * @param key - * @return - */ - public Dataset getCacheData(String key){ - return cacheMap.get(key); - } - - /** - * 清楚缓存 - * @param key - */ - public void removeCacheData(String key){ - cacheMap.remove(key); - } -} diff --git a/backend/src/main/java/io/dataease/service/spark/SparkCalc.java b/backend/src/main/java/io/dataease/service/spark/SparkCalc.java deleted file mode 100644 index f75dd84fb5..0000000000 --- a/backend/src/main/java/io/dataease/service/spark/SparkCalc.java +++ /dev/null @@ -1,407 +0,0 @@ -//package io.dataease.service.spark; -// -//import io.dataease.base.domain.DatasetTableField; -//import io.dataease.commons.utils.CommonBeanFactory; -//import io.dataease.controller.request.chart.ChartExtFilterRequest; -//import io.dataease.dto.chart.ChartViewFieldDTO; -//import org.antlr.analysis.MachineProbe; -//import org.apache.commons.collections4.CollectionUtils; -//import org.apache.commons.lang3.ObjectUtils; -//import org.apache.commons.lang3.StringUtils; -//import org.apache.hadoop.hbase.client.Result; -//import org.apache.hadoop.hbase.client.Scan; -//import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -//import org.apache.hadoop.hbase.mapreduce.TableInputFormat; -//import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -//import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; -//import org.apache.hadoop.hbase.util.Bytes; -//import org.apache.spark.api.java.JavaPairRDD; -//import org.apache.spark.api.java.JavaRDD; -//import org.apache.spark.api.java.JavaSparkContext; -//import org.apache.spark.api.java.function.FlatMapFunction; -//import org.apache.spark.api.java.function.Function; -//import org.apache.spark.sql.*; -//import org.apache.spark.sql.types.DataTypes; -//import org.apache.spark.sql.types.StructField; -//import org.apache.spark.sql.types.StructType; -//import org.apache.spark.storage.StorageLevel; -//import org.springframework.core.env.Environment; -//import org.springframework.stereotype.Service; -//import scala.Tuple2; -// -//import javax.annotation.Resource; -//import java.math.BigDecimal; -//import java.text.MessageFormat; -//import java.util.*; -// -///** -// * @Author gin -// * @Date 2021/3/26 3:49 下午 -// */ -//@Service -//public class SparkCalc { -// private static String column_family = "dataease"; -// private static String data_path = "/opt/dataease/data/db/"; -// @Resource -// private Environment env; // 保存了配置文件的信息 -// -// public List getData(String hTable, List fields, List xAxis, List yAxis, String tmpTable, List requestList) throws Exception { -// // Spark Context -// SparkSession spark = CommonBeanFactory.getBean(SparkSession.class); -// JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext()); -// -// // Spark SQL Context -// SQLContext sqlContext = new SQLContext(sparkContext); -// sqlContext.setConf("spark.sql.shuffle.partitions", env.getProperty("spark.sql.shuffle.partitions", "1")); -// sqlContext.setConf("spark.default.parallelism", env.getProperty("spark.default.parallelism", "1")); -// -// /*Map dataFrame = getData(sparkContext, sqlContext, hTable, fields); -// List data = new ArrayList<>(); -// Iterator> iterator = dataFrame.entrySet().iterator(); -// while (iterator.hasNext()) { -// String[] r = new String[2]; -// Map.Entry next = iterator.next(); -// String key = next.getKey(); -// BigDecimal value = next.getValue(); -// r[0] = key; -// r[1] = value.toString(); -// data.add(r); -// }*/ -// -//// Dataset dataFrame = getData(sparkContext, sqlContext, hTable, fields); -// Dataset dataFrame = CacheUtil.getInstance().getCacheData(hTable); -// if (ObjectUtils.isEmpty(dataFrame)) { -// dataFrame = getHBaseDataAndCache(sparkContext, sqlContext, hTable, fields); -// } -// -// dataFrame.createOrReplaceTempView(tmpTable); -// Dataset sql = sqlContext.sql(getSQL(xAxis, yAxis, tmpTable, requestList)); -// // transform -// List data = new ArrayList<>(); -// List list = sql.collectAsList(); -// for (Row row : list) { -// String[] r = new String[row.length()]; -// for (int i = 0; i < row.length(); i++) { -// r[i] = row.get(i) == null ? "null" : row.get(i).toString(); -// } -// data.add(r); -// } -// return data; -// } -// -// public Dataset getHBaseDataAndCache(String hTable, List fields) throws Exception { -// // Spark Context -// SparkSession spark = CommonBeanFactory.getBean(SparkSession.class); -// JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext()); -// -// // Spark SQL Context -// SQLContext sqlContext = new SQLContext(sparkContext); -// sqlContext.setConf("spark.sql.shuffle.partitions", env.getProperty("spark.sql.shuffle.partitions", "1")); -// sqlContext.setConf("spark.default.parallelism", env.getProperty("spark.default.parallelism", "1")); -// return getHBaseDataAndCache(sparkContext, sqlContext, hTable, fields); -// } -// -// public Map getData(JavaSparkContext sparkContext, SQLContext sqlContext, String tableId, List fields) throws Exception { -// fields.sort((o1, o2) -> { -// if (o1.getOriginName() == null) { -// return -1; -// } -// if (o2.getOriginName() == null) { -// return 1; -// } -// return o1.getOriginName().compareTo(o2.getOriginName()); -// }); -// -// JavaRDD pairRDD = sparkContext.textFile(data_path + tableId + ".txt"); -//// System.out.println(pairRDD.count()); -// -//// JavaRDD> rdd = pairRDD.map((Function>) v1 -> { -//// Map map = new HashMap<>(); -//// String[] items = v1.split(";"); -//// String day = null; -//// BigDecimal res = new BigDecimal(0); -//// for (int i = 0; i < items.length; i++) { -//// String l = items[i]; -//// DatasetTableField x = fields.get(i); -//// if (x.getOriginName().equalsIgnoreCase("sync_day")) { -//// day = l; -//// } -//// if (x.getOriginName().equalsIgnoreCase("usage_cost")) { -//// res = new BigDecimal(l); -//// } -//// } -//// BigDecimal bigDecimal = map.get(day); -//// if (bigDecimal == null) { -//// map.put(day, res); -//// } else { -//// map.put(day, bigDecimal.add(res)); -//// } -//// return map.entrySet().iterator().next(); -//// }); -// -// JavaRDD> rdd = pairRDD.mapPartitions((FlatMapFunction, Map.Entry>) tuple2Iterator -> { -// Map map = new HashMap<>(); -// while (tuple2Iterator.hasNext()) { -// String[] items = tuple2Iterator.next().split(";"); -// String day = null; -// BigDecimal res = new BigDecimal(0); -// for (int i = 0; i < items.length; i++) { -// String l = items[i]; -// DatasetTableField x = fields.get(i); -// if (x.getOriginName().equalsIgnoreCase("sync_day")) { -// day = l; -// } -// if (x.getOriginName().equalsIgnoreCase("usage_cost")) { -// res = new BigDecimal(l); -// } -// } -// BigDecimal bigDecimal = map.get(day); -// if (bigDecimal == null) { -// map.put(day, res); -// } else { -// map.put(day, bigDecimal.add(res)); -// } -// } -// return map.entrySet().iterator(); -// }); -// -// -//// System.out.println(rdd.count()); -// -// Map map = new HashMap<>(); -// List> collect = rdd.collect(); -//// System.out.println(collect.size()); -// -// collect.forEach(stringBigDecimalEntry -> { -// String key = stringBigDecimalEntry.getKey(); -// BigDecimal value = stringBigDecimalEntry.getValue(); -// -// BigDecimal bigDecimal = map.get(key); -// if (bigDecimal == null) { -// map.put(key, value); -// } else { -// map.put(key, bigDecimal.add(value)); -// } -// }); -// -// return map; -// } -// -//// public Dataset getData(JavaSparkContext sparkContext, SQLContext sqlContext, String tableId, List fields) throws Exception { -//// fields.sort((o1, o2) -> { -//// if (o1.getOriginName() == null) { -//// return -1; -//// } -//// if (o2.getOriginName() == null) { -//// return 1; -//// } -//// return o1.getOriginName().compareTo(o2.getOriginName()); -//// }); -//// -//// JavaRDD pairRDD = sparkContext.textFile(data_path + tableId + ".txt"); -//// -//// JavaRDD rdd = pairRDD.mapPartitions((FlatMapFunction, Row>) tuple2Iterator -> { -//// List iterator = new ArrayList<>(); -//// while (tuple2Iterator.hasNext()) { -//// String[] items = tuple2Iterator.next().split(";"); -//// List list = new ArrayList<>(); -//// for (int i = 0; i < items.length; i++) { -//// String l = items[i]; -//// DatasetTableField x = fields.get(i); -//// if (x.getDeType() == 0 || x.getDeType() == 1) { -//// list.add(l); -//// } else if (x.getDeType() == 2) { -//// if (StringUtils.isEmpty(l)) { -//// l = "0"; -//// } -//// if (StringUtils.equalsIgnoreCase(l, "Y")) { -//// l = "1"; -//// } -//// if (StringUtils.equalsIgnoreCase(l, "N")) { -//// l = "0"; -//// } -//// list.add(Long.valueOf(l)); -//// } else if (x.getDeType() == 3) { -//// if (StringUtils.isEmpty(l)) { -//// l = "0.0"; -//// } -//// list.add(Double.valueOf(l)); -//// } -//// } -//// iterator.add(RowFactory.create(list.toArray())); -//// } -//// return iterator.iterator(); -//// }); -//// -//// List structFields = new ArrayList<>(); -//// // struct顺序要与rdd顺序一致 -//// fields.forEach(x -> { -//// if (x.getDeType() == 0 || x.getDeType() == 1) { -//// structFields.add(DataTypes.createStructField(x.getOriginName(), DataTypes.StringType, true)); -//// } else if (x.getDeType() == 2) { -//// structFields.add(DataTypes.createStructField(x.getOriginName(), DataTypes.LongType, true)); -//// } else if (x.getDeType() == 3) { -//// structFields.add(DataTypes.createStructField(x.getOriginName(), DataTypes.DoubleType, true)); -//// } -//// }); -//// StructType structType = DataTypes.createStructType(structFields); -//// -//// Dataset dataFrame = sqlContext.createDataFrame(rdd, structType); -//// return dataFrame; -//// } -// -// public Dataset getHBaseDataAndCache(JavaSparkContext sparkContext, SQLContext sqlContext, String hTable, List fields) throws Exception { -// Scan scan = new Scan(); -// scan.addFamily(Bytes.toBytes(column_family)); -// for (DatasetTableField field : fields) { -// scan.addColumn(Bytes.toBytes(column_family), Bytes.toBytes(field.getOriginName())); -// } -// ClientProtos.Scan proto = ProtobufUtil.toScan(scan); -// String scanToString = new String(Base64.getEncoder().encode(proto.toByteArray())); -// -// // HBase config -// org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); -// conf.set("hbase.zookeeper.quorum", env.getProperty("hbase.zookeeper.quorum")); -// conf.set("hbase.zookeeper.property.clientPort", env.getProperty("hbase.zookeeper.property.clientPort")); -// conf.set("hbase.client.retries.number", env.getProperty("hbase.client.retries.number", "1")); -// conf.set(TableInputFormat.INPUT_TABLE, hTable); -// conf.set(TableInputFormat.SCAN, scanToString); -// -// JavaPairRDD pairRDD = sparkContext.newAPIHadoopRDD(conf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); -// -// JavaRDD rdd = pairRDD.mapPartitions((FlatMapFunction>, Row>) tuple2Iterator -> { -// List iterator = new ArrayList<>(); -// while (tuple2Iterator.hasNext()) { -// Result result = tuple2Iterator.next()._2; -// List list = new ArrayList<>(); -// fields.forEach(x -> { -// String l = Bytes.toString(result.getValue(column_family.getBytes(), x.getOriginName().getBytes())); -// if (x.getDeType() == 0 || x.getDeType() == 1) { -// list.add(l); -// } else if (x.getDeType() == 2) { -// if (StringUtils.isEmpty(l)) { -// l = "0"; -// } -// list.add(Long.valueOf(l)); -// } else if (x.getDeType() == 3) { -// if (StringUtils.isEmpty(l)) { -// l = "0.0"; -// } -// list.add(Double.valueOf(l)); -// } -// }); -// iterator.add(RowFactory.create(list.toArray())); -// } -// return iterator.iterator(); -// }); -// -// List structFields = new ArrayList<>(); -// // struct顺序要与rdd顺序一致 -// fields.forEach(x -> { -// if (x.getDeType() == 0 || x.getDeType() == 1) { -// structFields.add(DataTypes.createStructField(x.getOriginName(), DataTypes.StringType, true)); -// } else if (x.getDeType() == 2) { -// structFields.add(DataTypes.createStructField(x.getOriginName(), DataTypes.LongType, true)); -// } else if (x.getDeType() == 3) { -// structFields.add(DataTypes.createStructField(x.getOriginName(), DataTypes.DoubleType, true)); -// } -// }); -// StructType structType = DataTypes.createStructType(structFields); -// -// Dataset dataFrame = sqlContext.createDataFrame(rdd, structType).persist(StorageLevel.MEMORY_AND_DISK_SER()); -// CacheUtil.getInstance().addCacheData(hTable, dataFrame); -// dataFrame.count(); -// return dataFrame; -// } -// -// public String getSQL(List xAxis, List yAxis, String table, List extFilterRequestList) { -// // 字段汇总 排序等 -// String[] field = yAxis.stream().map(y -> "CAST(" + y.getSummary() + "(" + y.getOriginName() + ") AS DECIMAL(20,2)) AS _" + y.getSummary() + "_" + y.getOriginName()).toArray(String[]::new); -// String[] group = xAxis.stream().map(ChartViewFieldDTO::getOriginName).toArray(String[]::new); -// String[] order = yAxis.stream().filter(y -> StringUtils.isNotEmpty(y.getSort()) && !StringUtils.equalsIgnoreCase(y.getSort(), "none")) -// .map(y -> "_" + y.getSummary() + "_" + y.getOriginName() + " " + y.getSort()).toArray(String[]::new); -// -// String sql = MessageFormat.format("SELECT {0},{1} FROM {2} WHERE 1=1 {3} GROUP BY {4} ORDER BY null,{5}", -// StringUtils.join(group, ","), -// StringUtils.join(field, ","), -// table, -// transExtFilter(extFilterRequestList),// origin field filter and panel field filter, -// StringUtils.join(group, ","), -// StringUtils.join(order, ",")); -// if (sql.endsWith(",")) { -// sql = sql.substring(0, sql.length() - 1); -// } -// // 如果是对结果字段过滤,则再包裹一层sql -// String[] resultFilter = yAxis.stream().filter(y -> CollectionUtils.isNotEmpty(y.getFilter()) && y.getFilter().size() > 0) -// .map(y -> { -// String[] s = y.getFilter().stream().map(f -> "AND _" + y.getSummary() + "_" + y.getOriginName() + transFilterTerm(f.getTerm()) + f.getValue()).toArray(String[]::new); -// return StringUtils.join(s, " "); -// }).toArray(String[]::new); -// if (resultFilter.length == 0) { -// return sql; -// } else { -// String filterSql = MessageFormat.format("SELECT * FROM {0} WHERE 1=1 {1}", -// "(" + sql + ") AS tmp", -// StringUtils.join(resultFilter, " ")); -// return filterSql; -// } -// } -// -// public String transFilterTerm(String term) { -// switch (term) { -// case "eq": -// return " = "; -// case "not_eq": -// return " <> "; -// case "lt": -// return " < "; -// case "le": -// return " <= "; -// case "gt": -// return " > "; -// case "ge": -// return " >= "; -// case "in": -// return " IN "; -// case "not in": -// return " NOT IN "; -// case "like": -// return " LIKE "; -// case "not like": -// return " NOT LIKE "; -// case "null": -// return " IS NULL "; -// case "not_null": -// return " IS NOT NULL "; -// default: -// return ""; -// } -// } -// -// public String transExtFilter(List requestList) { -// if (CollectionUtils.isEmpty(requestList)) { -// return ""; -// } -// StringBuilder filter = new StringBuilder(); -// for (ChartExtFilterRequest request : requestList) { -// List value = request.getValue(); -// if (CollectionUtils.isEmpty(value)) { -// continue; -// } -// DatasetTableField field = request.getDatasetTableField(); -// filter.append(" AND ") -// .append(field.getOriginName()) -// .append(" ") -// .append(transFilterTerm(request.getOperator())) -// .append(" "); -// if (StringUtils.containsIgnoreCase(request.getOperator(), "in")) { -// filter.append("('").append(StringUtils.join(value, "','")).append("')"); -// } else if (StringUtils.containsIgnoreCase(request.getOperator(), "like")) { -// filter.append("'%").append(value.get(0)).append("%'"); -// } else { -// filter.append("'").append(value.get(0)).append("'"); -// } -// } -// return filter.toString(); -// } -//} diff --git a/backend/src/main/resources/generatorConfig.xml b/backend/src/main/resources/generatorConfig.xml index e900c8a49f..6aa551a59a 100644 --- a/backend/src/main/resources/generatorConfig.xml +++ b/backend/src/main/resources/generatorConfig.xml @@ -67,7 +67,7 @@ - +