From 92718d1678fdc156ae5395332cd8f79e8259902c Mon Sep 17 00:00:00 2001 From: taojinlong Date: Sat, 29 May 2021 12:36:20 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E9=99=90=E5=88=B6=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E9=9B=86=E5=8F=AA=E6=9C=89=E4=B8=80=E4=B8=AA=E5=90=8C=E6=AD=A5?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/dataset/ExtractDataService.java | 42 ++++++++++++++++--- 1 file changed, 37 insertions(+), 5 deletions(-) diff --git a/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java b/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java index 1ec769d7eb..e3f6d27ae5 100644 --- a/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java +++ b/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java @@ -117,15 +117,15 @@ public class ExtractDataService { private String user; @Value("${carte.passwd:cluster}") private String passwd; - private static String creatTableSql = "CREATE TABLE IF NOT EXISTS `TABLE_NAME`" + "Column_Fields" + "UNIQUE KEY(dataease_uuid)\n" + "DISTRIBUTED BY HASH(dataease_uuid) BUCKETS 10\n" + "PROPERTIES(\"replication_num\" = \"1\");"; + + private static String dropTableSql = "DROP TABLE IF EXISTS TABLE_NAME;"; private static String shellScript = "curl --location-trusted -u %s:%s -H \"label:%s\" -H \"column_separator:%s\" -H \"columns:%s\" -H \"merge_type: %s\" -T %s -XPUT http://%s:%s/api/%s/%s/_stream_load\n" + "rm -rf %s\n"; - private String createDorisTablColumnSql(List datasetTableFields) { String Column_Fields = "dataease_uuid varchar(50), `"; for (DatasetTableField datasetTableField : datasetTableFields) { @@ -163,13 +163,23 @@ public class ExtractDataService { private void createDorisTable(String dorisTableName, String dorisTablColumnSql) throws Exception { Datasource dorisDatasource = (Datasource) CommonBeanFactory.getBean("DorisDatasource"); JdbcProvider jdbcProvider = CommonBeanFactory.getBean(JdbcProvider.class); - ; DatasourceRequest datasourceRequest = new DatasourceRequest(); datasourceRequest.setDatasource(dorisDatasource); datasourceRequest.setQuery(creatTableSql.replace("TABLE_NAME", dorisTableName).replace("Column_Fields", dorisTablColumnSql)); jdbcProvider.exec(datasourceRequest); } + private void dropDorisTable(String dorisTableName) { + try { + Datasource dorisDatasource = (Datasource) CommonBeanFactory.getBean("DorisDatasource"); + JdbcProvider jdbcProvider = CommonBeanFactory.getBean(JdbcProvider.class); + DatasourceRequest datasourceRequest = new DatasourceRequest(); + datasourceRequest.setDatasource(dorisDatasource); + datasourceRequest.setQuery(dropTableSql.replace("TABLE_NAME", dorisTableName)); + jdbcProvider.exec(datasourceRequest); + }catch (Exception ignore){} + } + private void replaceTable(String dorisTableName) throws Exception { Datasource dorisDatasource = (Datasource) CommonBeanFactory.getBean("DorisDatasource"); JdbcProvider jdbcProvider = CommonBeanFactory.getBean(JdbcProvider.class); @@ -180,7 +190,6 @@ public class ExtractDataService { jdbcProvider.exec(datasourceRequest); } - public synchronized boolean updateSyncStatus(DatasetTable datasetTable ){ datasetTable.setSyncStatus(JobStatus.Underway.name()); DatasetTableExample example = new DatasetTableExample(); @@ -188,11 +197,15 @@ public class ExtractDataService { datasetTableMapper.selectByExample(example); example.clear(); example.createCriteria().andIdEqualTo(datasetTable.getId()).andSyncStatusNotEqualTo(JobStatus.Underway.name()); + example.or(example.createCriteria().andIdEqualTo(datasetTable.getId()).andSyncStatusIsNull()); return datasetTableMapper.updateByExampleSelective(datasetTable, example) == 0; } public void extractData(String datasetTableId, String taskId, String type, JobExecutionContext context) { - DatasetTable datasetTable = dataSetTableService.get(datasetTableId); + DatasetTable datasetTable = getDatasetTable(datasetTableId); + if(datasetTable == null){ + LogUtil.error("Can not find DatasetTable: " + datasetTableId); + } DatasetTableTask datasetTableTask = datasetTableTaskMapper.selectByPrimaryKey(taskId); boolean isSIMPLEJob = (datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())); if(updateSyncStatus(datasetTable) && !isSIMPLEJob){ @@ -288,10 +301,15 @@ public class ExtractDataService { datasetTableTaskLog.setInfo(ExceptionUtils.getStackTrace(e)); datasetTableTaskLog.setEndTime(System.currentTimeMillis()); dataSetTableTaskLogService.save(datasetTableTaskLog); + datasetTable.setSyncStatus(JobStatus.Error.name()); DatasetTableExample example = new DatasetTableExample(); example.createCriteria().andIdEqualTo(datasetTableId); datasetTableMapper.updateByExampleSelective(datasetTable, example); + + if(updateType.name().equalsIgnoreCase("all_scope")){ + dropDorisTable(DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTableId))); + } } finally { if (datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) { datasetTableTask.setRate(ScheduleType.SIMPLE_COMPLETE.toString()); @@ -300,6 +318,20 @@ public class ExtractDataService { } } + private DatasetTable getDatasetTable(String datasetTableId){ + for (int i=0;i<5;i++){ + DatasetTable datasetTable = dataSetTableService.get(datasetTableId); + if(datasetTable == null){ + try { + Thread.sleep(1000); + }catch (Exception ignore){} + }else { + return datasetTable; + } + } + return null; + } + private DatasetTableTaskLog writeDatasetTableTaskLog(DatasetTableTaskLog datasetTableTaskLog, String datasetTableId, String taskId) { datasetTableTaskLog.setTableId(datasetTableId); datasetTableTaskLog.setTaskId(taskId);