Browse Source

新增jms支持

zhouhao 8 năm trước cách đây
mục cha
commit
0d195f0739

+ 50 - 0
hsweb-message/hsweb-message-jms/src/main/java/org/hswebframework/web/message/jms/JmsMessagePublish.java

@@ -0,0 +1,50 @@
+package org.hswebframework.web.message.jms;
+
+import org.hswebframework.web.message.Message;
+import org.hswebframework.web.message.MessagePublish;
+import org.hswebframework.web.message.MessageSubject;
+import org.hswebframework.web.message.support.MultipleQueueMessageSubject;
+import org.hswebframework.web.message.support.QueueMessageSubject;
+import org.hswebframework.web.message.support.TopicMessageSubject;
+import org.springframework.jms.core.JmsTemplate;
+
+import java.util.Set;
+
+/**
+ * TODO 完成注释
+ *
+ * @author zhouhao
+ */
+public class JmsMessagePublish implements MessagePublish {
+    private MessageSubject subject;
+
+    private JmsTemplate queueTemplate;
+    private JmsTemplate topicTemplate;
+    private Message     message;
+
+    public JmsMessagePublish(JmsTemplate queueTemplate, JmsTemplate topicTemplate, Message message) {
+        this.queueTemplate = queueTemplate;
+        this.topicTemplate = topicTemplate;
+        this.message = message;
+    }
+
+    @Override
+    public MessagePublish to(MessageSubject subject) {
+        this.subject = subject;
+        return this;
+    }
+
+    @Override
+    public void send() {
+        if (subject instanceof QueueMessageSubject) {
+            queueTemplate.convertAndSend(((QueueMessageSubject) subject).getQueueName(), message);
+        }
+        if (subject instanceof MultipleQueueMessageSubject) {
+            Set<String> queueNames = ((MultipleQueueMessageSubject) subject).getQueueName();
+            queueNames.forEach(name -> queueTemplate.convertAndSend(name, message));
+        }
+        if (subject instanceof TopicMessageSubject) {
+            topicTemplate.convertAndSend(((TopicMessageSubject) subject).getTopic(), message);
+        }
+    }
+}

+ 70 - 0
hsweb-message/hsweb-message-jms/src/main/java/org/hswebframework/web/message/jms/JmsMessageSubscribe.java

@@ -0,0 +1,70 @@
+package org.hswebframework.web.message.jms;
+
+import org.hswebframework.web.message.Message;
+import org.hswebframework.web.message.MessageSubscribe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.jms.UncategorizedJmsException;
+import org.springframework.jms.core.JmsTemplate;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+/**
+ * TODO 完成注释
+ *
+ * @author zhouhao
+ */
+public class JmsMessageSubscribe<M extends Message> implements MessageSubscribe<M> {
+    private List<Consumer<M>> consumers = new ArrayList<>();
+    private JmsTemplate jmsTemplate;
+
+    private Executor executor;
+
+    private String subjectName;
+
+    private volatile boolean running = false;
+
+    private Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    public JmsMessageSubscribe(JmsTemplate jmsTemplate, Executor executor, String subject) {
+        this.jmsTemplate = jmsTemplate;
+        this.executor = executor;
+        this.subjectName = subject;
+    }
+
+    @Override
+    public MessageSubscribe<M> onMessage(Consumer<M> consumer) {
+        consumers.add(consumer);
+        if (!running) {
+            run();
+        }
+        return this;
+    }
+
+    private void run() {
+        running = true;
+        executor.execute(() -> {
+            while (running) {
+                try {
+                    M message = (M) jmsTemplate.receiveAndConvert(subjectName);
+                    consumers.forEach(con -> con.accept(message));
+                } catch (UncategorizedJmsException e) {
+                    logger.error("stop subscribe", e);
+                    running = false;
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                    // running = false;
+                }
+
+            }
+        });
+    }
+
+    @Override
+    public void cancel() {
+        running = false;
+    }
+}

+ 59 - 0
hsweb-message/hsweb-message-jms/src/main/java/org/hswebframework/web/message/jms/JmsMessager.java

