fix: 数据直连时使用连接池

This commit is contained in:
taojinlong 2024-10-12 15:31:21 +08:00
parent 43815fa9a5
commit c056a3fdf4
7 changed files with 98 additions and 106 deletions

View File

@ -8,10 +8,7 @@ import io.dataease.datasource.dao.auto.entity.CoreDatasourceTask;
import io.dataease.datasource.dao.auto.entity.CoreDatasourceTaskLog;
import io.dataease.datasource.dao.auto.entity.CoreDeEngine;
import io.dataease.datasource.dao.auto.mapper.CoreDatasourceMapper;
import io.dataease.datasource.provider.ApiUtils;
import io.dataease.datasource.provider.EngineProvider;
import io.dataease.datasource.provider.ExcelUtils;
import io.dataease.datasource.provider.ProviderUtil;
import io.dataease.datasource.provider.*;
import io.dataease.datasource.request.EngineRequest;
import io.dataease.datasource.server.DatasourceServer;
import io.dataease.datasource.server.DatasourceTaskServer;
@ -20,6 +17,8 @@ import io.dataease.extensions.datasource.dto.DatasetTableDTO;
import io.dataease.extensions.datasource.dto.DatasourceDTO;
import io.dataease.extensions.datasource.dto.DatasourceRequest;
import io.dataease.extensions.datasource.dto.TableField;
import io.dataease.extensions.datasource.factory.ProviderFactory;
import io.dataease.extensions.datasource.provider.Provider;
import io.dataease.job.schedule.ExtractDataJob;
import io.dataease.job.schedule.ScheduleManager;
import io.dataease.utils.BeanUtils;
@ -49,6 +48,8 @@ public class DatasourceSyncManage {
private DatasourceTaskServer datasourceTaskServer;
@Resource
private ScheduleManager scheduleManager;
@Resource
private CalciteProvider calciteProvider;
public void extractExcelData(CoreDatasource coreDatasource, String type) {
if (coreDatasource == null) {
@ -248,7 +249,7 @@ public class DatasourceSyncManage {
for (int page = 1; page <= totalPage; page++) {
engineRequest.setQuery(engineProvider.insertSql(engineTableName, dataList, page, pageNumber));
engineProvider.exec(engineRequest);
calciteProvider.exec(engineRequest);
}
}
@ -278,7 +279,7 @@ public class DatasourceSyncManage {
}
for (int page = 1; page <= totalPage; page++) {
engineRequest.setQuery(engineProvider.insertSql(engineTableName, dataList, page, pageNumber));
engineProvider.exec(engineRequest);
calciteProvider.exec(engineRequest);
}
}
@ -291,7 +292,7 @@ public class DatasourceSyncManage {
for (int i = 0; i < replaceTableSql.length; i++) {
if (StringUtils.isNotEmpty(replaceTableSql[i])) {
engineRequest.setQuery(replaceTableSql[i]);
engineProvider.exec(engineRequest);
calciteProvider.exec(engineRequest);
}
}
}
@ -302,7 +303,7 @@ public class DatasourceSyncManage {
engineRequest.setEngine(engine);
EngineProvider engineProvider = ProviderUtil.getEngineProvider(engine.getType());
engineRequest.setQuery(engineProvider.createTableSql(tableName, tableFields, engine));
engineProvider.exec(engineRequest);
calciteProvider.exec(engineRequest);
}
public void dropEngineTable(String tableName) throws Exception {
@ -311,7 +312,7 @@ public class DatasourceSyncManage {
engineRequest.setEngine(engine);
EngineProvider engineProvider = ProviderUtil.getEngineProvider(engine.getType());
engineRequest.setQuery(engineProvider.dropTable(tableName));
engineProvider.exec(engineRequest);
calciteProvider.exec(engineRequest);
}
public void addSchedule(CoreDatasourceTask datasourceTask) throws DEException {

View File

@ -5,13 +5,12 @@ import io.dataease.datasource.dao.auto.entity.CoreDatasource;
import io.dataease.datasource.dao.auto.entity.CoreDeEngine;
import io.dataease.datasource.dao.auto.mapper.CoreDatasourceMapper;
import io.dataease.datasource.dao.auto.mapper.CoreDeEngineMapper;
import io.dataease.datasource.provider.EngineProvider;
import io.dataease.datasource.provider.ProviderUtil;
import io.dataease.datasource.type.H2;
import io.dataease.datasource.type.Mysql;
import io.dataease.exception.DEException;
import io.dataease.extensions.datasource.dto.DatasourceDTO;
import io.dataease.extensions.datasource.dto.DatasourceRequest;
import io.dataease.extensions.datasource.factory.ProviderFactory;
import io.dataease.result.ResultMessage;
import io.dataease.utils.BeanUtils;
import io.dataease.utils.JsonUtil;
@ -75,12 +74,12 @@ public class EngineManage {
throw new Exception("未完整设置数据引擎");
}
try {
EngineProvider provider = ProviderUtil.getEngineProvider(engine.getType());
DatasourceRequest datasourceRequest = new DatasourceRequest();
DatasourceDTO datasource = new DatasourceDTO();
BeanUtils.copyBean(datasource, engine);
datasourceRequest.setDatasource(datasource);
provider.checkStatus(datasourceRequest);
ProviderFactory.getProvider(engine.getType()).checkStatus(datasourceRequest);
} catch (Exception e) {
DEException.throwException("校验失败:" + e.getMessage());
}

View File

@ -6,6 +6,7 @@ import io.dataease.datasource.dao.auto.entity.CoreDatasource;
import io.dataease.datasource.dao.auto.entity.CoreDriver;
import io.dataease.datasource.dao.auto.mapper.CoreDatasourceMapper;
import io.dataease.datasource.manage.EngineManage;
import io.dataease.datasource.request.EngineRequest;
import io.dataease.datasource.type.*;
import io.dataease.engine.constant.SQLConstants;
import io.dataease.exception.DEException;
@ -20,10 +21,13 @@ import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import org.apache.calcite.adapter.jdbc.JdbcSchema;
import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.config.Lex;
import org.apache.calcite.config.NullCollation;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.lang3.StringUtils;
@ -94,26 +98,6 @@ public class CalciteProvider extends Provider {
return schemas;
}
@Override
public List<DatasetTableDTO> getTables(DatasourceRequest datasourceRequest) {
List<DatasetTableDTO> tables = new ArrayList<>();
try (ConnectionObj con = getConnection(datasourceRequest.getDatasource()); Statement statement = getStatement(con.getConnection(), 30)) {
datasourceRequest.setDsVersion(con.getConnection().getMetaData().getDatabaseMajorVersion());
List<String> tablesSqls = getTablesSql(datasourceRequest);
for (String tablesSql : tablesSqls) {
ResultSet resultSet = statement.executeQuery(tablesSql);
while (resultSet.next()) {
tables.add(getTableDesc(datasourceRequest, resultSet));
}
}
} catch (Exception e) {
DEException.throwException(e.getMessage());
}
return tables;
}
@Override
public String checkStatus(DatasourceRequest datasourceRequest) throws Exception {
DatasourceConfiguration.DatasourceType datasourceType = DatasourceConfiguration.DatasourceType.valueOf(datasourceRequest.getDatasource().getType());
@ -146,6 +130,24 @@ public class CalciteProvider extends Provider {
return "Success";
}
@Override
public List<DatasetTableDTO> getTables(DatasourceRequest datasourceRequest) {
List<DatasetTableDTO> tables = new ArrayList<>();
try (Connection con = getConnectionFromPool(datasourceRequest.getDatasource().getId()); Statement statement = getStatement(con, 30)) {
datasourceRequest.setDsVersion(con.getMetaData().getDatabaseMajorVersion());
List<String> tablesSqls = getTablesSql(datasourceRequest);
for (String tablesSql : tablesSqls) {
ResultSet resultSet = statement.executeQuery(tablesSql);
while (resultSet.next()) {
tables.add(getTableDesc(datasourceRequest, resultSet));
}
}
} catch (Exception e) {
DEException.throwException(e.getMessage());
}
return tables;
}
@Override
public Map<String, Object> fetchResultField(DatasourceRequest datasourceRequest) throws DEException {
// 不跨数据源
@ -197,6 +199,22 @@ public class CalciteProvider extends Provider {
return map;
}
@Override
public String transSqlDialect(String sql, Map<Long, DatasourceSchemaDTO> dsMap) throws DEException {
DatasourceSchemaDTO value = dsMap.entrySet().iterator().next().getValue();
try (Connection connection = getConnectionFromPool(value.getId());) {
// 获取数据库version
if (connection != null) {
value.setDsVersion(connection.getMetaData().getDatabaseMajorVersion());
}
SqlParser parser = SqlParser.create(sql, SqlParser.Config.DEFAULT.withLex(Lex.JAVA));
SqlNode sqlNode = parser.parseStmt();
return sqlNode.toSqlString(getDialect(value)).toString();
} catch (Exception e) {
DEException.throwException(e.getMessage());
}
return null;
}
private List<TableField> fetchResultField(ResultSet rs) throws Exception {
List<TableField> fieldList = new ArrayList<>();
@ -229,7 +247,7 @@ public class CalciteProvider extends Provider {
String table = datasourceRequest.getTable();
if (StringUtils.isEmpty(table)) {
ResultSet resultSet = null;
try (ConnectionObj con = getConnection(datasourceRequest.getDatasource()); Statement statement = getStatement(con.getConnection(), 30)) {
try (Connection con = getConnectionFromPool(datasourceRequest.getDatasource().getId()); Statement statement = getStatement(con, 30)) {
if (DatasourceConfiguration.DatasourceType.valueOf(datasourceSchemaDTO.getType()) == DatasourceConfiguration.DatasourceType.oracle) {
statement.executeUpdate("ALTER SESSION SET CURRENT_SCHEMA = " + datasourceConfiguration.getSchema());
}
@ -248,8 +266,8 @@ public class CalciteProvider extends Provider {
}
} else {
ResultSet resultSet = null;
try (ConnectionObj con = getConnection(datasourceRequest.getDatasource()); Statement statement = getStatement(con.getConnection(), 30)) {
datasourceRequest.setDsVersion(con.getConnection().getMetaData().getDatabaseMajorVersion());
try (Connection con = getConnectionFromPool(datasourceRequest.getDatasource().getId()); Statement statement = getStatement(con, 30)) {
datasourceRequest.setDsVersion(con.getMetaData().getDatabaseMajorVersion());
if (datasourceRequest.getDatasource().getType().equalsIgnoreCase("mongo") || isDorisCatalog(datasourceRequest)) {
resultSet = statement.executeQuery("select * from " + table + " limit 0 offset 0 ");
return fetchResultField(resultSet);
@ -394,7 +412,7 @@ public class CalciteProvider extends Provider {
// schema
ResultSet resultSet = null;
try (ConnectionObj con = getConnection(datasourceRequest.getDatasource()); Statement statement = getPreparedStatement(con.getConnection(), datasourceConfiguration.getQueryTimeout(), datasourceRequest.getQuery(), datasourceRequest.getTableFieldWithValues())) {
try (Connection con = getConnectionFromPool(datasourceRequest.getDatasource().getId()); Statement statement = getPreparedStatement(con, datasourceConfiguration.getQueryTimeout(), datasourceRequest.getQuery(), datasourceRequest.getTableFieldWithValues())) {
if (DatasourceConfiguration.DatasourceType.valueOf(value.getType()) == DatasourceConfiguration.DatasourceType.oracle) {
statement.executeUpdate("ALTER SESSION SET CURRENT_SCHEMA = " + datasourceConfiguration.getSchema());
}
@ -434,16 +452,13 @@ public class CalciteProvider extends Provider {
public void exec(DatasourceRequest datasourceRequest) throws DEException {
DatasourceSchemaDTO value = datasourceRequest.getDsList().entrySet().iterator().next().getValue();
datasourceRequest.setDatasource(value);
DatasourceConfiguration datasourceConfiguration = JsonUtil.parseObject(datasourceRequest.getDatasource().getConfiguration(), DatasourceConfiguration.class);
// schema
ResultSet resultSet = null;
try (ConnectionObj con = getConnection(datasourceRequest.getDatasource()); Statement statement = getPreparedStatement(con.getConnection(), datasourceConfiguration.getQueryTimeout(), datasourceRequest.getQuery(), datasourceRequest.getTableFieldWithValues())) {
try (Connection con = getConnectionFromPool(datasourceRequest.getDatasource().getId()); Statement statement = getPreparedStatement(con, datasourceConfiguration.getQueryTimeout(), datasourceRequest.getQuery(), datasourceRequest.getTableFieldWithValues())) {
if (DatasourceConfiguration.DatasourceType.valueOf(value.getType()) == DatasourceConfiguration.DatasourceType.oracle) {
statement.executeUpdate("ALTER SESSION SET CURRENT_SCHEMA = " + datasourceConfiguration.getSchema());
}
if (CollectionUtils.isNotEmpty(datasourceRequest.getTableFieldWithValues())) {
LogUtil.info("execWithPreparedStatement sql: " + datasourceRequest.getQuery());
for (int i = 0; i < datasourceRequest.getTableFieldWithValues().size(); i++) {
@ -474,12 +489,10 @@ public class CalciteProvider extends Provider {
public int executeUpdate(DatasourceRequest datasourceRequest) throws DEException {
DatasourceSchemaDTO value = datasourceRequest.getDsList().entrySet().iterator().next().getValue();
datasourceRequest.setDatasource(value);
DatasourceConfiguration datasourceConfiguration = JsonUtil.parseObject(datasourceRequest.getDatasource().getConfiguration(), DatasourceConfiguration.class);
// schema
ResultSet resultSet = null;
try (ConnectionObj con = getConnection(datasourceRequest.getDatasource()); Statement statement = getPreparedStatement(con.getConnection(), datasourceConfiguration.getQueryTimeout(), datasourceRequest.getQuery(), datasourceRequest.getTableFieldWithValues())) {
try (Connection con = getConnectionFromPool(datasourceRequest.getDatasource().getId()); Statement statement = getPreparedStatement(con, datasourceConfiguration.getQueryTimeout(), datasourceRequest.getQuery(), datasourceRequest.getTableFieldWithValues())) {
if (DatasourceConfiguration.DatasourceType.valueOf(value.getType()) == DatasourceConfiguration.DatasourceType.oracle) {
statement.executeUpdate("ALTER SESSION SET CURRENT_SCHEMA = " + datasourceConfiguration.getSchema());
}
@ -697,17 +710,13 @@ public class CalciteProvider extends Provider {
return tableField;
}
public Connection initConnection(Map<Long, DatasourceSchemaDTO> dsMap) {
Connection connection = getCalciteConnection();
public Connection initConnection(Map<Long, DatasourceSchemaDTO> dsMap) throws SQLException {
Connection connection = take();
CalciteConnection calciteConnection = null;
try {
calciteConnection = connection.unwrap(CalciteConnection.class);
} catch (Exception e) {
DEException.throwException(e);
}
calciteConnection = connection.unwrap(CalciteConnection.class);
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDsList(dsMap);
SchemaPlus rootSchema = buildSchema(datasourceRequest, calciteConnection);
buildSchema(datasourceRequest, calciteConnection);
return connection;
}
@ -897,10 +906,8 @@ public class CalciteProvider extends Provider {
rootSchema.add(ds.getSchemaAlias(), schema);
}
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
@ -1245,11 +1252,8 @@ public class CalciteProvider extends Provider {
return null;
}
private Connection connection = null;
public static int capacity = 10;
public void initConnectionPool() {
LogUtil.info("Begin to init datasource pool...");
QueryWrapper<CoreDatasource> datasourceQueryWrapper = new QueryWrapper();
@ -1258,25 +1262,22 @@ public class CalciteProvider extends Provider {
if (engine != null) {
coreDatasources.add(engine);
}
Map<Long, DatasourceSchemaDTO> dsMap = new HashMap<>();
for (CoreDatasource coreDatasource : coreDatasources) {
Map<Long, DatasourceSchemaDTO> dsMap = new HashMap<>();
DatasourceSchemaDTO datasourceSchemaDTO = new DatasourceSchemaDTO();
BeanUtils.copyBean(datasourceSchemaDTO, coreDatasource);
datasourceSchemaDTO.setSchemaAlias(String.format(SQLConstants.SCHEMA, datasourceSchemaDTO.getId()));
dsMap.put(datasourceSchemaDTO.getId(), datasourceSchemaDTO);
}
LogUtil.info("dsMap size..." + dsMap.keySet().size());
try {
commonThreadPool.addTask(() -> {
try {
connection = initConnection(dsMap);
} catch (Exception e) {
} catch (Exception ignore) {
}
});
} catch (Exception e) {
}
LogUtil.info("dsMap size..." + coreDatasources.size());
}
public void update(DatasourceDTO datasourceDTO) throws DEException {
@ -1333,13 +1334,37 @@ public class CalciteProvider extends Provider {
Provider.getSessions().remove(datasource.getId());
}
public Connection take() {
if (connection == null) {
DEException.throwException("初始化连接池失败!");
if (connection == null) { // 第一次检查无需锁
synchronized (Connection.class) { // 同步块
if (connection == null) { // 第二次检查需要锁
connection = getCalciteConnection();
}
}
}
return connection;
}
private Connection getConnectionFromPool(Long dsId) throws SQLException {
Connection connection = take();
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
SchemaPlus rootSchema = calciteConnection.getRootSchema();
JdbcSchema jdbcSchema = rootSchema.getSubSchema(String.format(SQLConstants.SCHEMA, dsId)).unwrap(JdbcSchema.class);
BasicDataSource basicDataSource = (BasicDataSource) jdbcSchema.getDataSource();
return basicDataSource.getConnection();
}
public void exec(EngineRequest engineRequest) throws Exception {
DatasourceConfiguration configuration = JsonUtil.parseObject(engineRequest.getEngine().getConfiguration(), DatasourceConfiguration.class);
int queryTimeout = configuration.getQueryTimeout();
DatasourceDTO datasource = new DatasourceDTO();
BeanUtils.copyBean(datasource, engineRequest.getEngine());
try (Connection connection = getConnectionFromPool(datasource.getId()); Statement stat = getStatement(connection, queryTimeout)) {
PreparedStatement preparedStatement = connection.prepareStatement(engineRequest.getQuery());
preparedStatement.setQueryTimeout(queryTimeout);
Boolean result = preparedStatement.execute();
} catch (Exception e) {
throw e;
}
}
}

View File

@ -10,7 +10,7 @@ import java.util.List;
* @Author gin
* @Date 2021/5/17 4:19 下午
*/
public abstract class EngineProvider extends CalciteProvider {
public abstract class EngineProvider {
public abstract String createView(String name, String viewSQL);
public abstract String dropTable(String name);
@ -23,6 +23,5 @@ public abstract class EngineProvider extends CalciteProvider {
public abstract String insertSql(String name, List<String[]> dataList, int page, int pageNumber);
public void exec(EngineRequest datasourceRequest) throws Exception {
}
}

View File

@ -5,6 +5,7 @@ import io.dataease.dataset.utils.TableUtils;
import io.dataease.datasource.dao.auto.entity.CoreDeEngine;
import io.dataease.datasource.request.EngineRequest;
import io.dataease.datasource.type.H2;
import io.dataease.datasource.type.Mysql;
import io.dataease.extensions.datasource.dto.ConnectionObj;
import io.dataease.extensions.datasource.dto.DatasourceDTO;
import io.dataease.extensions.datasource.dto.TableField;
@ -22,21 +23,6 @@ import java.util.List;
@Service("h2Engine")
public class H2EngineProvider extends EngineProvider {
public void exec(EngineRequest engineRequest) throws Exception {
DatasourceConfiguration configuration = JsonUtil.parseObject(engineRequest.getEngine().getConfiguration(), H2.class);
int queryTimeout = configuration.getQueryTimeout();
DatasourceDTO datasource = new DatasourceDTO();
BeanUtils.copyBean(datasource, engineRequest.getEngine());
try (ConnectionObj connection = getConnection(datasource); Statement stat = getStatement(connection.getConnection(), queryTimeout)) {
PreparedStatement preparedStatement = connection.getConnection().prepareStatement(engineRequest.getQuery());
preparedStatement.setQueryTimeout(queryTimeout);
Boolean result = preparedStatement.execute();
} catch (Exception e) {
throw e;
}
}
private static final String creatTableSql =
"CREATE TABLE IF NOT EXISTS `TABLE_NAME`" +
"Column_Fields;";

View File

@ -26,21 +26,6 @@ import java.util.List;
@Service("mysqlEngine")
public class MysqlEngineProvider extends EngineProvider {
public void exec(EngineRequest engineRequest) throws Exception {
DatasourceConfiguration configuration = JsonUtil.parseObject(engineRequest.getEngine().getConfiguration(), Mysql.class);
int queryTimeout = configuration.getQueryTimeout();
DatasourceDTO datasource = new DatasourceDTO();
BeanUtils.copyBean(datasource, engineRequest.getEngine());
try (ConnectionObj connection = getConnection(datasource); Statement stat = getStatement(connection.getConnection(), queryTimeout)) {
PreparedStatement preparedStatement = connection.getConnection().prepareStatement(engineRequest.getQuery());
preparedStatement.setQueryTimeout(queryTimeout);
Boolean result = preparedStatement.execute();
} catch (Exception e) {
throw e;
}
}
private static final String creatTableSql =
"CREATE TABLE IF NOT EXISTS `TABLE_NAME`" +
"Column_Fields;";

View File

@ -133,15 +133,12 @@ public abstract class Provider {
}
public String transSqlDialect(String sql, Map<Long, DatasourceSchemaDTO> dsMap) throws DEException {
try {
DatasourceSchemaDTO value = dsMap.entrySet().iterator().next().getValue();
DatasourceSchemaDTO value = dsMap.entrySet().iterator().next().getValue();
try (ConnectionObj connection = getConnection(value)) {
// 获取数据库version
ConnectionObj connection = getConnection(value);
if (connection != null) {
value.setDsVersion(connection.getConnection().getMetaData().getDatabaseMajorVersion());
}
SqlParser parser = SqlParser.create(sql, SqlParser.Config.DEFAULT.withLex(Lex.JAVA));
SqlNode sqlNode = parser.parseStmt();
return sqlNode.toSqlString(getDialect(value)).toString();