ソースを参照

新增websocket实现

zhouhao 8 年 前
コミット
d794364eb5
17 ファイル変更805 行追加0 行削除
  1. 54 0
      hsweb-message/hsweb-message-websocket/pom.xml
  2. 21 0
      hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/WebSocketCommand.java
  3. 22 0
      hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/WebSocketSessionListener.java
  4. 30 0
      hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/handler/CommandRequest.java
  5. 105 0
      hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/handler/CommandWebSocketMessageDispatcher.java
  6. 48 0
      hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/handler/WebSocketUtils.java
  7. 111 0
      hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/message/DefaultWebSocketMessager.java
  8. 59 0
      hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/message/WebSocketMessage.java
  9. 18 0
      hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/message/WebSocketMessager.java
  10. 42 0
      hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/processor/DefaultWebSocketProcessorContainer.java
  11. 18 0
      hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/processor/WebSocketProcessor.java
  12. 17 0
      hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/processor/WebSocketProcessorContainer.java
  13. 80 0
      hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/starter/CommandWebSocketAutoConfiguration.java
  14. 3 0
      hsweb-message/hsweb-message-websocket/src/main/resources/META-INF/spring.factories
  15. 112 0
      hsweb-message/hsweb-message-websocket/src/test/java/org/hswebframework/web/socket/TestProcessor.java
  16. 30 0
      hsweb-message/hsweb-message-websocket/src/test/java/org/hswebframework/web/socket/WebSocketClientTest.java
  17. 35 0
      hsweb-message/hsweb-message-websocket/src/test/java/org/hswebframework/web/socket/WebSocketServerTests.java

+ 54 - 0
hsweb-message/hsweb-message-websocket/pom.xml

@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>hsweb-message</artifactId>
+        <groupId>org.hswebframework.web</groupId>
+        <version>3.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>hsweb-message-websocket</artifactId>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.hswebframework.web</groupId>
+            <artifactId>hsweb-message-redis</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.hswebframework.web</groupId>
+            <artifactId>hsweb-commons-utils</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.hswebframework.web</groupId>
+            <artifactId>hsweb-message-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-websocket</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.hswebframework.web</groupId>
+            <artifactId>hsweb-authorization-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.fusesource</groupId>
+            <artifactId>sigar</artifactId>
+            <version>1.6.4</version>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+        </dependency>
+    </dependencies>
+</project>

+ 21 - 0
hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/WebSocketCommand.java

@@ -0,0 +1,21 @@
+package org.hswebframework.web.socket;
+
+import org.hswebframework.web.authorization.Authentication;
+import org.springframework.web.socket.WebSocketSession;
+
+import java.util.Map;
+
+/**
+ * TODO 完成注释
+ *
+ * @author zhouhao
+ */
+public interface WebSocketCommand {
+    String getCommand();
+
+    Authentication getAuthentication();
+
+    Map<String, Object> getParameters();
+
+    WebSocketSession getSession();
+}

+ 22 - 0
hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/WebSocketSessionListener.java

@@ -0,0 +1,22 @@
+package org.hswebframework.web.socket;
+
+import org.springframework.web.socket.WebSocketSession;
+
+public interface WebSocketSessionListener {
+    /**
+     * 当session创建时,调用此方法
+     *
+     * @param session WebSocketSession 实例
+     * @throws Exception
+     */
+    void onSessionConnect(WebSocketSession session);
+
+    /**
+     * 当session关闭时,调用此方法
+     *
+     * @param session WebSocketSession 实例
+     * @throws Exception
+     */
+    void onSessionClose(WebSocketSession session);
+
+}

+ 30 - 0
hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/handler/CommandRequest.java

@@ -0,0 +1,30 @@
+package org.hswebframework.web.socket.handler;
+
+import java.util.Map;
+
+/**
+ * TODO 完成注释
+ *
+ * @author zhouhao
+ */
+public class CommandRequest {
+    private String command;
+
+    private Map<String, Object> parameters;
+
+    public String getCommand() {
+        return command;
+    }
+
+    public void setCommand(String command) {
+        this.command = command;
+    }
+
+    public Map<String, Object> getParameters() {
+        return parameters;
+    }
+
+    public void setParameters(Map<String, Object> parameters) {
+        this.parameters = parameters;
+    }
+}

+ 105 - 0
hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/handler/CommandWebSocketMessageDispatcher.java

