From daf784f2dfecc891ca7e0acd8cac82a2037e30d9 Mon Sep 17 00:00:00 2001 From: junjie Date: Wed, 14 Apr 2021 16:00:09 +0800 Subject: [PATCH] feat(backend):spark --- .../java/io/dataease/config/CommonConfig.java | 17 ++++++++++++++++- .../io/dataease/service/spark/SparkCalc.java | 5 ++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/backend/src/main/java/io/dataease/config/CommonConfig.java b/backend/src/main/java/io/dataease/config/CommonConfig.java index f22c749e62..0728b2a831 100644 --- a/backend/src/main/java/io/dataease/config/CommonConfig.java +++ b/backend/src/main/java/io/dataease/config/CommonConfig.java @@ -40,7 +40,22 @@ public class CommonConfig { SparkSession spark = SparkSession.builder() .appName(env.getProperty("spark.appName", "DataeaseJob")) .master(env.getProperty("spark.master", "local[*]")) - .config("spark.scheduler.mode", "FAIR") + .config("spark.scheduler.mode", env.getProperty("spark.scheduler.mode", "FAIR")) + .config("spark.serializer", env.getProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")) + .config("spark.executor.cores", env.getProperty("spark.executor.cores", "8")) + .config("spark.executor.memory", env.getProperty("spark.executor.memory", "6442450944b")) + .config("spark.locality.wait", env.getProperty("spark.locality.wait", "600000")) + .config("spark.maxRemoteBlockSizeFetchToMem", env.getProperty("spark.maxRemoteBlockSizeFetchToMem", "2000m")) + .config("spark.shuffle.detectCorrupt", env.getProperty("spark.shuffle.detectCorrupt", "false")) + .config("spark.shuffle.service.enabled", env.getProperty("spark.shuffle.service.enabled", "true")) + .config("spark.sql.adaptive.enabled", env.getProperty("spark.sql.adaptive.enabled", "true")) + .config("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", env.getProperty("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", "200M")) + .config("spark.sql.broadcastTimeout", env.getProperty("spark.sql.broadcastTimeout", "12000")) + .config("spark.sql.retainGroupColumns", env.getProperty("spark.sql.retainGroupColumns", "false")) + .config("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold", env.getProperty("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold", "100000")) + .config("spark.sql.sortMergeJoinExec.buffer.spill.threshold", env.getProperty("spark.sql.sortMergeJoinExec.buffer.spill.threshold", "100000")) + .config("spark.sql.variable.substitute", env.getProperty("spark.sql.variable.substitute", "false")) + .config("spark.temp.expired.time", env.getProperty("spark.temp.expired.time", "3600")) .getOrCreate(); return spark; } diff --git a/backend/src/main/java/io/dataease/service/spark/SparkCalc.java b/backend/src/main/java/io/dataease/service/spark/SparkCalc.java index 9c42468c9b..b3c8cf71a2 100644 --- a/backend/src/main/java/io/dataease/service/spark/SparkCalc.java +++ b/backend/src/main/java/io/dataease/service/spark/SparkCalc.java @@ -91,7 +91,10 @@ public class SparkCalc { public Dataset getHBaseDataAndCache(JavaSparkContext sparkContext, SQLContext sqlContext, String hTable, List fields) throws Exception { Scan scan = new Scan(); - scan.addFamily(column_family.getBytes()); + scan.addFamily(Bytes.toBytes(column_family)); + for (DatasetTableField field : fields) { + scan.addColumn(Bytes.toBytes(column_family), Bytes.toBytes(field.getOriginName())); + } ClientProtos.Scan proto = ProtobufUtil.toScan(scan); String scanToString = new String(Base64.getEncoder().encode(proto.toByteArray()));