fix: 限制数据集只有一个同步任务

This commit is contained in:
taojinlong 2021-05-26 12:05:16 +08:00
parent d65194ef17
commit e522d3935d
7 changed files with 125 additions and 10 deletions

View File

@ -21,6 +21,8 @@ public class DatasetTable implements Serializable {
private Long createTime;
private String syncStatus;
private String info;
private static final long serialVersionUID = 1L;

View File

@ -643,6 +643,76 @@ public class DatasetTableExample {
addCriterion("create_time not between", value1, value2, "createTime");
return (Criteria) this;
}
public Criteria andSyncStatusIsNull() {
addCriterion("sync_status is null");
return (Criteria) this;
}
public Criteria andSyncStatusIsNotNull() {
addCriterion("sync_status is not null");
return (Criteria) this;
}
public Criteria andSyncStatusEqualTo(String value) {
addCriterion("sync_status =", value, "syncStatus");
return (Criteria) this;
}
public Criteria andSyncStatusNotEqualTo(String value) {
addCriterion("sync_status <>", value, "syncStatus");
return (Criteria) this;
}
public Criteria andSyncStatusGreaterThan(String value) {
addCriterion("sync_status >", value, "syncStatus");
return (Criteria) this;
}
public Criteria andSyncStatusGreaterThanOrEqualTo(String value) {
addCriterion("sync_status >=", value, "syncStatus");
return (Criteria) this;
}
public Criteria andSyncStatusLessThan(String value) {
addCriterion("sync_status <", value, "syncStatus");
return (Criteria) this;
}
public Criteria andSyncStatusLessThanOrEqualTo(String value) {
addCriterion("sync_status <=", value, "syncStatus");
return (Criteria) this;
}
public Criteria andSyncStatusLike(String value) {
addCriterion("sync_status like", value, "syncStatus");
return (Criteria) this;
}
public Criteria andSyncStatusNotLike(String value) {
addCriterion("sync_status not like", value, "syncStatus");
return (Criteria) this;
}
public Criteria andSyncStatusIn(List<String> values) {
addCriterion("sync_status in", values, "syncStatus");
return (Criteria) this;
}
public Criteria andSyncStatusNotIn(List<String> values) {
addCriterion("sync_status not in", values, "syncStatus");
return (Criteria) this;
}
public Criteria andSyncStatusBetween(String value1, String value2) {
addCriterion("sync_status between", value1, value2, "syncStatus");
return (Criteria) this;
}
public Criteria andSyncStatusNotBetween(String value1, String value2) {
addCriterion("sync_status not between", value1, value2, "syncStatus");
return (Criteria) this;
}
}
public static class Criteria extends GeneratedCriteria {

View File

@ -10,6 +10,7 @@
<result column="mode" jdbcType="INTEGER" property="mode" />
<result column="create_by" jdbcType="VARCHAR" property="createBy" />
<result column="create_time" jdbcType="BIGINT" property="createTime" />
<result column="sync_status" jdbcType="VARCHAR" property="syncStatus" />
</resultMap>
<resultMap extends="BaseResultMap" id="ResultMapWithBLOBs" type="io.dataease.base.domain.DatasetTable">
<result column="info" jdbcType="LONGVARCHAR" property="info" />
@ -73,7 +74,7 @@
</where>
</sql>
<sql id="Base_Column_List">
id, `name`, scene_id, data_source_id, `type`, `mode`, create_by, create_time
id, `name`, scene_id, data_source_id, `type`, `mode`, create_by, create_time, sync_status
</sql>
<sql id="Blob_Column_List">
info
@ -129,12 +130,12 @@
<insert id="insert" parameterType="io.dataease.base.domain.DatasetTable">
insert into dataset_table (id, `name`, scene_id,
data_source_id, `type`, `mode`,
create_by, create_time, info
)
create_by, create_time, sync_status,
info)
values (#{id,jdbcType=VARCHAR}, #{name,jdbcType=VARCHAR}, #{sceneId,jdbcType=VARCHAR},
#{dataSourceId,jdbcType=VARCHAR}, #{type,jdbcType=VARCHAR}, #{mode,jdbcType=INTEGER},
#{createBy,jdbcType=VARCHAR}, #{createTime,jdbcType=BIGINT}, #{info,jdbcType=LONGVARCHAR}
)
#{createBy,jdbcType=VARCHAR}, #{createTime,jdbcType=BIGINT}, #{syncStatus,jdbcType=VARCHAR},
#{info,jdbcType=LONGVARCHAR})
</insert>
<insert id="insertSelective" parameterType="io.dataease.base.domain.DatasetTable">
insert into dataset_table
@ -163,6 +164,9 @@
<if test="createTime != null">
create_time,
</if>
<if test="syncStatus != null">
sync_status,
</if>
<if test="info != null">
info,
</if>
@ -192,6 +196,9 @@
<if test="createTime != null">
#{createTime,jdbcType=BIGINT},
</if>
<if test="syncStatus != null">
#{syncStatus,jdbcType=VARCHAR},
</if>
<if test="info != null">
#{info,jdbcType=LONGVARCHAR},
</if>
@ -230,6 +237,9 @@
<if test="record.createTime != null">
create_time = #{record.createTime,jdbcType=BIGINT},
</if>
<if test="record.syncStatus != null">
sync_status = #{record.syncStatus,jdbcType=VARCHAR},
</if>
<if test="record.info != null">
info = #{record.info,jdbcType=LONGVARCHAR},
</if>
@ -248,6 +258,7 @@
`mode` = #{record.mode,jdbcType=INTEGER},
create_by = #{record.createBy,jdbcType=VARCHAR},
create_time = #{record.createTime,jdbcType=BIGINT},
sync_status = #{record.syncStatus,jdbcType=VARCHAR},
info = #{record.info,jdbcType=LONGVARCHAR}
<if test="_parameter != null">
<include refid="Update_By_Example_Where_Clause" />
@ -262,7 +273,8 @@
`type` = #{record.type,jdbcType=VARCHAR},
`mode` = #{record.mode,jdbcType=INTEGER},
create_by = #{record.createBy,jdbcType=VARCHAR},
create_time = #{record.createTime,jdbcType=BIGINT}
create_time = #{record.createTime,jdbcType=BIGINT},
sync_status = #{record.syncStatus,jdbcType=VARCHAR}
<if test="_parameter != null">
<include refid="Update_By_Example_Where_Clause" />
</if>
@ -291,6 +303,9 @@
<if test="createTime != null">
create_time = #{createTime,jdbcType=BIGINT},
</if>
<if test="syncStatus != null">
sync_status = #{syncStatus,jdbcType=VARCHAR},
</if>
<if test="info != null">
info = #{info,jdbcType=LONGVARCHAR},
</if>
@ -306,6 +321,7 @@
`mode` = #{mode,jdbcType=INTEGER},
create_by = #{createBy,jdbcType=VARCHAR},
create_time = #{createTime,jdbcType=BIGINT},
sync_status = #{syncStatus,jdbcType=VARCHAR},
info = #{info,jdbcType=LONGVARCHAR}
where id = #{id,jdbcType=VARCHAR}
</update>
@ -317,7 +333,8 @@
`type` = #{type,jdbcType=VARCHAR},
`mode` = #{mode,jdbcType=INTEGER},
create_by = #{createBy,jdbcType=VARCHAR},
create_time = #{createTime,jdbcType=BIGINT}
create_time = #{createTime,jdbcType=BIGINT},
sync_status = #{syncStatus,jdbcType=VARCHAR}
where id = #{id,jdbcType=VARCHAR}
</update>
</mapper>

