Bladeren bron

增加redis实现

zhouhao 8 jaren geleden
bovenliggende
commit
2f6cc31fa4

+ 26 - 0
hsweb-message/hsweb-message-redis/pom.xml

@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>hsweb-message</artifactId>
+        <groupId>org.hswebframework.web</groupId>
+        <version>3.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>hsweb-message-redis</artifactId>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>org.hswebframework.web</groupId>
+            <artifactId>hsweb-message-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.redisson</groupId>
+            <artifactId>redisson</artifactId>
+        </dependency>
+    </dependencies>
+</project>

+ 82 - 0
hsweb-message/hsweb-message-redis/src/main/java/org/hswebframework/web/message/redis/RedissionMessageSubscribe.java

@@ -0,0 +1,82 @@
+package org.hswebframework.web.message.redis;
+
+import org.hswebframework.web.message.Message;
+import org.hswebframework.web.message.MessageSubject;
+import org.hswebframework.web.message.MessageSubscribe;
+import org.hswebframework.web.message.support.TopicMessageSubject;
+import org.hswebframework.web.message.support.UserMessageSubject;
+import org.redisson.api.*;
+import org.redisson.codec.SerializationCodec;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+/**
+ * @author zhouhao
+ */
+public class RedissionMessageSubscribe<M extends Message> implements MessageSubscribe<M> {
+    private MessageSubject iam;
+    private RedissonClient redisson;
+
+    private boolean running = false;
+
+    private List<Consumer<M>> consumers = new ArrayList<>();
+
+    public RedissionMessageSubscribe(MessageSubject iam, RedissonClient redisson) {
+        this.iam = iam;
+        this.redisson = redisson;
+    }
+
+    public RedissionMessageSubscribe(RedissonClient redisson) {
+        this.redisson = redisson;
+    }
+
+    @Override
+    public MessageSubscribe<M> iam(MessageSubject iam) {
+        this.iam = iam;
+        return this;
+    }
+
+    @Override
+    public MessageSubscribe<M> onMessage(Consumer<M> consumer) {
+        consumers.add(consumer);
+        if (!running) {
+            doRun();
+        }
+        return this;
+    }
+
+    private static SerializationCodec codec = new SerializationCodec();
+
+    private void doRun() {
+        if (iam instanceof UserMessageSubject) {
+            RQueue<M> queue = redisson
+                    .getQueue("queue_user_" + ((UserMessageSubject) iam).getUserId(), codec);
+            RCountDownLatch countDownLatch = redisson.getCountDownLatch("cdl_user_" + ((UserMessageSubject) iam).getUserId());
+            Thread thread = new Thread(() -> {
+                while (running) {
+                    try {
+                        countDownLatch.trySetCount(1);
+                        countDownLatch.await();
+                        consumers.forEach(cons -> cons.accept(queue.peek()));
+                    } catch (InterruptedException e) {
+                        try {
+                            Thread.sleep(1000);
+                        } catch (InterruptedException e1) {
+                            e1.printStackTrace();
+                        }
+                    }
+                }
+            });
+            running = true;
+            thread.start();
+            return;
+        }
+        if (iam instanceof TopicMessageSubject) {
+            RTopic<M> topic = redisson.getTopic("topic_" + ((TopicMessageSubject) iam).getTopic(), codec);
+            topic.addListener((channel, msg) -> consumers.forEach(cons -> cons.accept(msg)));
+        }
+        running = true;
+    }
+}

+ 83 - 0
hsweb-message/hsweb-message-redis/src/main/java/org/hswebframework/web/message/redis/RedissonMessagePublish.java

