forked from github/dataease
feat: 抽取数据到hbase
This commit is contained in:
parent
27ea12ed88
commit
d449e41cc1
@ -303,17 +303,6 @@
|
||||
<artifactId>reflections8</artifactId>
|
||||
<version>0.11.7</version>
|
||||
</dependency>
|
||||
<!-- k8s client -->
|
||||
<!--<dependency>
|
||||
<groupId>io.fabric8</groupId>
|
||||
<artifactId>kubernetes-client</artifactId>
|
||||
<version>4.13.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.fge</groupId>
|
||||
<artifactId>json-schema-validator</artifactId>
|
||||
<version>2.2.6</version>
|
||||
</dependency>-->
|
||||
<!--开启 cache 缓存 -->
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
@ -325,6 +314,40 @@
|
||||
<artifactId>ehcache</artifactId>
|
||||
<version>2.9.1</version>
|
||||
</dependency>
|
||||
<!-- hbase -->
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase</artifactId>
|
||||
<version>2.4.1</version>
|
||||
<type>pom</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-client</artifactId>
|
||||
<version>2.4.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-common</artifactId>
|
||||
<version>2.4.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-server</artifactId>
|
||||
<version>2.4.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-mapreduce</artifactId>
|
||||
<version>2.4.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.testng</groupId>
|
||||
<artifactId>testng</artifactId>
|
||||
<version>6.8</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
@ -1,9 +1,12 @@
|
||||
package io.dataease.base.domain;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
public class DatasetTableField implements Serializable {
|
||||
private String id;
|
||||
|
||||
@ -24,4 +27,4 @@ public class DatasetTableField implements Serializable {
|
||||
private Integer deType;
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,6 @@
|
||||
package io.dataease.commons.constants;
|
||||
|
||||
public class DatasetMode {
|
||||
public static final String EXTRACT = "1";
|
||||
public static final String DIRECT = "0";
|
||||
}
|
@ -0,0 +1,5 @@
|
||||
package io.dataease.commons.constants;
|
||||
|
||||
public enum JobStatus {
|
||||
Prepare, Underway, Completed, Error
|
||||
}
|
@ -1,5 +0,0 @@
|
||||
package io.dataease.commons.constants;
|
||||
|
||||
public enum TestPlanStatus {
|
||||
Prepare, Underway, Completed
|
||||
}
|
30
backend/src/main/java/io/dataease/config/HbaseConfig.java
Normal file
30
backend/src/main/java/io/dataease/config/HbaseConfig.java
Normal file
@ -0,0 +1,30 @@
|
||||
package io.dataease.config;
|
||||
|
||||
import com.fit2cloud.autoconfigure.QuartzAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.DependsOn;
|
||||
import org.springframework.core.env.Environment;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
@Configuration
|
||||
@AutoConfigureBefore(QuartzAutoConfiguration.class)
|
||||
public class HbaseConfig {
|
||||
|
||||
@Resource
|
||||
private Environment env; // 保存了配置文件的信息
|
||||
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
public org.apache.hadoop.conf.Configuration configuration(){
|
||||
org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
|
||||
configuration.set("hbase.zookeeper.quorum", env.getProperty("hbase.zookeeper.quorum"));
|
||||
configuration.set("hbase.zookeeper.property.clientPort", env.getProperty("hbase.zookeeper.property.clientPort"));
|
||||
configuration.set("hbase.client.retries.number", env.getProperty("hbase.client.retries.number", "1"));
|
||||
return configuration;
|
||||
}
|
||||
}
|
@ -19,7 +19,7 @@ public class DataSetTableFieldController {
|
||||
|
||||
@PostMapping("list/{tableId}")
|
||||
public List<DatasetTableField> list(@PathVariable String tableId) {
|
||||
DatasetTableField datasetTableField = new DatasetTableField();
|
||||
DatasetTableField datasetTableField = DatasetTableField.builder().build();
|
||||
datasetTableField.setTableId(tableId);
|
||||
return dataSetTableFieldsService.list(datasetTableField);
|
||||
}
|
||||
|
@ -23,4 +23,8 @@ public abstract class DatasourceProvider {
|
||||
getData(datasourceRequest);
|
||||
}
|
||||
|
||||
abstract public Long count(DatasourceRequest datasourceRequest)throws Exception;
|
||||
|
||||
abstract public List<String[]> getPageData(DatasourceRequest datasourceRequest) throws Exception;
|
||||
|
||||
}
|
||||
|
@ -9,6 +9,7 @@ import io.dataease.datasource.request.DatasourceRequest;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.stereotype.Service;
|
||||
import java.sql.*;
|
||||
import java.text.MessageFormat;
|
||||
import java.util.*;
|
||||
|
||||
@Service("jdbc")
|
||||
@ -23,23 +24,7 @@ public class JdbcProvider extends DatasourceProvider{
|
||||
Statement stat = connection.createStatement();
|
||||
ResultSet rs = stat.executeQuery(datasourceRequest.getQuery())
|
||||
) {
|
||||
ResultSetMetaData metaData = rs.getMetaData();
|
||||
int columnCount = metaData.getColumnCount();
|
||||
while (rs.next()) {
|
||||
String[] row = new String[columnCount];
|
||||
for (int j = 0; j < columnCount; j++) {
|
||||
int columType = metaData.getColumnType(j + 1);
|
||||
switch (columType) {
|
||||
case java.sql.Types.DATE:
|
||||
row[j] = rs.getDate(j + 1).toString();
|
||||
break;
|
||||
default:
|
||||
row[j] = rs.getString(j + 1);
|
||||
break;
|
||||
}
|
||||
}
|
||||
list.add(row);
|
||||
}
|
||||
list = fetchResult(rs);
|
||||
} catch (SQLException e){
|
||||
throw new Exception("ERROR:" + e.getMessage(), e);
|
||||
}catch (Exception e) {
|
||||
@ -48,6 +33,46 @@ public class JdbcProvider extends DatasourceProvider{
|
||||
return list;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String[]> getPageData(DatasourceRequest datasourceRequest) throws Exception {
|
||||
List<String[]> list = new LinkedList<>();
|
||||
try (
|
||||
Connection connection = getConnection(datasourceRequest);
|
||||
Statement stat = connection.createStatement();
|
||||
ResultSet rs = stat.executeQuery(datasourceRequest.getQuery() + MessageFormat.format(" LIMIT {0}, {1}", (datasourceRequest.getStartPage() -1)*datasourceRequest.getPageSize(), datasourceRequest.getPageSize()))
|
||||
) {
|
||||
list = fetchResult(rs);
|
||||
} catch (SQLException e){
|
||||
throw new Exception("ERROR:" + e.getMessage(), e);
|
||||
}catch (Exception e) {
|
||||
throw new Exception("ERROR:" + e.getMessage(), e);
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
|
||||
private List<String[]> fetchResult( ResultSet rs) throws Exception{
|
||||
List<String[]> list = new LinkedList<>();
|
||||
ResultSetMetaData metaData = rs.getMetaData();
|
||||
int columnCount = metaData.getColumnCount();
|
||||
while (rs.next()) {
|
||||
String[] row = new String[columnCount];
|
||||
for (int j = 0; j < columnCount; j++) {
|
||||
int columType = metaData.getColumnType(j + 1);
|
||||
switch (columType) {
|
||||
case java.sql.Types.DATE:
|
||||
row[j] = rs.getDate(j + 1).toString();
|
||||
break;
|
||||
default:
|
||||
row[j] = rs.getString(j + 1);
|
||||
break;
|
||||
}
|
||||
}
|
||||
list.add(row);
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getTables(DatasourceRequest datasourceRequest) throws Exception {
|
||||
List<String> tables = new ArrayList<>();
|
||||
@ -106,6 +131,19 @@ public class JdbcProvider extends DatasourceProvider{
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public Long count(DatasourceRequest datasourceRequest)throws Exception{
|
||||
try (Connection con = getConnection(datasourceRequest); Statement ps = con.createStatement()) {
|
||||
ResultSet resultSet = ps.executeQuery(datasourceRequest.getQuery());
|
||||
while (resultSet.next()) {
|
||||
return resultSet.getLong(1);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new Exception("ERROR: " + e.getMessage(), e);
|
||||
}
|
||||
return 0L;
|
||||
}
|
||||
|
||||
private Connection getConnection(DatasourceRequest datasourceRequest) throws Exception {
|
||||
String username = null;
|
||||
String password = null;
|
||||
|
@ -11,5 +11,8 @@ public class DatasourceRequest {
|
||||
protected String query;
|
||||
protected String table;
|
||||
protected Datasource datasource;
|
||||
private Long pageSize;
|
||||
private Long startPage;
|
||||
|
||||
|
||||
}
|
||||
|
@ -1,22 +1,25 @@
|
||||
package io.dataease.job.sechedule;
|
||||
|
||||
import org.quartz.Job;
|
||||
import org.quartz.JobExecutionContext;
|
||||
import org.quartz.JobExecutionException;
|
||||
import io.dataease.commons.utils.LogUtil;
|
||||
import org.quartz.*;
|
||||
|
||||
public abstract class DeScheduleJob implements Job {
|
||||
|
||||
protected String datasetTableId;
|
||||
protected String expression;
|
||||
protected String taskId;
|
||||
|
||||
@Override
|
||||
public void execute(JobExecutionContext context) throws JobExecutionException {
|
||||
JobKey jobKey = context.getTrigger().getJobKey();
|
||||
JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
|
||||
this.datasetTableId = jobDataMap.getString("datasetTableId");
|
||||
this.expression = jobDataMap.getString("expression");
|
||||
this.taskId = jobDataMap.getString("taskId");
|
||||
|
||||
// JobKey jobKey = context.getTrigger().getJobKey();
|
||||
// JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
|
||||
// this.resourceId = jobDataMap.getString("resourceId");
|
||||
// this.userId = jobDataMap.getString("userId");
|
||||
// this.expression = jobDataMap.getString("expression");
|
||||
//
|
||||
// LogUtil.info(jobKey.getGroup() + " Running: " + resourceId);
|
||||
// LogUtil.info("CronExpression: " + expression);
|
||||
LogUtil.info(jobKey.getGroup() + " Running: " + datasetTableId);
|
||||
LogUtil.info(jobKey.getName() + " Running: " + datasetTableId);
|
||||
LogUtil.info("CronExpression: " + expression);
|
||||
businessExecute(context);
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,22 @@
|
||||
package io.dataease.job.sechedule;
|
||||
|
||||
import io.dataease.commons.utils.CommonBeanFactory;
|
||||
import io.dataease.service.dataset.ExtractDataService;
|
||||
import org.quartz.JobExecutionContext;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class ExtractDataJob extends DeScheduleJob{
|
||||
private ExtractDataService extractDataService;
|
||||
|
||||
public ExtractDataJob() {
|
||||
extractDataService = (ExtractDataService) CommonBeanFactory.getBean(ExtractDataService.class);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
void businessExecute(JobExecutionContext context) {
|
||||
extractDataService.extractData(datasetTableId, taskId);
|
||||
}
|
||||
|
||||
}
|
@ -369,11 +369,11 @@ public class ScheduleManager {
|
||||
addOrUpdateCronJob(jobKey, triggerKey, jobClass, cron, startTime, endTime, null);
|
||||
}
|
||||
|
||||
public JobDataMap getDefaultJobDataMap(String resourceId, String expression, String userId) {
|
||||
public JobDataMap getDefaultJobDataMap(String resourceId, String expression, String taskId) {
|
||||
JobDataMap jobDataMap = new JobDataMap();
|
||||
jobDataMap.put("resourceId", resourceId);
|
||||
jobDataMap.put("datasetTableId", resourceId);
|
||||
jobDataMap.put("taskId", taskId);
|
||||
jobDataMap.put("expression", expression);
|
||||
jobDataMap.put("userId", userId);
|
||||
return jobDataMap;
|
||||
}
|
||||
|
||||
|
@ -20,13 +20,7 @@ public class AppStartListener implements ApplicationListener<ApplicationReadyEve
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
|
||||
System.out.println("================= 应用启动 =================");
|
||||
/* cron schedule */
|
||||
// scheduleManager.addCronJob(new JobKey("abc", "def"), new TriggerKey("abc", "def"), TestJob.class, "*/10 * * * * ?");
|
||||
/* single schedule*/
|
||||
// long timestamp = System.currentTimeMillis() + 90 * 1000;
|
||||
// Date date = new Date(timestamp);
|
||||
// scheduleManager.addSingleJob(new JobKey("abc", "def"), new TriggerKey("abc", "def"), TestJob.class, date);
|
||||
System.out.println("================= Application start =================");
|
||||
// 项目启动,从数据库读取任务加入到Quartz
|
||||
List<DatasetTableTask> list = dataSetTableTaskService.list(new DatasetTableTask());
|
||||
for (DatasetTableTask task : list) {
|
||||
|
@ -1,8 +1,8 @@
|
||||
package io.dataease.service;
|
||||
|
||||
import io.dataease.base.domain.DatasetTableTask;
|
||||
import io.dataease.job.sechedule.ExtractDataJob;
|
||||
import io.dataease.job.sechedule.ScheduleManager;
|
||||
import io.dataease.job.sechedule.TestJob;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.quartz.JobKey;
|
||||
import org.quartz.TriggerKey;
|
||||
@ -24,8 +24,8 @@ public class ScheduleService {
|
||||
if (StringUtils.equalsIgnoreCase(datasetTableTask.getRate(), "0")) {
|
||||
scheduleManager.addOrUpdateSingleJob(new JobKey(datasetTableTask.getId(), datasetTableTask.getTableId()),
|
||||
new TriggerKey(datasetTableTask.getId(), datasetTableTask.getTableId()),
|
||||
TestJob.class,//TODO
|
||||
new Date(datasetTableTask.getStartTime()));
|
||||
ExtractDataJob.class,
|
||||
new Date(datasetTableTask.getStartTime()), scheduleManager.getDefaultJobDataMap(datasetTableTask.getTableId(), datasetTableTask.getCron(), datasetTableTask.getId()));
|
||||
} else if (StringUtils.equalsIgnoreCase(datasetTableTask.getRate(), "1")) {
|
||||
Date endTime;
|
||||
if (datasetTableTask.getEndTime() == null || datasetTableTask.getEndTime() == 0) {
|
||||
@ -36,8 +36,9 @@ public class ScheduleService {
|
||||
|
||||
scheduleManager.addOrUpdateCronJob(new JobKey(datasetTableTask.getId(), datasetTableTask.getTableId()),
|
||||
new TriggerKey(datasetTableTask.getId(), datasetTableTask.getTableId()),
|
||||
TestJob.class,// TODO
|
||||
datasetTableTask.getCron(), new Date(datasetTableTask.getStartTime()), endTime);
|
||||
ExtractDataJob.class,
|
||||
datasetTableTask.getCron(), new Date(datasetTableTask.getStartTime()), endTime,
|
||||
scheduleManager.getDefaultJobDataMap(datasetTableTask.getTableId(), datasetTableTask.getCron(), datasetTableTask.getId()));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -71,7 +71,9 @@ public class DataSetTableService {
|
||||
|
||||
public List<DatasetTable> list(DataSetTableRequest dataSetTableRequest) {
|
||||
DatasetTableExample datasetTableExample = new DatasetTableExample();
|
||||
datasetTableExample.createCriteria().andSceneIdEqualTo(dataSetTableRequest.getSceneId());
|
||||
if(StringUtils.isNotEmpty(dataSetTableRequest.getSceneId())){
|
||||
datasetTableExample.createCriteria().andSceneIdEqualTo(dataSetTableRequest.getSceneId());
|
||||
}
|
||||
if (StringUtils.isNotEmpty(dataSetTableRequest.getSort())) {
|
||||
datasetTableExample.setOrderByClause(dataSetTableRequest.getSort());
|
||||
}
|
||||
@ -92,7 +94,7 @@ public class DataSetTableService {
|
||||
}
|
||||
|
||||
public Map<String, List<DatasetTableField>> getFieldsFromDE(DataSetTableRequest dataSetTableRequest) throws Exception {
|
||||
DatasetTableField datasetTableField = new DatasetTableField();
|
||||
DatasetTableField datasetTableField = DatasetTableField.builder().build();
|
||||
datasetTableField.setTableId(dataSetTableRequest.getId());
|
||||
datasetTableField.setChecked(Boolean.TRUE);
|
||||
List<DatasetTableField> fields = dataSetTableFieldsService.list(datasetTableField);
|
||||
@ -121,7 +123,7 @@ public class DataSetTableService {
|
||||
datasourceRequest.setDatasource(ds);
|
||||
String table = new Gson().fromJson(dataSetTableRequest.getInfo(), DataTableInfoDTO.class).getTable();
|
||||
|
||||
DatasetTableField datasetTableField = new DatasetTableField();
|
||||
DatasetTableField datasetTableField = DatasetTableField.builder().build();
|
||||
datasetTableField.setTableId(dataSetTableRequest.getId());
|
||||
datasetTableField.setChecked(Boolean.TRUE);
|
||||
List<DatasetTableField> fields = dataSetTableFieldsService.list(datasetTableField);
|
||||
@ -137,15 +139,13 @@ public class DataSetTableService {
|
||||
DatasourceRequest datasourceRequest = new DatasourceRequest();
|
||||
datasourceRequest.setDatasource(ds);
|
||||
String table = new Gson().fromJson(dataSetTableRequest.getInfo(), DataTableInfoDTO.class).getTable();
|
||||
// datasourceRequest.setTable(table);
|
||||
|
||||
DatasetTableField datasetTableField = new DatasetTableField();
|
||||
DatasetTableField datasetTableField = DatasetTableField.builder().build();
|
||||
datasetTableField.setTableId(dataSetTableRequest.getId());
|
||||
datasetTableField.setChecked(Boolean.TRUE);
|
||||
List<DatasetTableField> fields = dataSetTableFieldsService.list(datasetTableField);
|
||||
|
||||
String[] fieldArray = fields.stream().map(DatasetTableField::getOriginName).toArray(String[]::new);
|
||||
// datasourceRequest.setQuery("SELECT " + StringUtils.join(fieldArray, ",") + " FROM " + table + " LIMIT 0,10;");
|
||||
datasourceRequest.setQuery(createQuerySQL(ds.getType(), table, fieldArray) + " LIMIT 0,10");
|
||||
|
||||
List<String[]> data = new ArrayList<>();
|
||||
@ -154,17 +154,6 @@ public class DataSetTableService {
|
||||
} catch (Exception e) {
|
||||
}
|
||||
|
||||
|
||||
/*JSONArray jsonArray = new JSONArray();
|
||||
if (CollectionUtils.isNotEmpty(data)) {
|
||||
data.forEach(ele -> {
|
||||
JSONObject jsonObject = new JSONObject();
|
||||
for (int i = 0; i < ele.length; i++) {
|
||||
jsonObject.put(fieldArray[i], ele[i]);
|
||||
}
|
||||
jsonArray.add(jsonObject);
|
||||
});
|
||||
}*/
|
||||
List<Map<String, Object>> jsonArray = new ArrayList<>();
|
||||
if (CollectionUtils.isNotEmpty(data)) {
|
||||
jsonArray = data.stream().map(ele -> {
|
||||
@ -184,6 +173,53 @@ public class DataSetTableService {
|
||||
return map;
|
||||
}
|
||||
|
||||
public List<String[]> getDataSetData(String datasourceId, String table, List<DatasetTableField> fields){
|
||||
List<String[]> data = new ArrayList<>();
|
||||
Datasource ds = datasourceMapper.selectByPrimaryKey(datasourceId);
|
||||
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(ds.getType());
|
||||
DatasourceRequest datasourceRequest = new DatasourceRequest();
|
||||
datasourceRequest.setDatasource(ds);
|
||||
String[] fieldArray = fields.stream().map(DatasetTableField::getOriginName).toArray(String[]::new);
|
||||
datasourceRequest.setQuery(createQuerySQL(ds.getType(), table, fieldArray) + " LIMIT 0, 10");
|
||||
try {
|
||||
data.addAll(datasourceProvider.getData(datasourceRequest));
|
||||
} catch (Exception e) {
|
||||
}
|
||||
return data;
|
||||
}
|
||||
|
||||
public Long getDataSetTotalData(String datasourceId, String table){
|
||||
List<String[]> data = new ArrayList<>();
|
||||
Datasource ds = datasourceMapper.selectByPrimaryKey(datasourceId);
|
||||
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(ds.getType());
|
||||
DatasourceRequest datasourceRequest = new DatasourceRequest();
|
||||
datasourceRequest.setDatasource(ds);
|
||||
datasourceRequest.setQuery("select count(*) from " + table);
|
||||
try {
|
||||
return datasourceProvider.count(datasourceRequest);
|
||||
} catch (Exception e) {
|
||||
|
||||
}
|
||||
return 0l;
|
||||
}
|
||||
|
||||
public List<String[]> getDataSetPageData(String datasourceId, String table, List<DatasetTableField> fields, Long startPage, Long pageSize){
|
||||
List<String[]> data = new ArrayList<>();
|
||||
Datasource ds = datasourceMapper.selectByPrimaryKey(datasourceId);
|
||||
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(ds.getType());
|
||||
DatasourceRequest datasourceRequest = new DatasourceRequest();
|
||||
datasourceRequest.setDatasource(ds);
|
||||
String[] fieldArray = fields.stream().map(DatasetTableField::getOriginName).toArray(String[]::new);
|
||||
datasourceRequest.setPageSize(pageSize);
|
||||
datasourceRequest.setStartPage(startPage);
|
||||
datasourceRequest.setQuery(createQuerySQL(ds.getType(), table, fieldArray));
|
||||
try {
|
||||
return datasourceProvider.getData(datasourceRequest);
|
||||
} catch (Exception e) {
|
||||
}
|
||||
return data;
|
||||
}
|
||||
|
||||
public void saveTableField(DatasetTable datasetTable) throws Exception {
|
||||
Datasource ds = datasourceMapper.selectByPrimaryKey(datasetTable.getDataSourceId());
|
||||
DataSetTableRequest dataSetTableRequest = new DataSetTableRequest();
|
||||
@ -193,7 +229,7 @@ public class DataSetTableService {
|
||||
if (CollectionUtils.isNotEmpty(fields)) {
|
||||
for (int i = 0; i < fields.size(); i++) {
|
||||
TableFiled filed = fields.get(i);
|
||||
DatasetTableField datasetTableField = new DatasetTableField();
|
||||
DatasetTableField datasetTableField = DatasetTableField.builder().build();
|
||||
datasetTableField.setTableId(datasetTable.getId());
|
||||
datasetTableField.setOriginName(filed.getFieldName());
|
||||
datasetTableField.setName(filed.getRemarks());
|
||||
|
@ -48,4 +48,11 @@ public class DataSetTableTaskLogService {
|
||||
return extDataSetTaskMapper.list(request);
|
||||
}
|
||||
|
||||
public void deleteByTaskId(String taskId){
|
||||
DatasetTableTaskLogExample datasetTableTaskLogExample = new DatasetTableTaskLogExample();
|
||||
DatasetTableTaskLogExample.Criteria criteria = datasetTableTaskLogExample.createCriteria();
|
||||
criteria.andTaskIdEqualTo(taskId);
|
||||
datasetTableTaskLogMapper.deleteByExample(datasetTableTaskLogExample);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -20,7 +20,8 @@ import java.util.UUID;
|
||||
public class DataSetTableTaskService {
|
||||
@Resource
|
||||
private DatasetTableTaskMapper datasetTableTaskMapper;
|
||||
|
||||
@Resource
|
||||
private DataSetTableTaskLogService dataSetTableTaskLogService;
|
||||
@Resource
|
||||
private ScheduleService scheduleService;
|
||||
|
||||
@ -46,6 +47,11 @@ public class DataSetTableTaskService {
|
||||
DatasetTableTask datasetTableTask = datasetTableTaskMapper.selectByPrimaryKey(id);
|
||||
datasetTableTaskMapper.deleteByPrimaryKey(id);
|
||||
scheduleService.deleteSchedule(datasetTableTask);
|
||||
dataSetTableTaskLogService.deleteByTaskId(id);
|
||||
}
|
||||
|
||||
public DatasetTableTask get(String id) {
|
||||
return datasetTableTaskMapper.selectByPrimaryKey(id);
|
||||
}
|
||||
|
||||
public List<DatasetTableTask> list(DatasetTableTask datasetTableTask) {
|
||||
|
@ -0,0 +1,93 @@
|
||||
package io.dataease.service.dataset;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import io.dataease.base.domain.DatasetTable;
|
||||
import io.dataease.base.domain.DatasetTableField;
|
||||
import io.dataease.base.domain.DatasetTableTaskLog;
|
||||
import io.dataease.commons.constants.JobStatus;
|
||||
import io.dataease.commons.utils.CommonBeanFactory;
|
||||
import io.dataease.dto.dataset.DataTableInfoDTO;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.*;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
@Service
|
||||
public class ExtractDataService {
|
||||
|
||||
@Resource
|
||||
private DataSetTableService dataSetTableService;
|
||||
@Resource
|
||||
private DataSetTableFieldsService dataSetTableFieldsService;
|
||||
@Resource
|
||||
private DataSetTableTaskLogService dataSetTableTaskLogService;
|
||||
private Long pageSize = 10000l;
|
||||
private static ExecutorService pool = Executors.newScheduledThreadPool(50); //设置连接池
|
||||
private Connection connection;
|
||||
|
||||
public void extractData(String datasetTableId, String taskId) {
|
||||
DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog();
|
||||
try {
|
||||
datasetTableTaskLog.setTableId(datasetTableId);
|
||||
datasetTableTaskLog.setTaskId(taskId);
|
||||
datasetTableTaskLog.setStatus(JobStatus.Underway.name());
|
||||
datasetTableTaskLog.setStartTime(System.currentTimeMillis());
|
||||
dataSetTableTaskLogService.save(datasetTableTaskLog);
|
||||
Admin admin = getConnection().getAdmin();
|
||||
DatasetTable datasetTable = dataSetTableService.get(datasetTableId);
|
||||
List<DatasetTableField> datasetTableFields = dataSetTableFieldsService.list(DatasetTableField.builder().tableId(datasetTable.getId()).build());
|
||||
String table = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getTable();
|
||||
TableName tableName = TableName.valueOf(table + "-" + datasetTable.getDataSourceId());
|
||||
if(!admin.tableExists(tableName)){
|
||||
TableDescriptorBuilder descBuilder = TableDescriptorBuilder.newBuilder(tableName);
|
||||
ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of("cf");
|
||||
descBuilder.setColumnFamily(hcd);
|
||||
TableDescriptor desc = descBuilder.build();
|
||||
admin.createTable(desc);
|
||||
}
|
||||
admin.disableTable(tableName);
|
||||
admin.truncateTable(tableName, true);
|
||||
|
||||
Table tab = getConnection().getTable(tableName);
|
||||
Long total = dataSetTableService.getDataSetTotalData(datasetTable.getDataSourceId(), table);
|
||||
Long pageCount = total % pageSize == 0 ? total / pageSize : (total / pageSize) + 1;
|
||||
|
||||
for (Long pageIndex = 1l; pageIndex <= pageCount; pageIndex++) {
|
||||
List<String[]> data = dataSetTableService.getDataSetPageData(datasetTable.getDataSourceId(), table, datasetTableFields, pageIndex, pageSize);
|
||||
for (String[] d : data) {
|
||||
for(int i=0;i<datasetTableFields.size();i++){
|
||||
Put put = new Put(UUID.randomUUID().toString().getBytes());
|
||||
String value = d[i];
|
||||
if(value == null){
|
||||
value = "null";
|
||||
}
|
||||
put.addColumn("cf".getBytes(), datasetTableFields.get(i).getOriginName().getBytes(), value.getBytes());
|
||||
tab.put(put);
|
||||
}
|
||||
}
|
||||
}
|
||||
datasetTableTaskLog.setStatus(JobStatus.Completed.name());
|
||||
datasetTableTaskLog.setEndTime(System.currentTimeMillis());
|
||||
dataSetTableTaskLogService.save(datasetTableTaskLog);
|
||||
}catch (Exception e){
|
||||
datasetTableTaskLog.setStatus(JobStatus.Error.name());
|
||||
datasetTableTaskLog.setEndTime(System.currentTimeMillis());
|
||||
dataSetTableTaskLogService.save(datasetTableTaskLog);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private synchronized Connection getConnection() throws Exception{
|
||||
if(connection == null || connection.isClosed()){
|
||||
Configuration cfg = CommonBeanFactory.getBean(Configuration.class);
|
||||
connection = ConnectionFactory.createConnection(cfg, pool);
|
||||
}
|
||||
return connection;
|
||||
}
|
||||
}
|
4
pom.xml
4
pom.xml
@ -10,14 +10,14 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-parent</artifactId>
|
||||
<!--<version>2.2.6.RELEASE</version>-->
|
||||
<version>2.4.3</version>
|
||||
<relativePath/> <!-- lookup parent from repository -->
|
||||
<relativePath/>
|
||||
</parent>
|
||||
|
||||
<name>dataease</name>
|
||||
<modules>
|
||||
<module>backend</module>
|
||||
<module>frontend</module>
|
||||
</modules>
|
||||
|
||||
</project>
|
||||
|
Loading…
Reference in New Issue
Block a user