View File

@ -3,6 +3,7 @@ package io.dataease.listener;
import io.dataease.base.domain.DatasetTableTask;
import io.dataease.datasource.service.DatasourceService;
import io.dataease.service.ScheduleService;
import io.dataease.service.dataset.DataSetTableService;
import io.dataease.service.dataset.DataSetTableTaskService;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
@ -17,9 +18,12 @@ import java.util.List;
public class DataSourceInitStartListener implements ApplicationListener<ApplicationReadyEvent> {
@Resource
private DatasourceService datasourceService;
@Resource
private DataSetTableService dataSetTableService;
@Override
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
datasourceService.initAllDataSourceConnectionPool();
dataSetTableService.updateDatasetTableStatus();
}
}

View File

@ -876,4 +876,12 @@ public class DataSetTableService {
List<String[]> data = jdbcProvider.getData(datasourceRequest);
return CollectionUtils.isNotEmpty(data);
}
public void updateDatasetTableStatus(){
DatasetTable record = new DatasetTable();
record.setSyncStatus(JobStatus.Completed.name());
DatasetTableExample example = new DatasetTableExample();
example.createCriteria().andSyncStatusEqualTo(JobStatus.Underway.name());
datasetTableMapper.updateByExampleSelective(record, example);
}
}

View File

