From 3b0b5d2f0c98dc7d4dc1af89d104fc32439ae7a4 Mon Sep 17 00:00:00 2001 From: taojinlong Date: Fri, 9 Jul 2021 16:06:43 +0800 Subject: [PATCH] =?UTF-8?q?fix(=E6=95=B8=E6=93=9A=E9=9B=86):=20=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E9=9B=86=E4=BB=BB=E5=8A=A1=EF=BC=8C=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E7=AE=80=E5=8D=95=E9=87=8D=E5=A4=8D=E6=89=A7=E8=A1=8C=E7=9A=84?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/dataset/DataSetTableService.java | 9 +- .../dataset/DataSetTableTaskService.java | 57 +++++---- .../service/dataset/ExtractDataService.java | 120 ++++++++++++++---- 3 files changed, 128 insertions(+), 58 deletions(-) diff --git a/backend/src/main/java/io/dataease/service/dataset/DataSetTableService.java b/backend/src/main/java/io/dataease/service/dataset/DataSetTableService.java index b62fe0f8e7..9146ccc54d 100644 --- a/backend/src/main/java/io/dataease/service/dataset/DataSetTableService.java +++ b/backend/src/main/java/io/dataease/service/dataset/DataSetTableService.java @@ -100,8 +100,9 @@ public class DataSetTableService { private void extractData(DataSetTableRequest datasetTable) throws Exception{ if (StringUtils.equalsIgnoreCase(datasetTable.getType(), "excel")) { commonThreadPool.addTask(() -> { - extractDataService.extractData(datasetTable.getId(), null, "all_scope", null); + extractDataService.extractExcelData(datasetTable.getId(), "all_scope"); }); + return; } if (StringUtils.isNotEmpty(datasetTable.getSyncType()) && datasetTable.getSyncType().equalsIgnoreCase("sync_now")) { DataSetTaskRequest dataSetTaskRequest = new DataSetTaskRequest(); @@ -111,8 +112,6 @@ public class DataSetTableService { datasetTableTask.setType("all_scope"); datasetTableTask.setName(datasetTable.getName() + " 更新设置"); datasetTableTask.setEnd("0"); - datasetTableTask.setStatus(TaskStatus.Underway.name()); - datasetTableTask.setStartTime(System.currentTimeMillis()); dataSetTaskRequest.setDatasetTableTask(datasetTableTask); dataSetTableTaskService.save(dataSetTaskRequest); } @@ -149,11 +148,11 @@ public class DataSetTableService { if (StringUtils.equalsIgnoreCase(datasetTable.getType(), "excel")) { if (datasetTable.getEditType() == 0) { commonThreadPool.addTask(() -> { - extractDataService.extractData(datasetTable.getId(), null, "all_scope", null); + extractDataService.extractExcelData(datasetTable.getId(), "all_scope"); }); } else if (datasetTable.getEditType() == 1) { commonThreadPool.addTask(() -> { - extractDataService.extractData(datasetTable.getId(), null, "add_scope", null); + extractDataService.extractExcelData(datasetTable.getId(), "add_scope"); }); } } diff --git a/backend/src/main/java/io/dataease/service/dataset/DataSetTableTaskService.java b/backend/src/main/java/io/dataease/service/dataset/DataSetTableTaskService.java index faef6c2f47..ea49e52efa 100644 --- a/backend/src/main/java/io/dataease/service/dataset/DataSetTableTaskService.java +++ b/backend/src/main/java/io/dataease/service/dataset/DataSetTableTaskService.java @@ -1,10 +1,8 @@ package io.dataease.service.dataset; import com.google.gson.Gson; -import io.dataease.base.domain.DatasetTable; -import io.dataease.base.domain.DatasetTableTask; -import io.dataease.base.domain.DatasetTableTaskExample; -import io.dataease.base.domain.DatasetTableTaskLog; +import io.dataease.base.domain.*; +import io.dataease.base.mapper.DatasetTableMapper; import io.dataease.base.mapper.DatasetTableTaskMapper; import io.dataease.base.mapper.ext.ExtDataSetTaskMapper; import io.dataease.base.mapper.ext.query.GridExample; @@ -53,6 +51,9 @@ public class DataSetTableTaskService { private ExtractDataService extractDataService; @Resource private ExtDataSetTaskMapper extDataSetTaskMapper; + @Resource + private DatasetTableMapper datasetTableMapper; + public DatasetTableTask save(DataSetTaskRequest dataSetTaskRequest) throws Exception { checkName(dataSetTaskRequest); @@ -86,19 +87,15 @@ public class DataSetTableTaskService { } else { datasetTableTaskMapper.updateByPrimaryKeySelective(datasetTableTask); } + + // simple if (datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString()) && datasetTableTask.getStatus().equalsIgnoreCase(TaskStatus.Underway.name())) { // SIMPLE 类型,提前占位 execNow(datasetTableTask); - datasetTableTask.setLastExecStatus(JobStatus.Underway.name()); - datasetTableTask.setLastExecTime(System.currentTimeMillis()); - update(datasetTableTask); + scheduleService.addSchedule(datasetTableTask); } + //cron、simple_cron if(!datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.name())){ scheduleService.addSchedule(datasetTableTask); - }else { - if(datasetTableTask.getStatus().equalsIgnoreCase(JobStatus.Underway.name())){ - System.out.println(new Gson().toJson(datasetTableTask)); - scheduleService.addSchedule(datasetTableTask); - } } return datasetTableTask; @@ -111,17 +108,30 @@ public class DataSetTableTaskService { DataEaseException.throwException(Translator.get("i18n_not_exec_add_sync")); } } - if (extractDataService.existSyncTask(dataSetTableService.get(datasetTableTask.getTableId()), null)) { + if (existSyncTask(dataSetTableService.get(datasetTableTask.getTableId()), datasetTableTask)) { DataEaseException.throwException(Translator.get("i18n_sync_job_exists")); } - //write log - DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog(); - datasetTableTaskLog.setTableId(datasetTableTask.getTableId()); - datasetTableTaskLog.setTaskId(datasetTableTask.getId()); - datasetTableTaskLog.setStatus(JobStatus.Underway.name()); - datasetTableTaskLog.setStartTime(System.currentTimeMillis()); - datasetTableTaskLog.setTriggerType(TriggerType.Custom.name()); - dataSetTableTaskLogService.save(datasetTableTaskLog); + } + + private synchronized boolean existSyncTask(DatasetTable datasetTable, DatasetTableTask datasetTableTask) { + datasetTable.setSyncStatus(JobStatus.Underway.name()); + DatasetTableExample example = new DatasetTableExample(); + example.createCriteria().andIdEqualTo(datasetTable.getId()).andSyncStatusNotEqualTo(JobStatus.Underway.name()); + example.or(example.createCriteria().andIdEqualTo(datasetTable.getId()).andSyncStatusIsNull()); + Boolean existSyncTask = datasetTableMapper.updateByExampleSelective(datasetTable, example) == 0; + if(!existSyncTask){ + datasetTableTask.setLastExecTime(System.currentTimeMillis()); + datasetTableTask.setLastExecStatus(JobStatus.Underway.name()); + update(datasetTableTask); + DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog(); + datasetTableTaskLog.setTableId(datasetTableTask.getTableId()); + datasetTableTaskLog.setTaskId(datasetTableTask.getId()); + datasetTableTaskLog.setStatus(JobStatus.Underway.name()); + datasetTableTaskLog.setStartTime(System.currentTimeMillis()); + datasetTableTaskLog.setTriggerType(TriggerType.Custom.name()); + dataSetTableTaskLogService.save(datasetTableTaskLog); + } + return existSyncTask; } public void delete(String id) { @@ -207,11 +217,6 @@ public class DataSetTableTaskService { public void execTask(DatasetTableTask datasetTableTask) throws Exception{ execNow(datasetTableTask); -// datasetTableTask.setStatus(TaskStatus.Underway.name()); - datasetTableTask.setLastExecStatus(JobStatus.Underway.name()); - datasetTableTask.setLastExecTime(System.currentTimeMillis()); - update(datasetTableTask); - if(datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.CRON.toString())){ scheduleService.fireNow(datasetTableTask); } diff --git a/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java b/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java index 008f2309bd..6df0a0c908 100644 --- a/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java +++ b/backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java @@ -138,22 +138,102 @@ public class ExtractDataService { example.or(example.createCriteria().andIdEqualTo(datasetTable.getId()).andSyncStatusIsNull()); Boolean existSyncTask = datasetTableMapper.updateByExampleSelective(datasetTable, example) == 0; if(existSyncTask){ - if(datasetTableTask != null){ - DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog(); - datasetTableTaskLog.setTaskId(datasetTableTask.getId()); - datasetTableTaskLog.setTableId(datasetTable.getId()); - datasetTableTaskLog.setStatus(JobStatus.Underway.name()); - List datasetTableTaskLogs = dataSetTableTaskLogService.select(datasetTableTaskLog); - if(CollectionUtils.isNotEmpty(datasetTableTaskLogs) && datasetTableTaskLogs.get(0).getTriggerType().equalsIgnoreCase(TriggerType.Custom.name())){ - return false; - } + DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog(); + datasetTableTaskLog.setTaskId(datasetTableTask.getId()); + datasetTableTaskLog.setTableId(datasetTable.getId()); + datasetTableTaskLog.setStatus(JobStatus.Underway.name()); + List datasetTableTaskLogs = dataSetTableTaskLogService.select(datasetTableTaskLog); + if(CollectionUtils.isNotEmpty(datasetTableTaskLogs) && datasetTableTaskLogs.get(0).getTriggerType().equalsIgnoreCase(TriggerType.Custom.name())){ + return false; } return true; }else { + datasetTableTask.setLastExecTime(System.currentTimeMillis()); + datasetTableTask.setLastExecStatus(JobStatus.Underway.name()); + dataSetTableTaskService.update(datasetTableTask); return false; } } + public void extractExcelData(String datasetTableId, String type) { + Datasource datasource = new Datasource(); + datasource.setType("excel"); + DatasetTable datasetTable = getDatasetTable(datasetTableId); + if (datasetTable == null) { + LogUtil.error("Can not find DatasetTable: " + datasetTableId); + return; + } + UpdateType updateType = UpdateType.valueOf(type); + DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog(); + List datasetTableFields = dataSetTableFieldsService.list(DatasetTableField.builder().tableId(datasetTable.getId()).build()); + datasetTableFields.sort((o1, o2) -> { + if (o1.getColumnIndex() == null) { + return -1; + } + if (o2.getColumnIndex() == null) { + return 1; + } + return o1.getColumnIndex().compareTo(o2.getColumnIndex()); + }); + String dorisTablColumnSql = createDorisTablColumnSql(datasetTableFields); + switch (updateType) { + case all_scope: // 全量更新 + try { + datasetTableTaskLog = writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, null); + createDorisTable(DorisTableUtils.dorisName(datasetTableId), dorisTablColumnSql); + createDorisTable(DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTableId)), dorisTablColumnSql); + generateTransFile("all_scope", datasetTable, datasource, datasetTableFields, null); + if (datasetTable.getType().equalsIgnoreCase("sql")) { + generateJobFile("all_scope", datasetTable, fetchSqlField(new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getSql(), datasource)); + } else { + generateJobFile("all_scope", datasetTable, String.join(",", datasetTableFields.stream().map(DatasetTableField::getDataeaseName).collect(Collectors.toList()))); + } + Long execTime = System.currentTimeMillis(); + extractData(datasetTable, "all_scope"); + replaceTable(DorisTableUtils.dorisName(datasetTableId)); + saveSucessLog(datasetTableTaskLog); + sendWebMsg(datasetTable, null, true); + deleteFile("all_scope", datasetTableId); + updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, execTime); + } catch (Exception e) { + saveErrorLog(datasetTableId, null, e); + sendWebMsg(datasetTable, null, false); + updateTableStatus(datasetTableId, datasetTable, JobStatus.Error, null); + dropDorisTable(DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTableId))); + deleteFile("all_scope", datasetTableId); + } finally { + } + break; + + case add_scope: // 增量更新 + try { + datasetTableTaskLog = writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, null); + generateTransFile("incremental_add", datasetTable, datasource, datasetTableFields, null); + generateJobFile("incremental_add", datasetTable, String.join(",", datasetTableFields.stream().map(DatasetTableField::getDataeaseName).collect(Collectors.toList()))); + Long execTime = System.currentTimeMillis(); + extractData(datasetTable, "incremental_add"); + saveSucessLog(datasetTableTaskLog); + sendWebMsg(datasetTable, null, true); + updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, execTime); + } catch (Exception e) { + saveErrorLog(datasetTableId, null, e); + sendWebMsg(datasetTable, null, false); + updateTableStatus(datasetTableId, datasetTable, JobStatus.Error, null); + deleteFile("incremental_add", datasetTableId); + deleteFile("incremental_delete", datasetTableId); + } finally { + } + break; + } + //侵入式清除下属视图缓存 + List viewIds = extChartViewMapper.allViewIds(datasetTableId); + if (CollectionUtils.isNotEmpty(viewIds)){ + viewIds.forEach(viewId -> { + CacheUtils.remove(JdbcConstants.VIEW_CACHE_KEY, viewId); + }); + } + } + public void extractData(String datasetTableId, String taskId, String type, JobExecutionContext context) { DatasetTable datasetTable = getDatasetTable(datasetTableId); if(datasetTable == null){ @@ -161,7 +241,10 @@ public class ExtractDataService { return; } DatasetTableTask datasetTableTask = datasetTableTaskMapper.selectByPrimaryKey(taskId); - if(datasetTableTask.getStatus().equalsIgnoreCase(TaskStatus.Stopped.name()) && !datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.name())){ + if(datasetTableTask == null){ + return; + } + if(datasetTableTask.getStatus().equalsIgnoreCase(TaskStatus.Stopped.name())){ LogUtil.info("Skip synchronization task, task ID : " + datasetTableTask.getId()); return; } @@ -169,9 +252,6 @@ public class ExtractDataService { LogUtil.info("Skip synchronization task for dataset, dataset ID : " + datasetTableId); return; } - datasetTableTask.setLastExecTime(System.currentTimeMillis()); - datasetTableTask.setLastExecStatus(JobStatus.Underway.name()); - dataSetTableTaskService.update(datasetTableTask); DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog(); UpdateType updateType = UpdateType.valueOf(type); @@ -227,19 +307,12 @@ public class ExtractDataService { deleteFile("all_scope", datasetTableId); updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, execTime); - -// if (datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) { -// datasetTableTask.setStatus(TaskStatus.Stopped.name()); -// } datasetTableTask.setLastExecStatus(JobStatus.Completed.name()); dataSetTableTaskService.update(datasetTableTask); }catch (Exception e){ saveErrorLog(datasetTableId, taskId, e); datasetTableTask.setLastExecStatus(JobStatus.Error.name()); -// if (datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) { -// datasetTableTask.setStatus(TaskStatus.Stopped.name()); -// } dataSetTableTaskService.update(datasetTableTask); sendWebMsg(datasetTable, taskId,false); @@ -279,7 +352,6 @@ public class ExtractDataService { if(datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) { datasetTableTaskLog = getDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId); } - Long execTime = System.currentTimeMillis(); if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd()) && StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd().replace(" ", ""))) {// 增量添加 String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, datasetTable.getLastUpdateTime().toString()) @@ -305,9 +377,6 @@ public class ExtractDataService { updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, execTime); datasetTableTask.setLastExecStatus(JobStatus.Completed.name()); -// if (datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) { -// datasetTableTask.setStatus(TaskStatus.Stopped.name()); -// } dataSetTableTaskService.update(datasetTableTask); } }catch (Exception e){ @@ -315,9 +384,6 @@ public class ExtractDataService { sendWebMsg(datasetTable, taskId,false); updateTableStatus(datasetTableId, datasetTable, JobStatus.Error, null); datasetTableTask.setLastExecStatus(JobStatus.Error.name()); -// if (datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) { -// datasetTableTask.setStatus(TaskStatus.Stopped.name()); -// } dataSetTableTaskService.update(datasetTableTask); deleteFile("incremental_add", datasetTableId); deleteFile("incremental_delete", datasetTableId);