Browse Source

完善消息模块

zhouhao 8 years ago
parent
commit
9cc9ecafa1
18 changed files with 427 additions and 48 deletions
  1. 0 8
      hsweb-message/hsweb-message-api/src/test/java/org/hswebframework/web/message/SimpleMessageSubscribe.java
  2. 26 0
      hsweb-message/hsweb-message-memory/pom.xml
  3. 110 0
      hsweb-message/hsweb-message-memory/src/main/java/org/hswebframework/web/message/memory/MemoryMessager.java
  4. 66 0
      hsweb-message/hsweb-message-memory/src/main/java/org/hswebframework/web/message/memory/MemoryPublish.java
  5. 31 0
      hsweb-message/hsweb-message-memory/src/main/java/org/hswebframework/web/message/memory/MemoryTopic.java
  6. 36 0
      hsweb-message/hsweb-message-memory/src/main/java/org/hswebframework/web/message/memory/MemoryTopicSubscribe.java
  7. 21 0
      hsweb-message/hsweb-message-memory/src/main/java/org/hswebframework/web/message/memory/starter/MemoryMessagerAutoConfiguration.java
  8. 3 0
      hsweb-message/hsweb-message-memory/src/main/resources/META-INF/spring.factories
  9. 52 0
      hsweb-message/hsweb-message-memory/src/test/java/org/hswebframework/web/message/memory/MemoryMessagerTest.java
  10. 3 0
      hsweb-message/hsweb-message-redis/src/main/java/org/hswebframework/web/message/redis/RedissionMessageSubscribe.java
  11. 0 2
      hsweb-message/hsweb-message-redis/src/main/java/org/hswebframework/web/message/redis/RedissonMessagePublish.java
  12. 13 0
      hsweb-message/hsweb-message-websocket/pom.xml
  13. 5 1
      hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/handler/CommandWebSocketMessageDispatcher.java
  14. 16 17
      hsweb-message/hsweb-message-websocket/src/test/java/org/hswebframework/web/socket/TestProcessor.java
  15. 13 14
      hsweb-message/hsweb-message-websocket/src/test/java/org/hswebframework/web/socket/WebSocketClientTest.java
  16. 21 6
      hsweb-message/hsweb-message-websocket/src/test/java/org/hswebframework/web/socket/WebSocketServerTests.java
  17. 10 0
      hsweb-message/hsweb-message-websocket/src/test/resources/application.yml
  18. 1 0
      hsweb-message/pom.xml

+ 0 - 8
hsweb-message/hsweb-message-api/src/test/java/org/hswebframework/web/message/SimpleMessageSubscribe.java

@@ -29,8 +29,6 @@ import java.util.function.Consumer;
  * @author zhouhao
  * @author zhouhao
  */
  */
 public class SimpleMessageSubscribe<T extends Message> implements MessageSubscribe<T> {
 public class SimpleMessageSubscribe<T extends Message> implements MessageSubscribe<T> {
-    MessageSubject im;
-
     MessageSubject subject;
     MessageSubject subject;
 
 
     final List<Consumer<T>> consumers = new ArrayList<>();
     final List<Consumer<T>> consumers = new ArrayList<>();
@@ -46,12 +44,6 @@ public class SimpleMessageSubscribe<T extends Message> implements MessageSubscri
         this.queue = queue;
         this.queue = queue;
     }
     }
 
 
