feat: 集群环境websocket消息

This commit is contained in:
fit2cloud-chenyw 2022-04-13 18:30:31 +08:00
parent d7521f8861
commit 0c16ca38e4
18 changed files with 355 additions and 36 deletions

View File

@ -0,0 +1,12 @@
package io.dataease.commons.constants;
public class RedisConstants {
public static final String GLOBAL_REDIS_TOPIC = "global_redis_topic";
public static final String PLUGIN_INSTALL_MSG = "pluginMsgService";
public static final String WEBSOCKET_MSG = "wsMsgService";
}

View File

@ -0,0 +1,13 @@
package io.dataease.commons.model;
import lombok.Data;
import java.io.Serializable;
@Data
public class RedisMessage<T> implements Serializable {
private String type;
private T data;
}

View File

@ -2,19 +2,24 @@ package io.dataease.config;
import io.dataease.commons.condition.RedisStatusCondition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
@Conditional({RedisStatusCondition.class})
@Configuration
public class RedisConfig {
@Conditional({RedisStatusCondition.class})
@Autowired
private RedisConnectionFactory redisConnectionFactory;
@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
@ -24,4 +29,11 @@ public class RedisConfig {
return redisTemplate;
}
@Bean
public RedisMessageListenerContainer redisContainer() {
final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
return container;
}
}

View File

@ -0,0 +1,59 @@
package io.dataease.listener;
import com.google.gson.Gson;
import io.dataease.commons.condition.RedisStatusCondition;
import io.dataease.commons.constants.RedisConstants;
import io.dataease.commons.model.RedisMessage;
import io.dataease.commons.utils.CommonBeanFactory;
import io.dataease.commons.utils.LogUtil;
import io.dataease.service.redis.RedisMessageBroadcast;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.event.EventListener;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Conditional({RedisStatusCondition.class})
@Service
public class RedisMessageSubscriber implements MessageListener {
@Resource
private RedisMessageListenerContainer redisMessageListenerContainer;
private static final Gson json = new Gson();
@Autowired
private RedisTemplate redisTemplate;
/**
* 启动之后订阅 topic
*/
@EventListener
public void init(ApplicationReadyEvent event) {
String topic = RedisConstants.GLOBAL_REDIS_TOPIC;
LogUtil.info("Subscribe Topic: " + topic);
redisMessageListenerContainer.addMessageListener(new MessageListenerAdapter(this), new ChannelTopic(topic));
}
/**
* @param message 消息内容
* @param pattern 暂时用不到
*/
public void onMessage(final Message message, final byte[] pattern) {
RedisMessage redisMessage = json.fromJson(message.toString(), RedisMessage.class);
RedisMessageBroadcast service = (RedisMessageBroadcast)CommonBeanFactory.getBean(redisMessage.getType());
service.messageCallBack(redisMessage.getData());
}
}

View File

@ -0,0 +1,6 @@
package io.dataease.service.redis;
public interface RedisMessageBroadcast<T> {
void messageCallBack(T arg);
}

View File

@ -0,0 +1,13 @@
package io.dataease.service.redis.impl;
import io.dataease.service.redis.RedisMessageBroadcast;
import org.springframework.stereotype.Service;
@Service
public class PluginMsgService implements RedisMessageBroadcast {
@Override
public void messageCallBack(Object arg) {
}
}

View File

@ -0,0 +1,29 @@
package io.dataease.service.redis.impl;
import com.google.gson.Gson;
import io.dataease.service.redis.RedisMessageBroadcast;
import io.dataease.websocket.entity.WsMessage;
import io.dataease.websocket.service.impl.StandaloneWsService;
import io.dataease.websocket.util.WsUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Map;
@Service
public class WsMsgService implements RedisMessageBroadcast<Map> {
private static Gson json = new Gson();
@Autowired
private StandaloneWsService standaloneWsService;
@Override
public void messageCallBack(Map arg) {
WsMessage message = json.fromJson(json.toJson(arg), WsMessage.class);
Long userId = message.getUserId();
if (WsUtil.isOnLine(userId)) {
standaloneWsService.releaseMessage(message);
}
}
}

View File

