forked from github/dataease
fix: 合并冲突解决
This commit is contained in:
parent
f1e3621fac
commit
bc06646e32
@ -24,8 +24,8 @@ import io.dataease.provider.QueryProvider;
|
|||||||
import io.dataease.service.message.DeMsgutil;
|
import io.dataease.service.message.DeMsgutil;
|
||||||
import org.apache.commons.collections4.CollectionUtils;
|
import org.apache.commons.collections4.CollectionUtils;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.commons.lang3.ObjectUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
|
||||||
import org.apache.http.HttpResponse;
|
import org.apache.http.HttpResponse;
|
||||||
import org.apache.http.client.methods.HttpGet;
|
import org.apache.http.client.methods.HttpGet;
|
||||||
import org.apache.http.impl.client.CloseableHttpClient;
|
import org.apache.http.impl.client.CloseableHttpClient;
|
||||||
@ -133,23 +133,23 @@ public class ExtractDataService {
|
|||||||
"fi\n" +
|
"fi\n" +
|
||||||
"rm -rf %s\n";
|
"rm -rf %s\n";
|
||||||
|
|
||||||
public synchronized boolean existSyncTask(DatasetTable datasetTable, DatasetTableTask datasetTableTask){
|
public synchronized boolean existSyncTask(DatasetTable datasetTable, DatasetTableTask datasetTableTask) {
|
||||||
datasetTable.setSyncStatus(JobStatus.Underway.name());
|
datasetTable.setSyncStatus(JobStatus.Underway.name());
|
||||||
DatasetTableExample example = new DatasetTableExample();
|
DatasetTableExample example = new DatasetTableExample();
|
||||||
example.createCriteria().andIdEqualTo(datasetTable.getId()).andSyncStatusNotEqualTo(JobStatus.Underway.name());
|
example.createCriteria().andIdEqualTo(datasetTable.getId()).andSyncStatusNotEqualTo(JobStatus.Underway.name());
|
||||||
example.or(example.createCriteria().andIdEqualTo(datasetTable.getId()).andSyncStatusIsNull());
|
example.or(example.createCriteria().andIdEqualTo(datasetTable.getId()).andSyncStatusIsNull());
|
||||||
Boolean existSyncTask = datasetTableMapper.updateByExampleSelective(datasetTable, example) == 0;
|
Boolean existSyncTask = datasetTableMapper.updateByExampleSelective(datasetTable, example) == 0;
|
||||||
if(existSyncTask){
|
if (existSyncTask) {
|
||||||
DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog();
|
DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog();
|
||||||
datasetTableTaskLog.setTaskId(datasetTableTask.getId());
|
datasetTableTaskLog.setTaskId(datasetTableTask.getId());
|
||||||
datasetTableTaskLog.setTableId(datasetTable.getId());
|
datasetTableTaskLog.setTableId(datasetTable.getId());
|
||||||
datasetTableTaskLog.setStatus(JobStatus.Underway.name());
|
datasetTableTaskLog.setStatus(JobStatus.Underway.name());
|
||||||
List<DatasetTableTaskLog> datasetTableTaskLogs = dataSetTableTaskLogService.select(datasetTableTaskLog);
|
List<DatasetTableTaskLog> datasetTableTaskLogs = dataSetTableTaskLogService.select(datasetTableTaskLog);
|
||||||
if(CollectionUtils.isNotEmpty(datasetTableTaskLogs) && datasetTableTaskLogs.get(0).getTriggerType().equalsIgnoreCase(TriggerType.Custom.name())){
|
if (CollectionUtils.isNotEmpty(datasetTableTaskLogs) && datasetTableTaskLogs.get(0).getTriggerType().equalsIgnoreCase(TriggerType.Custom.name())) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}else {
|
} else {
|
||||||
datasetTableTask.setLastExecTime(System.currentTimeMillis());
|
datasetTableTask.setLastExecTime(System.currentTimeMillis());
|
||||||
datasetTableTask.setLastExecStatus(JobStatus.Underway.name());
|
datasetTableTask.setLastExecStatus(JobStatus.Underway.name());
|
||||||
datasetTableTask.setStatus(TaskStatus.Exec.name());
|
datasetTableTask.setStatus(TaskStatus.Exec.name());
|
||||||
@ -230,7 +230,7 @@ public class ExtractDataService {
|
|||||||
}
|
}
|
||||||
//侵入式清除下属视图缓存
|
//侵入式清除下属视图缓存
|
||||||
List<String> viewIds = extChartViewMapper.allViewIds(datasetTableId);
|
List<String> viewIds = extChartViewMapper.allViewIds(datasetTableId);
|
||||||
if (CollectionUtils.isNotEmpty(viewIds)){
|
if (CollectionUtils.isNotEmpty(viewIds)) {
|
||||||
viewIds.forEach(viewId -> {
|
viewIds.forEach(viewId -> {
|
||||||
CacheUtils.remove(JdbcConstants.VIEW_CACHE_KEY, viewId);
|
CacheUtils.remove(JdbcConstants.VIEW_CACHE_KEY, viewId);
|
||||||
});
|
});
|
||||||
@ -239,26 +239,26 @@ public class ExtractDataService {
|
|||||||
|
|
||||||
public void extractData(String datasetTableId, String taskId, String type, JobExecutionContext context) {
|
public void extractData(String datasetTableId, String taskId, String type, JobExecutionContext context) {
|
||||||
DatasetTable datasetTable = getDatasetTable(datasetTableId);
|
DatasetTable datasetTable = getDatasetTable(datasetTableId);
|
||||||
if(datasetTable == null){
|
if (datasetTable == null) {
|
||||||
LogUtil.error("Can not find DatasetTable: " + datasetTableId);
|
LogUtil.error("Can not find DatasetTable: " + datasetTableId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
DatasetTableTask datasetTableTask = datasetTableTaskMapper.selectByPrimaryKey(taskId);
|
DatasetTableTask datasetTableTask = datasetTableTaskMapper.selectByPrimaryKey(taskId);
|
||||||
if(datasetTableTask == null){
|
if (datasetTableTask == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if(datasetTableTask.getStatus().equalsIgnoreCase(TaskStatus.Stopped.name())|| datasetTableTask.getStatus().equalsIgnoreCase(TaskStatus.Pending.name())){
|
if (datasetTableTask.getStatus().equalsIgnoreCase(TaskStatus.Stopped.name()) || datasetTableTask.getStatus().equalsIgnoreCase(TaskStatus.Pending.name())) {
|
||||||
LogUtil.info("Skip synchronization task, task ID : " + datasetTableTask.getId());
|
LogUtil.info("Skip synchronization task, task ID : " + datasetTableTask.getId());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if(existSyncTask(datasetTable, datasetTableTask)){
|
if (existSyncTask(datasetTable, datasetTableTask)) {
|
||||||
LogUtil.info("Skip synchronization task for dataset, dataset ID : " + datasetTableId);
|
LogUtil.info("Skip synchronization task for dataset, dataset ID : " + datasetTableId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog();
|
DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog();
|
||||||
UpdateType updateType = UpdateType.valueOf(type);
|
UpdateType updateType = UpdateType.valueOf(type);
|
||||||
if(context != null){
|
if (context != null) {
|
||||||
datasetTable.setQrtzInstance(context.getFireInstanceId());
|
datasetTable.setQrtzInstance(context.getFireInstanceId());
|
||||||
datasetTableMapper.updateByPrimaryKeySelective(datasetTable);
|
datasetTableMapper.updateByPrimaryKeySelective(datasetTable);
|
||||||
}
|
}
|
||||||
@ -282,19 +282,19 @@ public class ExtractDataService {
|
|||||||
|
|
||||||
switch (updateType) {
|
switch (updateType) {
|
||||||
case all_scope: // 全量更新
|
case all_scope: // 全量更新
|
||||||
try{
|
try {
|
||||||
if(datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.CRON.toString())) {
|
if (datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.CRON.toString())) {
|
||||||
datasetTableTaskLog = writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
|
datasetTableTaskLog = writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
|
||||||
}
|
}
|
||||||
if(datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) {
|
if (datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) {
|
||||||
datasetTableTaskLog = getDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
|
datasetTableTaskLog = getDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
|
||||||
}
|
}
|
||||||
createDorisTable(DorisTableUtils.dorisName(datasetTableId), dorisTablColumnSql);
|
createDorisTable(DorisTableUtils.dorisName(datasetTableId), dorisTablColumnSql);
|
||||||
createDorisTable(DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTableId)), dorisTablColumnSql);
|
createDorisTable(DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTableId)), dorisTablColumnSql);
|
||||||
generateTransFile("all_scope", datasetTable, datasource, datasetTableFields, null);
|
generateTransFile("all_scope", datasetTable, datasource, datasetTableFields, null);
|
||||||
if(datasetTable.getType().equalsIgnoreCase("sql")){
|
if (datasetTable.getType().equalsIgnoreCase("sql")) {
|
||||||
generateJobFile("all_scope", datasetTable, fetchSqlField(new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getSql(), datasource));
|
generateJobFile("all_scope", datasetTable, fetchSqlField(new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getSql(), datasource));
|
||||||
}else {
|
} else {
|
||||||
generateJobFile("all_scope", datasetTable, String.join(",", datasetTableFields.stream().map(DatasetTableField::getDataeaseName).collect(Collectors.toList())));
|
generateJobFile("all_scope", datasetTable, String.join(",", datasetTableFields.stream().map(DatasetTableField::getDataeaseName).collect(Collectors.toList())));
|
||||||
}
|
}
|
||||||
Long execTime = System.currentTimeMillis();
|
Long execTime = System.currentTimeMillis();
|
||||||
@ -302,7 +302,7 @@ public class ExtractDataService {
|
|||||||
replaceTable(DorisTableUtils.dorisName(datasetTableId));
|
replaceTable(DorisTableUtils.dorisName(datasetTableId));
|
||||||
saveSucessLog(datasetTableTaskLog);
|
saveSucessLog(datasetTableTaskLog);
|
||||||
|
|
||||||
sendWebMsg(datasetTable, taskId,true);
|
sendWebMsg(datasetTable, datasetTableTask, true);
|
||||||
|
|
||||||
deleteFile("all_scope", datasetTableId);
|
deleteFile("all_scope", datasetTableId);
|
||||||
|
|
||||||
@ -310,16 +310,16 @@ public class ExtractDataService {
|
|||||||
|
|
||||||
dataSetTableTaskService.updateTaskStatus(datasetTableTask, JobStatus.Completed);
|
dataSetTableTaskService.updateTaskStatus(datasetTableTask, JobStatus.Completed);
|
||||||
|
|
||||||
}catch (Exception e){
|
} catch (Exception e) {
|
||||||
saveErrorLog(datasetTableId, taskId, e);
|
saveErrorLog(datasetTableId, taskId, e);
|
||||||
|
|
||||||
dataSetTableTaskService.updateTaskStatus(datasetTableTask, JobStatus.Error);
|
dataSetTableTaskService.updateTaskStatus(datasetTableTask, JobStatus.Error);
|
||||||
|
|
||||||
sendWebMsg(datasetTable, taskId,false);
|
sendWebMsg(datasetTable, datasetTableTask, false);
|
||||||
updateTableStatus(datasetTableId, datasetTable, JobStatus.Error, null);
|
updateTableStatus(datasetTableId, datasetTable, JobStatus.Error, null);
|
||||||
dropDorisTable(DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTableId)));
|
dropDorisTable(DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTableId)));
|
||||||
deleteFile("all_scope", datasetTableId);
|
deleteFile("all_scope", datasetTableId);
|
||||||
}finally {
|
} finally {
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
@ -336,10 +336,10 @@ public class ExtractDataService {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.CRON.toString())) {
|
if (datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.CRON.toString())) {
|
||||||
datasetTableTaskLog = writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
|
datasetTableTaskLog = writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
|
||||||
}
|
}
|
||||||
if(datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) {
|
if (datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) {
|
||||||
datasetTableTaskLog = getDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
|
datasetTableTaskLog = getDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
|
||||||
}
|
}
|
||||||
Long execTime = System.currentTimeMillis();
|
Long execTime = System.currentTimeMillis();
|
||||||
@ -360,7 +360,7 @@ public class ExtractDataService {
|
|||||||
}
|
}
|
||||||
saveSucessLog(datasetTableTaskLog);
|
saveSucessLog(datasetTableTaskLog);
|
||||||
|
|
||||||
sendWebMsg(datasetTable, taskId,true);
|
sendWebMsg(datasetTable, datasetTableTask, true);
|
||||||
|
|
||||||
deleteFile("incremental_add", datasetTableId);
|
deleteFile("incremental_add", datasetTableId);
|
||||||
deleteFile("incremental_delete", datasetTableId);
|
deleteFile("incremental_delete", datasetTableId);
|
||||||
@ -368,17 +368,17 @@ public class ExtractDataService {
|
|||||||
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, execTime);
|
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, execTime);
|
||||||
|
|
||||||
dataSetTableTaskService.updateTaskStatus(datasetTableTask, JobStatus.Completed);
|
dataSetTableTaskService.updateTaskStatus(datasetTableTask, JobStatus.Completed);
|
||||||
}catch (Exception e){
|
} catch (Exception e) {
|
||||||
saveErrorLog(datasetTableId, taskId, e);
|
saveErrorLog(datasetTableId, taskId, e);
|
||||||
sendWebMsg(datasetTable, taskId,false);
|
sendWebMsg(datasetTable, datasetTableTask, false);
|
||||||
updateTableStatus(datasetTableId, datasetTable, JobStatus.Error, null);
|
updateTableStatus(datasetTableId, datasetTable, JobStatus.Error, null);
|
||||||
|
|
||||||
dataSetTableTaskService.updateTaskStatus(datasetTableTask, JobStatus.Error);
|
dataSetTableTaskService.updateTaskStatus(datasetTableTask, JobStatus.Error);
|
||||||
|
|
||||||
deleteFile("incremental_add", datasetTableId);
|
deleteFile("incremental_add", datasetTableId);
|
||||||
deleteFile("incremental_delete", datasetTableId);
|
deleteFile("incremental_delete", datasetTableId);
|
||||||
}finally {
|
} finally {
|
||||||
if(datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())){
|
if (datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) {
|
||||||
datasetTableTask.setStatus(TaskStatus.Stopped.name());
|
datasetTableTask.setStatus(TaskStatus.Stopped.name());
|
||||||
dataSetTableTaskService.update(datasetTableTask);
|
dataSetTableTaskService.update(datasetTableTask);
|
||||||
}
|
}
|
||||||
@ -387,7 +387,7 @@ public class ExtractDataService {
|
|||||||
}
|
}
|
||||||
//侵入式清除下属视图缓存
|
//侵入式清除下属视图缓存
|
||||||
List<String> viewIds = extChartViewMapper.allViewIds(datasetTableId);
|
List<String> viewIds = extChartViewMapper.allViewIds(datasetTableId);
|
||||||
if (CollectionUtils.isNotEmpty(viewIds)){
|
if (CollectionUtils.isNotEmpty(viewIds)) {
|
||||||
viewIds.forEach(viewId -> {
|
viewIds.forEach(viewId -> {
|
||||||
CacheUtils.remove(JdbcConstants.VIEW_CACHE_KEY, viewId);
|
CacheUtils.remove(JdbcConstants.VIEW_CACHE_KEY, viewId);
|
||||||
});
|
});
|
||||||
@ -395,7 +395,8 @@ public class ExtractDataService {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendWebMsg(DatasetTable datasetTable, String taskId, Boolean status) {
|
private void sendWebMsg(DatasetTable datasetTable, DatasetTableTask datasetTableTask, Boolean status) {
|
||||||
|
String taskId = datasetTableTask.getId();
|
||||||
String msg = status ? "成功" : "失败";
|
String msg = status ? "成功" : "失败";
|
||||||
Long typeId = status ? 5L : 6L;
|
Long typeId = status ? 5L : 6L;
|
||||||
String id = datasetTable.getId();
|
String id = datasetTable.getId();
|
||||||
@ -403,18 +404,22 @@ public class ExtractDataService {
|
|||||||
Set<Long> userIds = AuthUtils.userIdsByURD(authURD);
|
Set<Long> userIds = AuthUtils.userIdsByURD(authURD);
|
||||||
Gson gson = new Gson();
|
Gson gson = new Gson();
|
||||||
userIds.forEach(userId -> {
|
userIds.forEach(userId -> {
|
||||||
Map<String,Object> param = new HashMap<>();
|
Map<String, Object> param = new HashMap<>();
|
||||||
param.put("tableId", id);
|
param.put("tableId", id);
|
||||||
if(StringUtils.isNotEmpty(taskId)){
|
if (StringUtils.isNotEmpty(taskId)) {
|
||||||
param.put("taskId", taskId);
|
param.put("taskId", taskId);
|
||||||
}
|
}
|
||||||
DeMsgutil.sendMsg(userId, typeId, 1L, "数据集【"+datasetTable.getName()+"】同步"+msg, gson.toJson(param));
|
String content = "数据集【" + datasetTable.getName() + "】同步" + msg;
|
||||||
|
if (ObjectUtils.isNotEmpty(datasetTableTask) && ObjectUtils.isNotEmpty(datasetTableTask.getName())) {
|
||||||
|
content += " 任务名称【" + datasetTableTask.getName() + "】";
|
||||||
|
}
|
||||||
|
DeMsgutil.sendMsg(userId, typeId, 1L, content, gson.toJson(param));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private void updateTableStatus(String datasetTableId, DatasetTable datasetTable, JobStatus completed, Long execTime) {
|
private void updateTableStatus(String datasetTableId, DatasetTable datasetTable, JobStatus completed, Long execTime) {
|
||||||
datasetTable.setSyncStatus(completed.name());
|
datasetTable.setSyncStatus(completed.name());
|
||||||
if(execTime != null){
|
if (execTime != null) {
|
||||||
datasetTable.setLastUpdateTime(execTime);
|
datasetTable.setLastUpdateTime(execTime);
|
||||||
}
|
}
|
||||||
DatasetTableExample example = new DatasetTableExample();
|
DatasetTableExample example = new DatasetTableExample();
|
||||||
@ -428,16 +433,16 @@ public class ExtractDataService {
|
|||||||
dataSetTableTaskLogService.save(datasetTableTaskLog);
|
dataSetTableTaskLogService.save(datasetTableTaskLog);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void saveErrorLog(String datasetTableId, String taskId, Exception e){
|
private void saveErrorLog(String datasetTableId, String taskId, Exception e) {
|
||||||
LogUtil.error("Extract data error: " + datasetTableId, e);
|
LogUtil.error("Extract data error: " + datasetTableId, e);
|
||||||
DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog();
|
DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog();
|
||||||
datasetTableTaskLog.setTableId(datasetTableId);
|
datasetTableTaskLog.setTableId(datasetTableId);
|
||||||
datasetTableTaskLog.setStatus(JobStatus.Underway.name());
|
datasetTableTaskLog.setStatus(JobStatus.Underway.name());
|
||||||
if(StringUtils.isNotEmpty(taskId)){
|
if (StringUtils.isNotEmpty(taskId)) {
|
||||||
datasetTableTaskLog.setTaskId(taskId);
|
datasetTableTaskLog.setTaskId(taskId);
|
||||||
}
|
}
|
||||||
List<DatasetTableTaskLog> datasetTableTaskLogs = dataSetTableTaskLogService.select(datasetTableTaskLog);
|
List<DatasetTableTaskLog> datasetTableTaskLogs = dataSetTableTaskLogService.select(datasetTableTaskLog);
|
||||||
if(CollectionUtils.isNotEmpty(datasetTableTaskLogs)){
|
if (CollectionUtils.isNotEmpty(datasetTableTaskLogs)) {
|
||||||
datasetTableTaskLog = datasetTableTaskLogs.get(0);
|
datasetTableTaskLog = datasetTableTaskLogs.get(0);
|
||||||
datasetTableTaskLog.setStatus(JobStatus.Error.name());
|
datasetTableTaskLog.setStatus(JobStatus.Error.name());
|
||||||
datasetTableTaskLog.setInfo(e.getMessage());
|
datasetTableTaskLog.setInfo(e.getMessage());
|
||||||
@ -456,7 +461,7 @@ public class ExtractDataService {
|
|||||||
if (datasetTableField.getSize() > 65533 || datasetTableField.getSize() * 3 > 65533) {
|
if (datasetTableField.getSize() > 65533 || datasetTableField.getSize() * 3 > 65533) {
|
||||||
Column_Fields = Column_Fields + "varchar(65533)" + ",`";
|
Column_Fields = Column_Fields + "varchar(65533)" + ",`";
|
||||||
} else {
|
} else {
|
||||||
Column_Fields = Column_Fields + "varchar(lenth)".replace("lenth", String.valueOf(datasetTableField.getSize()*3)) + ",`";
|
Column_Fields = Column_Fields + "varchar(lenth)".replace("lenth", String.valueOf(datasetTableField.getSize() * 3)) + ",`";
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case 1:
|
case 1:
|
||||||
@ -499,7 +504,8 @@ public class ExtractDataService {
|
|||||||
datasourceRequest.setDatasource(dorisDatasource);
|
datasourceRequest.setDatasource(dorisDatasource);
|
||||||
datasourceRequest.setQuery(dropTableSql.replace("TABLE_NAME", dorisTableName));
|
datasourceRequest.setQuery(dropTableSql.replace("TABLE_NAME", dorisTableName));
|
||||||
jdbcProvider.exec(datasourceRequest);
|
jdbcProvider.exec(datasourceRequest);
|
||||||
}catch (Exception ignore){}
|
} catch (Exception ignore) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void replaceTable(String dorisTableName) throws Exception {
|
private void replaceTable(String dorisTableName) throws Exception {
|
||||||
@ -512,14 +518,15 @@ public class ExtractDataService {
|
|||||||
jdbcProvider.exec(datasourceRequest);
|
jdbcProvider.exec(datasourceRequest);
|
||||||
}
|
}
|
||||||
|
|
||||||
private DatasetTable getDatasetTable(String datasetTableId){
|
private DatasetTable getDatasetTable(String datasetTableId) {
|
||||||
for (int i=0;i<5;i++){
|
for (int i = 0; i < 5; i++) {
|
||||||
DatasetTable datasetTable = dataSetTableService.get(datasetTableId);
|
DatasetTable datasetTable = dataSetTableService.get(datasetTableId);
|
||||||
if(datasetTable == null){
|
if (datasetTable == null) {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
}catch (Exception ignore){}
|
} catch (Exception ignore) {
|
||||||
}else {
|
}
|
||||||
|
} else {
|
||||||
return datasetTable;
|
return datasetTable;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -532,11 +539,11 @@ public class ExtractDataService {
|
|||||||
datasetTableTaskLog.setStatus(JobStatus.Underway.name());
|
datasetTableTaskLog.setStatus(JobStatus.Underway.name());
|
||||||
datasetTableTaskLog.setTriggerType(TriggerType.Cron.name());
|
datasetTableTaskLog.setTriggerType(TriggerType.Cron.name());
|
||||||
List<DatasetTableTaskLog> datasetTableTaskLogs = dataSetTableTaskLogService.select(datasetTableTaskLog);
|
List<DatasetTableTaskLog> datasetTableTaskLogs = dataSetTableTaskLogService.select(datasetTableTaskLog);
|
||||||
if(CollectionUtils.isEmpty(datasetTableTaskLogs)){
|
if (CollectionUtils.isEmpty(datasetTableTaskLogs)) {
|
||||||
datasetTableTaskLog.setStartTime(System.currentTimeMillis());
|
datasetTableTaskLog.setStartTime(System.currentTimeMillis());
|
||||||
dataSetTableTaskLogService.save(datasetTableTaskLog);
|
dataSetTableTaskLogService.save(datasetTableTaskLog);
|
||||||
return datasetTableTaskLog;
|
return datasetTableTaskLog;
|
||||||
}else {
|
} else {
|
||||||
return datasetTableTaskLogs.get(0);
|
return datasetTableTaskLogs.get(0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -545,14 +552,15 @@ public class ExtractDataService {
|
|||||||
datasetTableTaskLog.setTableId(datasetTableId);
|
datasetTableTaskLog.setTableId(datasetTableId);
|
||||||
datasetTableTaskLog.setTaskId(taskId);
|
datasetTableTaskLog.setTaskId(taskId);
|
||||||
datasetTableTaskLog.setStatus(JobStatus.Underway.name());
|
datasetTableTaskLog.setStatus(JobStatus.Underway.name());
|
||||||
for (int i=0;i<5;i++){
|
for (int i = 0; i < 5; i++) {
|
||||||
List<DatasetTableTaskLog> datasetTableTaskLogs = dataSetTableTaskLogService.select(datasetTableTaskLog);
|
List<DatasetTableTaskLog> datasetTableTaskLogs = dataSetTableTaskLogService.select(datasetTableTaskLog);
|
||||||
if(CollectionUtils.isNotEmpty(datasetTableTaskLogs)){
|
if (CollectionUtils.isNotEmpty(datasetTableTaskLogs)) {
|
||||||
return datasetTableTaskLogs.get(0);
|
return datasetTableTaskLogs.get(0);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
}catch (Exception ignore){}
|
} catch (Exception ignore) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
datasetTableTaskLog.setStartTime(System.currentTimeMillis());
|
datasetTableTaskLog.setStartTime(System.currentTimeMillis());
|
||||||
dataSetTableTaskLogService.save(datasetTableTaskLog);
|
dataSetTableTaskLogService.save(datasetTableTaskLog);
|
||||||
@ -706,7 +714,7 @@ public class ExtractDataService {
|
|||||||
datasourceRequest.setDatasource(ds);
|
datasourceRequest.setDatasource(ds);
|
||||||
datasourceRequest.setQuery(qp.wrapSql(sql));
|
datasourceRequest.setQuery(qp.wrapSql(sql));
|
||||||
List<String> dorisFileds = new ArrayList<>();
|
List<String> dorisFileds = new ArrayList<>();
|
||||||
datasourceProvider.fetchResultField(datasourceRequest).stream().map(TableFiled::getFieldName).forEach(filed ->{
|
datasourceProvider.fetchResultField(datasourceRequest).stream().map(TableFiled::getFieldName).forEach(filed -> {
|
||||||
dorisFileds.add(DorisTableUtils.columnName(filed));
|
dorisFileds.add(DorisTableUtils.columnName(filed));
|
||||||
});
|
});
|
||||||
return String.join(",", dorisFileds);
|
return String.join(",", dorisFileds);
|
||||||
@ -728,7 +736,7 @@ public class ExtractDataService {
|
|||||||
case mysql:
|
case mysql:
|
||||||
MysqlConfigration mysqlConfigration = new Gson().fromJson(datasource.getConfiguration(), MysqlConfigration.class);
|
MysqlConfigration mysqlConfigration = new Gson().fromJson(datasource.getConfiguration(), MysqlConfigration.class);
|
||||||
dataMeta = new DatabaseMeta("db", "MYSQL", "Native", mysqlConfigration.getHost(), mysqlConfigration.getDataBase(), mysqlConfigration.getPort().toString(), mysqlConfigration.getUsername(), mysqlConfigration.getPassword());
|
dataMeta = new DatabaseMeta("db", "MYSQL", "Native", mysqlConfigration.getHost(), mysqlConfigration.getDataBase(), mysqlConfigration.getPort().toString(), mysqlConfigration.getUsername(), mysqlConfigration.getPassword());
|
||||||
dataMeta.addExtraOption("MYSQL","characterEncoding", "UTF-8");
|
dataMeta.addExtraOption("MYSQL", "characterEncoding", "UTF-8");
|
||||||
transMeta.addDatabase(dataMeta);
|
transMeta.addDatabase(dataMeta);
|
||||||
selectSQL = getSelectSQL(extractType, datasetTable, datasource, datasetTableFields, selectSQL);
|
selectSQL = getSelectSQL(extractType, datasetTable, datasource, datasetTableFields, selectSQL);
|
||||||
inputStep = inputStep(transMeta, selectSQL);
|
inputStep = inputStep(transMeta, selectSQL);
|
||||||
@ -744,13 +752,13 @@ public class ExtractDataService {
|
|||||||
break;
|
break;
|
||||||
case oracle:
|
case oracle:
|
||||||
OracleConfigration oracleConfigration = new Gson().fromJson(datasource.getConfiguration(), OracleConfigration.class);
|
OracleConfigration oracleConfigration = new Gson().fromJson(datasource.getConfiguration(), OracleConfigration.class);
|
||||||
if(oracleConfigration.getConnectionType().equalsIgnoreCase("serviceName")){
|
if (oracleConfigration.getConnectionType().equalsIgnoreCase("serviceName")) {
|
||||||
String database = "(DESCRIPTION =(ADDRESS = (PROTOCOL = TCP)(HOST = ORACLE_HOSTNAME)(PORT = ORACLE_PORT))(CONNECT_DATA = (SERVER = DEDICATED)(SERVICE_NAME = ORACLE_SERVICE_NAME )))".replace("ORACLE_HOSTNAME", oracleConfigration.getHost()).replace("ORACLE_PORT", oracleConfigration.getPort().toString()).replace("ORACLE_SERVICE_NAME", oracleConfigration.getDataBase());
|
String database = "(DESCRIPTION =(ADDRESS = (PROTOCOL = TCP)(HOST = ORACLE_HOSTNAME)(PORT = ORACLE_PORT))(CONNECT_DATA = (SERVER = DEDICATED)(SERVICE_NAME = ORACLE_SERVICE_NAME )))".replace("ORACLE_HOSTNAME", oracleConfigration.getHost()).replace("ORACLE_PORT", oracleConfigration.getPort().toString()).replace("ORACLE_SERVICE_NAME", oracleConfigration.getDataBase());
|
||||||
dataMeta = new DatabaseMeta("db", "ORACLE", "Native", "", database, "-1", oracleConfigration.getUsername(), oracleConfigration.getPassword());
|
dataMeta = new DatabaseMeta("db", "ORACLE", "Native", "", database, "-1", oracleConfigration.getUsername(), oracleConfigration.getPassword());
|
||||||
}else {
|
} else {
|
||||||
dataMeta = new DatabaseMeta("db", "ORACLE", "Native", oracleConfigration.getHost(), oracleConfigration.getDataBase(), oracleConfigration.getPort().toString(), oracleConfigration.getUsername(), oracleConfigration.getPassword());
|
dataMeta = new DatabaseMeta("db", "ORACLE", "Native", oracleConfigration.getHost(), oracleConfigration.getDataBase(), oracleConfigration.getPort().toString(), oracleConfigration.getUsername(), oracleConfigration.getPassword());
|
||||||
}
|
}
|
||||||
transMeta.addDatabase(dataMeta);
|
transMeta.addDatabase(dataMeta);
|
||||||
|
|
||||||
selectSQL = getSelectSQL(extractType, datasetTable, datasource, datasetTableFields, selectSQL);
|
selectSQL = getSelectSQL(extractType, datasetTable, datasource, datasetTableFields, selectSQL);
|
||||||
inputStep = inputStep(transMeta, selectSQL);
|
inputStep = inputStep(transMeta, selectSQL);
|
||||||
@ -806,12 +814,12 @@ public class ExtractDataService {
|
|||||||
selectSQL = qp.createRawQuerySQL(tableName, datasetTableFields);
|
selectSQL = qp.createRawQuerySQL(tableName, datasetTableFields);
|
||||||
}
|
}
|
||||||
|
|
||||||
if(extractType.equalsIgnoreCase("all_scope") && datasetTable.getType().equalsIgnoreCase("sql")){
|
if (extractType.equalsIgnoreCase("all_scope") && datasetTable.getType().equalsIgnoreCase("sql")) {
|
||||||
selectSQL = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getSql();
|
selectSQL = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getSql();
|
||||||
QueryProvider qp = ProviderFactory.getQueryProvider(datasource.getType());
|
QueryProvider qp = ProviderFactory.getQueryProvider(datasource.getType());
|
||||||
selectSQL = qp.createRawQuerySQLAsTmp(selectSQL, datasetTableFields);
|
selectSQL = qp.createRawQuerySQLAsTmp(selectSQL, datasetTableFields);
|
||||||
}
|
}
|
||||||
if(!extractType.equalsIgnoreCase("all_scope")){
|
if (!extractType.equalsIgnoreCase("all_scope")) {
|
||||||
QueryProvider qp = ProviderFactory.getQueryProvider(datasource.getType());
|
QueryProvider qp = ProviderFactory.getQueryProvider(datasource.getType());
|
||||||
selectSQL = qp.createRawQuerySQLAsTmp(selectSQL, datasetTableFields);
|
selectSQL = qp.createRawQuerySQLAsTmp(selectSQL, datasetTableFields);
|
||||||
}
|
}
|
||||||
@ -834,23 +842,25 @@ public class ExtractDataService {
|
|||||||
ExcelInputMeta excelInputMeta = new ExcelInputMeta();
|
ExcelInputMeta excelInputMeta = new ExcelInputMeta();
|
||||||
if (StringUtils.equalsIgnoreCase(suffix, "xlsx")) {
|
if (StringUtils.equalsIgnoreCase(suffix, "xlsx")) {
|
||||||
excelInputMeta.setSpreadSheetType(SpreadSheetType.SAX_POI);
|
excelInputMeta.setSpreadSheetType(SpreadSheetType.SAX_POI);
|
||||||
try{
|
try {
|
||||||
InputStream inputStream = new FileInputStream(filePath);
|
InputStream inputStream = new FileInputStream(filePath);
|
||||||
XSSFWorkbook xssfWorkbook = new XSSFWorkbook(inputStream);
|
XSSFWorkbook xssfWorkbook = new XSSFWorkbook(inputStream);
|
||||||
XSSFSheet sheet0 = xssfWorkbook.getSheetAt(0);
|
XSSFSheet sheet0 = xssfWorkbook.getSheetAt(0);
|
||||||
excelInputMeta.setSheetName(new String[]{sheet0.getSheetName()});
|
excelInputMeta.setSheetName(new String[]{sheet0.getSheetName()});
|
||||||
}catch (Exception e){
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (StringUtils.equalsIgnoreCase(suffix, "xls")) {
|
if (StringUtils.equalsIgnoreCase(suffix, "xls")) {
|
||||||
excelInputMeta.setSpreadSheetType(SpreadSheetType.JXL);
|
excelInputMeta.setSpreadSheetType(SpreadSheetType.JXL);
|
||||||
try{
|
try {
|
||||||
InputStream inputStream = new FileInputStream(filePath);
|
InputStream inputStream = new FileInputStream(filePath);
|
||||||
HSSFWorkbook workbook = new HSSFWorkbook(inputStream);
|
HSSFWorkbook workbook = new HSSFWorkbook(inputStream);
|
||||||
HSSFSheet sheet0 = workbook.getSheetAt(0);
|
HSSFSheet sheet0 = workbook.getSheetAt(0);
|
||||||
excelInputMeta.setSheetName(new String[]{sheet0.getSheetName()});
|
excelInputMeta.setSheetName(new String[]{sheet0.getSheetName()});
|
||||||
}catch (Exception e){e.printStackTrace();}
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
excelInputMeta.setPassword("Encrypted");
|
excelInputMeta.setPassword("Encrypted");
|
||||||
excelInputMeta.setFileName(new String[]{filePath});
|
excelInputMeta.setFileName(new String[]{filePath});
|
||||||
@ -860,10 +870,10 @@ public class ExtractDataService {
|
|||||||
for (int i = 0; i < datasetTableFields.size(); i++) {
|
for (int i = 0; i < datasetTableFields.size(); i++) {
|
||||||
ExcelInputField field = new ExcelInputField();
|
ExcelInputField field = new ExcelInputField();
|
||||||
field.setName(datasetTableFields.get(i).getOriginName());
|
field.setName(datasetTableFields.get(i).getOriginName());
|
||||||
if(datasetTableFields.get(i).getDeExtractType() == 1){
|
if (datasetTableFields.get(i).getDeExtractType() == 1) {
|
||||||
field.setType("String");
|
field.setType("String");
|
||||||
field.setFormat("yyyy-MM-dd HH:mm:ss");
|
field.setFormat("yyyy-MM-dd HH:mm:ss");
|
||||||
}else {
|
} else {
|
||||||
field.setType("String");
|
field.setType("String");
|
||||||
}
|
}
|
||||||
fields[i] = field;
|
fields[i] = field;
|
||||||
@ -904,14 +914,14 @@ public class ExtractDataService {
|
|||||||
|
|
||||||
tmp_code = tmp_code.replace("handleWraps", handleWraps);
|
tmp_code = tmp_code.replace("handleWraps", handleWraps);
|
||||||
String Column_Fields = "";
|
String Column_Fields = "";
|
||||||
if(datasourceType.equals(DatasourceTypes.excel) || datasourceType.equals(DatasourceTypes.oracle)){
|
if (datasourceType.equals(DatasourceTypes.excel) || datasourceType.equals(DatasourceTypes.oracle)) {
|
||||||
Column_Fields = String.join(",", datasetTableFields.stream().map(DatasetTableField::getOriginName).collect(Collectors.toList()));
|
Column_Fields = String.join(",", datasetTableFields.stream().map(DatasetTableField::getOriginName).collect(Collectors.toList()));
|
||||||
}else {
|
} else {
|
||||||
Column_Fields = String.join(",", datasetTableFields.stream().map(DatasetTableField::getDataeaseName).collect(Collectors.toList()));
|
Column_Fields = String.join(",", datasetTableFields.stream().map(DatasetTableField::getDataeaseName).collect(Collectors.toList()));
|
||||||
}
|
}
|
||||||
if(datasourceType.equals(DatasourceTypes.excel)){
|
if (datasourceType.equals(DatasourceTypes.excel)) {
|
||||||
tmp_code = tmp_code.replace("handleExcelIntColumn", handleExcelIntColumn).replace("Column_Fields", Column_Fields);
|
tmp_code = tmp_code.replace("handleExcelIntColumn", handleExcelIntColumn).replace("Column_Fields", Column_Fields);
|
||||||
}else {
|
} else {
|
||||||
tmp_code = tmp_code.replace("handleExcelIntColumn", "").replace("Column_Fields", Column_Fields);
|
tmp_code = tmp_code.replace("handleExcelIntColumn", "").replace("Column_Fields", Column_Fields);
|
||||||
}
|
}
|
||||||
UserDefinedJavaClassDef userDefinedJavaClassDef = new UserDefinedJavaClassDef(UserDefinedJavaClassDef.ClassType.TRANSFORM_CLASS, "Processor", tmp_code);
|
UserDefinedJavaClassDef userDefinedJavaClassDef = new UserDefinedJavaClassDef(UserDefinedJavaClassDef.ClassType.TRANSFORM_CLASS, "Processor", tmp_code);
|
||||||
@ -926,7 +936,7 @@ public class ExtractDataService {
|
|||||||
return userDefinedJavaClassStep;
|
return userDefinedJavaClassStep;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void deleteFile(String type, String dataSetTableId){
|
public void deleteFile(String type, String dataSetTableId) {
|
||||||
String transName = null;
|
String transName = null;
|
||||||
String jobName = null;
|
String jobName = null;
|
||||||
String fileName = null;
|
String fileName = null;
|
||||||
@ -941,7 +951,7 @@ public class ExtractDataService {
|
|||||||
transName = "trans_add_" + DorisTableUtils.dorisName(dataSetTableId);
|
transName = "trans_add_" + DorisTableUtils.dorisName(dataSetTableId);
|
||||||
jobName = "job_add_" + DorisTableUtils.dorisName(dataSetTableId);
|
jobName = "job_add_" + DorisTableUtils.dorisName(dataSetTableId);
|
||||||
fileName = DorisTableUtils.dorisAddName(dataSetTableId);
|
fileName = DorisTableUtils.dorisAddName(dataSetTableId);
|
||||||
break;
|
break;
|
||||||
case "incremental_delete":
|
case "incremental_delete":
|
||||||
transName = "trans_delete_" + DorisTableUtils.dorisName(dataSetTableId);
|
transName = "trans_delete_" + DorisTableUtils.dorisName(dataSetTableId);
|
||||||
jobName = "job_delete_" + DorisTableUtils.dorisName(dataSetTableId);
|
jobName = "job_delete_" + DorisTableUtils.dorisName(dataSetTableId);
|
||||||
@ -950,28 +960,31 @@ public class ExtractDataService {
|
|||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
try{
|
try {
|
||||||
File file = new File(root_path + fileName + "." + extention);
|
File file = new File(root_path + fileName + "." + extention);
|
||||||
FileUtils.forceDelete(file);
|
FileUtils.forceDelete(file);
|
||||||
}catch (Exception e){}
|
} catch (Exception e) {
|
||||||
try{
|
}
|
||||||
|
try {
|
||||||
File file = new File(root_path + jobName + ".kjb");
|
File file = new File(root_path + jobName + ".kjb");
|
||||||
FileUtils.forceDelete(file);
|
FileUtils.forceDelete(file);
|
||||||
}catch (Exception e){}
|
} catch (Exception e) {
|
||||||
try{
|
}
|
||||||
|
try {
|
||||||
File file = new File(root_path + transName + ".ktr");
|
File file = new File(root_path + transName + ".ktr");
|
||||||
FileUtils.forceDelete(file);
|
FileUtils.forceDelete(file);
|
||||||
}catch (Exception e){}
|
} catch (Exception e) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isKettleRunning() {
|
public boolean isKettleRunning() {
|
||||||
try {
|
try {
|
||||||
if (!InetAddress.getByName(carte).isReachable(1000)) {
|
if (!InetAddress.getByName(carte).isReachable(1000)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}catch (Exception e){
|
} catch (Exception e) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
HttpGet getMethod = new HttpGet("http://" + carte + ":" + port);
|
HttpGet getMethod = new HttpGet("http://" + carte + ":" + port);
|
||||||
HttpClientManager.HttpClientBuilderFacade clientBuilder = HttpClientManager.getInstance().createBuilder();
|
HttpClientManager.HttpClientBuilderFacade clientBuilder = HttpClientManager.getInstance().createBuilder();
|
||||||
|
Loading…
Reference in New Issue
Block a user