feat(数据集): Spark初步

This commit is contained in:
junjie 2021-03-29 15:09:16 +08:00
parent c154e642c1
commit 6fb10cdfff
6 changed files with 223 additions and 25 deletions

View File

@ -371,6 +371,18 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<artifactId>janino</artifactId>
<groupId>org.codehaus.janino</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>3.0.8</version>
</dependency>
</dependencies>

View File

@ -3,12 +3,12 @@ package io.dataease.config;
import com.fit2cloud.autoconfigure.QuartzAutoConfiguration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.core.env.Environment;
import javax.annotation.Resource;
@ -23,7 +23,7 @@ public class CommonConfig {
@Bean
@ConditionalOnMissingBean
public org.apache.hadoop.conf.Configuration configuration(){
public org.apache.hadoop.conf.Configuration configuration() {
org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
configuration.set("hbase.zookeeper.quorum", env.getProperty("hbase.zookeeper.quorum"));
configuration.set("hbase.zookeeper.property.clientPort", env.getProperty("hbase.zookeeper.property.clientPort"));
@ -34,10 +34,19 @@ public class CommonConfig {
@Bean
@ConditionalOnMissingBean
public JavaSparkContext javaSparkContext(){
SparkConf conf = new SparkConf().setAppName(env.getProperty("spark.appName", "DataeaseJob") ).setMaster(env.getProperty("spark.master", "local[*]") );
public JavaSparkContext javaSparkContext() {
SparkConf conf = new SparkConf().setAppName(env.getProperty("spark.appName", "DataeaseJob")).setMaster(env.getProperty("spark.master", "local[*]"));
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
return sc;
}
@Bean
@ConditionalOnMissingBean
public SQLContext sqlContext(JavaSparkContext javaSparkContext) {
SQLContext sqlContext = new SQLContext(javaSparkContext);
sqlContext.setConf("spark.sql.shuffle.partitions", env.getProperty("spark.sql.shuffle.partitions", "1"));
sqlContext.setConf("spark.default.parallelism", env.getProperty("spark.default.parallelism", "1"));
return sqlContext;
}
}

View File

@ -2,6 +2,7 @@ package io.dataease.dto.chart;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
/**
@ -9,7 +10,7 @@ import java.util.List;
* @Date 2021/3/11 1:18 下午
*/
@Data
public class ChartViewFieldDTO {
public class ChartViewFieldDTO implements Serializable {
private String id;
private String tableId;

View File

@ -2,7 +2,10 @@ package io.dataease.service.chart;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import io.dataease.base.domain.*;
import io.dataease.base.domain.ChartViewExample;
import io.dataease.base.domain.ChartViewWithBLOBs;
import io.dataease.base.domain.DatasetTable;
import io.dataease.base.domain.Datasource;
import io.dataease.base.mapper.ChartViewMapper;
import io.dataease.commons.utils.BeanUtils;
import io.dataease.controller.request.chart.ChartViewRequest;
@ -15,8 +18,8 @@ import io.dataease.dto.chart.ChartViewDTO;
import io.dataease.dto.chart.ChartViewFieldDTO;
import io.dataease.dto.chart.Series;
import io.dataease.dto.dataset.DataTableInfoDTO;
import io.dataease.service.dataset.DataSetTableFieldsService;
import io.dataease.service.dataset.DataSetTableService;
import io.dataease.service.spark.SparkCalc;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
@ -25,7 +28,6 @@ import javax.annotation.Resource;
import java.math.BigDecimal;
import java.text.MessageFormat;
import java.util.*;
import java.util.stream.Collectors;
/**
* @Author gin
@ -40,7 +42,7 @@ public class ChartViewService {
@Resource
private DatasourceService datasourceService;
@Resource
private DataSetTableFieldsService dataSetTableFieldsService;
private SparkCalc sparkCalc;
public ChartViewWithBLOBs save(ChartViewWithBLOBs chartView) {
long timestamp = System.currentTimeMillis();
@ -102,22 +104,27 @@ public class ChartViewService {
// 获取数据集
DatasetTable table = dataSetTableService.get(view.getTableId());
// todo 判断连接方式直连或者定时抽取 table.mode
Datasource ds = datasourceService.get(table.getDataSourceId());
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(ds.getType());
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(ds);
DataTableInfoDTO dataTableInfoDTO = new Gson().fromJson(table.getInfo(), DataTableInfoDTO.class);
if (StringUtils.equalsIgnoreCase(table.getType(), "db")) {
datasourceRequest.setTable(dataTableInfoDTO.getTable());
datasourceRequest.setQuery(getSQL(ds.getType(), dataTableInfoDTO.getTable(), xAxis, yAxis));
} else if (StringUtils.equalsIgnoreCase(table.getType(), "sql")) {
datasourceRequest.setQuery(getSQL(ds.getType(), " (" + dataTableInfoDTO.getSql() + ") AS tmp ", xAxis, yAxis));
// 判断连接方式直连或者定时抽取 table.mode
List<String[]> data = new ArrayList<>();
if (table.getMode() == 0) {// 直连
Datasource ds = datasourceService.get(table.getDataSourceId());
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(ds.getType());
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(ds);
DataTableInfoDTO dataTableInfoDTO = new Gson().fromJson(table.getInfo(), DataTableInfoDTO.class);
if (StringUtils.equalsIgnoreCase(table.getType(), "db")) {
datasourceRequest.setTable(dataTableInfoDTO.getTable());
datasourceRequest.setQuery(getSQL(ds.getType(), dataTableInfoDTO.getTable(), xAxis, yAxis));
} else if (StringUtils.equalsIgnoreCase(table.getType(), "sql")) {
datasourceRequest.setQuery(getSQL(ds.getType(), " (" + dataTableInfoDTO.getSql() + ") AS tmp ", xAxis, yAxis));
}
data = datasourceProvider.getData(datasourceRequest);
} else if (table.getMode() == 1) {// 抽取
DataTableInfoDTO dataTableInfoDTO = new Gson().fromJson(table.getInfo(), DataTableInfoDTO.class);
data = sparkCalc.getData(dataTableInfoDTO.getTable() + "-" + table.getDataSourceId(), xAxis, yAxis, "tmp");// todo hBase table name maybe change
}
List<String[]> data = datasourceProvider.getData(datasourceRequest);
// todo 处理结果,目前做一个单系列图表后期图表组件再扩展
// 图表组件可再扩展
for (ChartViewFieldDTO y : yAxis) {
Series series1 = new Series();
series1.setName(y.getName());
@ -163,7 +170,7 @@ public class ChartViewService {
}
public String transMysqlSQL(String table, List<ChartViewFieldDTO> xAxis, List<ChartViewFieldDTO> yAxis) {
// TODO 字段汇总 排序等
// 字段汇总 排序等
String[] field = yAxis.stream().map(y -> "CAST(" + y.getSummary() + "(" + y.getOriginName() + ") AS DECIMAL(20,2)) AS _" + y.getSummary() + "_" + y.getOriginName()).toArray(String[]::new);
String[] group = xAxis.stream().map(ChartViewFieldDTO::getOriginName).toArray(String[]::new);
String[] order = yAxis.stream().filter(y -> StringUtils.isNotEmpty(y.getSort()) && !StringUtils.equalsIgnoreCase(y.getSort(), "none"))

View File

@ -0,0 +1,169 @@
package io.dataease.service.spark;
import io.dataease.commons.utils.CommonBeanFactory;
import io.dataease.dto.chart.ChartViewFieldDTO;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.springframework.stereotype.Service;
import scala.Tuple2;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import static org.reflections8.Reflections.collect;
/**
* @Author gin
* @Date 2021/3/26 3:49 下午
*/
@Service
public class SparkCalc {
private static String column_family = "dataease";
public List<String[]> getData(String hTable, List<ChartViewFieldDTO> xAxis, List<ChartViewFieldDTO> yAxis, String tmpTable) throws Exception {
Scan scan = new Scan();
scan.addFamily(column_family.getBytes());
ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
String scanToString = new String(Base64.getEncoder().encode(proto.toByteArray()));
JavaSparkContext sparkContext = CommonBeanFactory.getBean(JavaSparkContext.class);
Configuration conf = CommonBeanFactory.getBean(Configuration.class);
conf.set(TableInputFormat.INPUT_TABLE, hTable);
conf.set(TableInputFormat.SCAN, scanToString);
JavaPairRDD<ImmutableBytesWritable, Result> pairRDD = sparkContext.newAPIHadoopRDD(conf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);
JavaRDD<Row> rdd = pairRDD.map((Function<Tuple2<ImmutableBytesWritable, Result>, Row>) immutableBytesWritableResultTuple2 ->
{
Result result = immutableBytesWritableResultTuple2._2;
List<Object> list = new ArrayList<>();
xAxis.forEach(x -> {
if (x.getDeType() == 0 || x.getDeType() == 1) {
list.add(Bytes.toString(result.getValue(column_family.getBytes(), x.getOriginName().getBytes())));
} else if (x.getDeType() == 2) {
list.add(Long.valueOf(Bytes.toString(result.getValue(column_family.getBytes(), x.getOriginName().getBytes()))));
}
});
yAxis.forEach(y -> {
if (y.getDeType() == 0 || y.getDeType() == 1) {
list.add(Bytes.toString(result.getValue(column_family.getBytes(), y.getOriginName().getBytes())));
} else if (y.getDeType() == 2) {
list.add(Long.valueOf(Bytes.toString(result.getValue(column_family.getBytes(), y.getOriginName().getBytes()))));
}
});
return RowFactory.create(list.toArray());
});
List<StructField> structFields = new ArrayList<>();
// struct顺序要与rdd顺序一致
xAxis.forEach(x -> {
if (x.getDeType() == 0 || x.getDeType() == 1) {
structFields.add(DataTypes.createStructField(x.getOriginName(), DataTypes.StringType, true));
} else if (x.getDeType() == 2) {
structFields.add(DataTypes.createStructField(x.getOriginName(), DataTypes.LongType, true));
}
});
yAxis.forEach(y -> {
if (y.getDeType() == 0 || y.getDeType() == 1) {
structFields.add(DataTypes.createStructField(y.getOriginName(), DataTypes.StringType, true));
} else if (y.getDeType() == 2) {
structFields.add(DataTypes.createStructField(y.getOriginName(), DataTypes.LongType, true));
}
});
StructType structType = DataTypes.createStructType(structFields);
SQLContext sqlContext = CommonBeanFactory.getBean(SQLContext.class);
Dataset<Row> dataFrame = sqlContext.createDataFrame(rdd, structType);
dataFrame.createOrReplaceTempView(tmpTable);
Dataset<Row> sql = sqlContext.sql(getSQL(xAxis, yAxis, tmpTable));
List<String[]> data = new ArrayList<>();
// transform
JavaRDD<Row> rowJavaRDD = sql.javaRDD();
List<Row> list = rowJavaRDD.collect();
for (Row row : list) {
String[] r = new String[row.length()];
for (int i = 0; i < row.length(); i++) {
r[i] = row.get(i).toString();
}
data.add(r);
}
return data;
}
private String getSQL(List<ChartViewFieldDTO> xAxis, List<ChartViewFieldDTO> yAxis, String table) {
// 字段汇总 排序等
String[] field = yAxis.stream().map(y -> "CAST(" + y.getSummary() + "(" + y.getOriginName() + ") AS DECIMAL(20,2)) AS _" + y.getSummary() + "_" + y.getOriginName()).toArray(String[]::new);
String[] group = xAxis.stream().map(ChartViewFieldDTO::getOriginName).toArray(String[]::new);
String[] order = yAxis.stream().filter(y -> StringUtils.isNotEmpty(y.getSort()) && !StringUtils.equalsIgnoreCase(y.getSort(), "none"))
.map(y -> "_" + y.getSummary() + "_" + y.getOriginName() + " " + y.getSort()).toArray(String[]::new);
String sql = MessageFormat.format("SELECT {0},{1} FROM {2} WHERE 1=1 {3} GROUP BY {4} ORDER BY null,{5}",
StringUtils.join(group, ","),
StringUtils.join(field, ","),
table,
"",
StringUtils.join(group, ","),
StringUtils.join(order, ","));
if (sql.endsWith(",")) {
sql = sql.substring(0, sql.length() - 1);
}
// 如果是对结果字段过滤则再包裹一层sql
String[] resultFilter = yAxis.stream().filter(y -> CollectionUtils.isNotEmpty(y.getFilter()) && y.getFilter().size() > 0)
.map(y -> {
String[] s = y.getFilter().stream().map(f -> "AND _" + y.getSummary() + "_" + y.getOriginName() + transFilterTerm(f.getTerm()) + f.getValue()).toArray(String[]::new);
return StringUtils.join(s, " ");
}).toArray(String[]::new);
if (resultFilter.length == 0) {
return sql;
} else {
String filterSql = MessageFormat.format("SELECT * FROM {0} WHERE 1=1 {1}",
"(" + sql + ") AS tmp",
StringUtils.join(resultFilter, " "));
return filterSql;
}
}
public String transFilterTerm(String term) {
switch (term) {
case "eq":
return " = ";
case "not_eq":
return " <> ";
case "lt":
return " < ";
case "le":
return " <= ";
case "gt":
return " > ";
case "ge":
return " >= ";
case "null":
return " IS NULL ";
case "not_null":
return " IS NOT NULL ";
default:
return "";
}
}
}

View File

@ -27,7 +27,7 @@
<el-tab-pane :label="$t('dataset.join_view')" name="joinView">
关联视图 TODO
</el-tab-pane>
<el-tab-pane v-if="table.mode === 1" :label="$t('dataset.update_info')" name="updateInfo">
<el-tab-pane :label="$t('dataset.update_info')" name="updateInfo">
<update-info :table="table" />
</el-tab-pane>
</el-tabs>