mirror of
https://github.com/dataease/dataease.git
synced 2025-02-25 12:03:05 +08:00
feat: 限制数据集只有一个同步任务
This commit is contained in:
parent
f83f4fb12f
commit
92718d1678
@ -117,15 +117,15 @@ public class ExtractDataService {
|
|||||||
private String user;
|
private String user;
|
||||||
@Value("${carte.passwd:cluster}")
|
@Value("${carte.passwd:cluster}")
|
||||||
private String passwd;
|
private String passwd;
|
||||||
|
|
||||||
private static String creatTableSql = "CREATE TABLE IF NOT EXISTS `TABLE_NAME`" +
|
private static String creatTableSql = "CREATE TABLE IF NOT EXISTS `TABLE_NAME`" +
|
||||||
"Column_Fields" +
|
"Column_Fields" +
|
||||||
"UNIQUE KEY(dataease_uuid)\n" +
|
"UNIQUE KEY(dataease_uuid)\n" +
|
||||||
"DISTRIBUTED BY HASH(dataease_uuid) BUCKETS 10\n" +
|
"DISTRIBUTED BY HASH(dataease_uuid) BUCKETS 10\n" +
|
||||||
"PROPERTIES(\"replication_num\" = \"1\");";
|
"PROPERTIES(\"replication_num\" = \"1\");";
|
||||||
|
|
||||||
|
private static String dropTableSql = "DROP TABLE IF EXISTS TABLE_NAME;";
|
||||||
private static String shellScript = "curl --location-trusted -u %s:%s -H \"label:%s\" -H \"column_separator:%s\" -H \"columns:%s\" -H \"merge_type: %s\" -T %s -XPUT http://%s:%s/api/%s/%s/_stream_load\n" +
|
private static String shellScript = "curl --location-trusted -u %s:%s -H \"label:%s\" -H \"column_separator:%s\" -H \"columns:%s\" -H \"merge_type: %s\" -T %s -XPUT http://%s:%s/api/%s/%s/_stream_load\n" +
|
||||||
"rm -rf %s\n";
|
"rm -rf %s\n";
|
||||||
|
|
||||||
private String createDorisTablColumnSql(List<DatasetTableField> datasetTableFields) {
|
private String createDorisTablColumnSql(List<DatasetTableField> datasetTableFields) {
|
||||||
String Column_Fields = "dataease_uuid varchar(50), `";
|
String Column_Fields = "dataease_uuid varchar(50), `";
|
||||||
for (DatasetTableField datasetTableField : datasetTableFields) {
|
for (DatasetTableField datasetTableField : datasetTableFields) {
|
||||||
@ -163,13 +163,23 @@ public class ExtractDataService {
|
|||||||
private void createDorisTable(String dorisTableName, String dorisTablColumnSql) throws Exception {
|
private void createDorisTable(String dorisTableName, String dorisTablColumnSql) throws Exception {
|
||||||
Datasource dorisDatasource = (Datasource) CommonBeanFactory.getBean("DorisDatasource");
|
Datasource dorisDatasource = (Datasource) CommonBeanFactory.getBean("DorisDatasource");
|
||||||
JdbcProvider jdbcProvider = CommonBeanFactory.getBean(JdbcProvider.class);
|
JdbcProvider jdbcProvider = CommonBeanFactory.getBean(JdbcProvider.class);
|
||||||
;
|
|
||||||
DatasourceRequest datasourceRequest = new DatasourceRequest();
|
DatasourceRequest datasourceRequest = new DatasourceRequest();
|
||||||
datasourceRequest.setDatasource(dorisDatasource);
|
datasourceRequest.setDatasource(dorisDatasource);
|
||||||
datasourceRequest.setQuery(creatTableSql.replace("TABLE_NAME", dorisTableName).replace("Column_Fields", dorisTablColumnSql));
|
datasourceRequest.setQuery(creatTableSql.replace("TABLE_NAME", dorisTableName).replace("Column_Fields", dorisTablColumnSql));
|
||||||
jdbcProvider.exec(datasourceRequest);
|
jdbcProvider.exec(datasourceRequest);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void dropDorisTable(String dorisTableName) {
|
||||||
|
try {
|
||||||
|
Datasource dorisDatasource = (Datasource) CommonBeanFactory.getBean("DorisDatasource");
|
||||||
|
JdbcProvider jdbcProvider = CommonBeanFactory.getBean(JdbcProvider.class);
|
||||||
|
DatasourceRequest datasourceRequest = new DatasourceRequest();
|
||||||
|
datasourceRequest.setDatasource(dorisDatasource);
|
||||||
|
datasourceRequest.setQuery(dropTableSql.replace("TABLE_NAME", dorisTableName));
|
||||||
|
jdbcProvider.exec(datasourceRequest);
|
||||||
|
}catch (Exception ignore){}
|
||||||
|
}
|
||||||
|
|
||||||
private void replaceTable(String dorisTableName) throws Exception {
|
private void replaceTable(String dorisTableName) throws Exception {
|
||||||
Datasource dorisDatasource = (Datasource) CommonBeanFactory.getBean("DorisDatasource");
|
Datasource dorisDatasource = (Datasource) CommonBeanFactory.getBean("DorisDatasource");
|
||||||
JdbcProvider jdbcProvider = CommonBeanFactory.getBean(JdbcProvider.class);
|
JdbcProvider jdbcProvider = CommonBeanFactory.getBean(JdbcProvider.class);
|
||||||
@ -180,7 +190,6 @@ public class ExtractDataService {
|
|||||||
jdbcProvider.exec(datasourceRequest);
|
jdbcProvider.exec(datasourceRequest);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public synchronized boolean updateSyncStatus(DatasetTable datasetTable ){
|
public synchronized boolean updateSyncStatus(DatasetTable datasetTable ){
|
||||||
datasetTable.setSyncStatus(JobStatus.Underway.name());
|
datasetTable.setSyncStatus(JobStatus.Underway.name());
|
||||||
DatasetTableExample example = new DatasetTableExample();
|
DatasetTableExample example = new DatasetTableExample();
|
||||||
@ -188,11 +197,15 @@ public class ExtractDataService {
|
|||||||
datasetTableMapper.selectByExample(example);
|
datasetTableMapper.selectByExample(example);
|
||||||
example.clear();
|
example.clear();
|
||||||
example.createCriteria().andIdEqualTo(datasetTable.getId()).andSyncStatusNotEqualTo(JobStatus.Underway.name());
|
example.createCriteria().andIdEqualTo(datasetTable.getId()).andSyncStatusNotEqualTo(JobStatus.Underway.name());
|
||||||
|
example.or(example.createCriteria().andIdEqualTo(datasetTable.getId()).andSyncStatusIsNull());
|
||||||
return datasetTableMapper.updateByExampleSelective(datasetTable, example) == 0;
|
return datasetTableMapper.updateByExampleSelective(datasetTable, example) == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void extractData(String datasetTableId, String taskId, String type, JobExecutionContext context) {
|
public void extractData(String datasetTableId, String taskId, String type, JobExecutionContext context) {
|
||||||
DatasetTable datasetTable = dataSetTableService.get(datasetTableId);
|
DatasetTable datasetTable = getDatasetTable(datasetTableId);
|
||||||
|
if(datasetTable == null){
|
||||||
|
LogUtil.error("Can not find DatasetTable: " + datasetTableId);
|
||||||
|
}
|
||||||
DatasetTableTask datasetTableTask = datasetTableTaskMapper.selectByPrimaryKey(taskId);
|
DatasetTableTask datasetTableTask = datasetTableTaskMapper.selectByPrimaryKey(taskId);
|
||||||
boolean isSIMPLEJob = (datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString()));
|
boolean isSIMPLEJob = (datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString()));
|
||||||
if(updateSyncStatus(datasetTable) && !isSIMPLEJob){
|
if(updateSyncStatus(datasetTable) && !isSIMPLEJob){
|
||||||
@ -288,10 +301,15 @@ public class ExtractDataService {
|
|||||||
datasetTableTaskLog.setInfo(ExceptionUtils.getStackTrace(e));
|
datasetTableTaskLog.setInfo(ExceptionUtils.getStackTrace(e));
|
||||||
datasetTableTaskLog.setEndTime(System.currentTimeMillis());
|
datasetTableTaskLog.setEndTime(System.currentTimeMillis());
|
||||||
dataSetTableTaskLogService.save(datasetTableTaskLog);
|
dataSetTableTaskLogService.save(datasetTableTaskLog);
|
||||||
|
|
||||||
datasetTable.setSyncStatus(JobStatus.Error.name());
|
datasetTable.setSyncStatus(JobStatus.Error.name());
|
||||||
DatasetTableExample example = new DatasetTableExample();
|
DatasetTableExample example = new DatasetTableExample();
|
||||||
example.createCriteria().andIdEqualTo(datasetTableId);
|
example.createCriteria().andIdEqualTo(datasetTableId);
|
||||||
datasetTableMapper.updateByExampleSelective(datasetTable, example);
|
datasetTableMapper.updateByExampleSelective(datasetTable, example);
|
||||||
|
|
||||||
|
if(updateType.name().equalsIgnoreCase("all_scope")){
|
||||||
|
dropDorisTable(DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTableId)));
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
if (datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) {
|
if (datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) {
|
||||||
datasetTableTask.setRate(ScheduleType.SIMPLE_COMPLETE.toString());
|
datasetTableTask.setRate(ScheduleType.SIMPLE_COMPLETE.toString());
|
||||||
@ -300,6 +318,20 @@ public class ExtractDataService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private DatasetTable getDatasetTable(String datasetTableId){
|
||||||
|
for (int i=0;i<5;i++){
|
||||||
|
DatasetTable datasetTable = dataSetTableService.get(datasetTableId);
|
||||||
|
if(datasetTable == null){
|
||||||
|
try {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}catch (Exception ignore){}
|
||||||
|
}else {
|
||||||
|
return datasetTable;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
private DatasetTableTaskLog writeDatasetTableTaskLog(DatasetTableTaskLog datasetTableTaskLog, String datasetTableId, String taskId) {
|
private DatasetTableTaskLog writeDatasetTableTaskLog(DatasetTableTaskLog datasetTableTaskLog, String datasetTableId, String taskId) {
|
||||||
datasetTableTaskLog.setTableId(datasetTableId);
|
datasetTableTaskLog.setTableId(datasetTableId);
|
||||||
datasetTableTaskLog.setTaskId(taskId);
|
datasetTableTaskLog.setTaskId(taskId);
|
||||||
|
Loading…
Reference in New Issue
Block a user