diff --git a/backend/src/main/java/io/dataease/commons/constants/RedisConstants.java b/backend/src/main/java/io/dataease/commons/constants/RedisConstants.java new file mode 100644 index 0000000000..96c7647efb --- /dev/null +++ b/backend/src/main/java/io/dataease/commons/constants/RedisConstants.java @@ -0,0 +1,12 @@ +package io.dataease.commons.constants; + +public class RedisConstants { + + public static final String GLOBAL_REDIS_TOPIC = "global_redis_topic"; + + public static final String PLUGIN_INSTALL_MSG = "pluginMsgService"; + + public static final String WEBSOCKET_MSG = "wsMsgService"; + + +} diff --git a/backend/src/main/java/io/dataease/commons/model/RedisMessage.java b/backend/src/main/java/io/dataease/commons/model/RedisMessage.java new file mode 100644 index 0000000000..1d4f54c894 --- /dev/null +++ b/backend/src/main/java/io/dataease/commons/model/RedisMessage.java @@ -0,0 +1,13 @@ +package io.dataease.commons.model; + +import lombok.Data; + +import java.io.Serializable; + +@Data +public class RedisMessage implements Serializable { + + private String type; + + private T data; +} diff --git a/backend/src/main/java/io/dataease/config/RedisConfig.java b/backend/src/main/java/io/dataease/config/RedisConfig.java index c3778545bc..765d6014c4 100644 --- a/backend/src/main/java/io/dataease/config/RedisConfig.java +++ b/backend/src/main/java/io/dataease/config/RedisConfig.java @@ -2,19 +2,24 @@ package io.dataease.config; import io.dataease.commons.condition.RedisStatusCondition; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; - +@Conditional({RedisStatusCondition.class}) @Configuration public class RedisConfig { - @Conditional({RedisStatusCondition.class}) + @Autowired + private RedisConnectionFactory redisConnectionFactory; + + @Bean public RedisTemplate redisTemplate(RedisConnectionFactory factory) { RedisTemplate redisTemplate = new RedisTemplate<>(); @@ -24,4 +29,11 @@ public class RedisConfig { return redisTemplate; } + @Bean + public RedisMessageListenerContainer redisContainer() { + final RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + container.setConnectionFactory(redisConnectionFactory); + return container; + } + } diff --git a/backend/src/main/java/io/dataease/listener/RedisMessageSubscriber.java b/backend/src/main/java/io/dataease/listener/RedisMessageSubscriber.java new file mode 100644 index 0000000000..94cb4cf3ae --- /dev/null +++ b/backend/src/main/java/io/dataease/listener/RedisMessageSubscriber.java @@ -0,0 +1,59 @@ +package io.dataease.listener; + +import com.google.gson.Gson; +import io.dataease.commons.condition.RedisStatusCondition; +import io.dataease.commons.constants.RedisConstants; +import io.dataease.commons.model.RedisMessage; +import io.dataease.commons.utils.CommonBeanFactory; +import io.dataease.commons.utils.LogUtil; +import io.dataease.service.redis.RedisMessageBroadcast; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.annotation.Conditional; +import org.springframework.context.event.EventListener; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.listener.ChannelTopic; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; +import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + + +@Conditional({RedisStatusCondition.class}) +@Service +public class RedisMessageSubscriber implements MessageListener { + + @Resource + private RedisMessageListenerContainer redisMessageListenerContainer; + + private static final Gson json = new Gson(); + + @Autowired + private RedisTemplate redisTemplate; + + /** + * 启动之后订阅 topic + */ + @EventListener + public void init(ApplicationReadyEvent event) { + String topic = RedisConstants.GLOBAL_REDIS_TOPIC; + LogUtil.info("Subscribe Topic: " + topic); + redisMessageListenerContainer.addMessageListener(new MessageListenerAdapter(this), new ChannelTopic(topic)); + } + + /** + * @param message 消息内容 + * @param pattern 暂时用不到 + */ + public void onMessage(final Message message, final byte[] pattern) { + + + RedisMessage redisMessage = json.fromJson(message.toString(), RedisMessage.class); + + RedisMessageBroadcast service = (RedisMessageBroadcast)CommonBeanFactory.getBean(redisMessage.getType()); + service.messageCallBack(redisMessage.getData()); + } +} diff --git a/backend/src/main/java/io/dataease/service/redis/RedisMessageBroadcast.java b/backend/src/main/java/io/dataease/service/redis/RedisMessageBroadcast.java new file mode 100644 index 0000000000..8885301888 --- /dev/null +++ b/backend/src/main/java/io/dataease/service/redis/RedisMessageBroadcast.java @@ -0,0 +1,6 @@ +package io.dataease.service.redis; + +public interface RedisMessageBroadcast { + + void messageCallBack(T arg); +} diff --git a/backend/src/main/java/io/dataease/service/redis/impl/PluginMsgService.java b/backend/src/main/java/io/dataease/service/redis/impl/PluginMsgService.java new file mode 100644 index 0000000000..05751316b0 --- /dev/null +++ b/backend/src/main/java/io/dataease/service/redis/impl/PluginMsgService.java @@ -0,0 +1,13 @@ +package io.dataease.service.redis.impl; + +import io.dataease.service.redis.RedisMessageBroadcast; +import org.springframework.stereotype.Service; + +@Service +public class PluginMsgService implements RedisMessageBroadcast { + + @Override + public void messageCallBack(Object arg) { + + } +} diff --git a/backend/src/main/java/io/dataease/service/redis/impl/WsMsgService.java b/backend/src/main/java/io/dataease/service/redis/impl/WsMsgService.java new file mode 100644 index 0000000000..8af98c3ce1 --- /dev/null +++ b/backend/src/main/java/io/dataease/service/redis/impl/WsMsgService.java @@ -0,0 +1,29 @@ +package io.dataease.service.redis.impl; + +import com.google.gson.Gson; +import io.dataease.service.redis.RedisMessageBroadcast; +import io.dataease.websocket.entity.WsMessage; +import io.dataease.websocket.service.impl.StandaloneWsService; +import io.dataease.websocket.util.WsUtil; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.Map; + +@Service +public class WsMsgService implements RedisMessageBroadcast { + + private static Gson json = new Gson(); + + @Autowired + private StandaloneWsService standaloneWsService; + + @Override + public void messageCallBack(Map arg) { + WsMessage message = json.fromJson(json.toJson(arg), WsMessage.class); + Long userId = message.getUserId(); + if (WsUtil.isOnLine(userId)) { + standaloneWsService.releaseMessage(message); + } + } +} diff --git a/backend/src/main/java/io/dataease/websocket/config/WsConfig.java b/backend/src/main/java/io/dataease/websocket/config/WsConfig.java index b182a1f25a..74d27cc644 100644 --- a/backend/src/main/java/io/dataease/websocket/config/WsConfig.java +++ b/backend/src/main/java/io/dataease/websocket/config/WsConfig.java @@ -1,5 +1,7 @@ package io.dataease.websocket.config; +import io.dataease.websocket.factory.DeWsHandlerFactory; +import io.dataease.websocket.handler.PrincipalHandshakeHandler; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; @@ -15,7 +17,10 @@ public class WsConfig implements WebSocketMessageBrokerConfigurer { @Override public void registerStompEndpoints(StompEndpointRegistry registry) { - registry.addEndpoint("/websocket").setAllowedOriginPatterns("*").withSockJS(); + registry.addEndpoint("/websocket") + .setAllowedOriginPatterns("*") + .setHandshakeHandler(new PrincipalHandshakeHandler()) + .withSockJS(); } @Override @@ -26,6 +31,7 @@ public class WsConfig implements WebSocketMessageBrokerConfigurer { @Override public void configureWebSocketTransport(WebSocketTransportRegistration registry) { + registry.addDecoratorFactory(new DeWsHandlerFactory()); registry.setMessageSizeLimit(8192) //设置消息字节数大小 .setSendBufferSizeLimit(8192)//设置消息缓存大小 .setSendTimeLimit(10000); //设置消息发送时间限制毫秒 diff --git a/backend/src/main/java/io/dataease/websocket/entity/DePrincipal.java b/backend/src/main/java/io/dataease/websocket/entity/DePrincipal.java new file mode 100644 index 0000000000..500c24718c --- /dev/null +++ b/backend/src/main/java/io/dataease/websocket/entity/DePrincipal.java @@ -0,0 +1,17 @@ +package io.dataease.websocket.entity; + +import java.security.Principal; + +public class DePrincipal implements Principal { + + public DePrincipal(String name) { + this.name = name; + } + + private String name; + + @Override + public String getName() { + return name; + } +} diff --git a/backend/src/main/java/io/dataease/websocket/factory/DeWebSocketHandlerDecorator.java b/backend/src/main/java/io/dataease/websocket/factory/DeWebSocketHandlerDecorator.java new file mode 100644 index 0000000000..dfe5dad19a --- /dev/null +++ b/backend/src/main/java/io/dataease/websocket/factory/DeWebSocketHandlerDecorator.java @@ -0,0 +1,33 @@ +package io.dataease.websocket.factory; + +import io.dataease.websocket.util.WsUtil; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.WebSocketHandlerDecorator; + +public class DeWebSocketHandlerDecorator extends WebSocketHandlerDecorator { + + + public DeWebSocketHandlerDecorator(WebSocketHandler delegate) { + super(delegate); + } + + + @Override + public void afterConnectionEstablished(WebSocketSession session) throws Exception { + String name = session.getPrincipal().getName(); + Long userId = Long.parseLong(name); + WsUtil.onLine(userId); + super.afterConnectionEstablished(session); + } + + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { + String name = session.getPrincipal().getName(); + Long userId = Long.parseLong(name); + WsUtil.offLine(userId); + super.afterConnectionClosed(session, closeStatus); + } + +} diff --git a/backend/src/main/java/io/dataease/websocket/factory/DeWsHandlerFactory.java b/backend/src/main/java/io/dataease/websocket/factory/DeWsHandlerFactory.java new file mode 100644 index 0000000000..cfe601d6c9 --- /dev/null +++ b/backend/src/main/java/io/dataease/websocket/factory/DeWsHandlerFactory.java @@ -0,0 +1,15 @@ +package io.dataease.websocket.factory; + +import org.springframework.web.socket.WebSocketHandler; + +import org.springframework.web.socket.handler.WebSocketHandlerDecorator; +import org.springframework.web.socket.handler.WebSocketHandlerDecoratorFactory; + +public class DeWsHandlerFactory implements WebSocketHandlerDecoratorFactory { + + + @Override + public WebSocketHandler decorate(WebSocketHandler webSocketHandler) { + return new DeWebSocketHandlerDecorator(webSocketHandler); + } +} diff --git a/backend/src/main/java/io/dataease/websocket/handler/PrincipalHandshakeHandler.java b/backend/src/main/java/io/dataease/websocket/handler/PrincipalHandshakeHandler.java new file mode 100644 index 0000000000..1e456332bd --- /dev/null +++ b/backend/src/main/java/io/dataease/websocket/handler/PrincipalHandshakeHandler.java @@ -0,0 +1,30 @@ +package io.dataease.websocket.handler; + +import io.dataease.websocket.entity.DePrincipal; +import org.apache.commons.lang3.StringUtils; +import org.springframework.http.server.ServerHttpRequest; +import org.springframework.http.server.ServletServerHttpRequest; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.server.support.DefaultHandshakeHandler; + +import javax.servlet.http.HttpServletRequest; +import java.security.Principal; +import java.util.Map; + +public class PrincipalHandshakeHandler extends DefaultHandshakeHandler { + + @Override + protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map attributes) { + if (request instanceof ServletServerHttpRequest) { + ServletServerHttpRequest servletServerHttpRequest = (ServletServerHttpRequest) request; + HttpServletRequest httpRequest = servletServerHttpRequest.getServletRequest(); + final String userId = httpRequest.getParameter("userId"); + if (StringUtils.isEmpty(userId)) { + return null; + } + return new DePrincipal(userId); + } + return null; + //return super.determineUser(request, wsHandler, attributes); + } +} diff --git a/backend/src/main/java/io/dataease/websocket/service/WsService.java b/backend/src/main/java/io/dataease/websocket/service/WsService.java index d9e7bac1b8..922e61de96 100644 --- a/backend/src/main/java/io/dataease/websocket/service/WsService.java +++ b/backend/src/main/java/io/dataease/websocket/service/WsService.java @@ -1,39 +1,11 @@ package io.dataease.websocket.service; import io.dataease.websocket.entity.WsMessage; -import org.apache.commons.lang3.ObjectUtils; -import org.springframework.messaging.simp.SimpMessagingTemplate; -import org.springframework.stereotype.Component; - -import javax.annotation.Resource; -import java.util.List; -import java.util.Optional; - -@Component -public class WsService { +public interface WsService { - @Resource - private SimpMessagingTemplate messagingTemplate; - - - - - public void releaseMessage (List wsMessages) { - Optional.ofNullable(wsMessages).ifPresent(messages -> { - messages.forEach(this::releaseMessage); - }); - } - - public void releaseMessage(WsMessage wsMessage){ - if(ObjectUtils.isEmpty(wsMessage) || ObjectUtils.isEmpty(wsMessage.getUserId()) || ObjectUtils.isEmpty(wsMessage.getTopic())) return; - - - messagingTemplate.convertAndSendToUser(String.valueOf(wsMessage.getUserId()), wsMessage.getTopic(),wsMessage.getData()); - - - } + void releaseMessage(WsMessage wsMessage); } diff --git a/backend/src/main/java/io/dataease/websocket/service/impl/DistributedWsService.java b/backend/src/main/java/io/dataease/websocket/service/impl/DistributedWsService.java new file mode 100644 index 0000000000..b2c8a98100 --- /dev/null +++ b/backend/src/main/java/io/dataease/websocket/service/impl/DistributedWsService.java @@ -0,0 +1,34 @@ +package io.dataease.websocket.service.impl; + +import io.dataease.commons.condition.RedisStatusCondition; +import io.dataease.commons.constants.RedisConstants; +import io.dataease.commons.model.RedisMessage; +import io.dataease.websocket.entity.WsMessage; +import io.dataease.websocket.service.WsService; +import org.apache.commons.lang3.ObjectUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Conditional; +import org.springframework.context.annotation.Primary; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +@Component +@Conditional({RedisStatusCondition.class}) +@Primary +public class DistributedWsService implements WsService { + + @Autowired + private RedisTemplate redisTemplate; + + + public void releaseMessage(WsMessage wsMessage){ + if(ObjectUtils.isEmpty(wsMessage) || ObjectUtils.isEmpty(wsMessage.getUserId()) || ObjectUtils.isEmpty(wsMessage.getTopic())) return; + + RedisMessage msg = new RedisMessage(); + msg.setType(RedisConstants.WEBSOCKET_MSG); + msg.setData(wsMessage); + redisTemplate.convertAndSend(RedisConstants.GLOBAL_REDIS_TOPIC, msg); + } + + +} diff --git a/backend/src/main/java/io/dataease/websocket/service/impl/StandaloneWsService.java b/backend/src/main/java/io/dataease/websocket/service/impl/StandaloneWsService.java new file mode 100644 index 0000000000..1e37477a70 --- /dev/null +++ b/backend/src/main/java/io/dataease/websocket/service/impl/StandaloneWsService.java @@ -0,0 +1,21 @@ +package io.dataease.websocket.service.impl; + +import io.dataease.websocket.entity.WsMessage; +import io.dataease.websocket.service.WsService; +import org.apache.commons.lang3.ObjectUtils; +import org.springframework.messaging.simp.SimpMessagingTemplate; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +@Service +public class StandaloneWsService implements WsService { + + @Resource + private SimpMessagingTemplate messagingTemplate; + + public void releaseMessage(WsMessage wsMessage){ + if(ObjectUtils.isEmpty(wsMessage) || ObjectUtils.isEmpty(wsMessage.getUserId()) || ObjectUtils.isEmpty(wsMessage.getTopic())) return; + messagingTemplate.convertAndSendToUser(String.valueOf(wsMessage.getUserId()), wsMessage.getTopic(),wsMessage.getData()); + } +} diff --git a/backend/src/main/java/io/dataease/websocket/util/WsUtil.java b/backend/src/main/java/io/dataease/websocket/util/WsUtil.java new file mode 100644 index 0000000000..877bb97d6d --- /dev/null +++ b/backend/src/main/java/io/dataease/websocket/util/WsUtil.java @@ -0,0 +1,46 @@ +package io.dataease.websocket.util; + +import io.dataease.auth.api.dto.CurrentUserDto; +import io.dataease.commons.utils.AuthUtils; + +import org.apache.commons.lang3.ObjectUtils; + +import java.util.concurrent.CopyOnWriteArraySet; + +public class WsUtil { + + private static final CopyOnWriteArraySet ONLINE_USERS = new CopyOnWriteArraySet(); + + public static boolean onLine() { + CurrentUserDto user = AuthUtils.getUser(); + if (ObjectUtils.isNotEmpty(user) && ObjectUtils.isNotEmpty(user.getUserId())) + return onLine(user.getUserId()); + return false; + } + + public static boolean onLine(Long userId) { + return ONLINE_USERS.add(userId); + } + + public static boolean offLine() { + CurrentUserDto user = AuthUtils.getUser(); + if (ObjectUtils.isNotEmpty(user) && ObjectUtils.isNotEmpty(user.getUserId())) + return offLine(user.getUserId()); + return false; + } + + public static boolean offLine(Long userId) { + return ONLINE_USERS.remove(userId); + } + + public static boolean isOnLine(Long userId) { + return ONLINE_USERS.contains(userId); + } + + /*public static void releaseMessage(WsMessage wsMessage){ + if(ObjectUtils.isEmpty(wsMessage) || ObjectUtils.isEmpty(wsMessage.getUserId()) || ObjectUtils.isEmpty(wsMessage.getTopic())) return; + CommonBeanFactory.getBean() + }*/ + + +} diff --git a/frontend/src/components/Notification/index.vue b/frontend/src/components/Notification/index.vue index dd3a32a8b2..1d70e611c3 100644 --- a/frontend/src/components/Notification/index.vue +++ b/frontend/src/components/Notification/index.vue @@ -48,7 +48,7 @@
- + diff --git a/frontend/src/websocket/index.js b/frontend/src/websocket/index.js index 7787e1ba58..25c3254b28 100644 --- a/frontend/src/websocket/index.js +++ b/frontend/src/websocket/index.js @@ -46,11 +46,12 @@ class DeWebsocket { } connection() { - const socket = new SockJS(this.ws_url) - /* const socket = new SockJS('http://localhost:8081' + this.ws_url) */ if (!this.isLoginStatu()) { return } + const socket = new SockJS(this.ws_url + '?userId=' + store.state.user.user.userId) + /* const socket = new SockJS('http://localhost:8081' + this.ws_url) */ + this.client = Stomp.over(socket) const heads = { /* Authorization: '', */