Explorar o código

优化message

zhouhao %!s(int64=8) %!d(string=hai) anos
pai
achega
4312f03be8

+ 15 - 3
hsweb-message/hsweb-message-websocket/pom.xml

@@ -19,20 +19,32 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+        <!--<dependency>-->
+            <!--<groupId>org.hswebframework.web</groupId>-->
+            <!--<artifactId>hsweb-message-memory</artifactId>-->
+            <!--<version>${project.version}</version>-->
+            <!--<scope>test</scope>-->
+        <!--</dependency>-->
+
         <dependency>
             <groupId>org.hswebframework.web</groupId>
-            <artifactId>hsweb-message-memory</artifactId>
+            <artifactId>hsweb-message-jms</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
-
         <dependency>
             <groupId>org.hswebframework.web</groupId>
-            <artifactId>hsweb-message-jms</artifactId>
+            <artifactId>hsweb-concurrent-counter-redis</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.hswebframework.web</groupId>
+            <artifactId>hsweb-concurrent-counter-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.hswebframework.web</groupId>
             <artifactId>hsweb-commons-utils</artifactId>

+ 59 - 14
hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/message/DefaultWebSocketMessager.java

@@ -1,5 +1,8 @@
 package org.hswebframework.web.socket.message;
 
+import org.hswebframework.web.concurrent.counter.Counter;
+import org.hswebframework.web.concurrent.counter.CounterManager;
+import org.hswebframework.web.concurrent.counter.SimpleCounterManager;
 import org.hswebframework.web.message.MessageSubscribe;
 import org.hswebframework.web.message.Messager;
 import org.hswebframework.web.message.support.ObjectMessage;
@@ -7,8 +10,11 @@ import org.springframework.web.socket.TextMessage;
 import org.springframework.web.socket.WebSocketSession;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
 import static org.hswebframework.web.message.builder.StaticMessageBuilder.object;
 import static org.hswebframework.web.message.builder.StaticMessageSubjectBuilder.*;