@@ -0,0 +1,105 @@
+package org.hswebframework.web.socket.handler;
+
+import com.alibaba.fastjson.JSON;
+import org.hswebframework.web.authorization.Authentication;
+import org.hswebframework.web.authorization.container.AuthenticationContainer;
+import org.hswebframework.web.socket.WebSocketCommand;
+import org.hswebframework.web.socket.WebSocketSessionListener;
+import org.hswebframework.web.socket.message.WebSocketMessage;
+import org.hswebframework.web.socket.processor.WebSocketProcessor;
+import org.hswebframework.web.socket.processor.WebSocketProcessorContainer;
+import org.springframework.util.StringUtils;
+import org.springframework.web.socket.CloseStatus;
+import org.springframework.web.socket.TextMessage;
+import org.springframework.web.socket.WebSocketSession;
+import org.springframework.web.socket.handler.TextWebSocketHandler;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author zhouhao
+ */
+public class CommandWebSocketMessageDispatcher extends TextWebSocketHandler {
+
+    private WebSocketProcessorContainer processorContainer;
+
+    private AuthenticationContainer authenticationContainer;
+
+    private List<WebSocketSessionListener> webSocketSessionListeners;
+
+    public void setWebSocketSessionListeners(List<WebSocketSessionListener> webSocketSessionListeners) {
+        this.webSocketSessionListeners = webSocketSessionListeners;
+    }
+
+    public void setAuthenticationContainer(AuthenticationContainer authenticationContainer) {
+        this.authenticationContainer = authenticationContainer;
+    }
+
+    public void setProcessorContainer(WebSocketProcessorContainer processorContainer) {
+        this.processorContainer = processorContainer;
+    }
+
+    private static final TextMessage requestFormatErrorMessage = new TextMessage(new WebSocketMessage(400, "message format error!").toString());
+
+    private static final TextMessage commandNotFoundMessage = new TextMessage(new WebSocketMessage(404, "command not found!").toString());
+
+    @Override
+    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
+        String payload = message.getPayload();
+        if (StringUtils.isEmpty(payload)) return;
+        try {
+            CommandRequest request = JSON.parseObject(payload, CommandRequest.class);
+            WebSocketCommand command = buildCommand(request, session);
+            WebSocketProcessor processor = processorContainer.getProcessor(command.getCommand());
+            if (processor != null) {
+                processor.execute(command);
+            } else {
+                session.sendMessage(commandNotFoundMessage);
+            }
+        } catch (Exception e) {
+            session.sendMessage(requestFormatErrorMessage);
+        }
+    }
+
+    private Authentication getAuthenticationFromSession(WebSocketSession socketSession) {
+        if (null == authenticationContainer) return null;
+        return WebSocketUtils.getAuthentication(authenticationContainer, socketSession);
+    }
+
+    private WebSocketCommand buildCommand(CommandRequest request, WebSocketSession socketSession) {
+        return new WebSocketCommand() {
+            @Override
+            public String getCommand() {
+                return request.getCommand();
+            }
+
+            @Override
+            public Authentication getAuthentication() {
+                return getAuthenticationFromSession(socketSession);
+            }
+
+            @Override
+            public Map<String, Object> getParameters() {
+                return request.getParameters();
+            }
+
+            @Override
+            public WebSocketSession getSession() {
+                return socketSession;
+            }
+        };
+    }
+
+    @Override
+    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
+        if (webSocketSessionListeners != null) webSocketSessionListeners.forEach(webSocketSessionListener ->
+                webSocketSessionListener.onSessionConnect(session));
+    }
+
+    @Override
+    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
+        if (webSocketSessionListeners != null) webSocketSessionListeners.forEach(webSocketSessionListener ->
+                webSocketSessionListener.onSessionClose(session));
+    }
+}

+ 48 - 0
hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/handler/WebSocketUtils.java

