forked from github/dataease
feat: 定时任务采用分页的方式拉取数据
This commit is contained in:
parent
4f72f2c3c3
commit
b6798c6fee
@ -820,6 +820,14 @@ public class CKQueryProvider extends QueryProvider {
|
||||
return tmpSql;
|
||||
}
|
||||
|
||||
public String getTotalCount(boolean isTable, String sql, Datasource ds) {
|
||||
if(isTable){
|
||||
return "SELECT COUNT(*) from " + String.format(CKConstants.KEYWORD_TABLE, sql);
|
||||
}else {
|
||||
return "SELECT COUNT(*) from ( " + sql + " ) DE_COUNT_TEMP";
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String createRawQuerySQL(String table, List<DatasetTableField> fields, Datasource ds) {
|
||||
String[] array = fields.stream().map(f -> {
|
||||
@ -831,12 +839,12 @@ public class CKQueryProvider extends QueryProvider {
|
||||
}
|
||||
return stringBuilder.toString();
|
||||
}).toArray(String[]::new);
|
||||
return MessageFormat.format("SELECT {0} FROM {1}", StringUtils.join(array, ","), table);
|
||||
return MessageFormat.format("SELECT {0} FROM {1} LIMIT DE_OFFSET, DE_PAGE_SIZE ", StringUtils.join(array, ","), table);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String createRawQuerySQLAsTmp(String sql, List<DatasetTableField> fields) {
|
||||
return createRawQuerySQL(" (" + sqlFix(sql) + ") AS tmp ", fields, null);
|
||||
return createRawQuerySQL(" (" + sqlFix(sql) + ") AS DE_TEMP LIMIT DE_OFFSET, DE_PAGE_SIZE ", fields, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -808,6 +808,17 @@ public class Db2QueryProvider extends QueryProvider {
|
||||
return tmpSql;
|
||||
}
|
||||
|
||||
|
||||
public String getTotalCount(boolean isTable, String sql, Datasource ds) {
|
||||
if(isTable){
|
||||
String schema = new Gson().fromJson(ds.getConfiguration(), JdbcConfiguration.class).getSchema();
|
||||
schema = String.format(Db2Constants.KEYWORD_TABLE, schema);
|
||||
return "SELECT COUNT(*) from " + schema + "." + String.format(Db2Constants.KEYWORD_TABLE, sql);
|
||||
}else {
|
||||
return "SELECT COUNT(*) from ( " + sql + " ) DE_COUNT_TEMP";
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String createRawQuerySQL(String table, List<DatasetTableField> fields, Datasource ds) {
|
||||
String[] array = fields.stream().map(f -> {
|
||||
@ -817,15 +828,15 @@ public class Db2QueryProvider extends QueryProvider {
|
||||
}).toArray(String[]::new);
|
||||
if (ds != null) {
|
||||
Db2Configuration db2Configuration = new Gson().fromJson(ds.getConfiguration(), Db2Configuration.class);
|
||||
return MessageFormat.format("SELECT {0} FROM {1}", StringUtils.join(array, ","), db2Configuration.getSchema() + ".\"" + table + "\"");
|
||||
return MessageFormat.format("SELECT {0} FROM {1} LIMIT DE_OFFSET, DE_PAGE_SIZE ", StringUtils.join(array, ","), db2Configuration.getSchema() + String.format(Db2Constants.KEYWORD_TABLE, table));
|
||||
} else {
|
||||
return MessageFormat.format("SELECT {0} FROM {1}", StringUtils.join(array, ","), table);
|
||||
return MessageFormat.format("SELECT {0} FROM {1} LIMIT DE_OFFSET, DE_PAGE_SIZE ", StringUtils.join(array, ","), table);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String createRawQuerySQLAsTmp(String sql, List<DatasetTableField> fields) {
|
||||
return createRawQuerySQL(" (" + sqlFix(sql) + ") AS de_tmp ", fields, null);
|
||||
return createRawQuerySQL(" (" + sqlFix(sql) + ") AS de_tmp LIMIT DE_OFFSET, DE_PAGE_SIZE ", fields, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -770,6 +770,14 @@ public class HiveQueryProvider extends QueryProvider {
|
||||
return tmpSql;
|
||||
}
|
||||
|
||||
public String getTotalCount(boolean isTable, String sql, Datasource ds) {
|
||||
if(isTable){
|
||||
return "SELECT COUNT(*) from " + String.format(HiveConstants.KEYWORD_TABLE, sql);
|
||||
}else {
|
||||
return "SELECT COUNT(*) from ( " + sql + " ) DE_COUNT_TEMP";
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String createRawQuerySQL(String table, List<DatasetTableField> fields, Datasource ds) {
|
||||
String[] array = fields.stream().map(f -> {
|
||||
@ -781,12 +789,12 @@ public class HiveQueryProvider extends QueryProvider {
|
||||
}
|
||||
return stringBuilder.toString();
|
||||
}).toArray(String[]::new);
|
||||
return MessageFormat.format("SELECT {0} FROM {1}", StringUtils.join(array, ","), table);
|
||||
return MessageFormat.format("SELECT {0} FROM {1} LIMIT DE_OFFSET, DE_PAGE_SIZE ", StringUtils.join(array, ","), table);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String createRawQuerySQLAsTmp(String sql, List<DatasetTableField> fields) {
|
||||
return createRawQuerySQL(" (" + sqlFix(sql) + ") AS tmp ", fields, null);
|
||||
return createRawQuerySQL(" (" + sqlFix(sql) + ") AS DE_TEMP LIMIT DE_OFFSET, DE_PAGE_SIZE ", fields, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -768,6 +768,14 @@ public class ImpalaQueryProvider extends QueryProvider {
|
||||
return tmpSql;
|
||||
}
|
||||
|
||||
public String getTotalCount(boolean isTable, String sql, Datasource ds) {
|
||||
if(isTable){
|
||||
return "SELECT COUNT(*) from " + String.format(ImpalaConstants.KEYWORD_TABLE, sql);
|
||||
}else {
|
||||
return "SELECT COUNT(*) from ( " + sql + " ) DE_COUNT_TEMP";
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String createRawQuerySQL(String table, List<DatasetTableField> fields, Datasource ds) {
|
||||
String[] array = fields.stream().map(f -> {
|
||||
@ -779,12 +787,12 @@ public class ImpalaQueryProvider extends QueryProvider {
|
||||
}
|
||||
return stringBuilder.toString();
|
||||
}).toArray(String[]::new);
|
||||
return MessageFormat.format("SELECT {0} FROM {1}", StringUtils.join(array, ","), table);
|
||||
return MessageFormat.format("SELECT {0} FROM {1} LIMIT DE_PAGE_SIZE OFFSET DE_OFFSET ", StringUtils.join(array, ","), table);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String createRawQuerySQLAsTmp(String sql, List<DatasetTableField> fields) {
|
||||
return createRawQuerySQL(" (" + sqlFix(sql) + ") AS tmp ", fields, null);
|
||||
return createRawQuerySQL(" (" + sqlFix(sql) + ") AS DE_TEMP LIMIT DE_PAGE_SIZE OFFSET DE_OFFSET ", fields, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -808,12 +808,12 @@ public class MysqlQueryProvider extends QueryProvider {
|
||||
}
|
||||
return stringBuilder.toString();
|
||||
}).toArray(String[]::new);
|
||||
return MessageFormat.format("SELECT {0} FROM {1}", StringUtils.join(array, ","), table);
|
||||
return MessageFormat.format("SELECT {0} FROM {1} LIMIT DE_OFFSET, DE_PAGE_SIZE ", StringUtils.join(array, ","), table);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String createRawQuerySQLAsTmp(String sql, List<DatasetTableField> fields) {
|
||||
return createRawQuerySQL(" (" + sqlFix(sql) + ") AS tmp ", fields, null);
|
||||
return createRawQuerySQL(" (" + sqlFix(sql) + ") AS DE_TEMP LIMIT DE_OFFSET, DE_PAGE_SIZE ", fields, null);
|
||||
}
|
||||
|
||||
public String transTreeItem(SQLObj tableObj, DatasetRowPermissionsTreeItem item) {
|
||||
@ -1346,4 +1346,5 @@ public class MysqlQueryProvider extends QueryProvider {
|
||||
"{\"dateformat\": \"%Y%m%d %H:%i:%S\"}\n" +
|
||||
"]", Dateformat.class);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -860,25 +860,94 @@ public class OracleQueryProvider extends QueryProvider {
|
||||
return tmpSql;
|
||||
}
|
||||
|
||||
public String getTotalCount(boolean isTable, String sql, Datasource ds) {
|
||||
if (isTable) {
|
||||
String schema = new Gson().fromJson(ds.getConfiguration(), OracleConfiguration.class).getSchema();
|
||||
schema = String.format(OracleConstants.KEYWORD_TABLE, schema);
|
||||
return "SELECT COUNT(*) from " + schema + "." + String.format(OracleConstants.KEYWORD_TABLE, sql);
|
||||
} else {
|
||||
return "SELECT COUNT(*) from ( " + sql + " ) DE_COUNT_TEMP";
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String createRawQuerySQL(String table, List<DatasetTableField> fields, Datasource ds) {
|
||||
String[] array = fields.stream().map(f -> {
|
||||
StringBuilder stringBuilder = new StringBuilder();
|
||||
stringBuilder.append(" \"").append(f.getOriginName()).append("\"");
|
||||
return stringBuilder.toString();
|
||||
}).toArray(String[]::new);
|
||||
OracleConfiguration oracleConfiguration = new Gson().fromJson(ds.getConfiguration(), OracleConfiguration.class);
|
||||
return MessageFormat.format("SELECT {0} FROM {1}", StringUtils.join(array, ","), oracleConfiguration.getSchema() + ".\"" + table + "\"");
|
||||
List<ChartViewFieldDTO> xAxis = new ArrayList<>();
|
||||
ChartViewFieldDTO chartViewFieldDTO = new ChartViewFieldDTO();
|
||||
chartViewFieldDTO.setOriginName("ROWNUM");
|
||||
xAxis.add(chartViewFieldDTO);
|
||||
fields.forEach(datasetTableField -> {
|
||||
ChartViewFieldDTO f = new ChartViewFieldDTO();
|
||||
f.setOriginName(datasetTableField.getOriginName());
|
||||
f.setDeType(0);
|
||||
xAxis.add(f);
|
||||
});
|
||||
|
||||
List<ChartFieldCustomFilterDTO> fieldCustomFilter = new ArrayList<>();
|
||||
ChartFieldCustomFilterDTO chartFieldCustomFilterDTO = new ChartFieldCustomFilterDTO();
|
||||
DatasetTableField datasetTableField = new DatasetTableField();
|
||||
datasetTableField.setOriginName("ROWNUM");
|
||||
datasetTableField.setDeType(0);
|
||||
chartFieldCustomFilterDTO.setField(datasetTableField);
|
||||
|
||||
List<ChartCustomFilterItemDTO> filterItemDTOS = new ArrayList<>();
|
||||
ChartCustomFilterItemDTO itemDTO = new ChartCustomFilterItemDTO();
|
||||
itemDTO.setTerm("le");
|
||||
itemDTO.setValue("DE_ALL");
|
||||
filterItemDTOS.add(itemDTO);
|
||||
chartFieldCustomFilterDTO.setFilter(filterItemDTOS);
|
||||
fieldCustomFilter.add(chartFieldCustomFilterDTO);
|
||||
|
||||
|
||||
SQLObj tableObj = SQLObj.builder()
|
||||
.tableName((table.startsWith("(") && table.endsWith(")")) ? table : String.format(OracleConstants.KEYWORD_TABLE, table))
|
||||
.tableAlias(String.format(OracleConstants.ALIAS_FIX, String.format(TABLE_ALIAS_PREFIX, 0)))
|
||||
.build();
|
||||
setSchema(tableObj, ds);
|
||||
List<SQLObj> xFields = new ArrayList<>();
|
||||
if (CollectionUtils.isNotEmpty(xAxis)) {
|
||||
for (int i = 0; i < xAxis.size(); i++) {
|
||||
ChartViewFieldDTO x = xAxis.get(i);
|
||||
if (x.getOriginName().equalsIgnoreCase("ROWNUM")) {
|
||||
xFields.add(SQLObj.builder()
|
||||
.fieldName(x.getOriginName())
|
||||
.fieldAlias("DE_ROWNUM")
|
||||
.build());
|
||||
continue;
|
||||
}
|
||||
String originField = String.format(OracleConstants.KEYWORD_FIX, tableObj.getTableAlias(), x.getOriginName());
|
||||
String fieldAlias = String.format(OracleConstants.KEYWORD_TABLE, x.getOriginName());
|
||||
xFields.add(getXFields(x, originField, fieldAlias));
|
||||
}
|
||||
}
|
||||
|
||||
String customWheres = transCustomFilterList(tableObj, fieldCustomFilter);
|
||||
|
||||
List<String> wheres = new ArrayList<>();
|
||||
if (customWheres != null) wheres.add(customWheres);
|
||||
|
||||
STGroup stg = new STGroupFile(SQLConstants.SQL_TEMPLATE);
|
||||
ST st_sql = stg.getInstanceOf("previewSql");
|
||||
st_sql.add("isGroup", false);
|
||||
if (CollectionUtils.isNotEmpty(xFields)) st_sql.add("groups", xFields);
|
||||
if (CollectionUtils.isNotEmpty(wheres)) st_sql.add("filters", wheres);
|
||||
if (ObjectUtils.isNotEmpty(tableObj)) st_sql.add("table", tableObj);
|
||||
String sql = st_sql.render();
|
||||
|
||||
ST st = stg.getInstanceOf("previewSql");
|
||||
st.add("isGroup", false);
|
||||
SQLObj tableSQL = SQLObj.builder()
|
||||
.tableName(String.format(OracleConstants.BRACKETS, sql))
|
||||
.tableAlias(String.format(TABLE_ALIAS_PREFIX, 1))
|
||||
.build();
|
||||
if (ObjectUtils.isNotEmpty(tableSQL)) st.add("table", tableSQL);
|
||||
|
||||
return "SELECT * FROM (" + sqlFix(st.render()) + ") DE_RESULT_TMP " + " WHERE DE_ROWNUM >= DE_OFFSET";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String createRawQuerySQLAsTmp(String sql, List<DatasetTableField> fields) {
|
||||
String[] array = fields.stream().map(f -> {
|
||||
StringBuilder stringBuilder = new StringBuilder();
|
||||
stringBuilder.append(" \"").append(f.getOriginName()).append("\"");
|
||||
return stringBuilder.toString();
|
||||
}).toArray(String[]::new);
|
||||
return MessageFormat.format("SELECT {0} FROM {1}", StringUtils.join(array, ","), " (" + sqlFix(sql) + ") DE_TMP ");
|
||||
return createRawQuerySQL("(" + sqlFix(sql) + ")", fields, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -800,6 +800,16 @@ public class PgQueryProvider extends QueryProvider {
|
||||
return tmpSql;
|
||||
}
|
||||
|
||||
public String getTotalCount(boolean isTable, String sql, Datasource ds) {
|
||||
if(isTable){
|
||||
String schema = new Gson().fromJson(ds.getConfiguration(), JdbcConfiguration.class).getSchema();
|
||||
String tableWithSchema = String.format(SqlServerSQLConstants.KEYWORD_TABLE, schema) + "." + String.format(SqlServerSQLConstants.KEYWORD_TABLE, sql);
|
||||
return "SELECT COUNT(*) from " + String.format(PgConstants.KEYWORD_TABLE, tableWithSchema);
|
||||
}else {
|
||||
return "SELECT COUNT(*) from ( " + sql + " ) DE_COUNT_TEMP";
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String createRawQuerySQL(String table, List<DatasetTableField> fields, Datasource ds) {
|
||||
String[] array = fields.stream().map(f -> {
|
||||
@ -810,15 +820,15 @@ public class PgQueryProvider extends QueryProvider {
|
||||
if (ds != null) {
|
||||
String schema = new Gson().fromJson(ds.getConfiguration(), JdbcConfiguration.class).getSchema();
|
||||
String tableWithSchema = String.format(SqlServerSQLConstants.KEYWORD_TABLE, schema) + "." + String.format(SqlServerSQLConstants.KEYWORD_TABLE, table);
|
||||
return MessageFormat.format("SELECT {0} FROM {1} ", StringUtils.join(array, ","), tableWithSchema);
|
||||
return MessageFormat.format("SELECT {0} FROM {1} LIMIT DE_PAGE_SIZE OFFSET DE_OFFSET ", StringUtils.join(array, ","), tableWithSchema);
|
||||
} else {
|
||||
return MessageFormat.format("SELECT {0} FROM {1} ", StringUtils.join(array, ","), table);
|
||||
return MessageFormat.format("SELECT {0} FROM {1} LIMIT DE_PAGE_SIZE OFFSET DE_OFFSET ", StringUtils.join(array, ","), table);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String createRawQuerySQLAsTmp(String sql, List<DatasetTableField> fields) {
|
||||
return createRawQuerySQL(" (" + sqlFix(sql) + ") AS tmp ", fields, null);
|
||||
return createRawQuerySQL(" (" + sqlFix(sql) + ") AS DE_TEMP LIMIT DE_PAGE_SIZE OFFSET DE_OFFSET ", fields, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -807,6 +807,16 @@ public class RedshiftQueryProvider extends QueryProvider {
|
||||
return tmpSql;
|
||||
}
|
||||
|
||||
public String getTotalCount(boolean isTable, String sql, Datasource ds) {
|
||||
if(isTable){
|
||||
String schema = new Gson().fromJson(ds.getConfiguration(), JdbcConfiguration.class).getSchema();
|
||||
String tableWithSchema = String.format(SqlServerSQLConstants.KEYWORD_TABLE, schema) + "." + String.format(SqlServerSQLConstants.KEYWORD_TABLE, sql);
|
||||
return "SELECT COUNT(*) from " + String.format(ImpalaConstants.KEYWORD_TABLE, tableWithSchema);
|
||||
}else {
|
||||
return "SELECT COUNT(*) from ( " + sql + " ) DE_COUNT_TEMP";
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String createRawQuerySQL(String table, List<DatasetTableField> fields, Datasource ds) {
|
||||
String[] array = fields.stream().map(f -> {
|
||||
@ -817,15 +827,15 @@ public class RedshiftQueryProvider extends QueryProvider {
|
||||
if (ds != null) {
|
||||
String schema = new Gson().fromJson(ds.getConfiguration(), JdbcConfiguration.class).getSchema();
|
||||
String tableWithSchema = String.format(SqlServerSQLConstants.KEYWORD_TABLE, schema) + "." + String.format(SqlServerSQLConstants.KEYWORD_TABLE, table);
|
||||
return MessageFormat.format("SELECT {0} FROM {1} ", StringUtils.join(array, ","), tableWithSchema);
|
||||
return MessageFormat.format("SELECT {0} FROM {1} LIMIT DE_PAGE_SIZE OFFSET DE_OFFSET ", StringUtils.join(array, ","), tableWithSchema);
|
||||
} else {
|
||||
return MessageFormat.format("SELECT {0} FROM {1} ", StringUtils.join(array, ","), table);
|
||||
return MessageFormat.format("SELECT {0} FROM {1} LIMIT DE_PAGE_SIZE OFFSET DE_OFFSET ", StringUtils.join(array, ","), table);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String createRawQuerySQLAsTmp(String sql, List<DatasetTableField> fields) {
|
||||
return createRawQuerySQL(" (" + sqlFix(sql) + ") AS tmp ", fields, null);
|
||||
return createRawQuerySQL(" (" + sqlFix(sql) + ") AS DE_TEMP LIMIT DE_PAGE_SIZE OFFSET DE_OFFSET ", fields, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -96,6 +96,7 @@ public class ExtractDataService {
|
||||
@Resource
|
||||
private KettleService kettleService;
|
||||
|
||||
|
||||
private static final String lastUpdateTime = "${__last_update_time__}";
|
||||
private static final String currentUpdateTime = "${__current_update_time__}";
|
||||
private static final String separator = "|DE|";
|
||||
@ -104,6 +105,9 @@ public class ExtractDataService {
|
||||
|
||||
@Value("${kettle.files.keep:false}")
|
||||
private boolean kettleFilesKeep;
|
||||
@Value("${extract.page.size:50000}")
|
||||
private Long extractPageSize;
|
||||
|
||||
|
||||
private static final String shellScript = "result=`curl --location-trusted -u %s:%s -H \"label:%s\" -H \"column_separator:%s\" -H \"columns:%s\" -H \"merge_type: %s\" -T %s -XPUT http://%s:%s/api/%s/%s/_stream_load`\n" +
|
||||
"if [ $? -eq 0 ] ; then\n" +
|
||||
@ -433,7 +437,23 @@ public class ExtractDataService {
|
||||
extractApiData(datasetTable, datasource, datasetTableFields, extractType);
|
||||
return;
|
||||
}
|
||||
extractDataByKettle(datasetTable, datasource, datasetTableFields, extractType, selectSQL);
|
||||
Map<String, String> sql = getSelectSQL(extractType, datasetTable, datasource, datasetTableFields, selectSQL);
|
||||
if (StringUtils.isNotEmpty(sql.get("totalSql"))) {
|
||||
DatasourceRequest datasourceRequest = new DatasourceRequest();
|
||||
datasourceRequest.setDatasource(datasource);
|
||||
Provider datasourceProvider = ProviderFactory.getProvider(datasource.getType());
|
||||
datasourceRequest.setQuery(sql.get("totalSql"));
|
||||
List<String[]> tmpData = datasourceProvider.getData(datasourceRequest);
|
||||
Long totalItems = CollectionUtils.isEmpty(tmpData) ? 0 : Long.valueOf(tmpData.get(0)[0]);
|
||||
Long totalPage = (totalItems / extractPageSize) + (totalItems % extractPageSize > 0 ? 1 : 0);
|
||||
for (Long i = 0L; i < totalPage; i++) {
|
||||
Long offset = i * extractPageSize;
|
||||
Long all = offset + extractPageSize;
|
||||
extractDataByKettle(datasetTable, datasource, datasetTableFields, extractType, sql.get("selectSQL").replace("DE_OFFSET", offset.toString()).replace("DE_PAGE_SIZE", extractPageSize.toString()).replace("DE_ALL", all.toString()));
|
||||
}
|
||||
} else {
|
||||
extractDataByKettle(datasetTable, datasource, datasetTableFields, extractType, selectSQL);
|
||||
}
|
||||
}
|
||||
|
||||
private void extractApiData(DatasetTable datasetTable, Datasource datasource, List<DatasetTableField> datasetTableFields, String extractType) throws Exception {
|
||||
@ -923,7 +943,6 @@ public class ExtractDataService {
|
||||
}
|
||||
}
|
||||
transMeta.addDatabase(dataMeta);
|
||||
selectSQL = getSelectSQL(extractType, datasetTable, datasource, datasetTableFields, selectSQL);
|
||||
inputSteps = inputStep(transMeta, selectSQL, mysqlConfiguration);
|
||||
udjcStep = udjc(datasetTableFields, DatasourceTypes.mysql, mysqlConfiguration);
|
||||
break;
|
||||
@ -931,7 +950,6 @@ public class ExtractDataService {
|
||||
SqlServerConfiguration sqlServerConfiguration = new Gson().fromJson(datasource.getConfiguration(), SqlServerConfiguration.class);
|
||||
dataMeta = new DatabaseMeta("db", "MSSQLNATIVE", "Native", sqlServerConfiguration.getHost().trim(), sqlServerConfiguration.getDataBase(), sqlServerConfiguration.getPort().toString(), sqlServerConfiguration.getUsername(), sqlServerConfiguration.getPassword());
|
||||
transMeta.addDatabase(dataMeta);
|
||||
selectSQL = getSelectSQL(extractType, datasetTable, datasource, datasetTableFields, selectSQL);
|
||||
inputSteps = inputStep(transMeta, selectSQL, sqlServerConfiguration);
|
||||
udjcStep = udjc(datasetTableFields, DatasourceTypes.sqlServer, sqlServerConfiguration);
|
||||
break;
|
||||
@ -939,7 +957,6 @@ public class ExtractDataService {
|
||||
PgConfiguration pgConfiguration = new Gson().fromJson(datasource.getConfiguration(), PgConfiguration.class);
|
||||
dataMeta = new DatabaseMeta("db", "POSTGRESQL", "Native", pgConfiguration.getHost().trim(), pgConfiguration.getDataBase(), pgConfiguration.getPort().toString(), pgConfiguration.getUsername(), pgConfiguration.getPassword());
|
||||
transMeta.addDatabase(dataMeta);
|
||||
selectSQL = getSelectSQL(extractType, datasetTable, datasource, datasetTableFields, selectSQL);
|
||||
inputSteps = inputStep(transMeta, selectSQL, pgConfiguration);
|
||||
udjcStep = udjc(datasetTableFields, DatasourceTypes.pg, pgConfiguration);
|
||||
break;
|
||||
@ -952,7 +969,6 @@ public class ExtractDataService {
|
||||
dataMeta = new DatabaseMeta("db", "ORACLE", "Native", oracleConfiguration.getHost().trim(), oracleConfiguration.getDataBase(), oracleConfiguration.getPort().toString(), oracleConfiguration.getUsername(), oracleConfiguration.getPassword());
|
||||
}
|
||||
transMeta.addDatabase(dataMeta);
|
||||
selectSQL = getSelectSQL(extractType, datasetTable, datasource, datasetTableFields, selectSQL);
|
||||
inputSteps = inputStep(transMeta, selectSQL, oracleConfiguration);
|
||||
udjcStep = udjc(datasetTableFields, DatasourceTypes.oracle, oracleConfiguration);
|
||||
break;
|
||||
@ -961,7 +977,6 @@ public class ExtractDataService {
|
||||
dataMeta = new DatabaseMeta("db", "ORACLE", "Native", chConfiguration.getHost().trim(), chConfiguration.getDataBase().trim(), chConfiguration.getPort().toString(), chConfiguration.getUsername(), chConfiguration.getPassword());
|
||||
dataMeta.setDatabaseType("Clickhouse");
|
||||
transMeta.addDatabase(dataMeta);
|
||||
selectSQL = getSelectSQL(extractType, datasetTable, datasource, datasetTableFields, selectSQL);
|
||||
inputSteps = inputStep(transMeta, selectSQL, chConfiguration);
|
||||
udjcStep = udjc(datasetTableFields, DatasourceTypes.ck, chConfiguration);
|
||||
break;
|
||||
@ -970,7 +985,6 @@ public class ExtractDataService {
|
||||
dataMeta = new DatabaseMeta("db", "DB2", "Native", db2Configuration.getHost().trim(), db2Configuration.getDataBase().trim(), db2Configuration.getPort().toString(), db2Configuration.getUsername(), db2Configuration.getPassword());
|
||||
dataMeta.setDatabaseType("DB2");
|
||||
transMeta.addDatabase(dataMeta);
|
||||
selectSQL = getSelectSQL(extractType, datasetTable, datasource, datasetTableFields, selectSQL);
|
||||
inputSteps = inputStep(transMeta, selectSQL, db2Configuration);
|
||||
udjcStep = udjc(datasetTableFields, DatasourceTypes.db2, db2Configuration);
|
||||
break;
|
||||
@ -1019,11 +1033,13 @@ public class ExtractDataService {
|
||||
FileUtils.writeStringToFile(file, transXml, "UTF-8");
|
||||
}
|
||||
|
||||
private String getSelectSQL(String extractType, DatasetTable datasetTable, Datasource datasource, List<DatasetTableField> datasetTableFields, String selectSQL) {
|
||||
private Map<String, String> getSelectSQL(String extractType, DatasetTable datasetTable, Datasource datasource, List<DatasetTableField> datasetTableFields, String selectSQL) {
|
||||
Map<String, String> sql = new HashMap<>();
|
||||
if (extractType.equalsIgnoreCase("all_scope") && datasetTable.getType().equalsIgnoreCase(DatasetType.DB.name())) {
|
||||
String tableName = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getTable();
|
||||
QueryProvider qp = ProviderFactory.getQueryProvider(datasource.getType());
|
||||
selectSQL = qp.createRawQuerySQL(tableName, datasetTableFields, datasource);
|
||||
sql.put("selectSQL", qp.createRawQuerySQL(tableName, datasetTableFields, datasource));
|
||||
sql.put("totalSql", qp.getTotalCount(true, tableName, datasource));
|
||||
}
|
||||
|
||||
if (extractType.equalsIgnoreCase("all_scope") && datasetTable.getType().equalsIgnoreCase(DatasetType.SQL.name())) {
|
||||
@ -1033,13 +1049,17 @@ public class ExtractDataService {
|
||||
selectSQL = new String(java.util.Base64.getDecoder().decode(selectSQL));
|
||||
}
|
||||
QueryProvider qp = ProviderFactory.getQueryProvider(datasource.getType());
|
||||
selectSQL = qp.createRawQuerySQLAsTmp(selectSQL, datasetTableFields);
|
||||
sql.put("totalSql", qp.getTotalCount(false, selectSQL, datasource));
|
||||
sql.put("selectSQL", qp.createRawQuerySQLAsTmp(selectSQL, datasetTableFields));
|
||||
}
|
||||
|
||||
if (!extractType.equalsIgnoreCase("all_scope")) {
|
||||
QueryProvider qp = ProviderFactory.getQueryProvider(datasource.getType());
|
||||
selectSQL = qp.createRawQuerySQLAsTmp(selectSQL, datasetTableFields);
|
||||
sql.put("totalSql", qp.getTotalCount(false, selectSQL, datasource));
|
||||
sql.put("selectSQL", qp.createRawQuerySQLAsTmp(selectSQL, datasetTableFields));
|
||||
}
|
||||
return selectSQL;
|
||||
|
||||
return sql;
|
||||
}
|
||||
|
||||
private List<StepMeta> inputStep(TransMeta transMeta, String selectSQL, JdbcConfiguration jdbcConfiguration) {
|
||||
|
Loading…
Reference in New Issue
Block a user