feat[数据源]: 数据源支持ssh隧道

This commit is contained in:
taojinlong 2024-07-18 18:12:48 +08:00
parent 25f972b943
commit 1aed843a14
20 changed files with 316 additions and 63 deletions

View File

@ -1,6 +1,8 @@
package io.dataease.datasource.provider;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.Session;
import io.dataease.commons.utils.CommonThreadPool;
import io.dataease.dataset.utils.FieldUtils;
import io.dataease.datasource.dao.auto.entity.CoreDatasource;
@ -24,7 +26,6 @@ import org.apache.calcite.adapter.jdbc.JdbcSchema;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlDialect;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
@ -34,6 +35,7 @@ import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.sql.*;
import java.util.*;
import java.util.regex.Matcher;
@ -53,7 +55,6 @@ public class CalciteProvider extends Provider {
private final String FILE_PATH = "/opt/dataease2.0/drivers";
private final String CUSTOM_PATH = "/opt/dataease2.0/custom-drivers/";
private static String split = "DE";
@Resource
private CommonThreadPool commonThreadPool;
@ -85,7 +86,7 @@ public class CalciteProvider extends Provider {
public List<String> getSchema(DatasourceRequest datasourceRequest) {
List<String> schemas = new ArrayList<>();
String queryStr = getSchemaSql(datasourceRequest.getDatasource());
try (Connection con = getConnection(datasourceRequest.getDatasource()); Statement statement = getStatement(con, 30); ResultSet resultSet = statement.executeQuery(queryStr)) {
try (ConnectionObj con = getConnection(datasourceRequest.getDatasource()); Statement statement = getStatement(con.getConnection(), 30); ResultSet resultSet = statement.executeQuery(queryStr)) {
while (resultSet.next()) {
schemas.add(resultSet.getString(1));
}
@ -100,7 +101,7 @@ public class CalciteProvider extends Provider {
List<DatasetTableDTO> tables = new ArrayList<>();
List<String> tablesSqls = getTablesSql(datasourceRequest);
for (String tablesSql : tablesSqls) {
try (Connection con = getConnection(datasourceRequest.getDatasource()); Statement statement = getStatement(con, 30); ResultSet resultSet = statement.executeQuery(tablesSql)) {
try (ConnectionObj con = getConnection(datasourceRequest.getDatasource()); Statement statement = getStatement(con.getConnection(), 30); ResultSet resultSet = statement.executeQuery(tablesSql)) {
while (resultSet.next()) {
tables.add(getTableDesc(datasourceRequest, resultSet));
}
@ -111,6 +112,7 @@ public class CalciteProvider extends Provider {
return tables;
}
@Override
public String checkStatus(DatasourceRequest datasourceRequest) throws Exception {
DatasourceConfiguration.DatasourceType datasourceType = DatasourceConfiguration.DatasourceType.valueOf(datasourceRequest.getDatasource().getType());
@ -126,7 +128,7 @@ public class CalciteProvider extends Provider {
break;
}
String querySql = getTablesSql(datasourceRequest).get(0);
try (Connection con = getConnection(datasourceRequest.getDatasource()); Statement statement = getStatement(con, 30); ResultSet resultSet = statement.executeQuery(querySql)) {
try (ConnectionObj con = getConnection(datasourceRequest.getDatasource()); Statement statement = getStatement(con.getConnection(), 30); ResultSet resultSet = statement.executeQuery(querySql)) {
} catch (Exception e) {
throw e;
}
@ -189,14 +191,12 @@ public class CalciteProvider extends Provider {
List<TableField> datasetTableFields = new ArrayList<>();
DatasourceSchemaDTO datasourceSchemaDTO = datasourceRequest.getDsList().entrySet().iterator().next().getValue();
datasourceRequest.setDatasource(datasourceSchemaDTO);
DatasourceConfiguration datasourceConfiguration = JsonUtil.parseObject(datasourceRequest.getDatasource().getConfiguration(), DatasourceConfiguration.class);
String table = datasourceRequest.getTable();
if (StringUtils.isEmpty(table)) {
ResultSet resultSet = null;
try (Connection con = getConnection(datasourceRequest.getDatasource());
Statement statement = getStatement(con, 30)) {
try (ConnectionObj con = getConnection(datasourceRequest.getDatasource());
Statement statement = getStatement(con.getConnection(), 30)) {
if (DatasourceConfiguration.DatasourceType.valueOf(datasourceSchemaDTO.getType()) == DatasourceConfiguration.DatasourceType.oracle) {
statement.executeUpdate("ALTER SESSION SET CURRENT_SCHEMA = " + datasourceConfiguration.getSchema());
}
@ -215,8 +215,8 @@ public class CalciteProvider extends Provider {
}
} else {
ResultSet resultSet = null;
try (Connection con = getConnection(datasourceRequest.getDatasource());
Statement statement = getStatement(con, 30)) {
try (ConnectionObj con = getConnection(datasourceRequest.getDatasource());
Statement statement = getStatement(con.getConnection(), 30)) {
if (DatasourceConfiguration.DatasourceType.valueOf(datasourceSchemaDTO.getType()) == DatasourceConfiguration.DatasourceType.oracle) {
statement.executeUpdate("ALTER SESSION SET CURRENT_SCHEMA = " + datasourceConfiguration.getSchema());
}
@ -251,7 +251,8 @@ public class CalciteProvider extends Provider {
}
@Override
public Connection getConnection(DatasourceDTO coreDatasource) throws DEException {
public ConnectionObj getConnection(DatasourceDTO coreDatasource) throws Exception {
ConnectionObj connectionObj = new ConnectionObj();
DatasourceConfiguration configuration = null;
DatasourceConfiguration.DatasourceType datasourceType = DatasourceConfiguration.DatasourceType.valueOf(coreDatasource.getType());
switch (datasourceType) {
@ -290,6 +291,7 @@ public class CalciteProvider extends Provider {
default:
configuration = JsonUtil.parseObject(coreDatasource.getConfiguration(), Mysql.class);
}
startSshSession(configuration, connectionObj, null);
Properties props = new Properties();
if (StringUtils.isNotBlank(configuration.getUsername())) {
props.setProperty("user", configuration.getUsername());
@ -303,12 +305,59 @@ public class CalciteProvider extends Provider {
try {
Driver driverClass = (Driver) jdbcClassLoader.loadClass(driverClassName).newInstance();
conn = driverClass.connect(configuration.getJdbc(), props);
} catch (Exception e) {
DEException.throwException(e.getMessage());
}
return conn;
connectionObj.setConnection(conn);
return connectionObj;
}
private void startSshSession(DatasourceConfiguration configuration, ConnectionObj connectionObj, Long datacourseId) throws Exception {
if (configuration.isUseSSH()) {
if (datacourseId == null) {
configuration.setLPort(getLport(null));
connectionObj.setLPort(configuration.getLPort());
connectionObj.setConfiguration(configuration);
Session session = initSession(configuration);
connectionObj.setSession(session);
} else {
Integer lport = Provider.getLPorts().get(datacourseId);
configuration.setLPort(lport);
if (lport != null) {
if (Provider.getSessions().get(datacourseId) == null || !Provider.getSessions().get(datacourseId).isConnected()) {
Session session = initSession(configuration);
Provider.getSessions().put(datacourseId, session);
}
} else {
configuration.setLPort(getLport(datacourseId));
Session session = initSession(configuration);
Provider.getSessions().put(datacourseId, session);
}
configuration.setLPort(lport);
}
}
}
private Session initSession(DatasourceConfiguration configuration) throws Exception{
JSch jsch = new JSch();
Session session = jsch.getSession(configuration.getSshUserName(), configuration.getSshHost(), configuration.getSshPort());
if (!configuration.getSshType().equalsIgnoreCase("password")) {
session.setConfig("PreferredAuthentications", "publickey");
jsch.addIdentity("sshkey", configuration.getSshKey().getBytes(StandardCharsets.UTF_8), null, configuration.getSshKeyPassword() == null ? null : configuration.getSshKeyPassword().getBytes(StandardCharsets.UTF_8));
}
if (configuration.getSshType().equalsIgnoreCase("password")) {
session.setPassword(configuration.getSshPassword());
}
session.setConfig("StrictHostKeyChecking", "no");
session.connect();
session.setPortForwardingL(configuration.getLPort(), configuration.getHost(), configuration.getPort());
return session;
}
private DatasetTableDTO getTableDesc(DatasourceRequest datasourceRequest, ResultSet resultSet) throws SQLException {
DatasetTableDTO tableDesc = new DatasetTableDTO();
tableDesc.setDatasourceId(datasourceRequest.getDatasource().getId());
@ -341,8 +390,8 @@ public class CalciteProvider extends Provider {
// schema
ResultSet resultSet = null;
try (Connection con = getConnection(datasourceRequest.getDatasource());
Statement statement = getStatement(con, datasourceConfiguration.getQueryTimeout())) {
try (ConnectionObj con = getConnection(datasourceRequest.getDatasource());
Statement statement = getStatement(con.getConnection(), datasourceConfiguration.getQueryTimeout())) {
if (DatasourceConfiguration.DatasourceType.valueOf(value.getType()) == DatasourceConfiguration.DatasourceType.oracle) {
statement.executeUpdate("ALTER SESSION SET CURRENT_SCHEMA = " + datasourceConfiguration.getSchema());
}
@ -752,6 +801,7 @@ public class CalciteProvider extends Provider {
dataSource.setInitialSize(configuration.getInitialPoolSize());
dataSource.setMaxTotal(configuration.getMaxPoolSize());
dataSource.setMinIdle(configuration.getMinPoolSize());
startSshSession(configuration, null, ds.getId());
schema = JdbcSchema.create(rootSchema, ds.getSchemaAlias(), dataSource, null, configuration.getDataBase());
rootSchema.add(ds.getSchemaAlias(), schema);
break;
@ -764,6 +814,7 @@ public class CalciteProvider extends Provider {
dataSource.setMaxTotal(configuration.getMaxPoolSize());
dataSource.setMinIdle(configuration.getMinPoolSize());
dataSource.setDefaultQueryTimeout(Integer.valueOf(configuration.getQueryTimeout()));
startSshSession(configuration, null, ds.getId());
schema = JdbcSchema.create(rootSchema, ds.getSchemaAlias(), dataSource, null, configuration.getDataBase());
rootSchema.add(ds.getSchemaAlias(), schema);
break;
@ -776,6 +827,7 @@ public class CalciteProvider extends Provider {
dataSource.setMaxTotal(configuration.getMaxPoolSize());
dataSource.setMinIdle(configuration.getMinPoolSize());
dataSource.setDefaultQueryTimeout(Integer.valueOf(configuration.getQueryTimeout()));
startSshSession(configuration, null, ds.getId());
schema = JdbcSchema.create(rootSchema, ds.getSchemaAlias(), dataSource, null, configuration.getSchema());
rootSchema.add(ds.getSchemaAlias(), schema);
break;
@ -788,6 +840,7 @@ public class CalciteProvider extends Provider {
dataSource.setMaxTotal(configuration.getMaxPoolSize());
dataSource.setMinIdle(configuration.getMinPoolSize());
dataSource.setDefaultQueryTimeout(Integer.valueOf(configuration.getQueryTimeout()));
startSshSession(configuration, null, ds.getId());
schema = JdbcSchema.create(rootSchema, ds.getSchemaAlias(), dataSource, null, configuration.getSchema());
rootSchema.add(ds.getSchemaAlias(), schema);
break;
@ -800,6 +853,7 @@ public class CalciteProvider extends Provider {
dataSource.setMaxTotal(configuration.getMaxPoolSize());
dataSource.setMinIdle(configuration.getMinPoolSize());
dataSource.setDefaultQueryTimeout(Integer.valueOf(configuration.getQueryTimeout()));
startSshSession(configuration, null, ds.getId());
schema = JdbcSchema.create(rootSchema, ds.getSchemaAlias(), dataSource, null, configuration.getSchema());
rootSchema.add(ds.getSchemaAlias(), schema);
break;
@ -812,6 +866,7 @@ public class CalciteProvider extends Provider {
dataSource.setMaxTotal(configuration.getMaxPoolSize());
dataSource.setMinIdle(configuration.getMinPoolSize());
dataSource.setDefaultQueryTimeout(Integer.valueOf(configuration.getQueryTimeout()));
startSshSession(configuration, null, ds.getId());
schema = JdbcSchema.create(rootSchema, ds.getSchemaAlias(), dataSource, null, configuration.getDataBase());
rootSchema.add(ds.getSchemaAlias(), schema);
break;
@ -824,6 +879,7 @@ public class CalciteProvider extends Provider {
dataSource.setMaxTotal(configuration.getMaxPoolSize());
dataSource.setMinIdle(configuration.getMinPoolSize());
dataSource.setDefaultQueryTimeout(Integer.valueOf(configuration.getQueryTimeout()));
startSshSession(configuration, null, ds.getId());
schema = JdbcSchema.create(rootSchema, ds.getSchemaAlias(), dataSource, null, configuration.getSchema());
rootSchema.add(ds.getSchemaAlias(), schema);
break;
@ -836,6 +892,7 @@ public class CalciteProvider extends Provider {
dataSource.setMaxTotal(configuration.getMaxPoolSize());
dataSource.setMinIdle(configuration.getMinPoolSize());
dataSource.setDefaultQueryTimeout(Integer.valueOf(configuration.getQueryTimeout()));
startSshSession(configuration, null, ds.getId());
schema = JdbcSchema.create(rootSchema, ds.getSchemaAlias(), dataSource, null, configuration.getSchema());
rootSchema.add(ds.getSchemaAlias(), schema);
break;
@ -848,6 +905,7 @@ public class CalciteProvider extends Provider {
dataSource.setMaxTotal(configuration.getMaxPoolSize());
dataSource.setMinIdle(configuration.getMinPoolSize());
dataSource.setDefaultQueryTimeout(Integer.valueOf(configuration.getQueryTimeout()));
startSshSession(configuration, null, ds.getId());
schema = JdbcSchema.create(rootSchema, ds.getSchemaAlias(), dataSource, null, configuration.getDataBase());
rootSchema.add(ds.getSchemaAlias(), schema);
break;
@ -860,6 +918,7 @@ public class CalciteProvider extends Provider {
dataSource.setMaxTotal(configuration.getMaxPoolSize());
dataSource.setMinIdle(configuration.getMinPoolSize());
dataSource.setDefaultQueryTimeout(Integer.valueOf(configuration.getQueryTimeout()));
startSshSession(configuration, null, ds.getId());
schema = JdbcSchema.create(rootSchema, ds.getSchemaAlias(), dataSource, null, configuration.getDataBase());
rootSchema.add(ds.getSchemaAlias(), schema);
}
@ -1196,11 +1255,15 @@ public class CalciteProvider extends Provider {
} catch (Exception e) {
DEException.throwException(e.getMessage());
}
Provider.getLPorts().remove(datasource.getId());
if (Provider.getSessions().get(datasource.getId()) != null) {
Provider.getSessions().get(datasource.getId()).disconnect();
}
Provider.getSessions().remove(datasource.getId());
}
public Connection take() {
// 为了避免出现线程安全问题这里使用 synchronized 也可以使用 cas
if (connection == null) {
DEException.throwException("初始化连接池失败!");
}

View File

@ -5,6 +5,7 @@ import io.dataease.dataset.utils.TableUtils;
import io.dataease.datasource.dao.auto.entity.CoreDeEngine;
import io.dataease.datasource.request.EngineRequest;
import io.dataease.datasource.type.H2;
import io.dataease.extensions.datasource.dto.ConnectionObj;
import io.dataease.extensions.datasource.dto.DatasourceDTO;
import io.dataease.extensions.datasource.dto.TableField;
import io.dataease.extensions.datasource.vo.DatasourceConfiguration;
@ -13,7 +14,6 @@ import io.dataease.utils.JsonUtil;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.Statement;
import java.util.Arrays;
@ -28,8 +28,8 @@ public class H2EngineProvider extends EngineProvider {
int queryTimeout = configuration.getQueryTimeout();
DatasourceDTO datasource = new DatasourceDTO();
BeanUtils.copyBean(datasource, engineRequest.getEngine());
try (Connection connection = getConnection(datasource); Statement stat = getStatement(connection, queryTimeout)) {
PreparedStatement preparedStatement = connection.prepareStatement(engineRequest.getQuery());
try (ConnectionObj connection = getConnection(datasource); Statement stat = getStatement(connection.getConnection(), queryTimeout)) {
PreparedStatement preparedStatement = connection.getConnection().prepareStatement(engineRequest.getQuery());
preparedStatement.setQueryTimeout(queryTimeout);
Boolean result = preparedStatement.execute();
} catch (Exception e) {

View File

@ -5,6 +5,7 @@ import io.dataease.dataset.utils.TableUtils;
import io.dataease.datasource.dao.auto.entity.CoreDeEngine;
import io.dataease.datasource.request.EngineRequest;
import io.dataease.datasource.type.Mysql;
import io.dataease.extensions.datasource.dto.ConnectionObj;
import io.dataease.extensions.datasource.dto.DatasourceDTO;
import io.dataease.extensions.datasource.dto.TableField;
import io.dataease.extensions.datasource.vo.DatasourceConfiguration;
@ -13,7 +14,6 @@ import io.dataease.utils.JsonUtil;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.Statement;
import java.util.Arrays;
@ -32,8 +32,8 @@ public class MysqlEngineProvider extends EngineProvider {
int queryTimeout = configuration.getQueryTimeout();
DatasourceDTO datasource = new DatasourceDTO();
BeanUtils.copyBean(datasource, engineRequest.getEngine());
try (Connection connection = getConnection(datasource); Statement stat = getStatement(connection, queryTimeout)) {
PreparedStatement preparedStatement = connection.prepareStatement(engineRequest.getQuery());
try (ConnectionObj connection = getConnection(datasource); Statement stat = getStatement(connection.getConnection(), queryTimeout)) {
PreparedStatement preparedStatement = connection.getConnection().prepareStatement(engineRequest.getQuery());
preparedStatement.setQueryTimeout(queryTimeout);
Boolean result = preparedStatement.execute();
} catch (Exception e) {

View File

@ -17,13 +17,13 @@ public class CK extends DatasourceConfiguration {
}
if(StringUtils.isEmpty(extraParams.trim())){
return "jdbc:clickhouse://HOSTNAME:PORT/DATABASE"
.replace("HOSTNAME", getHost().trim())
.replace("PORT", getPort().toString().trim())
.replace("HOSTNAME", getLHost().trim())
.replace("PORT", getLPort().toString().trim())
.replace("DATABASE", getDataBase().trim());
}else {
return "jdbc:clickhouse://HOSTNAME:PORT/DATABASE?EXTRA_PARAMS"
.replace("HOSTNAME", getHost().trim())
.replace("PORT", getPort().toString().trim())
.replace("HOSTNAME", getLHost().trim())
.replace("PORT", getLPort().toString().trim())
.replace("DATABASE", getDataBase().trim())
.replace("EXTRA_PARAMS", getExtraParams().trim());
}

View File

@ -18,20 +18,20 @@ public class Db2 extends DatasourceConfiguration {
if(StringUtils.isEmpty(extraParams.trim())){
if (StringUtils.isEmpty(getSchema())) {
return "jdbc:db2://HOSTNAME:PORT/DATABASE"
.replace("HOSTNAME", getHost().trim())
.replace("PORT", getPort().toString().trim())
.replace("HOSTNAME", getLHost().trim())
.replace("PORT", getLPort().toString().trim())
.replace("DATABASE", getDataBase().trim());
} else {
return "jdbc:db2://HOSTNAME:PORT/DATABASE:currentSchema=SCHEMA;"
.replace("HOSTNAME", getHost().trim())
.replace("PORT", getPort().toString().trim())
.replace("HOSTNAME", getLHost().trim())
.replace("PORT", getLPort().toString().trim())
.replace("DATABASE", getDataBase().trim())
.replace("SCHEMA",getSchema().trim());
}
}else {
return "jdbc:db2://HOSTNAME:PORT/DATABASE:EXTRA_PARAMS"
.replace("HOSTNAME", getHost().trim())
.replace("PORT", getPort().toString().trim())
.replace("HOSTNAME", getLHost().trim())
.replace("PORT", getLPort().toString().trim())
.replace("DATABASE", getDataBase().trim())
.replace("EXTRA_PARAMS", getExtraParams().trim());
}

View File

@ -22,13 +22,13 @@ public class Impala extends DatasourceConfiguration {
}
if(StringUtils.isEmpty(extraParams.trim())){
return "jdbc:impala://HOSTNAME:PORT/DATABASE"
.replace("HOSTNAME", getHost().trim())
.replace("PORT", getPort().toString().trim())
.replace("HOSTNAME", getLHost().trim())
.replace("PORT", getLPort().toString().trim())
.replace("DATABASE", getDataBase().trim());
}else {
return "jdbc:impala://HOSTNAME:PORT/DATABASE;EXTRA_PARAMS"
.replace("HOSTNAME", getHost().trim())
.replace("PORT", getPort().toString().trim())
.replace("HOSTNAME", getLHost().trim())
.replace("PORT", getLPort().toString().trim())
.replace("DATABASE", getDataBase().trim())
.replace("EXTRA_PARAMS", getExtraParams().trim());
}

View File

@ -22,8 +22,8 @@ public class Mongo extends DatasourceConfiguration {
}
if (StringUtils.isEmpty(extraParams.trim())) {
return "jdbc:mysql://HOSTNAME:PORT/DATABASE"
.replace("HOSTNAME", getHost().trim())
.replace("PORT", getPort().toString().trim())
.replace("HOSTNAME", getLHost().trim())
.replace("PORT", getLPort().toString().trim())
.replace("DATABASE", getDataBase().trim());
} else {
for (String illegalParameter : illegalParameters) {
@ -33,8 +33,8 @@ public class Mongo extends DatasourceConfiguration {
}
return "jdbc:mysql://HOSTNAME:PORT/DATABASE?EXTRA_PARAMS"
.replace("HOSTNAME", getHost().trim())
.replace("PORT", getPort().toString().trim())
.replace("HOSTNAME", getLHost().trim())
.replace("PORT", getLPort().toString().trim())
.replace("DATABASE", getDataBase().trim())
.replace("EXTRA_PARAMS", getExtraParams().trim());
}

View File

@ -24,8 +24,8 @@ public class Mysql extends DatasourceConfiguration {
}
if (StringUtils.isEmpty(extraParams.trim())) {
return "jdbc:mysql://HOSTNAME:PORT/DATABASE"
.replace("HOSTNAME", getHost().trim())
.replace("PORT", getPort().toString().trim())
.replace("HOSTNAME", getLHost().trim())
.replace("PORT", getLPort().toString().trim())
.replace("DATABASE", getDataBase().trim());
} else {
for (String illegalParameter : illegalParameters) {
@ -33,10 +33,9 @@ public class Mysql extends DatasourceConfiguration {
DEException.throwException("Illegal parameter: " + illegalParameter);
}
}
return "jdbc:mysql://HOSTNAME:PORT/DATABASE?EXTRA_PARAMS"
.replace("HOSTNAME", getHost().trim())
.replace("PORT", getPort().toString().trim())
.replace("HOSTNAME", getLHost().trim())
.replace("PORT", getLPort().toString().trim())
.replace("DATABASE", getDataBase().trim())
.replace("EXTRA_PARAMS", getExtraParams().trim());
}

View File

@ -17,13 +17,13 @@ public class Oracle extends DatasourceConfiguration {
}
if (StringUtils.isNotEmpty(getConnectionType()) && getConnectionType().equalsIgnoreCase("serviceName")) {
return "jdbc:oracle:thin:@HOSTNAME:PORT/DATABASE"
.replace("HOSTNAME", getHost().trim())
.replace("PORT", getPort().toString().trim())
.replace("HOSTNAME", getLHost().trim())
.replace("PORT", getLPort().toString().trim())
.replace("DATABASE", getDataBase().trim());
}else {
return "jdbc:oracle:thin:@HOSTNAME:PORT:DATABASE"
.replace("HOSTNAME", getHost().trim())
.replace("PORT", getPort().toString().trim())
.replace("HOSTNAME", getLHost().trim())
.replace("PORT", getLPort().toString().trim())
.replace("DATABASE", getDataBase().trim());
}
}

View File

@ -18,20 +18,20 @@ public class Pg extends DatasourceConfiguration {
if(StringUtils.isEmpty(extraParams.trim())){
if (StringUtils.isEmpty(getSchema())) {
return "jdbc:postgresql://HOSTNAME:PORT/DATABASE"
.replace("HOSTNAME", getHost().trim())
.replace("PORT", getPort().toString().trim())
.replace("HOSTNAME", getLHost().trim())
.replace("PORT", getLPort().toString().trim())
.replace("DATABASE", getDataBase().trim());
} else {
return "jdbc:postgresql://HOSTNAME:PORT/DATABASE?currentSchema=SCHEMA"
.replace("HOSTNAME", getHost().trim())
.replace("PORT", getPort().toString().trim())
.replace("HOSTNAME", getLHost().trim())
.replace("PORT", getLPort().toString().trim())
.replace("DATABASE", getDataBase().trim())
.replace("SCHEMA", getSchema().trim());
}
}else {
return "jdbc:postgresql://HOSTNAME:PORT/DATABASE?EXTRA_PARAMS"
.replace("HOSTNAME", getHost().trim())
.replace("PORT", getPort().toString().trim())
.replace("HOSTNAME", getLHost().trim())
.replace("PORT", getLPort().toString().trim())
.replace("DATABASE", getDataBase().trim())
.replace("EXTRA_PARAMS", getExtraParams().trim());

View File

@ -16,8 +16,8 @@ public class Redshift extends DatasourceConfiguration {
return getJdbcUrl();
}
return "jdbc:redshift://HOSTNAME:PORT/DATABASE"
.replace("HOSTNAME", getHost().trim())
.replace("PORT", getPort().toString().trim())
.replace("HOSTNAME", getLHost().trim())
.replace("PORT", getLPort().toString().trim())
.replace("DATABASE", getDataBase().trim());
}
}

View File

@ -22,13 +22,13 @@ public class Sqlserver extends DatasourceConfiguration {
}
if (StringUtils.isEmpty(extraParams.trim())) {
return "jdbc:sqlserver://HOSTNAME:PORT;DatabaseName=DATABASE"
.replace("HOSTNAME", getHost().trim())
.replace("PORT", getPort().toString().trim())
.replace("HOSTNAME", getLHost().trim())
.replace("PORT", getLPort().toString().trim())
.replace("DATABASE", getDataBase().trim());
} else {
return "jdbc:sqlserver://HOSTNAME:PORT;DatabaseName=DATABASE;EXTRA_PARAMS"
.replace("HOSTNAME", getHost().trim())
.replace("PORT", getPort().toString().trim())
.replace("HOSTNAME", getLHost().trim())
.replace("PORT", getLPort().toString().trim())
.replace("DATABASE", getDataBase().trim())
.replace("EXTRA_PARAMS", getExtraParams().trim());
}

View File

@ -107,6 +107,7 @@ const initForm = type => {
dataBase: '',
jdbcUrl: '',
urlType: 'hostName',
sshType: 'password',
extraParams: '',
username: '',
password: '',
@ -340,6 +341,7 @@ const addApiItem = item => {
const activeName = ref('table')
const showPriority = ref(false)
const showSSH = ref(false)
const deleteItem = (item, idx) => {
form.value.apiConfiguration.splice(form.value.apiConfiguration.indexOf(item), 1)
@ -1014,6 +1016,87 @@ defineExpose({
autocomplete="off"
/>
</el-form-item>
<span
v-if="!['es', 'api'].includes(form.type)"
class="de-expand"
@click="showSSH = !showSSH"
>SSH 设置
<el-icon>
<Icon :name="showSSH ? 'icon_down_outlined' : 'icon_down_outlined-1'"></Icon>
</el-icon>
</span>
<template v-if="showSSH">
<el-form-item prop="configuration.sshHost">
<el-checkbox v-model="form.configuration.useSSH">启用SSH</el-checkbox>
</el-form-item>
<el-form-item label="主机" prop="configuration.sshHost">
<el-input
v-model="form.configuration.sshHost"
placeholder="请输入主机名"
autocomplete="off"
/>
</el-form-item>
<el-form-item label="端口" prop="configuration.sshPort">
<el-input-number
v-model="form.configuration.sshPort"
autocomplete="off"
step-strictly
class="text-left"
:min="0"
:max="65535"
:placeholder="t('common.inputText') + t('datasource.port')"
controls-position="right"
/>
</el-form-item>
<el-form-item :label="t('datasource.user_name')">
<el-input
:placeholder="t('common.inputText') + t('datasource.user_name')"
v-model="form.configuration.sshUserName"
autocomplete="off"
:maxlength="255"
/>
</el-form-item>
<el-form-item label="连接方式" prop="type">
<el-radio-group v-model="form.configuration.sshType">
<el-radio label="password">密码</el-radio>
<el-radio label="sshkey">ssh key</el-radio>
</el-radio-group>
</el-form-item>
<el-form-item
:label="t('datasource.password')"
v-if="form.configuration.sshType === 'password'"
>
<CustomPassword
:placeholder="t('common.inputText') + t('datasource.password')"
show-password
type="password"
v-model="form.configuration.sshPassword"
/>
</el-form-item>
<el-form-item
label="ssh key"
prop="configuration.sshKey"
v-if="form.configuration.sshType === 'sshkey'"
>
<el-input
type="textarea"
:rows="6"
v-model="form.configuration.sshKey"
placeholder="请输入ssh key"
autocomplete="off"
/>
</el-form-item>
<el-form-item label="ssh key 密码" v-if="form.configuration.sshType === 'sshkey'">
<CustomPassword
:placeholder="t('common.inputText') + t('datasource.password')"
show-password
type="password"
v-model="form.configuration.sshKeyPassword"
/>
</el-form-item>
</template>
<span
v-if="!['es', 'api'].includes(form.type)"
class="de-expand"

View File

@ -500,6 +500,9 @@ const init = (nodeInfo: Form | Param, id?: string, res?: object) => {
if (form.hasOwnProperty('configuration') && form.configuration.urlType == undefined) {
form.configuration.urlType = 'hostName'
}
if (form.hasOwnProperty('configuration') && form.configuration.sshType == undefined) {
form.configuration.sshType = 'password'
}
}
pid.value = nodeInfo.pid || '0'
} else {

View File

@ -136,6 +136,12 @@ export interface Configuration {
minPoolSize: string
maxPoolSize: string
queryTimeout: string
useSSH: boolean
sshHost: string
sshPort: string
sshUserName: string
sshType: string
sshPassword: string
}
export interface ApiConfiguration {

View File

@ -52,6 +52,7 @@
<flexmark.version>0.62.2</flexmark.version>
<mybatis-spring.version>3.0.3</mybatis-spring.version>
<commons-compress.version>1.26.2</commons-compress.version>
<jsch.version>0.1.55</jsch.version>
</properties>
<dependencyManagement>

View File

@ -29,6 +29,11 @@
</exclusions>
<classifier>de</classifier>
</dependency>
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>${jsch.version}</version>
</dependency>
</dependencies>
<properties>

View File

@ -0,0 +1,35 @@
package io.dataease.extensions.datasource.dto;
import com.jcraft.jsch.Session;
import io.dataease.extensions.datasource.provider.Provider;
import io.dataease.extensions.datasource.vo.DatasourceConfiguration;
import lombok.Data;
import java.sql.Connection;
@Data
public class ConnectionObj implements AutoCloseable {
private Connection connection;
private Session session;
private Integer lPort;
private DatasourceConfiguration configuration;
@Override
public void close() throws Exception {
if (this.connection != null) {
this.connection.close();
}
if (session != null) {
System.out.println("session.disconnect()");
session.disconnect();
}
if(lPort != null){
Provider.getLPorts().remove(Long.valueOf(lPort));
}
}
}

View File

@ -1,10 +1,12 @@
package io.dataease.extensions.datasource.provider;
import com.jcraft.jsch.Session;
import io.dataease.exception.DEException;
import io.dataease.extensions.datasource.constant.SqlPlaceholderConstants;
import io.dataease.extensions.datasource.dto.*;
import io.dataease.extensions.datasource.model.SQLMeta;
import io.dataease.extensions.datasource.vo.DatasourceConfiguration;
import lombok.Getter;
import org.apache.calcite.config.Lex;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlNode;
@ -13,7 +15,8 @@ import org.apache.calcite.sql.parser.SqlParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.io.IOException;
import java.net.Socket;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -30,7 +33,7 @@ public abstract class Provider {
public abstract List<DatasetTableDTO> getTables(DatasourceRequest datasourceRequest);
public abstract Connection getConnection(DatasourceDTO coreDatasource) throws DEException;
public abstract ConnectionObj getConnection(DatasourceDTO coreDatasource) throws Exception;
public abstract String checkStatus(DatasourceRequest datasourceRequest) throws Exception;
@ -38,6 +41,11 @@ public abstract class Provider {
public abstract List<TableField> fetchTableField(DatasourceRequest datasourceRequest) throws DEException;
@Getter
private static final Map<Long, Integer> lPorts = new HashMap<>();
@Getter
private static final Map<Long, Session> sessions = new HashMap<>();
public String rebuildSQL(String sql, SQLMeta sqlMeta, boolean crossDs, Map<Long, DatasourceSchemaDTO> dsMap) {
logger.info("calcite sql: " + sql);
if (crossDs) {
@ -141,4 +149,28 @@ public abstract class Provider {
}
return sqlDialect;
}
synchronized public Integer getLport(Long datasourceId) throws Exception {
for (int i = 10000; i < 20000; i++) {
if (isPortAvailable(i) && !lPorts.values().contains(i)) {
if (datasourceId == null) {
lPorts.put((long) i, i);
} else {
lPorts.put(datasourceId, i);
}
return i;
}
}
throw new Exception("localhost无可用端口");
}
public boolean isPortAvailable(int port) {
try {
Socket socket = new Socket("127.0.0.1", port);
socket.close();
return false;
} catch (IOException e) {
return true;
}
}
}

View File

@ -32,5 +32,31 @@ public class Configuration {
private int minPoolSize = 5;
private int maxPoolSize = 50;
private int queryTimeout = 30;
private boolean useSSH = false;
private String sshHost;
private Integer sshPort;
private Integer lPort;
private String sshUserName;
private String sshType = "password";
private String sshPassword;
private String sshKey;
private String sshKeyPassword;
public String getLHost(){
if(useSSH){
return "127.0.0.1";
}else {
return this.host;
}
}
public Integer getLPort(){
if(useSSH && lPort != null){
return lPort;
}else {
return this.port;
}
}
}