-    @Override
-    public MessageSubscribe<T> iam(MessageSubject iam) {
-        im = iam;
-        return this;
-    }
-
     @Override
     @Override
     public MessageSubscribe<T> onMessage(Consumer<T> consumer) {
     public MessageSubscribe<T> onMessage(Consumer<T> consumer) {
         synchronized (consumers) {
         synchronized (consumers) {

+ 26 - 0
hsweb-message/hsweb-message-memory/pom.xml

@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>hsweb-message</artifactId>
+        <groupId>org.hswebframework.web</groupId>
+        <version>3.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>hsweb-message-memory</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter</artifactId>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>org.hswebframework.web</groupId>
+            <artifactId>hsweb-message-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+</project>

+ 110 - 0
hsweb-message/hsweb-message-memory/src/main/java/org/hswebframework/web/message/memory/MemoryMessager.java

@@ -0,0 +1,110 @@
+package org.hswebframework.web.message.memory;
+
+import org.hswebframework.web.message.*;
+import org.hswebframework.web.message.support.QueueMessageSubject;
+import org.hswebframework.web.message.support.TopicMessageSubject;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
+
+/**
+ * TODO 完成注释
+ *
+ * @author zhouhao
+ */
+public class MemoryMessager implements Messager {
+
+    private Map<String, MemoryTopic<? extends Message>> topicStore = new ConcurrentHashMap<>(256);
+
+    private Map<String, QueueConsumer<? extends Message>> queueStore = new ConcurrentHashMap<>(512);
+
+    private Executor executor;
+
+    public MemoryMessager(Executor executor) {
+        this.executor = executor;
+    }
+
+    public MemoryMessager() {
+        this(Executors.newCachedThreadPool());
+    }
+
+    public void setExecutor(Executor executor) {
+        this.executor = executor;
+    }
+
+    @Override
+    public MessagePublish publish(Message message) {
+        return new MemoryPublish(this::getQueue, this::getTopic, message);
+    }
+
+    @SuppressWarnings("unchecked")
+    private <M extends Message> QueueConsumer<M> getQueue(String name) {
+        return (QueueConsumer) queueStore
+                .computeIfAbsent(name, queueName -> new QueueConsumer<>());
+    }
+
+    @SuppressWarnings("unchecked")
+    public <M extends Message> MemoryTopic<M> getTopic(String name) {
+        return (MemoryTopic) topicStore
+                .computeIfAbsent(name, topic -> new MemoryTopic<>());
+    }
+
+    @Override
+    public <M extends Message> MessageSubscribe<M> subscribe(MessageSubject subject) {
+        if (subject instanceof QueueMessageSubject) {
+            QueueConsumer<M> queue = getQueue(((QueueMessageSubject) subject).getQueueName());
+            return new MessageSubscribe<M>() {
+                private List<Consumer<M>> consumers = new ArrayList<>();
+                private Consumer<M> consumer = m -> consumers.forEach(cons -> cons.accept(m));
+
+                {
+                    queue.lock.writeLock().lock();
+                    try {
+                        queue.consumers.add(consumer);
+                    } finally {
+                        queue.lock.writeLock().unlock();
+                    }
+                }
+
+                @Override
+                public MessageSubscribe<M> onMessage(Consumer<M> consumer) {
+                    consumers.add(consumer);
+                    return this;
+                }
+
+                @Override
+                public void cancel() {
+                    boolean lockSuccess = true;
+                    try {
+                        queue.lock.writeLock().tryLock(5, TimeUnit.SECONDS);
+                    } catch (InterruptedException e) {
+                        lockSuccess = false;
+                    }
+                    try {
+                        queue.consumers.remove(consumer);
+                    } finally {
+                        try {
+                            queue.lock.writeLock().unlock();
+                        } catch (Exception e) {
+                        }
+                    }
+                }
+            };
+        } else if (subject instanceof TopicMessageSubject) {
+            return new MemoryTopicSubscribe<>(topicStore
+                    .computeIfAbsent(((TopicMessageSubject) subject).getTopic(), topic -> new MemoryTopic<>()));
+        }
+        throw new UnsupportedOperationException(subject.getClass().getName());
+    }
+
+    class QueueConsumer<M extends Message> {
+        final List<Consumer<M>> consumers = new ArrayList<>();
+        final ReadWriteLock     lock      = new ReentrantReadWriteLock();
+    }
+}

+ 66 - 0
hsweb-message/hsweb-message-memory/src/main/java/org/hswebframework/web/message/memory/MemoryPublish.java

@@ -0,0 +1,66 @@
+package org.hswebframework.web.message.memory;
+
+import org.hswebframework.web.message.Message;
+import org.hswebframework.web.message.MessagePublish;
+import org.hswebframework.web.message.MessageSubject;
+import org.hswebframework.web.message.support.MultipleQueueMessageSubject;
+import org.hswebframework.web.message.support.QueueMessageSubject;
+import org.hswebframework.web.message.support.TopicMessageSubject;
+
+import java.util.Objects;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+/**
+ * TODO 完成注释
+ *
+ * @author zhouhao
+ */
+public class MemoryPublish implements MessagePublish {
+    private MessageSubject subject;
+
+    private Function<String, MemoryMessager.QueueConsumer<Message>> queueGetter;
+    private Function<String, MemoryTopic<Message>>                  topicGetter;
+    private Message                                                 message;
+
+    public MemoryPublish(Function<String, MemoryMessager.QueueConsumer<Message>> queueGetter
+            , Function<String, MemoryTopic<Message>> topicGetter, Message message) {
+        this.queueGetter = queueGetter;
+        this.topicGetter = topicGetter;
+        this.message = message;
+    }
+
+    @Override
+    public MessagePublish to(MessageSubject subject) {
+        this.subject = subject;
+        return this;
+    }
+
+    private static Random random = new Random();
+
+    private void pubQueue(String name) {
+        final MemoryMessager.QueueConsumer<Message> queueConsumer = queueGetter.apply(name);
+        queueConsumer.lock.readLock().lock();
+        try {
+            int size = queueConsumer.consumers.size();
+            queueConsumer.consumers.get(random.nextInt(size)).accept(message);
+        } finally {
+            queueConsumer.lock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public void send() {
+        Objects.requireNonNull(subject);
+        if (subject instanceof QueueMessageSubject) {
+            pubQueue(((QueueMessageSubject) subject).getQueueName());
+        }
+        if (subject instanceof MultipleQueueMessageSubject) {
+            ((MultipleQueueMessageSubject) subject).getQueueName().forEach(this::pubQueue);
+        }
+        if (subject instanceof TopicMessageSubject) {
+            topicGetter.apply(((TopicMessageSubject) subject).getTopic()).publish(message);
+        }
+    }
+}

+ 31 - 0
hsweb-message/hsweb-message-memory/src/main/java/org/hswebframework/web/message/memory/MemoryTopic.java

@@ -0,0 +1,31 @@
+package org.hswebframework.web.message.memory;
+
+import org.hswebframework.web.message.Message;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+/**
+ * TODO 完成注释
+ *
+ * @author zhouhao
+ */
+class MemoryTopic<M extends Message> {
+    private Map<String, List<Consumer<M>>> consumers = new ConcurrentHashMap<>();
+
+    public void remove(String id) {
+        consumers.remove(id);
+    }
+
+    public void subscribe(String id, Consumer<M> consumer) {
+        consumers.computeIfAbsent(id, i -> new ArrayList<>())
+                .add(consumer);
+    }
+
+    public void publish(M message) {
+        consumers.values().stream().flatMap(List::stream).forEach(consumer -> consumer.accept(message));
+    }
+}

+ 36 - 0
hsweb-message/hsweb-message-memory/src/main/java/org/hswebframework/web/message/memory/MemoryTopicSubscribe.java

@@ -0,0 +1,36 @@
+package org.hswebframework.web.message.memory;
+
+import org.hswebframework.web.message.Message;
+import org.hswebframework.web.message.MessageSubscribe;
+
+import java.util.UUID;
+import java.util.function.Consumer;
+
+/**
+ * TODO 完成注释
+ *
+ * @author zhouhao
+ */
+public class MemoryTopicSubscribe<M extends Message> implements MessageSubscribe<M> {
+
+    private MemoryTopic topic;
+
+    private String id;
+
+    public MemoryTopicSubscribe(MemoryTopic topic) {
+        this.topic = topic;
+        id = UUID.randomUUID().toString();
+    }
+
+    @Override
+    public MessageSubscribe<M> onMessage(Consumer<M> consumer) {
+        topic.subscribe(id, consumer);
+        return this;
+    }
+
+    @Override
+    public void cancel() {
+        topic.remove(id);
+    }
+
+}

+ 21 - 0
hsweb-message/hsweb-message-memory/src/main/java/org/hswebframework/web/message/memory/starter/MemoryMessagerAutoConfiguration.java

@@ -0,0 +1,21 @@
+package org.hswebframework.web.message.memory.starter;
+
+import org.hswebframework.web.message.Messager;
+import org.hswebframework.web.message.memory.MemoryMessager;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * TODO 完成注释
+ *
+ * @author zhouhao
+ */
+@Configuration
+@ConditionalOnMissingBean(Messager.class)
+public class MemoryMessagerAutoConfiguration {
+    @Bean
+    public Messager messager() {
+        return new MemoryMessager();
+    }
+}

+ 3 - 0
hsweb-message/hsweb-message-memory/src/main/resources/META-INF/spring.factories

@@ -0,0 +1,3 @@
+# Auto Configure
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
+org.hswebframework.web.message.memory.starter.MemoryMessagerAutoConfiguration

+ 52 - 0
hsweb-message/hsweb-message-memory/src/test/java/org/hswebframework/web/message/memory/MemoryMessagerTest.java

@@ -0,0 +1,52 @@
+package org.hswebframework.web.message.memory;
+
+import org.hswebframework.web.message.MessageSubscribe;
+import org.hswebframework.web.message.Messager;
+import org.hswebframework.web.message.support.TextMessage;
+import org.junit.Test;
+
+import static org.hswebframework.web.message.builder.StaticMessageBuilder.text;
+import static org.hswebframework.web.message.builder.StaticMessageSubjectBuilder.queue;
+import static org.hswebframework.web.message.builder.StaticMessageSubjectBuilder.topic;
+
+/**
+ * TODO 完成注释
+ *
+ * @author zhouhao
+ */
+public class MemoryMessagerTest {
+
+    private Messager messager = new MemoryMessager();
+
+    @Test
+    public void testQueue() throws InterruptedException {
+        for (int i = 0; i < 5; i++) {
+            int x = i;
+            MessageSubscribe<TextMessage> sub = messager.subscribe(queue("test"));
+            sub.onMessage(msg -> System.out.println(x + msg.getMessage()));
+        }
+        for (int i = 0; i < 10; i++) {
+            Thread.sleep(200);
+            messager.publish(text("hello queue" + i))
+                    .to(queue("test")).send();
+        }
+
+        Thread.sleep(1000);
+    }
+
+    @Test
+    public void testTopic() throws InterruptedException {
+        for (int i = 0; i < 5; i++) {
+            int x = i;
+            MessageSubscribe<TextMessage> sub = messager.subscribe(topic("test"));
+            sub.onMessage(msg -> System.out.println(x + msg.getMessage()));
+        }
+        for (int i = 0; i < 10; i++) {
+            Thread.sleep(200);
+            messager.publish(text("hello queue" + i))
+                    .to(topic("test")).send();
+        }
+
+        Thread.sleep(1000);
+    }
+}

+ 3 - 0
hsweb-message/hsweb-message-redis/src/main/java/org/hswebframework/web/message/redis/RedissionMessageSubscribe.java

@@ -69,6 +69,9 @@ public class RedissionMessageSubscribe<M extends Message> implements MessageSubs
             Thread thread = new Thread(() -> {
             Thread thread = new Thread(() -> {
                 while (running) {
                 while (running) {
                     try {
                     try {
+                        if (redisson.isShutdown() || redisson.isShuttingDown()) {
+                            return;
+                        }
                         countDownLatch.trySetCount(1);
                         countDownLatch.trySetCount(1);
                         countDownLatch.await();
                         countDownLatch.await();
                         consumers.forEach(cons -> {
                         consumers.forEach(cons -> {

+ 0 - 2
hsweb-message/hsweb-message-redis/src/main/java/org/hswebframework/web/message/redis/RedissonMessagePublish.java

@@ -66,6 +66,4 @@ public class RedissonMessagePublish implements MessagePublish {
             topic.publish(message);
             topic.publish(message);
         }
         }
     }
     }
-
-
 }
 }

+ 13 - 0
hsweb-message/hsweb-message-websocket/pom.xml

@@ -19,6 +19,19 @@
             <version>${project.version}</version>
             <version>${project.version}</version>
             <scope>test</scope>
             <scope>test</scope>
         </dependency>
         </dependency>
+        <dependency>
+            <groupId>org.hswebframework.web</groupId>
+            <artifactId>hsweb-message-memory</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.hswebframework.web</groupId>
+            <artifactId>hsweb-message-jms</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
 
 
         <dependency>
         <dependency>
             <groupId>org.hswebframework.web</groupId>
             <groupId>org.hswebframework.web</groupId>

+ 5 - 1
hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/handler/CommandWebSocketMessageDispatcher.java

@@ -1,6 +1,7 @@
 package org.hswebframework.web.socket.handler;
 package org.hswebframework.web.socket.handler;
 
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSON;
+import com.fasterxml.jackson.core.JsonParseException;
 import org.hswebframework.web.authorization.Authentication;
 import org.hswebframework.web.authorization.Authentication;
 import org.hswebframework.web.authorization.container.AuthenticationContainer;
 import org.hswebframework.web.authorization.container.AuthenticationContainer;
 import org.hswebframework.web.socket.WebSocketCommand;
 import org.hswebframework.web.socket.WebSocketCommand;
@@ -57,8 +58,11 @@ public class CommandWebSocketMessageDispatcher extends TextWebSocketHandler {
             } else {
             } else {
                 session.sendMessage(commandNotFoundMessage);
                 session.sendMessage(commandNotFoundMessage);
             }
             }
-        } catch (Exception e) {
+        } catch (JsonParseException e) {
             session.sendMessage(requestFormatErrorMessage);
             session.sendMessage(requestFormatErrorMessage);
+        } catch (Exception e) {
+            e.printStackTrace();
+            session.sendMessage(new TextMessage(new WebSocketMessage(500, "error!" + e.getMessage()).toString()));
         }
         }
     }
     }
 
 

+ 16 - 17
hsweb-message/hsweb-message-websocket/src/test/java/org/hswebframework/web/socket/TestProcessor.java

@@ -2,14 +2,10 @@ package org.hswebframework.web.socket;
 
 
 import org.hswebframework.web.message.MessageSubscribe;
 import org.hswebframework.web.message.MessageSubscribe;
 import org.hswebframework.web.message.Messager;
 import org.hswebframework.web.message.Messager;
-import org.hswebframework.web.message.builder.StaticMessageBuilder;
-import org.hswebframework.web.message.builder.StaticMessageSubjectBuilder;
 import org.hswebframework.web.message.support.ObjectMessage;
 import org.hswebframework.web.message.support.ObjectMessage;
 import org.hswebframework.web.socket.message.WebSocketMessage;
 import org.hswebframework.web.socket.message.WebSocketMessage;
-import org.hswebframework.web.socket.message.WebSocketMessager;
 import org.hswebframework.web.socket.processor.WebSocketProcessor;
 import org.hswebframework.web.socket.processor.WebSocketProcessor;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
 import org.springframework.web.socket.TextMessage;
 import org.springframework.web.socket.TextMessage;
 import org.springframework.web.socket.WebSocketSession;
 import org.springframework.web.socket.WebSocketSession;
 
 
@@ -18,7 +14,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap;
 
 
 import static org.hswebframework.web.message.builder.StaticMessageBuilder.object;
 import static org.hswebframework.web.message.builder.StaticMessageBuilder.object;
-import static org.hswebframework.web.message.builder.StaticMessageSubjectBuilder.topic;
+import static org.hswebframework.web.message.builder.StaticMessageSubjectBuilder.queue;
 
 
 /**
 /**
  * TODO 完成注释
  * TODO 完成注释
@@ -29,23 +25,24 @@ public class TestProcessor implements WebSocketProcessor, WebSocketSessionListen
 
 
     @Autowired
     @Autowired
     private Messager messager;
     private Messager messager;
-    Map<String, MessageSubscribe<ObjectMessage<WebSocketMessage>>>
-            store = new ConcurrentHashMap<>();
+
+    private final Map<String, MessageSubscribe<ObjectMessage<WebSocketMessage>>> store = new ConcurrentHashMap<>();
 
 
     @Override
     @Override
     public String getName() {
     public String getName() {
         return "test";
         return "test";
     }
     }
 
 
-    public void sub(WebSocketSession socketSession) {
+    private void sub(WebSocketSession socketSession) {
         MessageSubscribe<ObjectMessage<WebSocketMessage>> subscribe =
         MessageSubscribe<ObjectMessage<WebSocketMessage>> subscribe =
                 store.get(socketSession.getId());
                 store.get(socketSession.getId());
         if (subscribe != null) return;
         if (subscribe != null) return;
-        store.put(socketSession.getId(), messager.<ObjectMessage<WebSocketMessage>>subscribe(topic("test"))
+        store.put(socketSession.getId(), messager
+                .<ObjectMessage<WebSocketMessage>>subscribe(queue("test")) //订阅 queue队列
                 .onMessage(message -> {
                 .onMessage(message -> {
                     try {
                     try {
                         if (!socketSession.isOpen()) {
                         if (!socketSession.isOpen()) {
-                            desub(socketSession);
+                            deSub(socketSession);
                             return;
                             return;
                         }
                         }
                         socketSession.sendMessage(new TextMessage(message.getObject().toString()));
                         socketSession.sendMessage(new TextMessage(message.getObject().toString()));
@@ -55,7 +52,7 @@ public class TestProcessor implements WebSocketProcessor, WebSocketSessionListen
                 }));
                 }));
     }
     }
 
 
-    public void desub(WebSocketSession socketSession) {
+    private void deSub(WebSocketSession socketSession) {
         MessageSubscribe<ObjectMessage<WebSocketMessage>> subscribe =
         MessageSubscribe<ObjectMessage<WebSocketMessage>> subscribe =
                 store.get(socketSession.getId());
                 store.get(socketSession.getId());
         if (subscribe == null) return;
         if (subscribe == null) return;
@@ -71,7 +68,7 @@ public class TestProcessor implements WebSocketProcessor, WebSocketSessionListen
                 sub(command.getSession());
                 sub(command.getSession());
                 break;
                 break;
             case "close": {
             case "close": {
-                desub(command.getSession());
+                deSub(command.getSession());
             }
             }
         }
         }
     }
     }
@@ -86,10 +83,12 @@ public class TestProcessor implements WebSocketProcessor, WebSocketSessionListen
                 } catch (InterruptedException e) {
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                     e.printStackTrace();
                 }
                 }
-                messager.publish(object(new WebSocketMessage(200, "hello" + total++)))
-                        .to(topic("test"))
-                        .send();
-                System.out.println(total);
+                if (store.size() > 0) {
+                    messager.publish(object(new WebSocketMessage(200, "hello" + total++)))
+                            .to(queue("test")) //向队列发送消息
+                            .send();
+                    System.out.println(total);
+                }
             }
             }
         }).start();
         }).start();
     }
     }
@@ -107,6 +106,6 @@ public class TestProcessor implements WebSocketProcessor, WebSocketSessionListen
 
 
     @Override
     @Override
     public void onSessionClose(WebSocketSession session) {
     public void onSessionClose(WebSocketSession session) {
-        desub(session);
+        deSub(session);
     }
     }
 }
 }

+ 13 - 14
hsweb-message/hsweb-message-websocket/src/test/java/org/hswebframework/web/socket/WebSocketClientTest.java

@@ -1,5 +1,6 @@
 package org.hswebframework.web.socket;
 package org.hswebframework.web.socket;
 
 
+import org.springframework.util.concurrent.ListenableFuture;
 import org.springframework.web.socket.TextMessage;
 import org.springframework.web.socket.TextMessage;
 import org.springframework.web.socket.WebSocketMessage;
 import org.springframework.web.socket.WebSocketMessage;
 import org.springframework.web.socket.WebSocketSession;
 import org.springframework.web.socket.WebSocketSession;
@@ -10,21 +11,19 @@ import org.springframework.web.socket.handler.AbstractWebSocketHandler;
 public class WebSocketClientTest {
 public class WebSocketClientTest {
 
 
     public static void main(String[] args) throws Exception {
     public static void main(String[] args) throws Exception {
-        WebSocketClient client = new StandardWebSocketClient();
-        String url = "ws://localhost:8080/socket";
-        client.doHandshake(new AbstractWebSocketHandler() {
-            @Override
-            public void afterConnectionEstablished(WebSocketSession session) throws Exception {
-                //链接成功后发送消息
-                System.out.println("发送消息");
-                session.sendMessage(new TextMessage("{\"command\":\"test\",\"parameters\":{\"type\":\"conn\"}}"));
-            }
+        for (int i = 0; i < 10; i++) {
+            WebSocketClient client = new StandardWebSocketClient();
+            String url = "ws://localhost:8080/socket";
+            ListenableFuture<WebSocketSession> future = client.doHandshake(new AbstractWebSocketHandler() {
+                @Override
+                public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
+                    System.out.println(message.getPayload());
+                }
+            }, url);
 
 
-            @Override
-            public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
-                System.out.println(message.getPayload());
-            }
-        }, url);
+            WebSocketSession socketSession = future.get();
+            socketSession.sendMessage(new TextMessage("{\"command\":\"test\",\"parameters\":{\"type\":\"conn\"}}"));
+        }
         System.in.read();
         System.in.read();
     }
     }
 }
 }

+ 21 - 6
hsweb-message/hsweb-message-websocket/src/test/java/org/hswebframework/web/socket/WebSocketServerTests.java

@@ -1,5 +1,7 @@
 package org.hswebframework.web.socket;
 package org.hswebframework.web.socket;
 
 
+import org.hswebframework.web.message.Messager;
+import org.hswebframework.web.message.jms.JmsMessager;
 import org.redisson.Redisson;
 import org.redisson.Redisson;
 import org.redisson.api.RedissonClient;
 import org.redisson.api.RedissonClient;
 import org.redisson.config.Config;
 import org.redisson.config.Config;
@@ -7,6 +9,8 @@ import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.jms.annotation.EnableJms;
+import org.springframework.jms.core.JmsTemplate;
 
 
 /**
 /**
  * TODO 完成注释
  * TODO 完成注释
@@ -15,19 +19,30 @@ import org.springframework.context.annotation.Configuration;
  */
  */
 @Configuration
 @Configuration
 @EnableAutoConfiguration
 @EnableAutoConfiguration
+@EnableJms
 public class WebSocketServerTests {
 public class WebSocketServerTests {
 
 
+    static {
+        System.setProperty("org.apache.activemq.SERIALIZABLE_PACKAGES", "*");
+    }
+
+    @Bean
+    public Messager messager(JmsTemplate template) {
+        return new JmsMessager(template);
+    }
+
     @Bean
     @Bean
     public TestProcessor testProcessor() {
     public TestProcessor testProcessor() {
         return new TestProcessor();
         return new TestProcessor();
     }
     }
 
 
-    @Bean(destroyMethod = "shutdown")
-    public RedissonClient redissonClient() {
-        Config config = new Config();
-        config.useSingleServer().setAddress("127.0.0.1:6379");
-        return Redisson.create(config);
-    }
+// 使用redis
+//    @Bean(destroyMethod = "shutdown")
+//    public RedissonClient redissonClient() {
+//        Config config = new Config();
+//        config.useSingleServer().setAddress("127.0.0.1:6379");
+//        return Redisson.create(config);
+//    }
 
 
     public static void main(String[] args) {
     public static void main(String[] args) {
         SpringApplication.run(WebSocketServerTests.class);
         SpringApplication.run(WebSocketServerTests.class);

+ 10 - 0
hsweb-message/hsweb-message-websocket/src/test/resources/application.yml

@@ -0,0 +1,10 @@
+spring:
+    activemq:
+        in-memory: true
+    jms:
+      pub-sub-domain: true
+
+hsweb:
+    app:
+      name: websocket测试
+      version: 3.0.0

+ 1 - 0
hsweb-message/pom.xml

@@ -16,6 +16,7 @@
         <module>hsweb-message-jms</module>
         <module>hsweb-message-jms</module>
         <module>hsweb-message-redis</module>
         <module>hsweb-message-redis</module>
         <module>hsweb-message-websocket</module>
         <module>hsweb-message-websocket</module>
+        <module>hsweb-message-memory</module>
     </modules>
     </modules>