lrf 2 年之前
父节点
当前提交
a2f8fca5be
共有 9 个文件被更改,包括 207 次插入0 次删除
  1. 2 0
      app.js
  2. 8 0
      app/controller/home.js
  3. 1 0
      app/router.js
  4. 9 0
      app/service/trade/order.js
  5. 139 0
      app/service/util/rabbitMq.js
  6. 22 0
      config/config.default.js
  7. 20 0
      config/config.prod.js
  8. 5 0
      config/plugin.js
  9. 1 0
      package.json

+ 2 - 0
app.js

@@ -12,6 +12,8 @@ class AppBootHook {
     // 应用已经启动完毕
     const ctx = await this.app.createAnonymousContext();
     ctx.service.util.install.init();
+    // 初始化死信机制
+    ctx.service.util.rabbitMq.initDeadProcess();
     // await ctx.service.util.rabbitMq.mission();
   }
 }

+ 8 - 0
app/controller/home.js

@@ -5,6 +5,14 @@ class HomeController extends Controller {
   async index() {
     const { ctx } = this;
     ctx.body = 'hi, egg';
+    await this.ctx.service.util.rabbitMq.initTestQueue();
+    await this.ctx.service.util.rabbitMq.toDead();
+  }
+  async m1() {
+    const { taskMqConfig } = this.app.config;
+    const body = this.ctx.request.body;
+    await this.ctx.service.util.rabbitMq.makeTask(taskMqConfig.queue, body);
+    this.ctx.ok();
   }
 
   async makeCoupons() {

+ 1 - 0
app/router.js

@@ -6,6 +6,7 @@
 module.exports = app => {
   const { router, controller } = app;
   router.get('/', controller.home.index);
+  router.get('/m1', controller.home.m1);
   require('./z_router/dev/index')(app); // 开发者使用部分
   require('./z_router/user/index')(app); // 用户部分
   require('./z_router/system/index')(app); // 系统部分

+ 9 - 0
app/service/trade/order.js

@@ -103,6 +103,8 @@ class OrderService extends CrudService {
       // 处理优惠券,改为使用过
       if (coupon.length > 1) await this.ctx.service.user.userCoupon.useCoupon(coupon, this.tran);
       await this.tran.run();
+      // 创建定时任务(mq死信机制任务)
+      await this.toMakeTask(order_id);
       return order_id;
     } catch (error) {
       await this.tran.rollback();
@@ -313,6 +315,13 @@ class OrderService extends CrudService {
     }
     return data;
   }
+
+
+  async toMakeTask(order_id) {
+    const { taskMqConfig } = this.app.config;
+    const data = { service: 'trade.order', method: 'cancel', params: { order_id } };
+    await this.ctx.service.util.rabbitMq.makeTask(taskMqConfig.queue, data, 1);
+  }
 }
 
 module.exports = OrderService;

+ 139 - 0
app/service/util/rabbitMq.js

@@ -0,0 +1,139 @@
+'use strict';
+
+const Service = require('egg').Service;
+const _ = require('lodash');
+
+class RabbitmqService extends Service {
+  constructor(ctx) {
+    super(ctx);
+    this.exType = 'topic';
+    this.durable = true;
+    // 定时任务队列
+    this.task = this.app.config.taskMqConfig;
+  }
+
+  // 初始化死信机制
+  async initDeadProcess() {
+    try {
+      await this.initDeadQueue();
+      await this.initTaskQueue();
+    } catch (error) {
+      console.error('初始化死信机制失败');
+    }
+    console.log('初始化:死信机制----成功');
+
+  }
+
+  /**
+   * 初始化定时任务队列并设置死信机制
+   */
+  async initTaskQueue() {
+    const { mq } = this.ctx;
+    try {
+      const ch = await mq.conn.createChannel();
+      // 声明正常交换器
+      await ch.assertExchange(this.task.ex, 'direct', { durable: true });
+      // 声明正常队列(配置死信队列设置)
+      const q = await ch.assertQueue(this.task.queue, { exclusive: false, deadLetterExchange: this.task.deadEx, deadLetterRoutingKey: this.task.deadLetterRoutingKey });
+      // 正常队里绑定至正常交换器
+      await ch.bindQueue(q.queue, this.task.ex);
+    } catch (error) {
+      console.error('mq---- 死信机制-任务队列生成失败');
+      console.error(error);
+    }
+  }
+  /**
+   * 初始化定时任务死信机制队列
+   */
+  async initDeadQueue() {
+    const { mq } = this.ctx;
+    try {
+      const ch = await mq.conn.createChannel();
+      await ch.assertExchange(this.task.deadEx, 'direct', { durable: true });
+      const qr = await ch.assertQueue(this.task.deadQueue, {
+        exclusive: false,
+      });
+      await ch.bindQueue(qr.queue, this.task.deadEx, this.task.deadLetterRoutingKey);
+      await ch.consume(
+        qr.queue,
+        msg => {
+          console.log('in dead', msg.content.toString());
+        },
+        { noAck: true }
+      );
+    } catch (error) {
+      console.error('mq---- 死信机制-死信队列生成失败');
+      console.error(error);
+    }
+  }
+  /**
+   * 发送定时消息
+   * @param {String} queue 队列名
+   * @param {Any} data 数据
+   * @param {Number} time 时间:分(函数内需要转为毫秒) 默认 6秒
+   */
+  async makeTask(queue, data, time = '0.1') {
+    time = this.ctx.multiply(time, 60 * 1000); // 转换为毫秒
+    const ch = await this.ctx.mq.conn.createChannel();
+    await ch.sendToQueue(queue, Buffer.from(JSON.stringify(data)), { expiration: time });
+    await ch.close();
+  }
+
+  // 接收消息
+  async receiveQueueMsg(ex) {
+    this.ctx.logger.info('调用mq的' + ex);
+    const self = this;
+    const { mq } = self.ctx;
+    if (mq) {
+      const ch = await mq.conn.createChannel();
+      await ch.assertExchange(ex, 'topic', { durable: true });
+      const q = await ch.assertQueue('', { exclusive: true });
+      await ch.bindQueue(q.queue, ex, '*');
+      await ch.consume(q.queue, msg => this.logMessage(msg, this), { noAck: true });
+    } else {
+      this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
+    }
+  }
+
+  async logMessage(msg) {
+    const result = msg.content.toString();
+    const headers = msg.properties.headers;
+  }
+
+  // mission队列处理
+  async mission() {
+    const { mq } = this.ctx;
+    if (mq) {
+      const ch = await mq.conn.createChannel();
+      const queue = 'mission/market';
+      try {
+        // 创建队列:在没有队列的情况,直接获取会导致程序无法启动
+        await ch.assertQueue(queue, { durable: false });
+        await ch.consume(queue, msg => this.dealMission(msg), { noAck: true });
+      } catch (error) {
+        this.ctx.logger.error('未找到订阅的队列');
+      }
+    } else {
+      this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
+    }
+  }
+  // 执行任务
+  async dealMission(bdata) {
+    if (!bdata) this.ctx.logger.error('mission队列中信息不存在');
+    let data = bdata.content.toString();
+    try {
+      data = JSON.parse(data);
+    } catch (error) {
+      this.ctx.logger.error('数据不是object');
+    }
+    const { service, method, project, ...others } = data;
+    const arr = service.split('.');
+    let s = this.ctx.service;
+    for (const key of arr) {
+      s = s[key];
+    }
+    s[method](others);
+  }
+}
+
+module.exports = RabbitmqService;

+ 22 - 0
config/config.default.js

@@ -62,6 +62,28 @@ module.exports = appInfo => {
       db: 1,
     },
   };
+  // mq设置
+  config.amqp = {
+    client: {
+      hostname: 'broadcast.waityou24.cn',
+      username: 'tehq',
+      password: 'tehq',
+      vhost: 'tehq',
+    },
+    app: true,
+    agent: true,
+  };
+  // 定时任务机制设置
+  config.taskMqConfig = {
+    ex: 'task',
+    queue: 'task',
+    routingKey: 'tr',
+    deadEx: 'deadTask',
+    deadQueue: 'deadTaskQueue',
+    deadLetterRoutingKey: 'deadTr',
+  };
+
+
   // 路由设置
   config.routePrefix = '/point/v1/api';
   // 支付路由回调设置

+ 20 - 0
config/config.prod.js

@@ -14,6 +14,26 @@ module.exports = appInfo => {
       useFindAndModify: true,
     },
   };
+  // mq设置
+  config.amqp = {
+    client: {
+      hostname: '127.0.0.1',
+      username: 'tehq',
+      password: 'tehq',
+      vhost: 'tehq',
+    },
+    app: true,
+    agent: true,
+  };
+  // redis设置
+  config.redis = {
+    client: {
+      port: 6379, // Redis port
+      host: '127.0.0.1', // Redis host
+      password: '123456',
+      db: 1,
+    },
+  };
   config.emailConfig = {
     config: 'tehq',
   };

+ 5 - 0
config/plugin.js

@@ -3,3 +3,8 @@ exports.redis = {
   enable: true,
   package: 'egg-redis',
 };
+
+exports.amqp = {
+  enable: true,
+  package: 'egg-naf-amqp',
+};

+ 1 - 0
package.json

@@ -9,6 +9,7 @@
   "dependencies": {
     "decimal.js": "^10.4.1",
     "egg": "^2",
+    "egg-naf-amqp": "^0.0.13",
     "egg-redis": "^2.4.0",
     "egg-scripts": "^2",
     "jsonwebtoken": "^8.5.1",