feat: 抽取数据到hbase

This commit is contained in:
taojinlong 2021-03-10 18:13:27 +08:00
parent 27ea12ed88
commit c72d79999d
15 changed files with 198 additions and 80 deletions

View File

@ -303,17 +303,6 @@
<artifactId>reflections8</artifactId>
<version>0.11.7</version>
</dependency>
<!-- k8s client -->
<!--<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
<version>4.13.0</version>
</dependency>
<dependency>
<groupId>com.github.fge</groupId>
<artifactId>json-schema-validator</artifactId>
<version>2.2.6</version>
</dependency>-->
<!--开启 cache 缓存 -->
<dependency>
<groupId>org.springframework.boot</groupId>
@ -325,6 +314,40 @@
<artifactId>ehcache</artifactId>
<version>2.9.1</version>
</dependency>
<!-- hbase -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>2.4.1</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.8</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -1,9 +1,12 @@
package io.dataease.base.domain;
import java.io.Serializable;
import lombok.Builder;
import lombok.Data;
@Data
@Builder
public class DatasetTableField implements Serializable {
private String id;
@ -24,4 +27,4 @@ public class DatasetTableField implements Serializable {
private Integer deType;
private static final long serialVersionUID = 1L;
}
}

View File

@ -0,0 +1,5 @@
package io.dataease.commons.constants;
public enum JobStatus {
Prepare, Underway, Completed, Error
}

View File

@ -1,5 +0,0 @@
package io.dataease.commons.constants;
public enum TestPlanStatus {
Prepare, Underway, Completed
}

View File

@ -19,7 +19,7 @@ public class DataSetTableFieldController {
@PostMapping("list/{tableId}")
public List<DatasetTableField> list(@PathVariable String tableId) {
DatasetTableField datasetTableField = new DatasetTableField();
DatasetTableField datasetTableField = DatasetTableField.builder().build();
datasetTableField.setTableId(tableId);
return dataSetTableFieldsService.list(datasetTableField);
}

View File

@ -23,4 +23,8 @@ public abstract class DatasourceProvider {
getData(datasourceRequest);
}
abstract public Long count(DatasourceRequest datasourceRequest)throws Exception;
abstract public List<String[]> getPageData(DatasourceRequest datasourceRequest) throws Exception;
}

View File