@ -1,5 +1,7 @@
package io.dataease.websocket.config;
import io.dataease.websocket.factory.DeWsHandlerFactory;
import io.dataease.websocket.handler.PrincipalHandshakeHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
@ -15,7 +17,10 @@ public class WsConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/websocket").setAllowedOriginPatterns("*").withSockJS();
registry.addEndpoint("/websocket")
.setAllowedOriginPatterns("*")
.setHandshakeHandler(new PrincipalHandshakeHandler())
.withSockJS();
}
@Override
@ -26,6 +31,7 @@ public class WsConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
registry.addDecoratorFactory(new DeWsHandlerFactory());
registry.setMessageSizeLimit(8192) //设置消息字节数大小
.setSendBufferSizeLimit(8192)//设置消息缓存大小
.setSendTimeLimit(10000); //设置消息发送时间限制毫秒

View File

@ -0,0 +1,17 @@
package io.dataease.websocket.entity;
import java.security.Principal;
public class DePrincipal implements Principal {
public DePrincipal(String name) {
this.name = name;
}
private String name;
@Override
public String getName() {
return name;
}
}

View File

@ -0,0 +1,33 @@
package io.dataease.websocket.factory;
import io.dataease.websocket.util.WsUtil;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.WebSocketHandlerDecorator;
public class DeWebSocketHandlerDecorator extends WebSocketHandlerDecorator {
public DeWebSocketHandlerDecorator(WebSocketHandler delegate) {
super(delegate);
}
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
String name = session.getPrincipal().getName();
Long userId = Long.parseLong(name);
WsUtil.onLine(userId);
super.afterConnectionEstablished(session);
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
String name = session.getPrincipal().getName();
Long userId = Long.parseLong(name);
WsUtil.offLine(userId);
super.afterConnectionClosed(session, closeStatus);
}
}

View File

@ -0,0 +1,15 @@
package io.dataease.websocket.factory;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.handler.WebSocketHandlerDecorator;
import org.springframework.web.socket.handler.WebSocketHandlerDecoratorFactory;
public class DeWsHandlerFactory implements WebSocketHandlerDecoratorFactory {
@Override
public WebSocketHandler decorate(WebSocketHandler webSocketHandler) {
return new DeWebSocketHandlerDecorator(webSocketHandler);
}
}

View File

@ -0,0 +1,30 @@
package io.dataease.websocket.handler;
import io.dataease.websocket.entity.DePrincipal;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;
import javax.servlet.http.HttpServletRequest;
import java.security.Principal;
import java.util.Map;
public class PrincipalHandshakeHandler extends DefaultHandshakeHandler {
@Override
protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
if (request instanceof ServletServerHttpRequest) {
ServletServerHttpRequest servletServerHttpRequest = (ServletServerHttpRequest) request;
HttpServletRequest httpRequest = servletServerHttpRequest.getServletRequest();
final String userId = httpRequest.getParameter("userId");
if (StringUtils.isEmpty(userId)) {
return null;
}
return new DePrincipal(userId);
}
return null;
//return super.determineUser(request, wsHandler, attributes);
}
}

View File