@@ -0,0 +1,83 @@
+package org.hswebframework.web.message.redis;
+
+import org.hswebframework.web.message.Message;
+import org.hswebframework.web.message.MessagePublish;
+import org.hswebframework.web.message.MessageSubject;
+import org.hswebframework.web.message.support.MultipleUserMessageSubject;
+import org.hswebframework.web.message.support.TopicMessageSubject;
+import org.hswebframework.web.message.support.UserMessageSubject;
+import org.redisson.api.RCountDownLatch;
+import org.redisson.api.RQueue;
+import org.redisson.api.RTopic;
+import org.redisson.api.RedissonClient;
+import org.redisson.codec.SerializationCodec;
+
+import java.util.function.Consumer;
+
+/**
+ * TODO 完成注释
+ *
+ * @author zhouhao
+ */
+public class RedissonMessagePublish implements MessagePublish {
+    private MessageSubject from;
+    private MessageSubject to;
+    private RedissonClient redissonClient;
+    private Message        message;
+
+    public RedissonMessagePublish(RedissonClient redissonClient, Message message) {
+        this.redissonClient = redissonClient;
+        this.message = message;
+    }
+
+    @Override
+    public MessagePublish from(MessageSubject subject) {
+        this.from = subject;
+        return this;
+    }
+
+    @Override
+    public MessagePublish to(MessageSubject subject) {
+        this.to = subject;
+        return this;
+    }
+
+    @Override
+    public MessagePublish deleteOnTimeout(long timeOutSecond) {
+        return null;
+    }
+
+    private boolean useQueue() {
+        return to instanceof UserMessageSubject || to instanceof MultipleUserMessageSubject;
+    }
+
+    private SerializationCodec codec = new SerializationCodec();
+
+    private Consumer<String> queueConsumer = id -> {
+        RQueue<Message> queue = redissonClient.getQueue("queue_user_" + id, codec);
+        RCountDownLatch downLatch = redissonClient.getCountDownLatch("cdl_user_" + id);
+        queue.add(message);
+        downLatch.countDown();
+    };
+
+    @Override
+    public <T> T send() {
+        if (to instanceof UserMessageSubject) {
+            queueConsumer.accept(((UserMessageSubject) to).getUserId());
+        }
+        if (to instanceof MultipleUserMessageSubject) {
+            ((MultipleUserMessageSubject) to).getUserIdList().forEach(queueConsumer);
+        }
+        if (to instanceof TopicMessageSubject) {
+            RTopic<Message> topic = redissonClient.getTopic("topic_" + ((TopicMessageSubject) to).getTopic(), codec);
+            topic.publish(message);
+        }
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public <T> void send(Consumer<T> responseConsumer) {
+        responseConsumer.accept(send());
+    }
+
+}

+ 30 - 0
hsweb-message/hsweb-message-redis/src/main/java/org/hswebframework/web/message/redis/RedissonMessager.java

@@ -0,0 +1,30 @@
+package org.hswebframework.web.message.redis;
+
+import org.hswebframework.web.message.*;
+import org.redisson.api.RedissonClient;
+
+/**
+ * @author zhouhao
+ */
+public class RedissonMessager implements Messager {
+
+    private RedissonClient redisson;
+
+    public RedissonMessager(RedissonClient redisson) {
+        this.redisson = redisson;
+    }
+
+    public void setRedisson(RedissonClient redisson) {
+        this.redisson = redisson;
+    }
+
+    @Override
+    public MessagePublish publish(Message message) {
+        return new RedissonMessagePublish(redisson, message);
+    }
+
+    @Override
+    public <M extends Message> MessageSubscribe<M> subscribe(MessageSubject subscribe) {
+        return new RedissionMessageSubscribe<>(subscribe, redisson);
+    }
+}

+ 48 - 0
hsweb-message/hsweb-message-redis/src/test/java/org/hswebframework/web/message/redis/RedissonMessagerTest.java

@@ -0,0 +1,48 @@
+package org.hswebframework.web.message.redis;
+
+import org.hswebframework.web.message.Messager;
+import org.redisson.Redisson;
+import org.redisson.api.RedissonClient;
+import org.redisson.config.Config;
+
+import static org.hswebframework.web.message.builder.StaticMessageSubjectBuilder.topic;
+
+/**
+ * TODO 完成注释
+ *
+ * @author zhouhao
+ */
+public class RedissonMessagerTest {
+
+
+    public void testSimple() {
+
+    }
+
+    public static void main(String[] args) throws InterruptedException {
+        Config config = new Config();
+        config.useSingleServer().setAddress("127.0.0.1:6379");
+        RedissonClient redisson = Redisson.create(config);
+        Messager messager = new RedissonMessager(redisson);
+
+        byte[] stat = new byte[1];
+
+//        new Thread(() -> {
+//            for (int i = 0; i < 1000; i++) {
+//                try {
+//                    Thread.sleep(1000);
+//                } catch (InterruptedException e) {
+//                    e.printStackTrace();
+//                }
+//                messager.publish(text("hello2"))
+//                        .to(topic("test"))
+//                        .from(user("admin"))
+//                        .send();
+//            }
+//        }).start();
+        messager.subscribe(topic("test"))
+                .onMessage(System.out::println)
+                .onMessage(msg -> stat[0] = 1);
+        //redisson.shutdown();
+    }
+}