@ -9,6 +9,7 @@ import io.dataease.datasource.request.DatasourceRequest;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import java.sql.*;
import java.text.MessageFormat;
import java.util.*;
@Service("jdbc")
@ -23,23 +24,7 @@ public class JdbcProvider extends DatasourceProvider{
Statement stat = connection.createStatement();
ResultSet rs = stat.executeQuery(datasourceRequest.getQuery())
) {
ResultSetMetaData metaData = rs.getMetaData();
int columnCount = metaData.getColumnCount();
while (rs.next()) {
String[] row = new String[columnCount];
for (int j = 0; j < columnCount; j++) {
int columType = metaData.getColumnType(j + 1);
switch (columType) {
case java.sql.Types.DATE:
row[j] = rs.getDate(j + 1).toString();
break;
default:
row[j] = rs.getString(j + 1);
break;
}
}
list.add(row);
}
list = fetchResult(rs);
} catch (SQLException e){
throw new Exception("ERROR:" + e.getMessage(), e);
}catch (Exception e) {
@ -48,6 +33,46 @@ public class JdbcProvider extends DatasourceProvider{
return list;
}
@Override
public List<String[]> getPageData(DatasourceRequest datasourceRequest) throws Exception {
List<String[]> list = new LinkedList<>();
try (
Connection connection = getConnection(datasourceRequest);
Statement stat = connection.createStatement();
ResultSet rs = stat.executeQuery(datasourceRequest.getQuery() + MessageFormat.format(" LIMIT {0}, {1}", (datasourceRequest.getStartPage() -1)*datasourceRequest.getPageSize(), datasourceRequest.getPageSize()))
) {
list = fetchResult(rs);
} catch (SQLException e){
throw new Exception("ERROR:" + e.getMessage(), e);
}catch (Exception e) {
throw new Exception("ERROR:" + e.getMessage(), e);
}
return list;
}
private List<String[]> fetchResult( ResultSet rs) throws Exception{
List<String[]> list = new LinkedList<>();
ResultSetMetaData metaData = rs.getMetaData();
int columnCount = metaData.getColumnCount();
while (rs.next()) {
String[] row = new String[columnCount];
for (int j = 0; j < columnCount; j++) {
int columType = metaData.getColumnType(j + 1);
switch (columType) {
case java.sql.Types.DATE:
row[j] = rs.getDate(j + 1).toString();
break;
default:
row[j] = rs.getString(j + 1);
break;
}
}
list.add(row);
}
return list;
}
@Override
public List<String> getTables(DatasourceRequest datasourceRequest) throws Exception {
List<String> tables = new ArrayList<>();
@ -106,6 +131,19 @@ public class JdbcProvider extends DatasourceProvider{
}
}
public Long count(DatasourceRequest datasourceRequest)throws Exception{
try (Connection con = getConnection(datasourceRequest); Statement ps = con.createStatement()) {
ResultSet resultSet = ps.executeQuery(datasourceRequest.getQuery());
while (resultSet.next()) {
return resultSet.getLong(1);
}
} catch (Exception e) {
throw new Exception("ERROR: " + e.getMessage(), e);
}
return 0L;
}
private Connection getConnection(DatasourceRequest datasourceRequest) throws Exception {
String username = null;
String password = null;

View File

@ -11,5 +11,8 @@ public class DatasourceRequest {
protected String query;
protected String table;
protected Datasource datasource;
private Long pageSize;
private Long startPage;
}

View File

@ -1,22 +1,25 @@
package io.dataease.job.sechedule;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import io.dataease.commons.utils.LogUtil;
import org.quartz.*;
public abstract class DeScheduleJob implements Job {
protected String datasetTableId;
protected String expression;
protected String taskId;
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
JobKey jobKey = context.getTrigger().getJobKey();
JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
this.datasetTableId = jobDataMap.getString("datasetTableId");
this.expression = jobDataMap.getString("expression");
this.taskId = jobDataMap.getString("taskId");
// JobKey jobKey = context.getTrigger().getJobKey();
// JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
// this.resourceId = jobDataMap.getString("resourceId");
// this.userId = jobDataMap.getString("userId");
// this.expression = jobDataMap.getString("expression");
//
// LogUtil.info(jobKey.getGroup() + " Running: " + resourceId);
// LogUtil.info("CronExpression: " + expression);
LogUtil.info(jobKey.getGroup() + " Running: " + datasetTableId);
LogUtil.info(jobKey.getName() + " Running: " + datasetTableId);
LogUtil.info("CronExpression: " + expression);
businessExecute(context);
}

View File

@ -369,11 +369,11 @@ public class ScheduleManager {
addOrUpdateCronJob(jobKey, triggerKey, jobClass, cron, startTime, endTime, null);
}
public JobDataMap getDefaultJobDataMap(String resourceId, String expression, String userId) {
public JobDataMap getDefaultJobDataMap(String resourceId, String expression, String taskId) {
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put("resourceId", resourceId);
jobDataMap.put("datasetTableId", resourceId);
jobDataMap.put("taskId", taskId);
jobDataMap.put("expression", expression);
jobDataMap.put("userId", userId);
return jobDataMap;
}

View File

@ -20,13 +20,7 @@ public class AppStartListener implements ApplicationListener<ApplicationReadyEve
@Override
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
System.out.println("================= 应用启动 =================");
/* cron schedule */
// scheduleManager.addCronJob(new JobKey("abc", "def"), new TriggerKey("abc", "def"), TestJob.class, "*/10 * * * * ?");
/* single schedule*/
// long timestamp = System.currentTimeMillis() + 90 * 1000;
// Date date = new Date(timestamp);
// scheduleManager.addSingleJob(new JobKey("abc", "def"), new TriggerKey("abc", "def"), TestJob.class, date);
System.out.println("================= Application start =================");
// 项目启动从数据库读取任务加入到Quartz
List<DatasetTableTask> list = dataSetTableTaskService.list(new DatasetTableTask());
for (DatasetTableTask task : list) {

View File

@ -1,8 +1,8 @@
package io.dataease.service;
import io.dataease.base.domain.DatasetTableTask;
import io.dataease.job.sechedule.ExtractDataJob;
import io.dataease.job.sechedule.ScheduleManager;
import io.dataease.job.sechedule.TestJob;
import org.apache.commons.lang3.StringUtils;
import org.quartz.JobKey;
import org.quartz.TriggerKey;
@ -24,8 +24,8 @@ public class ScheduleService {
if (StringUtils.equalsIgnoreCase(datasetTableTask.getRate(), "0")) {
scheduleManager.addOrUpdateSingleJob(new JobKey(datasetTableTask.getId(), datasetTableTask.getTableId()),
new TriggerKey(datasetTableTask.getId(), datasetTableTask.getTableId()),
TestJob.class,//TODO
new Date(datasetTableTask.getStartTime()));
ExtractDataJob.class,
new Date(datasetTableTask.getStartTime()), scheduleManager.getDefaultJobDataMap(datasetTableTask.getTableId(), datasetTableTask.getCron(), datasetTableTask.getId()));
} else if (StringUtils.equalsIgnoreCase(datasetTableTask.getRate(), "1")) {
Date endTime;
if (datasetTableTask.getEndTime() == null || datasetTableTask.getEndTime() == 0) {
@ -36,8 +36,9 @@ public class ScheduleService {
scheduleManager.addOrUpdateCronJob(new JobKey(datasetTableTask.getId(), datasetTableTask.getTableId()),
new TriggerKey(datasetTableTask.getId(), datasetTableTask.getTableId()),
TestJob.class,// TODO
datasetTableTask.getCron(), new Date(datasetTableTask.getStartTime()), endTime);
ExtractDataJob.class,
datasetTableTask.getCron(), new Date(datasetTableTask.getStartTime()), endTime,
scheduleManager.getDefaultJobDataMap(datasetTableTask.getTableId(), datasetTableTask.getCron(), datasetTableTask.getId()));
}
}

View File

@ -71,7 +71,9 @@ public class DataSetTableService {
public List<DatasetTable> list(DataSetTableRequest dataSetTableRequest) {
DatasetTableExample datasetTableExample = new DatasetTableExample();
datasetTableExample.createCriteria().andSceneIdEqualTo(dataSetTableRequest.getSceneId());
if(StringUtils.isNotEmpty(dataSetTableRequest.getSceneId())){
datasetTableExample.createCriteria().andSceneIdEqualTo(dataSetTableRequest.getSceneId());
}
if (StringUtils.isNotEmpty(dataSetTableRequest.getSort())) {
datasetTableExample.setOrderByClause(dataSetTableRequest.getSort());
}
@ -92,7 +94,7 @@ public class DataSetTableService {
}
public Map<String, List<DatasetTableField>> getFieldsFromDE(DataSetTableRequest dataSetTableRequest) throws Exception {
DatasetTableField datasetTableField = new DatasetTableField();
DatasetTableField datasetTableField = DatasetTableField.builder().build();
datasetTableField.setTableId(dataSetTableRequest.getId());
datasetTableField.setChecked(Boolean.TRUE);
List<DatasetTableField> fields = dataSetTableFieldsService.list(datasetTableField);
@ -121,7 +123,7 @@ public class DataSetTableService {
datasourceRequest.setDatasource(ds);
String table = new Gson().fromJson(dataSetTableRequest.getInfo(), DataTableInfoDTO.class).getTable();
DatasetTableField datasetTableField = new DatasetTableField();
DatasetTableField datasetTableField = DatasetTableField.builder().build();
datasetTableField.setTableId(dataSetTableRequest.getId());
datasetTableField.setChecked(Boolean.TRUE);
List<DatasetTableField> fields = dataSetTableFieldsService.list(datasetTableField);
@ -137,15 +139,13 @@ public class DataSetTableService {
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(ds);
String table = new Gson().fromJson(dataSetTableRequest.getInfo(), DataTableInfoDTO.class).getTable();
// datasourceRequest.setTable(table);
DatasetTableField datasetTableField = new DatasetTableField();
DatasetTableField datasetTableField = DatasetTableField.builder().build();
datasetTableField.setTableId(dataSetTableRequest.getId());
datasetTableField.setChecked(Boolean.TRUE);
List<DatasetTableField> fields = dataSetTableFieldsService.list(datasetTableField);
String[] fieldArray = fields.stream().map(DatasetTableField::getOriginName).toArray(String[]::new);
// datasourceRequest.setQuery("SELECT " + StringUtils.join(fieldArray, ",") + " FROM " + table + " LIMIT 0,10;");
datasourceRequest.setQuery(createQuerySQL(ds.getType(), table, fieldArray) + " LIMIT 0,10");
List<String[]> data = new ArrayList<>();
@ -154,17 +154,6 @@ public class DataSetTableService {
} catch (Exception e) {
}
/*JSONArray jsonArray = new JSONArray();
if (CollectionUtils.isNotEmpty(data)) {
data.forEach(ele -> {
JSONObject jsonObject = new JSONObject();
for (int i = 0; i < ele.length; i++) {
jsonObject.put(fieldArray[i], ele[i]);
}
jsonArray.add(jsonObject);
});
}*/
List<Map<String, Object>> jsonArray = new ArrayList<>();
if (CollectionUtils.isNotEmpty(data)) {
jsonArray = data.stream().map(ele -> {
@ -184,6 +173,53 @@ public class DataSetTableService {
return map;
}
public List<String[]> getDataSetData(String datasourceId, String table, List<DatasetTableField> fields){
List<String[]> data = new ArrayList<>();
Datasource ds = datasourceMapper.selectByPrimaryKey(datasourceId);
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(ds.getType());
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(ds);
String[] fieldArray = fields.stream().map(DatasetTableField::getOriginName).toArray(String[]::new);
datasourceRequest.setQuery(createQuerySQL(ds.getType(), table, fieldArray) + " LIMIT 0, 10");
try {
data.addAll(datasourceProvider.getData(datasourceRequest));
} catch (Exception e) {
}
return data;
}
public Long getDataSetTotalData(String datasourceId, String table){
List<String[]> data = new ArrayList<>();
Datasource ds = datasourceMapper.selectByPrimaryKey(datasourceId);
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(ds.getType());
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(ds);
datasourceRequest.setQuery("select count(*) from " + table);
try {
return datasourceProvider.count(datasourceRequest);
} catch (Exception e) {
}
return 0l;
}
public List<String[]> getDataSetPageData(String datasourceId, String table, List<DatasetTableField> fields, Long startPage, Long pageSize){
List<String[]> data = new ArrayList<>();
Datasource ds = datasourceMapper.selectByPrimaryKey(datasourceId);
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(ds.getType());
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(ds);
String[] fieldArray = fields.stream().map(DatasetTableField::getOriginName).toArray(String[]::new);
datasourceRequest.setPageSize(pageSize);
datasourceRequest.setStartPage(startPage);
datasourceRequest.setQuery(createQuerySQL(ds.getType(), table, fieldArray));
try {
return datasourceProvider.getData(datasourceRequest);
} catch (Exception e) {
}
return data;
}
public void saveTableField(DatasetTable datasetTable) throws Exception {
Datasource ds = datasourceMapper.selectByPrimaryKey(datasetTable.getDataSourceId());
DataSetTableRequest dataSetTableRequest = new DataSetTableRequest();
@ -193,7 +229,7 @@ public class DataSetTableService {
if (CollectionUtils.isNotEmpty(fields)) {
for (int i = 0; i < fields.size(); i++) {
TableFiled filed = fields.get(i);
DatasetTableField datasetTableField = new DatasetTableField();
DatasetTableField datasetTableField = DatasetTableField.builder().build();
datasetTableField.setTableId(datasetTable.getId());
datasetTableField.setOriginName(filed.getFieldName());
datasetTableField.setName(filed.getRemarks());

View File

@ -48,4 +48,11 @@ public class DataSetTableTaskLogService {
return extDataSetTaskMapper.list(request);
}
public void deleteByTaskId(String taskId){
DatasetTableTaskLogExample datasetTableTaskLogExample = new DatasetTableTaskLogExample();
DatasetTableTaskLogExample.Criteria criteria = datasetTableTaskLogExample.createCriteria();
criteria.andTaskIdEqualTo(taskId);
datasetTableTaskLogMapper.deleteByExample(datasetTableTaskLogExample);
}
}

View File

@ -20,7 +20,8 @@ import java.util.UUID;
public class DataSetTableTaskService {
@Resource
private DatasetTableTaskMapper datasetTableTaskMapper;
@Resource
private DataSetTableTaskLogService dataSetTableTaskLogService;
@Resource
private ScheduleService scheduleService;
@ -46,6 +47,11 @@ public class DataSetTableTaskService {
DatasetTableTask datasetTableTask = datasetTableTaskMapper.selectByPrimaryKey(id);
datasetTableTaskMapper.deleteByPrimaryKey(id);
scheduleService.deleteSchedule(datasetTableTask);
dataSetTableTaskLogService.deleteByTaskId(id);
}
public DatasetTableTask get(String id) {
return datasetTableTaskMapper.selectByPrimaryKey(id);
}
public List<DatasetTableTask> list(DatasetTableTask datasetTableTask) {