Merge branch 'main' of fit2cloud:dataease/dataease into main

This commit is contained in:
fit2cloud-chenyw 2021-06-16 18:04:17 +08:00
commit 6bce1414c6
7 changed files with 143 additions and 45 deletions

View File

@ -1,4 +1,4 @@
FROM registry.fit2cloud.com/fit2cloud3/fabric8-java-alpine-openjdk8-jre
FROM registry.cn-qingdao.aliyuncs.com/fit2cloud3/fabric8-java-alpine-openjdk8-jre
ARG IMAGE_TAG

View File

@ -25,6 +25,8 @@ public class DatasetTable implements Serializable {
private String syncStatus;
private Long lastUpdateTime;
private String info;
private static final long serialVersionUID = 1L;

View File

@ -783,6 +783,66 @@ public class DatasetTableExample {
addCriterion("sync_status not between", value1, value2, "syncStatus");
return (Criteria) this;
}
public Criteria andLastUpdateTimeIsNull() {
addCriterion("last_update_time is null");
return (Criteria) this;
}
public Criteria andLastUpdateTimeIsNotNull() {
addCriterion("last_update_time is not null");
return (Criteria) this;
}
public Criteria andLastUpdateTimeEqualTo(Long value) {
addCriterion("last_update_time =", value, "lastUpdateTime");
return (Criteria) this;
}
public Criteria andLastUpdateTimeNotEqualTo(Long value) {
addCriterion("last_update_time <>", value, "lastUpdateTime");
return (Criteria) this;
}
public Criteria andLastUpdateTimeGreaterThan(Long value) {
addCriterion("last_update_time >", value, "lastUpdateTime");
return (Criteria) this;
}
public Criteria andLastUpdateTimeGreaterThanOrEqualTo(Long value) {
addCriterion("last_update_time >=", value, "lastUpdateTime");
return (Criteria) this;
}
public Criteria andLastUpdateTimeLessThan(Long value) {
addCriterion("last_update_time <", value, "lastUpdateTime");
return (Criteria) this;
}
public Criteria andLastUpdateTimeLessThanOrEqualTo(Long value) {
addCriterion("last_update_time <=", value, "lastUpdateTime");
return (Criteria) this;
}
public Criteria andLastUpdateTimeIn(List<Long> values) {
addCriterion("last_update_time in", values, "lastUpdateTime");
return (Criteria) this;
}
public Criteria andLastUpdateTimeNotIn(List<Long> values) {
addCriterion("last_update_time not in", values, "lastUpdateTime");
return (Criteria) this;
}
public Criteria andLastUpdateTimeBetween(Long value1, Long value2) {
addCriterion("last_update_time between", value1, value2, "lastUpdateTime");
return (Criteria) this;
}
public Criteria andLastUpdateTimeNotBetween(Long value1, Long value2) {
addCriterion("last_update_time not between", value1, value2, "lastUpdateTime");
return (Criteria) this;
}
}
public static class Criteria extends GeneratedCriteria {

View File

@ -12,6 +12,7 @@
<result column="create_time" jdbcType="BIGINT" property="createTime" />
<result column="qrtz_instance" jdbcType="VARCHAR" property="qrtzInstance" />
<result column="sync_status" jdbcType="VARCHAR" property="syncStatus" />
<result column="last_update_time" jdbcType="BIGINT" property="lastUpdateTime" />
</resultMap>
<resultMap extends="BaseResultMap" id="ResultMapWithBLOBs" type="io.dataease.base.domain.DatasetTable">
<result column="info" jdbcType="LONGVARCHAR" property="info" />
@ -76,7 +77,7 @@
</sql>
<sql id="Base_Column_List">
id, `name`, scene_id, data_source_id, `type`, `mode`, create_by, create_time, qrtz_instance,
sync_status
sync_status, last_update_time
</sql>
<sql id="Blob_Column_List">
info
@ -133,11 +134,13 @@
insert into dataset_table (id, `name`, scene_id,
data_source_id, `type`, `mode`,
create_by, create_time, qrtz_instance,
sync_status, info)
sync_status, last_update_time, info
)
values (#{id,jdbcType=VARCHAR}, #{name,jdbcType=VARCHAR}, #{sceneId,jdbcType=VARCHAR},
#{dataSourceId,jdbcType=VARCHAR}, #{type,jdbcType=VARCHAR}, #{mode,jdbcType=INTEGER},
#{createBy,jdbcType=VARCHAR}, #{createTime,jdbcType=BIGINT}, #{qrtzInstance,jdbcType=VARCHAR},
#{syncStatus,jdbcType=VARCHAR}, #{info,jdbcType=LONGVARCHAR})
#{syncStatus,jdbcType=VARCHAR}, #{lastUpdateTime,jdbcType=BIGINT}, #{info,jdbcType=LONGVARCHAR}
)
</insert>
<insert id="insertSelective" parameterType="io.dataease.base.domain.DatasetTable">
insert into dataset_table
@ -172,6 +175,9 @@
<if test="syncStatus != null">
sync_status,
</if>
<if test="lastUpdateTime != null">
last_update_time,
</if>
<if test="info != null">
info,
</if>
@ -207,6 +213,9 @@
<if test="syncStatus != null">
#{syncStatus,jdbcType=VARCHAR},
</if>
<if test="lastUpdateTime != null">
#{lastUpdateTime,jdbcType=BIGINT},
</if>
<if test="info != null">
#{info,jdbcType=LONGVARCHAR},
</if>
@ -251,6 +260,9 @@
<if test="record.syncStatus != null">
sync_status = #{record.syncStatus,jdbcType=VARCHAR},
</if>
<if test="record.lastUpdateTime != null">
last_update_time = #{record.lastUpdateTime,jdbcType=BIGINT},
</if>
<if test="record.info != null">
info = #{record.info,jdbcType=LONGVARCHAR},
</if>
@ -271,6 +283,7 @@
create_time = #{record.createTime,jdbcType=BIGINT},
qrtz_instance = #{record.qrtzInstance,jdbcType=VARCHAR},
sync_status = #{record.syncStatus,jdbcType=VARCHAR},
last_update_time = #{record.lastUpdateTime,jdbcType=BIGINT},
info = #{record.info,jdbcType=LONGVARCHAR}
<if test="_parameter != null">
<include refid="Update_By_Example_Where_Clause" />
@ -287,7 +300,8 @@
create_by = #{record.createBy,jdbcType=VARCHAR},
create_time = #{record.createTime,jdbcType=BIGINT},
qrtz_instance = #{record.qrtzInstance,jdbcType=VARCHAR},
sync_status = #{record.syncStatus,jdbcType=VARCHAR}
sync_status = #{record.syncStatus,jdbcType=VARCHAR},
last_update_time = #{record.lastUpdateTime,jdbcType=BIGINT}
<if test="_parameter != null">
<include refid="Update_By_Example_Where_Clause" />
</if>
@ -322,6 +336,9 @@
<if test="syncStatus != null">
sync_status = #{syncStatus,jdbcType=VARCHAR},
</if>
<if test="lastUpdateTime != null">
last_update_time = #{lastUpdateTime,jdbcType=BIGINT},
</if>
<if test="info != null">
info = #{info,jdbcType=LONGVARCHAR},
</if>
@ -339,6 +356,7 @@
create_time = #{createTime,jdbcType=BIGINT},
qrtz_instance = #{qrtzInstance,jdbcType=VARCHAR},
sync_status = #{syncStatus,jdbcType=VARCHAR},
last_update_time = #{lastUpdateTime,jdbcType=BIGINT},
info = #{info,jdbcType=LONGVARCHAR}
where id = #{id,jdbcType=VARCHAR}
</update>
@ -352,7 +370,8 @@
create_by = #{createBy,jdbcType=VARCHAR},
create_time = #{createTime,jdbcType=BIGINT},
qrtz_instance = #{qrtzInstance,jdbcType=VARCHAR},
sync_status = #{syncStatus,jdbcType=VARCHAR}
sync_status = #{syncStatus,jdbcType=VARCHAR},
last_update_time = #{lastUpdateTime,jdbcType=BIGINT}
where id = #{id,jdbcType=VARCHAR}
</update>
</mapper>

