Bläddra i källkod

Merge pull request #6114 from EightMonth/master

rocketmq-starter应用举例
JEECG 11 månader sedan
förälder
incheckning
7548f3aa60

+ 6 - 0
jeecg-server-cloud/jeecg-system-cloud-start/pom.xml

@@ -49,6 +49,12 @@
             <artifactId>jeecg-cloud-test-rabbitmq</artifactId>
             <version>${jeecgboot.version}</version>
         </dependency>-->
+        <!-- rocketmq例子-->
+        <!--<dependency>
+            <groupId>org.jeecgframework.boot</groupId>
+            <artifactId>jeecg-cloud-test-rocketmq</artifactId>
+            <version>${jeecgboot.version}</version>
+        </dependency>-->
         <!-- 分布式事务例子
        <dependency>
            <groupId>org.jeecgframework.boot</groupId>

+ 22 - 0
jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/pom.xml

@@ -0,0 +1,22 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.jeecgframework.boot</groupId>
+        <artifactId>jeecg-cloud-test</artifactId>
+        <version>3.6.3</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <description>消息队列测试模块</description>
+    <artifactId>jeecg-cloud-test-rocketmq</artifactId>
+
+    <dependencies>
+        <!-- rocketmq消息队列-->
+        <dependency>
+            <groupId>org.jeecgframework.boot</groupId>
+            <artifactId>jeecg-boot-starter-rocketmq</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>

+ 28 - 0
jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/constant/CloudConstant.java

@@ -0,0 +1,28 @@
+package org.jeecg.modules.test.rocketmq.constant;
+
+/**
+ * 微服务单元测试常量定义
+ * @author: zyf
+ * @date: 2022/04/21
+ */
+public interface CloudConstant {
+
+
+    /**
+     * MQ测试队列名字
+     */
+    public final static String MQ_JEECG_PLACE_ORDER = "jeecg_place_order";
+    public final static String MQ_JEECG_PLACE_ORDER_TIME = "jeecg_place_order_time";
+
+    /**
+     * MQ测试消息总线
+     */
+    public final static String MQ_DEMO_BUS_EVENT = "demoBusEvent";
+
+    /**
+     * 分布式锁lock key
+     */
+    public final static String REDISSON_DEMO_LOCK_KEY1 = "demoLockKey1";
+    public final static String REDISSON_DEMO_LOCK_KEY2 = "demoLockKey2";
+
+}

+ 61 - 0
jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/controller/JeecgMqTestController.java

@@ -0,0 +1,61 @@
+package org.jeecg.modules.test.rocketmq.controller;
+
+
+import cn.hutool.core.util.RandomUtil;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import org.jeecg.boot.starter.rabbitmq.client.RabbitMqClient;
+import org.jeecg.common.api.vo.Result;
+import org.jeecg.common.base.BaseMap;
+import org.jeecg.modules.test.rocketmq.constant.CloudConstant;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.servlet.http.HttpServletRequest;
+
+
+/**
+ * RocketMqClient发送消息
+ * @author: zyf
+ * @date: 2022/04/21
+ */
+@RestController
+@RequestMapping("/sys/test")
+@Api(tags = "【微服务】MQ单元测试")
+public class JeecgMqTestController {
+
+    @Autowired
+    private RabbitMqClient rabbitMqClient;
+
+
+    /**
+     * 测试方法:快速点击发送MQ消息
+     *  观察三个接受者如何分配处理消息:HelloReceiver1、HelloReceiver2、HelloReceiver3,会均衡分配
+     *
+     * @param req
+     * @return
+     */
+    @GetMapping(value = "/rocketmq")
+    @ApiOperation(value = "测试rocketmq", notes = "测试rocketmq")
+    public Result<?> rabbitMqClientTest(HttpServletRequest req) {
+        //rabbitmq消息队列测试
+        BaseMap map = new BaseMap();
+        map.put("orderId", RandomUtil.randomNumbers(10));
+        rabbitMqClient.sendMessage(CloudConstant.MQ_JEECG_PLACE_ORDER, map);
+        rabbitMqClient.sendMessage(CloudConstant.MQ_JEECG_PLACE_ORDER_TIME, map,2);
+        return Result.OK("MQ发送消息成功");
+    }
+
+    @GetMapping(value = "/rocketmq2")
+    @ApiOperation(value = "rocketmq消息总线测试", notes = "rocketmq消息总线测试")
+    public Result<?> rabbitmq2(HttpServletRequest req) {
+
+        //rabbitmq消息总线测试
+        BaseMap params = new BaseMap();
+        params.put("orderId", "123456");
+        rabbitMqClient.publishEvent(CloudConstant.MQ_DEMO_BUS_EVENT, params);
+        return Result.OK("MQ发送消息成功");
+    }
+}

+ 29 - 0
jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/event/DemoBusEvent.java

@@ -0,0 +1,29 @@
+package org.jeecg.modules.test.rocketmq.event;
+
+import cn.hutool.core.util.ObjectUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.jeecg.boot.starter.rabbitmq.event.EventObj;
+import org.jeecg.boot.starter.rabbitmq.event.JeecgBusEventHandler;
+import org.jeecg.common.base.BaseMap;
+import org.jeecg.modules.test.rocketmq.constant.CloudConstant;
+import org.springframework.stereotype.Component;
+
+/**
+ * 消息处理器【发布订阅】
+ * @author: zyf
+ * @date: 2022/04/21
+ */
+@Slf4j
+@Component(CloudConstant.MQ_DEMO_BUS_EVENT)
+public class DemoBusEvent implements JeecgBusEventHandler {
+
+
+    @Override
+    public void onMessage(EventObj obj) {
+        if (ObjectUtil.isNotEmpty(obj)) {
+            BaseMap baseMap = obj.getBaseMap();
+            String orderId = baseMap.get("orderId");
+            log.info("业务处理----订单ID:" + orderId);
+        }
+    }
+}