@ -1,39 +1,11 @@
package io.dataease.websocket.service;
import io.dataease.websocket.entity.WsMessage;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import java.util.Optional;
@Component
public class WsService {
public interface WsService {
@Resource
private SimpMessagingTemplate messagingTemplate;
public void releaseMessage (List<WsMessage> wsMessages) {
Optional.ofNullable(wsMessages).ifPresent(messages -> {
messages.forEach(this::releaseMessage);
});
}
public void releaseMessage(WsMessage wsMessage){
if(ObjectUtils.isEmpty(wsMessage) || ObjectUtils.isEmpty(wsMessage.getUserId()) || ObjectUtils.isEmpty(wsMessage.getTopic())) return;
messagingTemplate.convertAndSendToUser(String.valueOf(wsMessage.getUserId()), wsMessage.getTopic(),wsMessage.getData());
}
void releaseMessage(WsMessage wsMessage);
}

View File

@ -0,0 +1,34 @@
package io.dataease.websocket.service.impl;
import io.dataease.commons.condition.RedisStatusCondition;
import io.dataease.commons.constants.RedisConstants;
import io.dataease.commons.model.RedisMessage;
import io.dataease.websocket.entity.WsMessage;
import io.dataease.websocket.service.WsService;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
@Component
@Conditional({RedisStatusCondition.class})
@Primary
public class DistributedWsService implements WsService {
@Autowired
private RedisTemplate redisTemplate;
public void releaseMessage(WsMessage wsMessage){
if(ObjectUtils.isEmpty(wsMessage) || ObjectUtils.isEmpty(wsMessage.getUserId()) || ObjectUtils.isEmpty(wsMessage.getTopic())) return;
RedisMessage<WsMessage> msg = new RedisMessage();
msg.setType(RedisConstants.WEBSOCKET_MSG);
msg.setData(wsMessage);
redisTemplate.convertAndSend(RedisConstants.GLOBAL_REDIS_TOPIC, msg);
}
}

View File

@ -0,0 +1,21 @@
package io.dataease.websocket.service.impl;
import io.dataease.websocket.entity.WsMessage;
import io.dataease.websocket.service.WsService;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class StandaloneWsService implements WsService {
@Resource
private SimpMessagingTemplate messagingTemplate;
public void releaseMessage(WsMessage wsMessage){
if(ObjectUtils.isEmpty(wsMessage) || ObjectUtils.isEmpty(wsMessage.getUserId()) || ObjectUtils.isEmpty(wsMessage.getTopic())) return;
messagingTemplate.convertAndSendToUser(String.valueOf(wsMessage.getUserId()), wsMessage.getTopic(),wsMessage.getData());
}
}

View File

@ -0,0 +1,46 @@
package io.dataease.websocket.util;
import io.dataease.auth.api.dto.CurrentUserDto;
import io.dataease.commons.utils.AuthUtils;
import org.apache.commons.lang3.ObjectUtils;
import java.util.concurrent.CopyOnWriteArraySet;
public class WsUtil {
private static final CopyOnWriteArraySet<Long> ONLINE_USERS = new CopyOnWriteArraySet();
public static boolean onLine() {
CurrentUserDto user = AuthUtils.getUser();
if (ObjectUtils.isNotEmpty(user) && ObjectUtils.isNotEmpty(user.getUserId()))
return onLine(user.getUserId());
return false;
}
public static boolean onLine(Long userId) {
return ONLINE_USERS.add(userId);
}
public static boolean offLine() {
CurrentUserDto user = AuthUtils.getUser();
if (ObjectUtils.isNotEmpty(user) && ObjectUtils.isNotEmpty(user.getUserId()))
return offLine(user.getUserId());
return false;
}
public static boolean offLine(Long userId) {
return ONLINE_USERS.remove(userId);
}
public static boolean isOnLine(Long userId) {
return ONLINE_USERS.contains(userId);
}
/*public static void releaseMessage(WsMessage wsMessage){
if(ObjectUtils.isEmpty(wsMessage) || ObjectUtils.isEmpty(wsMessage.getUserId()) || ObjectUtils.isEmpty(wsMessage.getTopic())) return;
CommonBeanFactory.getBean()
}*/
}

View File

@ -48,7 +48,7 @@
</div>
</div>
<div slot="reference">
<el-badge :value="visible ? paginationConfig.total : count" :hidden="!count && !paginationConfig.total" :max="99" class="item">
<el-badge :value="visible && !loading ? paginationConfig.total : count" :hidden="!count && !paginationConfig.total" :max="99" class="item">
<svg-icon class-name="notification" icon-class="notification" />
</el-badge>

View File

@ -46,11 +46,12 @@ class DeWebsocket {
}
connection() {
const socket = new SockJS(this.ws_url)
/* const socket = new SockJS('http://localhost:8081' + this.ws_url) */
if (!this.isLoginStatu()) {
return
}
const socket = new SockJS(this.ws_url + '?userId=' + store.state.user.user.userId)
/* const socket = new SockJS('http://localhost:8081' + this.ws_url) */
this.client = Stomp.over(socket)
const heads = {
/* Authorization: '', */