feat: 对接spark

This commit is contained in:
TaoJinlong 2021-03-15 18:21:37 +08:00
parent bacabf8422
commit 52e4155a08
4 changed files with 49 additions and 2 deletions

View File

@ -17,6 +17,7 @@
<java.version>1.8</java.version>
<graalvm.version>20.1.0</graalvm.version>
<jwt.version>3.12.1</jwt.version>
<spark.version>3.1.1</spark.version>
</properties>
<dependencies>
@ -333,6 +334,36 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
<build>

View File

@ -1,6 +1,9 @@
package io.dataease.config;
import com.fit2cloud.autoconfigure.QuartzAutoConfiguration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
@ -12,7 +15,7 @@ import javax.annotation.Resource;
@Configuration
@AutoConfigureBefore(QuartzAutoConfiguration.class)
public class HbaseConfig {
public class CommonConfig {
@Resource
private Environment env; // 保存了配置文件的信息
@ -27,4 +30,14 @@ public class HbaseConfig {
configuration.set("hbase.client.retries.number", env.getProperty("hbase.client.retries.number", "1"));
return configuration;
}
@Bean
@ConditionalOnMissingBean
public JavaSparkContext javaSparkContext(){
SparkConf conf = new SparkConf().setAppName(env.getProperty("spark.appName", "DataeaseJob") ).setMaster(env.getProperty("spark.master", "local[*]") );
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
return sc;
}
}

View File

@ -18,7 +18,6 @@ public abstract class DeScheduleJob implements Job {
this.taskId = jobDataMap.getString("taskId");
LogUtil.info(jobKey.getGroup() + " Running: " + datasetTableId);
LogUtil.info(jobKey.getName() + " Running: " + datasetTableId);
LogUtil.info("CronExpression: " + expression);
businessExecute(context);
}

View File

@ -6,6 +6,7 @@ import io.dataease.base.domain.DatasetTableField;
import io.dataease.base.domain.DatasetTableTaskLog;
import io.dataease.commons.constants.JobStatus;
import io.dataease.commons.utils.CommonBeanFactory;
import io.dataease.commons.utils.LogUtil;
import io.dataease.dto.dataset.DataTableInfoDTO;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
@ -76,6 +77,9 @@ public class ExtractDataService {
datasetTableTaskLog.setEndTime(System.currentTimeMillis());
dataSetTableTaskLogService.save(datasetTableTaskLog);
}catch (Exception e){
e.printStackTrace();
LogUtil.error("ExtractData error, dataaset: " + datasetTableId);
LogUtil.error(e.getMessage(), e);
datasetTableTaskLog.setStatus(JobStatus.Error.name());
datasetTableTaskLog.setEndTime(System.currentTimeMillis());
dataSetTableTaskLogService.save(datasetTableTaskLog);