View File

@ -1055,6 +1055,11 @@ public class DataSetTableService {
DatasetTableTaskLogExample datasetTableTaskLogExample = new DatasetTableTaskLogExample();
datasetTableTaskLogExample.createCriteria().andStatusEqualTo(JobStatus.Underway.name()).andTableIdIn(jobStoppeddDatasetTables.stream().map(DatasetTable::getId).collect(Collectors.toList()));
datasetTableTaskLogMapper.updateByExampleSelective(datasetTableTaskLog, datasetTableTaskLogExample);
for (DatasetTable jobStoppeddDatasetTable : jobStoppeddDatasetTables) {
extractDataService.deleteFile("all_scope", jobStoppeddDatasetTable.getId());
extractDataService.deleteFile("incremental_add", jobStoppeddDatasetTable.getId());
extractDataService.deleteFile("incremental_delete", jobStoppeddDatasetTable.getId());
}
}
}

View File

@ -196,14 +196,15 @@ public class ExtractDataService {
}else {
generateJobFile("all_scope", datasetTable, String.join(",", datasetTableFields.stream().map(DatasetTableField::getDataeaseName).collect(Collectors.toList())));
}
Long execTime = System.currentTimeMillis();
extractData(datasetTable, "all_scope");
replaceTable(DorisTableUtils.dorisName(datasetTableId));
saveSucessLog(datasetTableTaskLog);
deleteFile("all_scope", datasetTableId);
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed);
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, execTime);
}catch (Exception e){
saveErrorLog(datasetTableId, taskId, e);
updateTableStatus(datasetTableId, datasetTable, JobStatus.Error);
updateTableStatus(datasetTableId, datasetTable, JobStatus.Error, null);
dropDorisTable(DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTableId)));
deleteFile("all_scope", datasetTableId);
}finally {
@ -220,21 +221,19 @@ public class ExtractDataService {
datasetTableTaskLog = writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, null);
generateTransFile("incremental_add", datasetTable, datasource, datasetTableFields, null);
generateJobFile("incremental_add", datasetTable, String.join(",", datasetTableFields.stream().map(DatasetTableField::getDataeaseName).collect(Collectors.toList())));
Long execTime = System.currentTimeMillis();
extractData(datasetTable, "incremental_add");
saveSucessLog(datasetTableTaskLog);
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed);
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, execTime);
}else {
DatasetTableIncrementalConfig datasetTableIncrementalConfig = dataSetTableService.incrementalConfig(datasetTableId);
if (datasetTableIncrementalConfig == null || StringUtils.isEmpty(datasetTableIncrementalConfig.getTableId())) {
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed);
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, null);
return;
}
DatasetTableTaskLog request = new DatasetTableTaskLog();
request.setTableId(datasetTableId);
request.setStatus(JobStatus.Completed.name());
List<DatasetTableTaskLog> datasetTableTaskLogs = dataSetTableTaskLogService.select(request);
if (CollectionUtils.isEmpty(datasetTableTaskLogs)) {
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed);
if (datasetTable.getLastUpdateTime() == 0 || datasetTable.getLastUpdateTime() == null) {
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, null);
return;
}
@ -245,8 +244,9 @@ public class ExtractDataService {
datasetTableTaskLog = getDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
}
Long execTime = System.currentTimeMillis();
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd()) && StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd().replace(" ", ""))) {// 增量添加
String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, datasetTableTaskLogs.get(0).getStartTime().toString())
String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, datasetTable.getLastUpdateTime().toString())
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString());
generateTransFile("incremental_add", datasetTable, datasource, datasetTableFields, sql);
generateJobFile("incremental_add", datasetTable, fetchSqlField(sql, datasource));
@ -254,7 +254,7 @@ public class ExtractDataService {
}
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete()) && StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete().replace(" ", ""))) {// 增量删除
String sql = datasetTableIncrementalConfig.getIncrementalDelete().replace(lastUpdateTime, datasetTableTaskLogs.get(0).getStartTime().toString())
String sql = datasetTableIncrementalConfig.getIncrementalDelete().replace(lastUpdateTime, datasetTable.getLastUpdateTime().toString())
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString());
generateTransFile("incremental_delete", datasetTable, datasource, datasetTableFields, sql);
generateJobFile("incremental_delete", datasetTable, fetchSqlField(sql, datasource));
@ -263,11 +263,11 @@ public class ExtractDataService {
saveSucessLog(datasetTableTaskLog);
deleteFile("incremental_add", datasetTableId);
deleteFile("incremental_delete", datasetTableId);
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed);
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, execTime);
}
}catch (Exception e){
saveErrorLog(datasetTableId, taskId, e);
updateTableStatus(datasetTableId, datasetTable, JobStatus.Error);
updateTableStatus(datasetTableId, datasetTable, JobStatus.Error, null);
deleteFile("incremental_add", datasetTableId);
deleteFile("incremental_delete", datasetTableId);
}finally {
@ -280,8 +280,11 @@ public class ExtractDataService {
}
}
private void updateTableStatus(String datasetTableId, DatasetTable datasetTable, JobStatus completed) {
private void updateTableStatus(String datasetTableId, DatasetTable datasetTable, JobStatus completed, Long execTime) {
datasetTable.setSyncStatus(completed.name());
if(execTime != null){
datasetTable.setLastUpdateTime(execTime);
}
DatasetTableExample example = new DatasetTableExample();
example.createCriteria().andIdEqualTo(datasetTableId);
datasetTableMapper.updateByExampleSelective(datasetTable, example);
@ -429,13 +432,13 @@ public class ExtractDataService {
JobMeta jobMeta = null;
switch (extractType) {
case "all_scope":
jobMeta = repository.loadJob("job_" + datasetTable.getId(), repositoryDirectoryInterface, null, null);
jobMeta = repository.loadJob("job_" + DorisTableUtils.dorisName(datasetTable.getId()), repositoryDirectoryInterface, null, null);
break;
case "incremental_add":
jobMeta = repository.loadJob("job_add_" + datasetTable.getId(), repositoryDirectoryInterface, null, null);
jobMeta = repository.loadJob("job_add_" + DorisTableUtils.dorisName(datasetTable.getId()), repositoryDirectoryInterface, null, null);
break;
case "incremental_delete":
jobMeta = repository.loadJob("job_delete_" + datasetTable.getId(), repositoryDirectoryInterface, null, null);
jobMeta = repository.loadJob("job_delete_" + DorisTableUtils.dorisName(datasetTable.getId()), repositoryDirectoryInterface, null, null);
break;
default:
break;
@ -477,7 +480,7 @@ public class ExtractDataService {
}
private void generateJobFile(String extractType, DatasetTable datasetTable, String columnFeilds) throws Exception {
String dorisOutputTable = null;
String outFile = null;
String jobName = null;
String script = null;
Datasource dorisDatasource = (Datasource) CommonBeanFactory.getBean("DorisDatasource");
@ -486,22 +489,22 @@ public class ExtractDataService {
String transName = null;
switch (extractType) {
case "all_scope":
transName = "trans_" + datasetTable.getId();
dorisOutputTable = DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTable.getId()));
transName = "trans_" + DorisTableUtils.dorisName(datasetTable.getId());
outFile = DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTable.getId()));
jobName = "job_" + datasetTable.getId();
script = String.format(shellScript, dorisConfigration.getUsername(), dorisConfigration.getPassword(), String.valueOf(System.currentTimeMillis()), separator, columns, "APPEND", root_path + dorisOutputTable + "." + extention, dorisConfigration.getHost(), dorisConfigration.getHttpPort(), dorisConfigration.getDataBase(), dorisOutputTable, root_path + dorisOutputTable + "." + extention);
script = String.format(shellScript, dorisConfigration.getUsername(), dorisConfigration.getPassword(), String.valueOf(System.currentTimeMillis()), separator, columns, "APPEND", root_path + outFile + "." + extention, dorisConfigration.getHost(), dorisConfigration.getHttpPort(), dorisConfigration.getDataBase(), DorisTableUtils.dorisName(datasetTable.getId()), root_path + outFile + "." + extention);
break;
case "incremental_add":
transName = "trans_add_" + datasetTable.getId();
dorisOutputTable = DorisTableUtils.dorisName(datasetTable.getId());
jobName = "job_add_" + datasetTable.getId();
script = String.format(shellScript, dorisConfigration.getUsername(), dorisConfigration.getPassword(), String.valueOf(System.currentTimeMillis()), separator, columns, "APPEND", root_path + dorisOutputTable + "." + extention, dorisConfigration.getHost(), dorisConfigration.getHttpPort(), dorisConfigration.getDataBase(), dorisOutputTable, root_path + dorisOutputTable + "." + extention);
transName = "trans_add_" + DorisTableUtils.dorisName(datasetTable.getId());
outFile = DorisTableUtils.dorisName(datasetTable.getId());
jobName = "job_add_" + DorisTableUtils.dorisName(datasetTable.getId());
script = String.format(shellScript, dorisConfigration.getUsername(), dorisConfigration.getPassword(), String.valueOf(System.currentTimeMillis()), separator, columns, "APPEND", root_path + outFile + "." + extention, dorisConfigration.getHost(), dorisConfigration.getHttpPort(), dorisConfigration.getDataBase(), DorisTableUtils.dorisName(datasetTable.getId()), root_path + outFile + "." + extention);
break;
case "incremental_delete":
transName = "trans_delete_" + datasetTable.getId();
dorisOutputTable = DorisTableUtils.dorisDeleteName(DorisTableUtils.dorisName(datasetTable.getId()));
script = String.format(shellScript, dorisConfigration.getUsername(), dorisConfigration.getPassword(), String.valueOf(System.currentTimeMillis()), separator, columns, "DELETE", root_path + dorisOutputTable + "." + extention, dorisConfigration.getHost(), dorisConfigration.getHttpPort(), dorisConfigration.getDataBase(), DorisTableUtils.dorisName(datasetTable.getId()), root_path + dorisOutputTable + "." + extention);
jobName = "job_delete_" + datasetTable.getId();
transName = "trans_delete_" + DorisTableUtils.dorisName(datasetTable.getId());
outFile = DorisTableUtils.dorisDeleteName(DorisTableUtils.dorisName(datasetTable.getId()));
script = String.format(shellScript, dorisConfigration.getUsername(), dorisConfigration.getPassword(), String.valueOf(System.currentTimeMillis()), separator, columns, "DELETE", root_path + outFile + "." + extention, dorisConfigration.getHost(), dorisConfigration.getHttpPort(), dorisConfigration.getDataBase(), DorisTableUtils.dorisName(datasetTable.getId()), root_path + outFile + "." + extention);
jobName = "job_delete_" + DorisTableUtils.dorisName(datasetTable.getId());
break;
default:
break;
@ -586,7 +589,7 @@ public class ExtractDataService {
private void generateTransFile(String extractType, DatasetTable datasetTable, Datasource datasource, List<DatasetTableField> datasetTableFields, String selectSQL) throws Exception {
TransMeta transMeta = new TransMeta();
String dorisOutputTable = null;
String outFile = null;
DatasourceTypes datasourceType = DatasourceTypes.valueOf(datasource.getType());
DatabaseMeta dataMeta = null;
StepMeta inputStep = null;
@ -640,18 +643,18 @@ public class ExtractDataService {
switch (extractType) {
case "all_scope":
transName = "trans_" + datasetTable.getId();
dorisOutputTable = DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTable.getId()));
transName = "trans_" + DorisTableUtils.dorisName(datasetTable.getId());
outFile = DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTable.getId()));
transMeta.setName(transName);
break;
case "incremental_add":
transName = "trans_add_" + datasetTable.getId();
dorisOutputTable = DorisTableUtils.dorisName(datasetTable.getId());
transName = "trans_add_" + DorisTableUtils.dorisName(datasetTable.getId());
outFile = DorisTableUtils.dorisName(datasetTable.getId());
transMeta.setName(transName);
break;
case "incremental_delete":
dorisOutputTable = DorisTableUtils.dorisDeleteName(DorisTableUtils.dorisName(datasetTable.getId()));
transName = "trans_delete_" + datasetTable.getId();
transName = "trans_delete_" + DorisTableUtils.dorisName(datasetTable.getId());
outFile = DorisTableUtils.dorisDeleteName(DorisTableUtils.dorisName(datasetTable.getId()));
transMeta.setName(transName);
break;
default:
@ -659,7 +662,7 @@ public class ExtractDataService {
}
outputStep = outputStep(dorisOutputTable);
outputStep = outputStep(outFile);
hi1 = new TransHopMeta(inputStep, udjcStep);
hi2 = new TransHopMeta(udjcStep, outputStep);
transMeta.addTransHop(hi1);
@ -779,26 +782,34 @@ public class ExtractDataService {
return userDefinedJavaClassStep;
}
private void deleteFile(String type, String dataSetTableId){
public void deleteFile(String type, String dataSetTableId){
String transName = null;
String jobName = null;
String fileName = null;
switch (type) {
case "all_scope":
transName = "trans_" + dataSetTableId;
jobName = "job_" + dataSetTableId;
fileName = DorisTableUtils.dorisTmpName(dataSetTableId);
break;
case "incremental_add":
transName = "trans_add_" + dataSetTableId;
jobName = "job_add_" + dataSetTableId;
fileName = DorisTableUtils.dorisName(dataSetTableId);
break;
case "incremental_delete":
transName = "trans_delete_" + dataSetTableId;
jobName = "job_delete_" + dataSetTableId;
fileName = DorisTableUtils.dorisDeleteName(dataSetTableId);
break;
default:
break;
}
try{
File file = new File(root_path + fileName + "." + extention);
FileUtils.forceDelete(file);
}catch (Exception e){}
try{
File file = new File(root_path + jobName + ".kjb");
FileUtils.forceDelete(file);

View File

@ -0,0 +1 @@
ALTER TABLE `dataset_table` ADD COLUMN `last_update_time` BIGINT(13) NULL DEFAULT 0 AFTER `sync_status`;