@@ -23,61 +29,96 @@ public class DefaultWebSocketMessager implements WebSocketMessager {
     private Messager messager;
 
     public DefaultWebSocketMessager(Messager messager) {
+        this(messager, new SimpleCounterManager());
+    }
+
+    public DefaultWebSocketMessager(Messager messager, CounterManager counterManager) {
         this.messager = messager;
+        this.counterManager = counterManager == null ? new SimpleCounterManager() : counterManager;
     }
 
-    // command,   userId,     sessionId
+    //              command,   type,     sessionId
     private final Map<String, Map<String, Map<String, MessageSubscribeSession>>> store = new ConcurrentHashMap<>(32);
 
+    private CounterManager counterManager = new SimpleCounterManager();
+
+
     @Override
     public void onSessionConnect(WebSocketSession session) {
 
     }
 
+    private String getSubTotalKey(String command, String type) {
+        return "sub_".concat(command)
+                .concat("_")
+                .concat(type)
+                .concat("_total");
+    }
+
     @Override
-    public void onSessionClose(WebSocketSession session) {
+    public int getSubscribeTotal(String command, String type) {
+        return (int) counterManager.getCounter(getSubTotalKey(command, type)).get();
+    }
 
+    @Override
+    public void onSessionClose(WebSocketSession session) {
+        store.values()  //command
+                .stream().map(Map::values).flatMap(Collection::stream)
+                .map(sessionStore -> sessionStore.get(session.getId()))
+                .filter(Objects::nonNull)
+                .forEach(MessageSubscribeSession::cancel);
     }
 
     @Override
-    public void publish(String toUser, WebSocketMessage message) {
+    public void publish(String command, String type, WebSocketMessage message) {
         messager.publish(object(message))
-                .to(user(toUser))
+                .to(TYPE_QUEUE.equals(type) ? queue("queue_" + command) : topic("topic_" + command))
                 .send();
     }
 
-    private Map<String, MessageSubscribeSession> getSubSession(String command, String userId) {
+    private Map<String, MessageSubscribeSession> getSubSession(String command, String type) {
         return store.computeIfAbsent(command, cmd -> new ConcurrentHashMap<>(128))
-                .computeIfAbsent(userId, uid -> new ConcurrentHashMap<>());
+                .computeIfAbsent(type, t -> new ConcurrentHashMap<>());
     }
 
     @Override
-    public boolean subscribe(String command, String userId, WebSocketSession socketSession) {
-        Map<String, MessageSubscribeSession> subscribeSessionStore = getSubSession(command, userId);
+    public boolean subscribe(String command, String type, WebSocketSession socketSession) {
+        Map<String, MessageSubscribeSession> subscribeSessionStore = getSubSession(command, type);
         subscribeSessionStore.computeIfAbsent(socketSession.getId(), sessionId -> {
-            MessageSubscribe<ObjectMessage<WebSocketMessage>> subscribe = messager.subscribe(user(userId));
+            MessageSubscribe<ObjectMessage<WebSocketMessage>> subscribe = messager
+                    .subscribe(TYPE_QUEUE.equals(type) ? queue("queue_" + command) : topic("topic_" + command));
             subscribe.onMessage(message -> {
                 try {
                     if (!socketSession.isOpen()) {
-                        deSubscribe(command, userId, socketSession);
+                        deSubscribe(command, type, socketSession);
+                        return;
                     }
                     socketSession.sendMessage(new TextMessage(((ObjectMessage) message).getObject().toString()));
                 } catch (IOException e) {
                     e.printStackTrace();
                 }
             });
-            return new MessageSubscribeSession(subscribe, socketSession);
+            return new MessageSubscribeSession(subscribe, socketSession) {
+                @Override
+                public void cancel() {
+                    super.cancel();
+                    counterManager.getCounter(getSubTotalKey(command, type)).decrement();
+                }
+            };
         });
-        return false;
+        counterManager.getCounter(getSubTotalKey(command, type)).increment();
+        return true;
     }
 
     @Override
-    public boolean deSubscribe(String command, String userId, WebSocketSession socketSession) {
-        Map<String, MessageSubscribeSession> subscribeSessionStore = getSubSession(command, userId);
+    public boolean deSubscribe(String command, String type, WebSocketSession socketSession) {
+        Map<String, MessageSubscribeSession> subscribeSessionStore = getSubSession(command, type);
         MessageSubscribeSession subscribeSession = subscribeSessionStore.get(socketSession.getId());
         if (null != subscribeSession) {
             subscribeSession.getSubscribe().cancel();
             subscribeSessionStore.remove(socketSession.getId());
+            counterManager.getCounter(getSubTotalKey(command, type)).decrement();
+            return true;
         }
         return false;
     }
@@ -107,5 +148,9 @@ public class DefaultWebSocketMessager implements WebSocketMessager {
         public void setSession(WebSocketSession session) {
             this.session = session;
         }
+
+        public void cancel() {
+            subscribe.cancel();
+        }
     }
 }

+ 33 - 3
hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/message/WebSocketMessager.java

@@ -9,10 +9,40 @@ import org.springframework.web.socket.WebSocketSession;
  * @author zhouhao
  */
 public interface WebSocketMessager extends WebSocketSessionListener {
-    void publish(String toUser, WebSocketMessage message);
 
-    boolean subscribe(String command, String userId, WebSocketSession socketSession);
+    String TYPE_QUEUE = "queue";
 
-    boolean deSubscribe(String command, String userId, WebSocketSession socketSession);
+    String TYPE_TOPIC = "topic";
 
+    default void publishQueue(String command, WebSocketMessage message) {
+        publish(command, TYPE_QUEUE, message);
+    }
+
+    default void publishTopic(String command, WebSocketMessage message) {
+        publish(command, TYPE_TOPIC, message);
+    }
+
+    void publish(String command, String type, WebSocketMessage message);
+
+    int getSubscribeTotal(String command, String type);
+
+    boolean subscribe(String command, String type, WebSocketSession socketSession);
+
+    default boolean subscribeQueue(String command, WebSocketSession socketSession) {
+        return subscribe(command, TYPE_QUEUE, socketSession);
+    }
+
+    default boolean subscribeTopic(String command, WebSocketSession socketSession) {
+        return subscribe(command, TYPE_TOPIC, socketSession);
+    }
+
+    boolean deSubscribe(String command, String type, WebSocketSession socketSession);
+
+    default boolean deSubscribeQueue(String command, WebSocketSession socketSession) {
+        return deSubscribe(command, TYPE_QUEUE, socketSession);
+    }
+
+    default boolean deSubscribeTopic(String command, WebSocketSession socketSession) {
+        return deSubscribe(command, TYPE_TOPIC, socketSession);
+    }
 }

+ 5 - 1
hsweb-message/hsweb-message-websocket/src/main/java/org/hswebframework/web/socket/starter/CommandWebSocketAutoConfiguration.java

@@ -1,6 +1,7 @@
 package org.hswebframework.web.socket.starter;
 
 import org.hswebframework.web.authorization.container.AuthenticationContainer;
+import org.hswebframework.web.concurrent.counter.CounterManager;
 import org.hswebframework.web.message.Messager;
 import org.hswebframework.web.socket.WebSocketSessionListener;
 import org.hswebframework.web.socket.handler.CommandWebSocketMessageDispatcher;
@@ -47,9 +48,12 @@ public class CommandWebSocketAutoConfiguration {
     @ConditionalOnBean(Messager.class)
     @ConditionalOnMissingBean(WebSocketMessager.class)
     public static class WebSocketMessagerConfiguration {
+        @Autowired(required = false)
+        private CounterManager counterManager;
+
         @Bean
         public WebSocketMessager webSocketMessager(Messager messager) {
-            return new DefaultWebSocketMessager(messager);
+            return new DefaultWebSocketMessager(messager,counterManager);
         }
     }
 

+ 6 - 41
hsweb-message/hsweb-message-websocket/src/test/java/org/hswebframework/web/socket/TestProcessor.java

@@ -1,21 +1,11 @@
 package org.hswebframework.web.socket;
 
-import org.hswebframework.web.message.MessageSubscribe;
-import org.hswebframework.web.message.Messager;
-import org.hswebframework.web.message.support.ObjectMessage;
 import org.hswebframework.web.socket.message.WebSocketMessage;
+import org.hswebframework.web.socket.message.WebSocketMessager;
 import org.hswebframework.web.socket.processor.CommandProcessor;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.web.socket.TextMessage;
 import org.springframework.web.socket.WebSocketSession;
 
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import static org.hswebframework.web.message.builder.StaticMessageBuilder.object;
-import static org.hswebframework.web.message.builder.StaticMessageSubjectBuilder.queue;
-
 /**
  * TODO 完成注释
  *
@@ -24,9 +14,7 @@ import static org.hswebframework.web.message.builder.StaticMessageSubjectBuilder
 public class TestProcessor implements CommandProcessor, WebSocketSessionListener {
 
     @Autowired
-    private Messager messager;
-
-    private final Map<String, MessageSubscribe<ObjectMessage<WebSocketMessage>>> store = new ConcurrentHashMap<>();
+    private WebSocketMessager messager;
 
     @Override
     public String getName() {
@@ -34,30 +22,11 @@ public class TestProcessor implements CommandProcessor, WebSocketSessionListener
     }
 
     private void sub(WebSocketSession socketSession) {
-        MessageSubscribe<ObjectMessage<WebSocketMessage>> subscribe =
-                store.get(socketSession.getId());
-        if (subscribe != null) return;
-        store.put(socketSession.getId(), messager
-                .<ObjectMessage<WebSocketMessage>>subscribe(queue("test")) //订阅 queue队列
-                .onMessage(message -> {
-                    try {
-                        if (!socketSession.isOpen()) {
-                            deSub(socketSession);
-                            return;
-                        }
-                        socketSession.sendMessage(new TextMessage(message.getObject().toString()));
-                    } catch (IOException e) {
-                        e.printStackTrace();
-                    }
-                }));
+        messager.subscribeQueue(getName(), socketSession);
     }
 
     private void deSub(WebSocketSession socketSession) {
-        MessageSubscribe<ObjectMessage<WebSocketMessage>> subscribe =
-                store.get(socketSession.getId());
-        if (subscribe == null) return;
-        subscribe.cancel();
-        store.remove(socketSession.getId());
+        messager.deSubscribeQueue(getName(), socketSession);
     }
 
     @Override
@@ -83,10 +52,8 @@ public class TestProcessor implements CommandProcessor, WebSocketSessionListener
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 }
-                if (store.size() > 0) {
-                    messager.publish(object(new WebSocketMessage(200, "hello" + total++)))
-                            .to(queue("test")) //向队列发送消息
-                            .send();
+                if (messager.getSubscribeTotal(getName(), WebSocketMessager.TYPE_QUEUE) > 0) {
+                    messager.publishQueue(getName(), new WebSocketMessage(200, "hello" + total++));
                     System.out.println(total);
                 }
             }
@@ -95,8 +62,6 @@ public class TestProcessor implements CommandProcessor, WebSocketSessionListener
 
     @Override
     public void destroy() {
-        store.values().forEach(MessageSubscribe::cancel);
-        store.clear();
     }
 
     @Override

+ 3 - 3
hsweb-message/hsweb-message-websocket/src/test/java/org/hswebframework/web/socket/WebSocketClientTests.java

@@ -10,9 +10,9 @@ import org.springframework.web.socket.handler.AbstractWebSocketHandler;
 
 public class WebSocketClientTests {
     public static void main(String[] args) throws Exception {
-        for (int i = 0; i < 10; i++) {
+//        for (int i = 0; i < 10; i++) {
             WebSocketClient client = new StandardWebSocketClient();
-            String url = "ws://localhost:8080/socket";
+            String url = "ws://localhost:8081/socket";
             ListenableFuture<WebSocketSession> future = client.doHandshake(new AbstractWebSocketHandler() {
                 @Override
                 public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
@@ -22,7 +22,7 @@ public class WebSocketClientTests {
 
             WebSocketSession socketSession = future.get();
             socketSession.sendMessage(new TextMessage("{\"command\":\"test\",\"parameters\":{\"type\":\"conn\"}}"));
-        }
+//        }
         System.in.read();
     }
 }

+ 12 - 1
hsweb-message/hsweb-message-websocket/src/test/java/org/hswebframework/web/socket/WebSocketServerTests.java

@@ -1,5 +1,8 @@
 package org.hswebframework.web.socket;
 
+import org.hswebframework.web.concurrent.counter.Counter;
+import org.hswebframework.web.concurrent.counter.CounterManager;
+import org.hswebframework.web.counter.redis.RedissonCounterManager;
 import org.hswebframework.web.message.Messager;
 import org.hswebframework.web.message.jms.JmsMessager;
 import org.redisson.Redisson;
@@ -36,7 +39,15 @@ public class WebSocketServerTests {
         return new TestProcessor();
     }
 
-// 使用redis
+    @Bean
+    public CounterManager counterManager() {
+        Config config = new Config();
+        config.useSingleServer().setAddress("127.0.0.1:6379");
+        RedissonClient client = Redisson.create(config);
+        return new RedissonCounterManager(client);
+    }
+
+//    // 使用redis
 //    @Bean(destroyMethod = "shutdown")
 //    public RedissonClient redissonClient() {
 //        Config config = new Config();

+ 5 - 2
hsweb-message/hsweb-message-websocket/src/test/resources/application.yml

@@ -1,10 +1,13 @@
 spring:
     activemq:
-        in-memory: true
+        broker-url: tcp://localhost:61616
+        in-memory: false
     jms:
       pub-sub-domain: true
 
 hsweb:
     app:
       name: websocket测试
-      version: 3.0.0
+      version: 3.0.0
+server:
+  port: 8081