@@ -0,0 +1,48 @@
+package org.hswebframework.web.socket.handler;
+
+import org.hswebframework.web.authorization.Authentication;
+import org.hswebframework.web.authorization.container.AuthenticationContainer;
+import org.springframework.http.HttpHeaders;
+import org.springframework.web.socket.WebSocketSession;
+
+import java.util.*;
+import java.util.function.Function;
+
+/**
+ * TODO 完成注释
+ *
+ * @author zhouhao
+ */
+public class WebSocketUtils {
+
+
+    public static Authentication getAuthentication(AuthenticationContainer container, WebSocketSession session) {
+        Authentication authentication = Authentication
+                .current()
+                .orElseGet(() -> ((Authentication) session.getAttributes().get(Authentication.class.getName())));
+
+        if (authentication != null) return authentication;
+        HttpHeaders headers = session.getHandshakeHeaders();
+        List<String> cookies = headers.get("Cookie");
+        if (cookies == null || cookies.isEmpty()) {
+            return null;
+        }
+        String[] cookie = cookies.get(0).split("[;]");
+        Map<String, Set<String>> sessionId = new HashMap<>();
+        for (String aCookie : cookie) {
+            String[] tmp = aCookie.split("[=]");
+            if (tmp.length == 2)
+                sessionId.computeIfAbsent(tmp[0].trim(), k -> new HashSet<>())
+                        .add(tmp[1].trim());
+        }
+
+        Function<Set<String>, Optional<Authentication>> userGetter = set ->
+                set == null ? Optional.empty() : set.stream()
+                        .map(container::getAuthenticationBySessionId)
+                        .filter(Objects::nonNull).findFirst();
+
+        return userGetter.apply(sessionId.get("SESSION"))
+                .orElseGet(() -> userGetter.apply(sessionId.get("JSESSIONID")).orElse(null));
+
+    }
+}

+ 111 - 0
hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/message/DefaultWebSocketMessager.java

