forked from github/dataease
fix: 合并
This commit is contained in:
commit
aae5fc59e0
@ -0,0 +1,6 @@
|
||||
package io.dataease.commons.constants;
|
||||
|
||||
public class DatasetMode {
|
||||
public static final String EXTRACT = "1";
|
||||
public static final String DIRECT = "0";
|
||||
}
|
30
backend/src/main/java/io/dataease/config/HbaseConfig.java
Normal file
30
backend/src/main/java/io/dataease/config/HbaseConfig.java
Normal file
@ -0,0 +1,30 @@
|
||||
package io.dataease.config;
|
||||
|
||||
import com.fit2cloud.autoconfigure.QuartzAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.DependsOn;
|
||||
import org.springframework.core.env.Environment;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
@Configuration
|
||||
@AutoConfigureBefore(QuartzAutoConfiguration.class)
|
||||
public class HbaseConfig {
|
||||
|
||||
@Resource
|
||||
private Environment env; // 保存了配置文件的信息
|
||||
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
public org.apache.hadoop.conf.Configuration configuration(){
|
||||
org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
|
||||
configuration.set("hbase.zookeeper.quorum", env.getProperty("hbase.zookeeper.quorum"));
|
||||
configuration.set("hbase.zookeeper.property.clientPort", env.getProperty("hbase.zookeeper.property.clientPort"));
|
||||
configuration.set("hbase.client.retries.number", env.getProperty("hbase.client.retries.number", "1"));
|
||||
return configuration;
|
||||
}
|
||||
}
|
@ -0,0 +1,22 @@
|
||||
package io.dataease.job.sechedule;
|
||||
|
||||
import io.dataease.commons.utils.CommonBeanFactory;
|
||||
import io.dataease.service.dataset.ExtractDataService;
|
||||
import org.quartz.JobExecutionContext;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class ExtractDataJob extends DeScheduleJob{
|
||||
private ExtractDataService extractDataService;
|
||||
|
||||
public ExtractDataJob() {
|
||||
extractDataService = (ExtractDataService) CommonBeanFactory.getBean(ExtractDataService.class);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
void businessExecute(JobExecutionContext context) {
|
||||
extractDataService.extractData(datasetTableId, taskId);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,93 @@
|
||||
package io.dataease.service.dataset;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import io.dataease.base.domain.DatasetTable;
|
||||
import io.dataease.base.domain.DatasetTableField;
|
||||
import io.dataease.base.domain.DatasetTableTaskLog;
|
||||
import io.dataease.commons.constants.JobStatus;
|
||||
import io.dataease.commons.utils.CommonBeanFactory;
|
||||
import io.dataease.dto.dataset.DataTableInfoDTO;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.*;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
@Service
|
||||
public class ExtractDataService {
|
||||
|
||||
@Resource
|
||||
private DataSetTableService dataSetTableService;
|
||||
@Resource
|
||||
private DataSetTableFieldsService dataSetTableFieldsService;
|
||||
@Resource
|
||||
private DataSetTableTaskLogService dataSetTableTaskLogService;
|
||||
private Long pageSize = 10000l;
|
||||
private static ExecutorService pool = Executors.newScheduledThreadPool(50); //设置连接池
|
||||
private Connection connection;
|
||||
|
||||
public void extractData(String datasetTableId, String taskId) {
|
||||
DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog();
|
||||
try {
|
||||
datasetTableTaskLog.setTableId(datasetTableId);
|
||||
datasetTableTaskLog.setTaskId(taskId);
|
||||
datasetTableTaskLog.setStatus(JobStatus.Underway.name());
|
||||
datasetTableTaskLog.setStartTime(System.currentTimeMillis());
|
||||
dataSetTableTaskLogService.save(datasetTableTaskLog);
|
||||
Admin admin = getConnection().getAdmin();
|
||||
DatasetTable datasetTable = dataSetTableService.get(datasetTableId);
|
||||
List<DatasetTableField> datasetTableFields = dataSetTableFieldsService.list(DatasetTableField.builder().tableId(datasetTable.getId()).build());
|
||||
String table = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getTable();
|
||||
TableName tableName = TableName.valueOf(table + "-" + datasetTable.getDataSourceId());
|
||||
if(!admin.tableExists(tableName)){
|
||||
TableDescriptorBuilder descBuilder = TableDescriptorBuilder.newBuilder(tableName);
|
||||
ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of("cf");
|
||||
descBuilder.setColumnFamily(hcd);
|
||||
TableDescriptor desc = descBuilder.build();
|
||||
admin.createTable(desc);
|
||||
}
|
||||
admin.disableTable(tableName);
|
||||
admin.truncateTable(tableName, true);
|
||||
|
||||
Table tab = getConnection().getTable(tableName);
|
||||
Long total = dataSetTableService.getDataSetTotalData(datasetTable.getDataSourceId(), table);
|
||||
Long pageCount = total % pageSize == 0 ? total / pageSize : (total / pageSize) + 1;
|
||||
|
||||
for (Long pageIndex = 1l; pageIndex <= pageCount; pageIndex++) {
|
||||
List<String[]> data = dataSetTableService.getDataSetPageData(datasetTable.getDataSourceId(), table, datasetTableFields, pageIndex, pageSize);
|
||||
for (String[] d : data) {
|
||||
for(int i=0;i<datasetTableFields.size();i++){
|
||||
Put put = new Put(UUID.randomUUID().toString().getBytes());
|
||||
String value = d[i];
|
||||
if(value == null){
|
||||
value = "null";
|
||||
}
|
||||
put.addColumn("cf".getBytes(), datasetTableFields.get(i).getOriginName().getBytes(), value.getBytes());
|
||||
tab.put(put);
|
||||
}
|
||||
}
|
||||
}
|
||||
datasetTableTaskLog.setStatus(JobStatus.Completed.name());
|
||||
datasetTableTaskLog.setEndTime(System.currentTimeMillis());
|
||||
dataSetTableTaskLogService.save(datasetTableTaskLog);
|
||||
}catch (Exception e){
|
||||
datasetTableTaskLog.setStatus(JobStatus.Error.name());
|
||||
datasetTableTaskLog.setEndTime(System.currentTimeMillis());
|
||||
dataSetTableTaskLogService.save(datasetTableTaskLog);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private synchronized Connection getConnection() throws Exception{
|
||||
if(connection == null || connection.isClosed()){
|
||||
Configuration cfg = CommonBeanFactory.getBean(Configuration.class);
|
||||
connection = ConnectionFactory.createConnection(cfg, pool);
|
||||
}
|
||||
return connection;
|
||||
}
|
||||
}
|
4
pom.xml
4
pom.xml
@ -10,14 +10,14 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-parent</artifactId>
|
||||
<!--<version>2.2.6.RELEASE</version>-->
|
||||
<version>2.4.3</version>
|
||||
<relativePath/> <!-- lookup parent from repository -->
|
||||
<relativePath/>
|
||||
</parent>
|
||||
|
||||
<name>dataease</name>
|
||||
<modules>
|
||||
<module>backend</module>
|
||||
<module>frontend</module>
|
||||
</modules>
|
||||
|
||||
</project>
|
||||
|
Loading…
Reference in New Issue
Block a user