@ -2,6 +2,7 @@ package io.dataease.service.dataset;
import com.google.gson.Gson;
import io.dataease.base.domain.*;
import io.dataease.base.mapper.DatasetTableMapper;
import io.dataease.base.mapper.DatasourceMapper;
import io.dataease.commons.constants.JobStatus;
import io.dataease.commons.constants.ScheduleType;
@ -93,6 +94,8 @@ public class ExtractDataService {
private DataSetTableTaskService dataSetTableTaskService;
@Resource
private DatasourceMapper datasourceMapper;
@Resource
private DatasetTableMapper datasetTableMapper;
private static String lastUpdateTime = "${__last_update_time__}";
private static String currentUpdateTime = "${__current_update_time__}";
@ -172,12 +175,17 @@ public class ExtractDataService {
}
public void extractData(String datasetTableId, String taskId, String type) {
DatasetTable datasetTable = dataSetTableService.get(datasetTableId);
datasetTable.setSyncStatus(JobStatus.Underway.name());
DatasetTableExample example = new DatasetTableExample();
example.createCriteria().andIdEqualTo(datasetTableId);
if (datasetTableMapper.updateByExampleSelective(datasetTable, example) == 0) {
return;
}
DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog();
UpdateType updateType = UpdateType.valueOf(type);
DatasetTable datasetTable = null;
Datasource datasource = new Datasource();
try {
datasetTable = dataSetTableService.get(datasetTableId);
if (StringUtils.isNotEmpty(datasetTable.getDataSourceId())) {
datasource = datasourceMapper.selectByPrimaryKey(datasetTable.getDataSourceId());
} else {
@ -261,6 +269,10 @@ public class ExtractDataService {
datasetTableTask.setRate(ScheduleType.SIMPLE_COMPLETE.toString());
dataSetTableTaskService.update(datasetTableTask);
}
datasetTable.setSyncStatus(JobStatus.Completed.name());
example.clear();
example.createCriteria().andIdEqualTo(datasetTableId);
datasetTableMapper.updateByExampleSelective(datasetTable, example);
}
}
@ -273,6 +285,8 @@ public class ExtractDataService {
}
private void extractData(DatasetTable datasetTable, String extractType) throws Exception {
KettleFileRepository repository = CommonBeanFactory.getBean(KettleFileRepository.class);
RepositoryDirectoryInterface repositoryDirectoryInterface = repository.loadRepositoryDirectoryTree();
JobMeta jobMeta = null;

View File

@ -74,7 +74,7 @@
<!-- </table>-->
<!-- <table tableName="v_dataset"/>-->
<!-- <table tableName="sys_auth_detail"/>-->
<table tableName="panel_group"/>
<table tableName="dataset_table"/>
</context>