@@ -0,0 +1,111 @@
+package org.hswebframework.web.socket.message;
+
+import org.hswebframework.web.message.MessageSubscribe;
+import org.hswebframework.web.message.Messager;
+import org.hswebframework.web.message.support.ObjectMessage;
+import org.springframework.web.socket.TextMessage;
+import org.springframework.web.socket.WebSocketSession;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.hswebframework.web.message.builder.StaticMessageBuilder.object;
+import static org.hswebframework.web.message.builder.StaticMessageSubjectBuilder.*;
+
+/**
+ * TODO 完成注释
+ *
+ * @author zhouhao
+ */
+public class DefaultWebSocketMessager implements WebSocketMessager {
+
+    private Messager messager;
+
+    public DefaultWebSocketMessager(Messager messager) {
+        this.messager = messager;
+    }
+
+    // command,   userId,     sessionId
+    private final Map<String, Map<String, Map<String, MessageSubscribeSession>>> store = new ConcurrentHashMap<>(32);
+
+    @Override
+    public void onSessionConnect(WebSocketSession session) {
+
+    }
+
+    @Override
+    public void onSessionClose(WebSocketSession session) {
+
+    }
+
+    @Override
+    public void publish(String toUser, WebSocketMessage message) {
+        messager.publish(object(message))
+                .to(user(toUser))
+                .send();
+    }
+
+    private Map<String, MessageSubscribeSession> getSubSession(String command, String userId) {
+        return store.computeIfAbsent(command, cmd -> new ConcurrentHashMap<>(128))
+                .computeIfAbsent(userId, uid -> new ConcurrentHashMap<>());
+    }
+
+    @Override
+    public boolean subscribe(String command, String userId, WebSocketSession socketSession) {
+        Map<String, MessageSubscribeSession> subscribeSessionStore = getSubSession(command, userId);
+        subscribeSessionStore.computeIfAbsent(socketSession.getId(), sessionId -> {
+            MessageSubscribe<ObjectMessage<WebSocketMessage>> subscribe = messager.subscribe(user(userId));
+            subscribe.onMessage(message -> {
+                try {
+                    if (!socketSession.isOpen()) {
+                        deSubscribe(command, userId, socketSession);
+                    }
+                    socketSession.sendMessage(new TextMessage(((ObjectMessage) message).getObject().toString()));
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            });
+            return new MessageSubscribeSession(subscribe, socketSession);
+        });
+        return false;
+    }
+
+    @Override
+    public boolean deSubscribe(String command, String userId, WebSocketSession socketSession) {
+        Map<String, MessageSubscribeSession> subscribeSessionStore = getSubSession(command, userId);
+        MessageSubscribeSession subscribeSession = subscribeSessionStore.get(socketSession.getId());
+        if (null != subscribeSession) {
+            subscribeSession.getSubscribe().cancel();
+            subscribeSessionStore.remove(socketSession.getId());
+        }
+        return false;
+    }
+
+    public class MessageSubscribeSession {
+        private MessageSubscribe<ObjectMessage<WebSocketMessage>> subscribe;
+
+        private WebSocketSession session;
+
+        public MessageSubscribeSession(MessageSubscribe<ObjectMessage<WebSocketMessage>> subscribe, WebSocketSession session) {
+            this.subscribe = subscribe;
+            this.session = session;
+        }
+
+        public MessageSubscribe<ObjectMessage<WebSocketMessage>> getSubscribe() {
+            return subscribe;
+        }
+
+        public void setSubscribe(MessageSubscribe<ObjectMessage<WebSocketMessage>> subscribe) {
+            this.subscribe = subscribe;
+        }
+
+        public WebSocketSession getSession() {
+            return session;
+        }
+
+        public void setSession(WebSocketSession session) {
+            this.session = session;
+        }
+    }
+}

+ 59 - 0
hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/message/WebSocketMessage.java

@@ -0,0 +1,59 @@
+package org.hswebframework.web.socket.message;
+
+import com.alibaba.fastjson.JSON;
+
+import java.io.Serializable;
+
+/**
+ * @author zhouhao
+ */
+public class WebSocketMessage implements Serializable {
+    private int code;
+
+    private String message;
+
+    private Object data;
+
+    public int getCode() {
+        return code;
+    }
+
+    public void setCode(int code) {
+        this.code = code;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public void setMessage(String message) {
+        this.message = message;
+    }
+
+    public Object getData() {
+        return data;
+    }
+
+    public void setData(Object data) {
+        this.data = data;
+    }
+
+    @Override
+    public String toString() {
+        return JSON.toJSONString(this);
+    }
+
+    public WebSocketMessage() {
+    }
+
+    public WebSocketMessage(int code, String message) {
+        this.code = code;
+        this.message = message;
+    }
+
+    public WebSocketMessage(int code, String message, Object data) {
+        this.code = code;
+        this.message = message;
+        this.data = data;
+    }
+}

+ 18 - 0
hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/message/WebSocketMessager.java

@@ -0,0 +1,18 @@
+package org.hswebframework.web.socket.message;
+
+import org.hswebframework.web.socket.WebSocketSessionListener;
+import org.springframework.web.socket.WebSocketSession;
+
+/**
+ * TODO 完成注释
+ *
+ * @author zhouhao
+ */
+public interface WebSocketMessager extends WebSocketSessionListener {
+    void publish(String toUser, WebSocketMessage message);
+
+    boolean subscribe(String command, String userId, WebSocketSession socketSession);
+
+    boolean deSubscribe(String command, String userId, WebSocketSession socketSession);
+
+}

+ 42 - 0
hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/processor/DefaultWebSocketProcessorContainer.java

@@ -0,0 +1,42 @@
+package org.hswebframework.web.socket.processor;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * @author zhouhao
+ */
+public class DefaultWebSocketProcessorContainer implements WebSocketProcessorContainer {
+
+    private final ConcurrentMap<String, WebSocketProcessor> processorStore = new ConcurrentHashMap<>();
+
+    @Override
+    public WebSocketProcessor install(WebSocketProcessor command) {
+        command.init();
+        return processorStore.put(command.getName(), command);
+    }
+
+    @Override
+    public WebSocketProcessor uninstall(String name) {
+        WebSocketProcessor processor = processorStore.remove(name);
+        if (null != processor) processor.destroy();
+        return processor;
+    }
+
+    public void destroy() {
+        getAllProcessor().forEach(WebSocketProcessor::destroy);
+        processorStore.clear();
+    }
+
+    @Override
+    public WebSocketProcessor getProcessor(String name) {
+        return processorStore.get(name);
+    }
+
+    @Override
+    public List<WebSocketProcessor> getAllProcessor() {
+        return new ArrayList<>(processorStore.values());
+    }
+}

+ 18 - 0
hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/processor/WebSocketProcessor.java

@@ -0,0 +1,18 @@
+package org.hswebframework.web.socket.processor;
+
+import org.hswebframework.web.socket.WebSocketCommand;
+
+/**
+ * TODO 完成注释
+ *
+ * @author zhouhao
+ */
+public interface WebSocketProcessor {
+    String getName();
+
+    void execute(WebSocketCommand command);
+
+    void init();
+
+    void destroy();
+}

+ 17 - 0
hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/processor/WebSocketProcessorContainer.java

@@ -0,0 +1,17 @@
+package org.hswebframework.web.socket.processor;
+
+
+import java.util.List;
+
+/**
+ * @author zhouhao
+ */
+public interface WebSocketProcessorContainer {
+    WebSocketProcessor install(WebSocketProcessor command);
+
+    WebSocketProcessor uninstall(String name);
+
+    WebSocketProcessor getProcessor(String name);
+
+    List<WebSocketProcessor> getAllProcessor();
+}

+ 80 - 0
hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/starter/CommandWebSocketAutoConfiguration.java

@@ -0,0 +1,80 @@
+package org.hswebframework.web.socket.starter;
+
+import org.hswebframework.web.authorization.container.AuthenticationContainer;
+import org.hswebframework.web.message.Messager;
+import org.hswebframework.web.socket.WebSocketSessionListener;
+import org.hswebframework.web.socket.handler.CommandWebSocketMessageDispatcher;
+import org.hswebframework.web.socket.message.DefaultWebSocketMessager;
+import org.hswebframework.web.socket.message.WebSocketMessager;
+import org.hswebframework.web.socket.processor.DefaultWebSocketProcessorContainer;
+import org.hswebframework.web.socket.processor.WebSocketProcessor;
+import org.hswebframework.web.socket.processor.WebSocketProcessorContainer;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.socket.config.annotation.WebSocketConfigurationSupport;
+import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
+
+import java.util.List;
+
+/**
+ * TODO 完成注释
+ *
+ * @author zhouhao
+ */
+@Configuration
+public class CommandWebSocketAutoConfiguration {
+
+    @Configuration
+    @ConditionalOnMissingBean(WebSocketProcessorContainer.class)
+    public static class WebSocketProcessorContainerConfiguration {
+        @Autowired(required = false)
+        private List<WebSocketProcessor> webSocketProcessors;
+
+        @Bean(destroyMethod = "destroy")
+        public DefaultWebSocketProcessorContainer defaultWebSocketProcessorContainer() {
+            DefaultWebSocketProcessorContainer container = new DefaultWebSocketProcessorContainer();
+            if (webSocketProcessors != null) {
+                webSocketProcessors.forEach(container::install);
+            }
+            return container;
+        }
+    }
+
+    @Configuration
+    @ConditionalOnBean(Messager.class)
+    @ConditionalOnMissingBean(WebSocketMessager.class)
+    public static class WebSocketMessagerConfiguration {
+        @Bean
+        public WebSocketMessager webSocketMessager(Messager messager) {
+            return new DefaultWebSocketMessager(messager);
+        }
+    }
+
+    @Configuration
+    public static class HandlerConfigruation extends WebSocketConfigurationSupport {
+        @Autowired(required = false)
+        private AuthenticationContainer authenticationContainer;
+
+        @Autowired(required = false)
+        private List<WebSocketSessionListener> webSocketSessionListeners;
+
+        @Autowired
+        private WebSocketProcessorContainer webSocketProcessorContainer;
+
+        @Override
+        protected void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
+            CommandWebSocketMessageDispatcher dispatcher = new CommandWebSocketMessageDispatcher();
+            dispatcher.setProcessorContainer(webSocketProcessorContainer);
+            dispatcher.setAuthenticationContainer(authenticationContainer);
+            dispatcher.setWebSocketSessionListeners(webSocketSessionListeners);
+            registry.addHandler(dispatcher, "/sockjs")
+                    .withSockJS()
+                    .setSessionCookieNeeded(true);
+            registry.addHandler(dispatcher, "/socket");
+        }
+    }
+
+}

+ 3 - 0
hsweb-message/hsweb-message-websocket/src/main/resources/META-INF/spring.factories

@@ -0,0 +1,3 @@
+# Auto Configure
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
+org.hswebframework.web.socket.starter.CommandWebSocketAutoConfiguration

+ 112 - 0
hsweb-message/hsweb-message-websocket/src/test/java/org/hswebframework/web/socket/TestProcessor.java

@@ -0,0 +1,112 @@
+package org.hswebframework.web.socket;
+
+import org.hswebframework.web.message.MessageSubscribe;
+import org.hswebframework.web.message.Messager;
+import org.hswebframework.web.message.builder.StaticMessageBuilder;
+import org.hswebframework.web.message.builder.StaticMessageSubjectBuilder;
+import org.hswebframework.web.message.support.ObjectMessage;
+import org.hswebframework.web.socket.message.WebSocketMessage;
+import org.hswebframework.web.socket.message.WebSocketMessager;
+import org.hswebframework.web.socket.processor.WebSocketProcessor;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.web.socket.TextMessage;
+import org.springframework.web.socket.WebSocketSession;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.hswebframework.web.message.builder.StaticMessageBuilder.object;
+import static org.hswebframework.web.message.builder.StaticMessageSubjectBuilder.topic;
+
+/**
+ * TODO 完成注释
+ *
+ * @author zhouhao
+ */
+public class TestProcessor implements WebSocketProcessor, WebSocketSessionListener {
+
+    @Autowired
+    private Messager messager;
+    Map<String, MessageSubscribe<ObjectMessage<WebSocketMessage>>>
+            store = new ConcurrentHashMap<>();
+
+    @Override
+    public String getName() {
+        return "test";
+    }
+
+    public void sub(WebSocketSession socketSession) {
+        MessageSubscribe<ObjectMessage<WebSocketMessage>> subscribe =
+                store.get(socketSession.getId());
+        if (subscribe != null) return;
+        store.put(socketSession.getId(), messager.<ObjectMessage<WebSocketMessage>>subscribe(topic("test"))
+                .onMessage(message -> {
+                    try {
+                        if (!socketSession.isOpen()) {
+                            desub(socketSession);
+                            return;
+                        }
+                        socketSession.sendMessage(new TextMessage(message.getObject().toString()));
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                    }
+                }));
+    }
+
+    public void desub(WebSocketSession socketSession) {
+        MessageSubscribe<ObjectMessage<WebSocketMessage>> subscribe =
+                store.get(socketSession.getId());
+        if (subscribe == null) return;
+        subscribe.cancel();
+        store.remove(socketSession.getId());
+    }
+
+    @Override
+    public void execute(WebSocketCommand command) {
+        String type = String.valueOf(command.getParameters().get("type"));
+        switch (type) {
+            case "conn":
+                sub(command.getSession());
+                break;
+            case "close": {
+                desub(command.getSession());
+            }
+        }
+    }
+
+    @Override
+    public void init() {
+        new Thread(() -> {
+            long total = 0;
+            while (true) {
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+                messager.publish(object(new WebSocketMessage(200, "hello" + total++)))
+                        .to(topic("test"))
+                        .send();
+                System.out.println(total);
+            }
+        }).start();
+    }
+
+    @Override
+    public void destroy() {
+        store.values().forEach(MessageSubscribe::cancel);
+        store.clear();
+    }
+
+    @Override
+    public void onSessionConnect(WebSocketSession session) {
+
+    }
+
+    @Override
+    public void onSessionClose(WebSocketSession session) {
+        desub(session);
+    }
+}

+ 30 - 0
hsweb-message/hsweb-message-websocket/src/test/java/org/hswebframework/web/socket/WebSocketClientTest.java

@@ -0,0 +1,30 @@
+package org.hswebframework.web.socket;
+
+import org.springframework.web.socket.TextMessage;
+import org.springframework.web.socket.WebSocketMessage;
+import org.springframework.web.socket.WebSocketSession;
+import org.springframework.web.socket.client.WebSocketClient;
+import org.springframework.web.socket.client.standard.StandardWebSocketClient;
+import org.springframework.web.socket.handler.AbstractWebSocketHandler;
+
+public class WebSocketClientTest {
+
+    public static void main(String[] args) throws Exception {
+        WebSocketClient client = new StandardWebSocketClient();
+        String url = "ws://localhost:8080/socket";
+        client.doHandshake(new AbstractWebSocketHandler() {
+            @Override
+            public void afterConnectionEstablished(WebSocketSession session) throws Exception {
+                //链接成功后发送消息
+                System.out.println("发送消息");
+                session.sendMessage(new TextMessage("{\"command\":\"test\",\"parameters\":{\"type\":\"conn\"}}"));
+            }
+
+            @Override
+            public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
+                System.out.println(message.getPayload());
+            }
+        }, url);
+        System.in.read();
+    }
+}

+ 35 - 0
hsweb-message/hsweb-message-websocket/src/test/java/org/hswebframework/web/socket/WebSocketServerTests.java

@@ -0,0 +1,35 @@
+package org.hswebframework.web.socket;
+
+import org.redisson.Redisson;
+import org.redisson.api.RedissonClient;
+import org.redisson.config.Config;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * TODO 完成注释
+ *
+ * @author zhouhao
+ */
+@Configuration
+@EnableAutoConfiguration
+public class WebSocketServerTests {
+
+    @Bean
+    public TestProcessor testProcessor() {
+        return new TestProcessor();
+    }
+
+    @Bean(destroyMethod = "shutdown")
+    public RedissonClient redissonClient() {
+        Config config = new Config();
+        config.useSingleServer().setAddress("127.0.0.1:6379");
+        return Redisson.create(config);
+    }
+
+    public static void main(String[] args) {
+        SpringApplication.run(WebSocketServerTests.class);
+    }
+}