+ 27 - 0
jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/listener/HelloReceiver1.java

@@ -0,0 +1,27 @@
+package org.jeecg.modules.test.rocketmq.listener;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.jeecg.common.base.BaseMap;
+import org.jeecg.modules.test.rocketmq.constant.CloudConstant;
+import org.springframework.stereotype.Component;
+
+/**
+ * 定义接收者(可以定义N个接受者,消息会均匀的发送到N个接收者中)
+ *
+ * RabbitMq接受者1
+ * (@RabbitListener声明类上,一个类只能监听一个队列)
+ * @author: zyf
+ * @date: 2022/04/21
+ */
+@Slf4j
+@Component
+@RocketMQMessageListener(topic = CloudConstant.MQ_JEECG_PLACE_ORDER, consumerGroup = "helloReceiver1")
+public class HelloReceiver1 implements RocketMQListener<BaseMap> {
+
+    public void onMessage(BaseMap baseMap) {
+        log.info("helloReceiver1接收消息:" + baseMap);
+    }
+
+}

+ 27 - 0
jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/listener/HelloReceiver2.java

@@ -0,0 +1,27 @@
+package org.jeecg.modules.test.rocketmq.listener;//package org.jeecg.modules.cloud.rabbitmq;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.jeecg.common.base.BaseMap;
+import org.jeecg.modules.test.rocketmq.constant.CloudConstant;
+import org.springframework.stereotype.Component;
+
+/**
+ * 定义接收者(可以定义N个接受者,消息会均匀的发送到N个接收者中)
+ *
+ * RabbitMq接受者2
+ * (@RabbitListener声明类上,一个类只能监听一个队列)
+ * @author: zyf
+ * @date: 2022/04/21
+ */
+@Slf4j
+@Component
+@RocketMQMessageListener(topic = CloudConstant.MQ_JEECG_PLACE_ORDER, consumerGroup = "helloReceiver2")
+public class HelloReceiver2 implements RocketMQListener<BaseMap> {
+
+    public void onMessage(BaseMap baseMap) {
+        log.info("helloReceiver2接收消息:" + baseMap);
+    }
+
+}

+ 27 - 0
jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/listener/HelloReceiver3.java

@@ -0,0 +1,27 @@
+package org.jeecg.modules.test.rocketmq.listener;//package org.jeecg.modules.cloud.rabbitmq;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.jeecg.common.base.BaseMap;
+import org.jeecg.modules.test.rocketmq.constant.CloudConstant;
+import org.springframework.stereotype.Component;
+
+/**
+ * 定义接收者(可以定义N个接受者,消息会均匀的发送到N个接收者中)
+ *
+ * RabbitMq接受者3【我是处理人3】
+ * (@RabbitListener声明类方法上,一个类可以多监听多个队列)
+ * @author: zyf
+ * @date: 2022/04/21
+ */
+@Slf4j
+@Component
+@RocketMQMessageListener(topic = CloudConstant.MQ_JEECG_PLACE_ORDER, consumerGroup = "helloReceiver3")
+public class HelloReceiver3 implements RocketMQListener<BaseMap> {
+
+    public void onMessage(BaseMap baseMap) {
+        log.info("helloReceiver3接收消息:" + baseMap);
+    }
+
+}

+ 24 - 0
jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/listener/HelloTimeReceiver.java

@@ -0,0 +1,24 @@
+package org.jeecg.modules.test.rocketmq.listener;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.jeecg.common.base.BaseMap;
+import org.jeecg.modules.test.rocketmq.constant.CloudConstant;
+import org.springframework.stereotype.Component;
+
+/**
+ * 定义接收者(可以定义N个接受者,消息会均匀的发送到N个接收者中)
+ * @author: zyf
+ * @date: 2022/04/21
+ */
+@Slf4j
+@Component
+@RocketMQMessageListener(topic = CloudConstant.MQ_JEECG_PLACE_ORDER_TIME, consumerGroup = "helloTimeReceiver")
+public class HelloTimeReceiver implements RocketMQListener<BaseMap> {
+
+    public void onMessage(BaseMap baseMap) {
+        log.info("helloTimeReceiver接收消息:" + baseMap);
+    }
+
+}

+ 1 - 0
jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/pom.xml

@@ -24,5 +24,6 @@
         <module>jeecg-cloud-test-more</module>
         <module>jeecg-cloud-test-rabbitmq</module>
         <module>jeecg-cloud-test-seata</module>
+        <module>jeecg-cloud-test-rocketmq</module>
     </modules>
 </project>

+ 6 - 0
pom.xml

@@ -225,6 +225,12 @@
 				<artifactId>jeecg-boot-starter-rabbitmq</artifactId>
 				<version>${jeecgboot.version}</version>
 			</dependency>
+			<!--rocketmq-->
+			<dependency>
+				<groupId>org.jeecgframework.boot</groupId>
+				<artifactId>jeecg-boot-starter-rocketmq</artifactId>
+				<version>${jeecgboot.version}</version>
+			</dependency>
 			<!--分库分表shardingsphere-->
 			<dependency>
 				<groupId>org.jeecgframework.boot</groupId>