feat(backend):spark

This commit is contained in:
junjie 2021-04-14 16:00:09 +08:00
parent 1276109f47
commit daf784f2df
2 changed files with 20 additions and 2 deletions

View File

@ -40,7 +40,22 @@ public class CommonConfig {
SparkSession spark = SparkSession.builder()
.appName(env.getProperty("spark.appName", "DataeaseJob"))
.master(env.getProperty("spark.master", "local[*]"))
.config("spark.scheduler.mode", "FAIR")
.config("spark.scheduler.mode", env.getProperty("spark.scheduler.mode", "FAIR"))
.config("spark.serializer", env.getProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer"))
.config("spark.executor.cores", env.getProperty("spark.executor.cores", "8"))
.config("spark.executor.memory", env.getProperty("spark.executor.memory", "6442450944b"))
.config("spark.locality.wait", env.getProperty("spark.locality.wait", "600000"))
.config("spark.maxRemoteBlockSizeFetchToMem", env.getProperty("spark.maxRemoteBlockSizeFetchToMem", "2000m"))
.config("spark.shuffle.detectCorrupt", env.getProperty("spark.shuffle.detectCorrupt", "false"))
.config("spark.shuffle.service.enabled", env.getProperty("spark.shuffle.service.enabled", "true"))
.config("spark.sql.adaptive.enabled", env.getProperty("spark.sql.adaptive.enabled", "true"))
.config("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", env.getProperty("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", "200M"))
.config("spark.sql.broadcastTimeout", env.getProperty("spark.sql.broadcastTimeout", "12000"))
.config("spark.sql.retainGroupColumns", env.getProperty("spark.sql.retainGroupColumns", "false"))
.config("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold", env.getProperty("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold", "100000"))
.config("spark.sql.sortMergeJoinExec.buffer.spill.threshold", env.getProperty("spark.sql.sortMergeJoinExec.buffer.spill.threshold", "100000"))
.config("spark.sql.variable.substitute", env.getProperty("spark.sql.variable.substitute", "false"))
.config("spark.temp.expired.time", env.getProperty("spark.temp.expired.time", "3600"))
.getOrCreate();
return spark;
}

View File

@ -91,7 +91,10 @@ public class SparkCalc {
public Dataset<Row> getHBaseDataAndCache(JavaSparkContext sparkContext, SQLContext sqlContext, String hTable, List<DatasetTableField> fields) throws Exception {
Scan scan = new Scan();
scan.addFamily(column_family.getBytes());
scan.addFamily(Bytes.toBytes(column_family));
for (DatasetTableField field : fields) {
scan.addColumn(Bytes.toBytes(column_family), Bytes.toBytes(field.getOriginName()));
}
ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
String scanToString = new String(Base64.getEncoder().encode(proto.toByteArray()));