@@ -0,0 +1,59 @@
+package org.hswebframework.web.message.jms;
+
+import org.hswebframework.web.message.*;
+import org.hswebframework.web.message.support.QueueMessageSubject;
+import org.hswebframework.web.message.support.TopicMessageSubject;
+import org.springframework.jms.core.JmsTemplate;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+/**
+ * TODO 完成注释
+ *
+ * @author zhouhao
+ */
+public class JmsMessager implements Messager {
+
+    private JmsTemplate queueTemplate;
+
+    private JmsTemplate topicTemplate;
+
+    private Executor executor;
+
+    public JmsMessager(JmsTemplate template) {
+        this(template, Executors.newCachedThreadPool());
+    }
+
+    public JmsMessager(JmsTemplate template, Executor executor) {
+        queueTemplate = new JmsTemplate(template.getConnectionFactory());
+        queueTemplate.setDestinationResolver(template.getDestinationResolver());
+        queueTemplate.setPubSubDomain(false);
+        topicTemplate = new JmsTemplate(template.getConnectionFactory());
+        topicTemplate.setDestinationResolver(template.getDestinationResolver());
+        topicTemplate.setPubSubDomain(true);
+        this.executor = executor;
+    }
+
+    @Override
+    public MessagePublish publish(Message message) {
+        return new JmsMessagePublish(queueTemplate, topicTemplate, message);
+    }
+
+    @Override
+    public <M extends Message> MessageSubscribe<M> subscribe(MessageSubject subject) {
+        String subjectName = null;
+        JmsTemplate template = null;
+        if (subject instanceof QueueMessageSubject) {
+            subjectName = ((QueueMessageSubject) subject).getQueueName();
+            template = queueTemplate;
+        } else if (subject instanceof TopicMessageSubject) {
+            subjectName = ((TopicMessageSubject) subject).getTopic();
+            template = topicTemplate;
+        }
+        if (null == subjectName) {
+            throw new UnsupportedOperationException(subject.getClass().getName());
+        }
+        return new JmsMessageSubscribe<>(template, executor, subjectName);
+    }
+}

+ 0 - 55
hsweb-message/hsweb-message-jms/src/test/java/org/hswebframework/web/message/jms/AmqpTests.java

@@ -1,55 +0,0 @@
-package org.hswebframework.web.message.jms;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.jms.annotation.EnableJms;
-import org.springframework.jms.core.JmsTemplate;
-import org.springframework.test.context.junit4.SpringRunner;
-
-
-/**
- * @author zhouhao
- */
-@RunWith(SpringRunner.class)
-@SpringBootTest(classes = SampleActiveMQApplication.class, properties = "application.yml")
-@EnableJms
-public class AmqpTests {
-
-    @Autowired
-    private JmsTemplate template;
-
-    @Test
-    public void testSend() throws InterruptedException {
-        new Thread(() -> {
-            while (true) {
-                try {
-                    Object obj = template.receiveAndConvert("test4");
-                    System.out.println("----" + obj);
-                } catch (Exception e) {
-                    break;
-                }
-            }
-        }).start();
-        new Thread(() -> {
-            while (true) {
-                try {
-                    Object obj = template.receiveAndConvert("test4");
-                    System.out.println("----222" + obj);
-                } catch (Exception e) {
-                    break;
-                }
-            }
-        }).start();
-        Thread.sleep(100);
-        int i = 0;
-        while (i < 10) {
-            template.convertAndSend("test4", "aaa" + i++);
-            Thread.sleep(1000);
-        }
-        Thread.sleep(1000);
-
-    }
-
-}

+ 0 - 26
hsweb-message/hsweb-message-jms/src/test/java/org/hswebframework/web/message/jms/Consumer.java

@@ -1,26 +0,0 @@
-package org.hswebframework.web.message.jms;
-
-import org.springframework.jms.annotation.JmsListener;
-import org.springframework.messaging.simp.annotation.SubscribeMapping;
-import org.springframework.stereotype.Component;
-
-@Component
-public class Consumer {
-
-    @JmsListener(destination = "test")
-    public void receiveQueue(String text) {
-        System.out.println("3:" + text);
-    }
-
-    @JmsListener(destination = "test")
-    public void receiveQueue4(String text) {
-        System.out.println("4:" + text);
-    }
-
-    @JmsListener(destination = "test2", subscription = "test2")
-    public void receiveQueue3(String text) {
-        System.out.println("1:" + text);
-    }
-
-
-}

+ 0 - 15
hsweb-message/hsweb-message-jms/src/test/java/org/hswebframework/web/message/jms/Consumer2.java

