fix: 集群模式下,更新数据源信息

This commit is contained in:
taojinlong 2022-05-06 18:45:09 +08:00
parent e23fff0dcd
commit 586f4b0307
3 changed files with 91 additions and 1 deletions

View File

@ -8,5 +8,7 @@ public class RedisConstants {
public static final String WEBSOCKET_MSG = "wsMsgService";
public static final String DS_REDIS_TOPIC = "ds_redis_topic";
}

View File

@ -0,0 +1,61 @@
package io.dataease.listener;
import com.google.gson.Gson;
import io.dataease.commons.condition.RedisStatusCondition;
import io.dataease.commons.constants.RedisConstants;
import io.dataease.commons.utils.LogUtil;
import io.dataease.service.datasource.DatasourceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.event.EventListener;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Conditional({RedisStatusCondition.class})
@Service
public class RedisMessageDataSourceSubscriber implements MessageListener {
@Resource
private RedisMessageListenerContainer redisMessageListenerContainer;
private static final Gson json = new Gson();
@Autowired
private RedisTemplate redisTemplate;
@Resource
private DatasourceService datasourceService;
/**
* 启动之后订阅 topic
*/
@EventListener
public void init(ApplicationReadyEvent event) {
String topic = RedisConstants.DS_REDIS_TOPIC;
LogUtil.info("Subscribe Topic: " + topic);
redisMessageListenerContainer.addMessageListener(new MessageListenerAdapter(this), new ChannelTopic(topic));
}
/**
* @param message 消息内容
* @param pattern 暂时用不到
*/
public void onMessage(final Message message, final byte[] pattern) {
try {
byte[] messageBody = message.getBody();
// 使用值序列化器转换
Object o = redisTemplate.getValueSerializer().deserialize(messageBody);
datasourceService.handleConnectionPool(o.toString(), "edit");
}catch (Exception e){
LogUtil.error(e);
}
}
}

View File

@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.Gson;
import io.dataease.auth.annotation.DeCleaner;
import io.dataease.commons.constants.RedisConstants;
import io.dataease.ext.ExtDataSourceMapper;
import io.dataease.ext.query.GridExample;
import io.dataease.commons.constants.DePermissionType;
@ -40,6 +41,10 @@ import io.dataease.service.sys.SysAuthService;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@ -63,6 +68,8 @@ public class DatasourceService {
private CommonThreadPool commonThreadPool;
@Resource
private SysAuthService sysAuthService;
@Resource
private Environment env;
public Collection<DataSourceType>types(){
Collection<DataSourceType> types = new ArrayList<>();
@ -91,6 +98,15 @@ public class DatasourceService {
return datasource;
}
public void handleConnectionPool(String datasourceId, String type) {
Datasource datasource = datasourceMapper.selectByPrimaryKey(datasourceId);
if(datasource == null){
return;
}
handleConnectionPool(datasource, type);
}
public void handleConnectionPool(Datasource datasource, String type) {
commonThreadPool.addTask(() -> {
try {
@ -193,7 +209,18 @@ public class DatasourceService {
DatasourceExample example = new DatasourceExample();
example.createCriteria().andIdEqualTo(updataDsRequest.getId());
datasourceMapper.updateByExampleSelective(datasource, example);
handleConnectionPool(datasource, "edit");
handleConnectionPool(updataDsRequest.getId());
}
private void handleConnectionPool(String datasourceId){
String cacheType = env.getProperty("spring.cache.type");
if(cacheType != null && cacheType.equalsIgnoreCase("redis")){
handleConnectionPool(datasourceId, "delete");
RedisTemplate redisTemplate = SpringContextUtil.getBean("redisTemplate", RedisTemplate.class);
redisTemplate.convertAndSend(RedisConstants.DS_REDIS_TOPIC, datasourceId);
}else {
handleConnectionPool(datasourceId, "edit");
}
}
public ResultHolder validate(DatasourceDTO datasource) throws Exception {