소스 검색

优化redis实现

zhouhao 8 년 전
부모
커밋
24594d1613

+ 24 - 24
hsweb-message/hsweb-message-redis/src/main/java/org/hswebframework/web/message/redis/RedissionMessageSubscribe.java

@@ -3,10 +3,16 @@ package org.hswebframework.web.message.redis;
 import org.hswebframework.web.message.Message;
 import org.hswebframework.web.message.MessageSubject;
 import org.hswebframework.web.message.MessageSubscribe;
+import org.hswebframework.web.message.support.QueueMessageSubject;
 import org.hswebframework.web.message.support.TopicMessageSubject;
-import org.hswebframework.web.message.support.UserMessageSubject;
-import org.redisson.api.*;
-import org.redisson.codec.SerializationCodec;
+import org.redisson.api.RCountDownLatch;
+import org.redisson.api.RQueue;
+import org.redisson.api.RTopic;
+import org.redisson.api.RedissonClient;
+import org.redisson.client.codec.Codec;
+import org.redisson.codec.JsonJacksonCodec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -16,15 +22,17 @@ import java.util.function.Consumer;
  * @author zhouhao
  */
 public class RedissionMessageSubscribe<M extends Message> implements MessageSubscribe<M> {
-    private MessageSubject iam;
+    private MessageSubject subject;
     private RedissonClient redisson;
     private boolean           running    = false;
     private int               listenerId = 0;
     private List<Consumer<M>> consumers  = new ArrayList<>();
     private RTopic<M> topic;
 
-    public RedissionMessageSubscribe(MessageSubject iam, RedissonClient redisson) {
-        this.iam = iam;
+    private Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    public RedissionMessageSubscribe(MessageSubject subject, RedissonClient redisson) {
+        this.subject = subject;
         this.redisson = redisson;
     }
 
@@ -32,12 +40,6 @@ public class RedissionMessageSubscribe<M extends Message> implements MessageSubs
         this.redisson = redisson;
     }
 
-    @Override
-    public MessageSubscribe<M> iam(MessageSubject iam) {
-        this.iam = iam;
-        return this;
-    }
-
     @Override
     public MessageSubscribe<M> onMessage(Consumer<M> consumer) {
         consumers.add(consumer);
@@ -57,13 +59,13 @@ public class RedissionMessageSubscribe<M extends Message> implements MessageSubs
         consumers.clear();
     }
 
-    private static SerializationCodec codec = new SerializationCodec();
+    private static Codec codec = JsonJacksonCodec.INSTANCE;
 
     private void doRun() {
-        if (iam instanceof UserMessageSubject) {
-            RQueue<M> queue = redisson
-                    .getQueue("queue_user_" + ((UserMessageSubject) iam).getUserId(), codec);
-            RCountDownLatch countDownLatch = redisson.getCountDownLatch("cdl_user_" + ((UserMessageSubject) iam).getUserId());
+        if (subject instanceof QueueMessageSubject) {
+            String queueName = ((QueueMessageSubject) subject).getQueueName();
+            RQueue<M> queue = redisson.getQueue(queueName, codec);
+            RCountDownLatch countDownLatch = redisson.getCountDownLatch("cdl_" + queueName);
             Thread thread = new Thread(() -> {
                 while (running) {
                     try {
@@ -75,11 +77,9 @@ public class RedissionMessageSubscribe<M extends Message> implements MessageSubs
                                 cons.accept(message);
                         });
                     } catch (InterruptedException e) {
-                        try {
-                            Thread.sleep(1000);
-                        } catch (InterruptedException e1) {
-                            e1.printStackTrace();
-                        }
+                        running = false;
+                        logger.error("queue consumer thread interrupted", e);
+                        Thread.currentThread().interrupt();
                     }
                 }
             });
@@ -87,8 +87,8 @@ public class RedissionMessageSubscribe<M extends Message> implements MessageSubs
             thread.start();
             return;
         }
-        if (iam instanceof TopicMessageSubject) {
-            topic = redisson.getTopic("topic_" + ((TopicMessageSubject) iam).getTopic(), codec);
+        if (subject instanceof TopicMessageSubject) {
+            topic = redisson.getTopic("topic_" + ((TopicMessageSubject) subject).getTopic(), codec);
             listenerId = topic.addListener((channel, msg) -> consumers.forEach(cons -> cons.accept(msg)));
         }
         running = true;

+ 13 - 11
hsweb-message/hsweb-message-redis/src/main/java/org/hswebframework/web/message/redis/RedissonMessagePublish.java

@@ -3,13 +3,15 @@ package org.hswebframework.web.message.redis;
 import org.hswebframework.web.message.Message;
 import org.hswebframework.web.message.MessagePublish;
 import org.hswebframework.web.message.MessageSubject;
-import org.hswebframework.web.message.support.MultipleUserMessageSubject;
+import org.hswebframework.web.message.support.MultipleQueueMessageSubject;
+import org.hswebframework.web.message.support.QueueMessageSubject;
 import org.hswebframework.web.message.support.TopicMessageSubject;
-import org.hswebframework.web.message.support.UserMessageSubject;
 import org.redisson.api.RCountDownLatch;
 import org.redisson.api.RQueue;
 import org.redisson.api.RTopic;
 import org.redisson.api.RedissonClient;
+import org.redisson.client.codec.Codec;
+import org.redisson.codec.JsonJacksonCodec;
 import org.redisson.codec.SerializationCodec;
 
 import java.util.function.Consumer;
@@ -36,14 +38,14 @@ public class RedissonMessagePublish implements MessagePublish {
     }
 
     private boolean useQueue() {
-        return to instanceof UserMessageSubject || to instanceof MultipleUserMessageSubject;
+        return to instanceof QueueMessageSubject || to instanceof MultipleQueueMessageSubject;
     }
 
-    private SerializationCodec codec = new SerializationCodec();
+    private static Codec codec = JsonJacksonCodec.INSTANCE;
 
-    private Consumer<String> queueConsumer = id -> {
-        RQueue<Message> queue = redissonClient.getQueue("queue_user_" + id, codec);
-        RCountDownLatch downLatch = redissonClient.getCountDownLatch("cdl_user_" + id);
+    private Consumer<String> queueConsumer = queueName -> {
+        RQueue<Message> queue = redissonClient.getQueue(queueName, codec);
+        RCountDownLatch downLatch = redissonClient.getCountDownLatch("cdl_" + queueName);
         queue.add(message);
         downLatch.countDown();
     };
@@ -53,11 +55,11 @@ public class RedissonMessagePublish implements MessagePublish {
         if (redissonClient.isShutdown() || redissonClient.isShuttingDown()) {
             return;
         }
-        if (to instanceof UserMessageSubject) {
-            queueConsumer.accept(((UserMessageSubject) to).getUserId());
+        if (to instanceof QueueMessageSubject) {
+            queueConsumer.accept(((QueueMessageSubject) to).getQueueName());
         }
-        if (to instanceof MultipleUserMessageSubject) {
-            ((MultipleUserMessageSubject) to).getUserIdList().forEach(queueConsumer);
+        if (to instanceof MultipleQueueMessageSubject) {
+            ((MultipleQueueMessageSubject) to).getQueueName().forEach(queueConsumer);
         }
         if (to instanceof TopicMessageSubject) {
             RTopic<Message> topic = redissonClient.getTopic("topic_" + ((TopicMessageSubject) to).getTopic(), codec);

+ 0 - 2
hsweb-message/hsweb-message-redis/src/main/java/org/hswebframework/web/message/redis/starter/RedissonMessagerAutoConfiguration.java

@@ -9,8 +9,6 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
 /**
- * TODO 完成注释
- *
  * @author zhouhao
  */
 @Configuration