@@ -1,15 +0,0 @@
-package org.hswebframework.web.message.jms;
-
-import org.springframework.jms.annotation.JmsListener;
-import org.springframework.stereotype.Component;
-
-@Component
-public class Consumer2 {
-
-
-    @JmsListener(destination = "test2", subscription = "test2")
-    public void receiveQueue4(String text) {
-        System.out.println("2:" + text);
-    }
-
-}

+ 77 - 0
hsweb-message/hsweb-message-jms/src/test/java/org/hswebframework/web/message/jms/JmsMessagerTest.java

@@ -0,0 +1,77 @@
+package org.hswebframework.web.message.jms;
+
+import org.hswebframework.web.message.Messager;
+import org.hswebframework.web.message.builder.StaticMessageBuilder;
+import org.hswebframework.web.message.builder.StaticMessageSubjectBuilder;
+import org.hswebframework.web.message.support.TextMessage;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.jms.annotation.EnableJms;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import static org.hswebframework.web.message.builder.StaticMessageBuilder.*;
+import static org.hswebframework.web.message.builder.StaticMessageSubjectBuilder.queue;
+import static org.hswebframework.web.message.builder.StaticMessageSubjectBuilder.topic;
+
+/**
+ * TODO 完成注释
+ *
+ * @author zhouhao
+ */
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = SampleActiveMQApplication.class, properties = "application.yml")
+@EnableJms
+public class JmsMessagerTest {
+    @Autowired
+    private JmsTemplate jmsTemplate;
+
+    private Messager messager;
+
+    @Before
+    public void setup() {
+        messager = new JmsMessager(jmsTemplate);
+    }
+
+    static {
+        System.setProperty("org.apache.activemq.SERIALIZABLE_PACKAGES", "*");
+    }
+
+    @Test
+    public void testQueue() throws InterruptedException {
+
+        messager.<TextMessage>subscribe(queue("test"))
+                .onMessage(textMessage -> System.out.println(textMessage.getMessage() + " sub1"));
+        messager.<TextMessage>subscribe(queue("test"))
+                .onMessage(textMessage -> System.out.println(textMessage.getMessage() + " sub2"));
+
+
+        for (int i = 0; i < 100; i++) {
+            Thread.sleep(1000);
+            messager.publish(text("hello jms"))
+                    .to(queue("test"))
+                    .send();
+        }
+    }
+
+    @Test
+    public void testTopic() throws InterruptedException {
+
+        messager.<TextMessage>subscribe(topic("test"))
+                .onMessage(textMessage -> System.out.println(textMessage.getMessage() + " topic1"));
+        messager.<TextMessage>subscribe(topic("test"))
+                .onMessage(textMessage -> System.out.println(textMessage.getMessage() + " topic2"));
+
+
+        for (int i = 0; i < 10; i++) {
+            Thread.sleep(1000);
+            messager.publish(text("hello jms"))
+                    .to(topic("test"))
+                    .send();
+        }
+        
+    }
+}

+ 0 - 21
hsweb-message/hsweb-message-jms/src/test/java/org/hswebframework/web/message/jms/SampleActiveMQApplication.java

@@ -1,31 +1,10 @@
 package org.hswebframework.web.message.jms;
 
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.autoconfigure.jms.JmsAutoConfiguration;
-import org.springframework.context.annotation.Bean;
 import org.springframework.jms.annotation.EnableJms;
-import org.springframework.jms.listener.MessageListenerContainer;
-import org.springframework.jms.listener.SimpleMessageListenerContainer;
-import org.springframework.jms.listener.adapter.MessageListenerAdapter;
-
-import javax.jms.*;
 
 @SpringBootApplication
 @EnableJms
 public class SampleActiveMQApplication {
 
-
-//    @Bean
-//    public Queue queue() {
-//        return new ActiveMQQueue("test");
-//    }
-//
-//    @Bean
-//    public Topic topic() {
-//        ActiveMQTopic topic = new ActiveMQTopic("test2");
-//        return topic;
-//    }
-
 }

+ 1 - 0
hsweb-message/hsweb-message-jms/src/test/resources/application.yml

@@ -9,6 +9,7 @@ spring:
        driver-class-name : org.h2.Driver
     jms:
       pub-sub-domain: true
+
 hsweb:
     app:
       name: jms测试