feat(backend):spark calc fix

This commit is contained in:
junjie 2021-04-01 17:30:06 +08:00
parent c8be2636fa
commit c4d671fc31
2 changed files with 28 additions and 11 deletions

View File

@ -119,7 +119,8 @@ public class ChartViewService {
data = datasourceProvider.getData(datasourceRequest);
} else if (table.getMode() == 1) {// 抽取
DataTableInfoDTO dataTableInfoDTO = new Gson().fromJson(table.getInfo(), DataTableInfoDTO.class);
data = sparkCalc.getData(dataTableInfoDTO.getTable() + "-" + table.getDataSourceId(), xAxis, yAxis, "tmp");// todo hBase table name maybe change
String tableName = dataTableInfoDTO.getTable() + "-" + table.getDataSourceId();// todo hBase table name maybe change
data = sparkCalc.getData(tableName, xAxis, yAxis, view.getId().split("-")[0]);
}
// 图表组件可再扩展

View File

@ -16,16 +16,15 @@ import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;
import scala.Tuple2;
import javax.annotation.Resource;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Base64;
@ -39,6 +38,8 @@ import java.util.List;
@Service
public class SparkCalc {
private static String column_family = "dataease";
@Resource
private Environment env; // 保存了配置文件的信息
public List<String[]> getData(String hTable, List<ChartViewFieldDTO> xAxis, List<ChartViewFieldDTO> yAxis, String tmpTable) throws Exception {
Scan scan = new Scan();
@ -46,8 +47,21 @@ public class SparkCalc {
ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
String scanToString = new String(Base64.getEncoder().encode(proto.toByteArray()));
JavaSparkContext sparkContext = CommonBeanFactory.getBean(JavaSparkContext.class);
Configuration conf = CommonBeanFactory.getBean(Configuration.class);
// Spark Context
// JavaSparkContext sparkContext = CommonBeanFactory.getBean(JavaSparkContext.class);
SparkSession spark = SparkSession.builder()
.appName(env.getProperty("spark.appName", "DataeaseJob"))
.master(env.getProperty("spark.master", "local[*]"))
.config("spark.scheduler.mode", "FAIR")
.getOrCreate();
JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext());
// HBase config
// Configuration conf = CommonBeanFactory.getBean(Configuration.class);
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
conf.set("hbase.zookeeper.quorum", env.getProperty("hbase.zookeeper.quorum"));
conf.set("hbase.zookeeper.property.clientPort", env.getProperty("hbase.zookeeper.property.clientPort"));
conf.set("hbase.client.retries.number", env.getProperty("hbase.client.retries.number", "1"));
conf.set(TableInputFormat.INPUT_TABLE, hTable);
conf.set(TableInputFormat.SCAN, scanToString);
@ -103,12 +117,15 @@ public class SparkCalc {
});
StructType structType = DataTypes.createStructType(structFields);
SQLContext sqlContext = CommonBeanFactory.getBean(SQLContext.class);
// Spark SQL Context
// SQLContext sqlContext = CommonBeanFactory.getBean(SQLContext.class);
SQLContext sqlContext = new SQLContext(sparkContext);
sqlContext.setConf("spark.sql.shuffle.partitions", env.getProperty("spark.sql.shuffle.partitions", "1"));
sqlContext.setConf("spark.default.parallelism", env.getProperty("spark.default.parallelism", "1"));
Dataset<Row> dataFrame = sqlContext.createDataFrame(rdd, structType);
dataFrame.createOrReplaceTempView(tmpTable);
Dataset<Row> sql = sqlContext.sql(getSQL(xAxis, yAxis, tmpTable));
// transform
List<String[]> data = new ArrayList<>();
List<Row> list = sql.collectAsList();
@ -119,7 +136,6 @@ public class SparkCalc {
}
data.add(r);
}
return data;
}