feat: 数据源连接池

This commit is contained in:
taojinlong 2021-04-14 14:15:11 +08:00
parent 230ff9e5a5
commit e5226cfab2
3 changed files with 116 additions and 23 deletions

View File

@ -19,7 +19,7 @@ public abstract class DatasourceProvider {
abstract public List<String> getTables(DatasourceRequest datasourceRequest) throws Exception;
public List<TableFiled> getTableFileds(DatasourceRequest datasourceRequest) throws Exception{
public List<TableFiled> getTableFileds(DatasourceRequest datasourceRequest) throws Exception {
return new ArrayList<>();
};
@ -27,7 +27,7 @@ public abstract class DatasourceProvider {
getData(datasourceRequest);
}
abstract public Long count(DatasourceRequest datasourceRequest)throws Exception;
abstract public Long count(DatasourceRequest datasourceRequest) throws Exception;
abstract public List<String[]> getPageData(DatasourceRequest datasourceRequest) throws Exception;
@ -35,4 +35,5 @@ public abstract class DatasourceProvider {
abstract public List<TableFiled> fetchResultField(ResultSet rs) throws Exception;
abstract public void initConnectionPool(DatasourceRequest datasourceRequest) throws Exception;
}

View File

@ -13,24 +13,28 @@ import org.springframework.stereotype.Service;
import java.sql.*;
import java.text.MessageFormat;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
@Service("jdbc")
public class JdbcProvider extends DatasourceProvider {
private static Map<String, ArrayBlockingQueue<Connection>> jdbcConnection = new HashMap<>();
private static int poolSize = 20;
@Override
public List<String[]> getData(DatasourceRequest datasourceRequest) throws Exception {
List<String[]> list = new LinkedList<>();
try (
Connection connection = getConnection(datasourceRequest);
Statement stat = connection.createStatement();
ResultSet rs = stat.executeQuery(datasourceRequest.getQuery())
) {
Connection connection = null;
try {
connection = getConnectionFromPool(datasourceRequest);
Statement stat = connection.createStatement();
ResultSet rs = stat.executeQuery(datasourceRequest.getQuery());
list = fetchResult(rs);
} catch (SQLException e) {
throw new Exception("ERROR:" + e.getMessage(), e);
} catch (Exception e) {
throw new Exception("ERROR:" + e.getMessage(), e);
}finally {
returnSource(connection, datasourceRequest.getDatasource().getId());
}
return list;
}
@ -38,14 +42,18 @@ public class JdbcProvider extends DatasourceProvider {
@Override
public ResultSet getDataResultSet(DatasourceRequest datasourceRequest) throws Exception {
ResultSet rs;
Connection connection = null;
try {
Connection connection = getConnection(datasourceRequest);
connection = getConnectionFromPool(datasourceRequest);
Statement stat = connection.createStatement();
rs = stat.executeQuery(datasourceRequest.getQuery());
returnSource(connection, datasourceRequest.getDatasource().getId());
} catch (SQLException e) {
throw new Exception("ERROR:" + e.getMessage(), e);
} catch (Exception e) {
throw new Exception("ERROR:" + e.getMessage(), e);
}finally {
returnSource(connection, datasourceRequest.getDatasource().getId());
}
return rs;
}
@ -53,16 +61,19 @@ public class JdbcProvider extends DatasourceProvider {
@Override
public List<String[]> getPageData(DatasourceRequest datasourceRequest) throws Exception {
List<String[]> list = new LinkedList<>();
try (
Connection connection = getConnection(datasourceRequest);
Statement stat = connection.createStatement();
ResultSet rs = stat.executeQuery(datasourceRequest.getQuery() + MessageFormat.format(" LIMIT {0}, {1}", (datasourceRequest.getStartPage() - 1) * datasourceRequest.getPageSize(), datasourceRequest.getPageSize()))
) {
Connection connection = null;
try {
connection = getConnectionFromPool(datasourceRequest);
Statement stat = connection.createStatement();
ResultSet rs = stat.executeQuery(datasourceRequest.getQuery() + MessageFormat.format(" LIMIT {0}, {1}", (datasourceRequest.getStartPage() - 1) * datasourceRequest.getPageSize(), datasourceRequest.getPageSize()));
returnSource(connection, datasourceRequest.getDatasource().getId());
list = fetchResult(rs);
} catch (SQLException e) {
throw new Exception("ERROR:" + e.getMessage(), e);
} catch (Exception e) {
throw new Exception("ERROR:" + e.getMessage(), e);
}finally {
returnSource(connection, datasourceRequest.getDatasource().getId());
}
return list;
}
@ -112,23 +123,28 @@ public class JdbcProvider extends DatasourceProvider {
public List<String> getTables(DatasourceRequest datasourceRequest) throws Exception {
List<String> tables = new ArrayList<>();
String queryStr = getTablesSql(datasourceRequest);
try (Connection con = getConnection(datasourceRequest); Statement ps = con.createStatement()) {
Connection con = null;
try {
con = getConnectionFromPool(datasourceRequest);
Statement ps = con.createStatement();
ResultSet resultSet = ps.executeQuery(queryStr);
while (resultSet.next()) {
tables.add(resultSet.getString(1));
}
return tables;
} catch (Exception e) {
throw new Exception("ERROR: " + e.getMessage(), e);
}finally {
returnSource(con, datasourceRequest.getDatasource().getId());
}
return tables;
}
@Override
public List<TableFiled> getTableFileds(DatasourceRequest datasourceRequest) throws Exception {
List<TableFiled> list = new LinkedList<>();
try (
Connection connection = getConnection(datasourceRequest);
) {
Connection connection = null;
try {
connection = getConnectionFromPool(datasourceRequest);
DatabaseMetaData databaseMetaData = connection.getMetaData();
ResultSet resultSet = databaseMetaData.getColumns(null, "%", datasourceRequest.getTable().toUpperCase(), "%");
while (resultSet.next()) {
@ -152,6 +168,8 @@ public class JdbcProvider extends DatasourceProvider {
throw new Exception("ERROR:" + e.getMessage(), e);
} catch (Exception e) {
throw new Exception("ERROR:" + e.getMessage(), e);
}finally {
returnSource(connection, datasourceRequest.getDatasource().getId());
}
return list;
}
@ -161,27 +179,73 @@ public class JdbcProvider extends DatasourceProvider {
@Override
public void test(DatasourceRequest datasourceRequest) throws Exception {
String queryStr = getTablesSql(datasourceRequest);
try (Connection con = getConnection(datasourceRequest); Statement ps = con.createStatement()) {
Connection con = null;
try {
con = getConnection(datasourceRequest);
Statement ps = con.createStatement();
ResultSet resultSet = ps.executeQuery(queryStr);
} catch (Exception e) {
throw new Exception("ERROR: " + e.getMessage(), e);
}finally {
con.close();
}
}
public Long count(DatasourceRequest datasourceRequest) throws Exception {
try (Connection con = getConnection(datasourceRequest); Statement ps = con.createStatement()) {
Connection con = null;
try {
con = getConnectionFromPool(datasourceRequest); Statement ps = con.createStatement();
ResultSet resultSet = ps.executeQuery(datasourceRequest.getQuery());
while (resultSet.next()) {
return resultSet.getLong(1);
}
} catch (Exception e) {
throw new Exception("ERROR: " + e.getMessage(), e);
}finally {
returnSource(con, datasourceRequest.getDatasource().getId());
}
return 0L;
}
private Connection getConnection(DatasourceRequest datasourceRequest) throws Exception {
private void returnSource(Connection connection, String dataSourceId) throws Exception{
if(connection != null && !connection.isClosed()){
ArrayBlockingQueue<Connection> connections = jdbcConnection.get(dataSourceId);
connections.put(connection);
}
}
private Connection getConnectionFromPool(DatasourceRequest datasourceRequest)throws Exception {
ArrayBlockingQueue<Connection> connections = jdbcConnection.get(datasourceRequest.getDatasource().getId());
if (connections == null) {
initConnectionPool(datasourceRequest);
}
connections = jdbcConnection.get(datasourceRequest.getDatasource().getId());
Connection co = connections.take();
return co;
}
@Override
public void initConnectionPool(DatasourceRequest datasourceRequest)throws Exception{
ArrayBlockingQueue<Connection> connections = jdbcConnection.get(datasourceRequest.getDatasource().getId());
if (connections == null) {
connections = new ArrayBlockingQueue<>(poolSize);
for (int i = 0; i < poolSize ; i++) {
Connection connection = getConnection(datasourceRequest);
connections.add(connection);
}
jdbcConnection.put(datasourceRequest.getDatasource().getId(), connections);
}else {
for (int i = 0; i < poolSize ; i++) {
Connection connection = connections.take();
connection.close();
connection = getConnection(datasourceRequest);
connections.add(connection);
}
}
}
private static Connection getConnection(DatasourceRequest datasourceRequest) throws Exception {
String username = null;
String password = null;
String driver = null;

View File

@ -5,6 +5,7 @@ import io.dataease.base.mapper.*;
import io.dataease.base.mapper.ext.ExtDataSourceMapper;
import io.dataease.base.mapper.ext.query.GridExample;
import io.dataease.commons.exception.DEException;
import io.dataease.commons.utils.CommonThreadPool;
import io.dataease.controller.sys.base.BaseGridRequest;
import io.dataease.datasource.provider.DatasourceProvider;
import io.dataease.datasource.provider.ProviderFactory;
@ -24,7 +25,8 @@ public class DatasourceService {
@Resource
private DatasourceMapper datasourceMapper;
@Resource
private CommonThreadPool commonThreadPool;
@Resource
private ExtDataSourceMapper extDataSourceMapper;
@ -39,6 +41,7 @@ public class DatasourceService {
datasource.setUpdateTime(currentTimeMillis);
datasource.setCreateTime(currentTimeMillis);
datasourceMapper.insertSelective(datasource);
initConnectionPool(datasource);
return datasource;
}
@ -68,6 +71,7 @@ public class DatasourceService {
datasource.setCreateTime(null);
datasource.setUpdateTime(System.currentTimeMillis());
datasourceMapper.updateByPrimaryKeySelective(datasource);
initConnectionPool(datasource);
}
public void validate(Datasource datasource) throws Exception {
@ -89,4 +93,28 @@ public class DatasourceService {
return datasourceMapper.selectByPrimaryKey(id);
}
private void initConnectionPool(Datasource datasource){
commonThreadPool.addTask(() ->{
try {
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(datasource.getType());
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(datasource);
datasourceProvider.initConnectionPool(datasourceRequest);
}catch (Exception e){}
});
}
public void initAllDataSourceConnectionPool(){
List<Datasource> datasources = datasourceMapper.selectByExampleWithBLOBs(new DatasourceExample());
datasources.forEach(datasource -> {
try {
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(datasource.getType());
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(datasource);
datasourceProvider.initConnectionPool(datasourceRequest);
}catch (Exception e){
e.printStackTrace();
}
});
}
}