lrf 2 years ago
parent
commit
466cb6da34
4 changed files with 29 additions and 29 deletions
  1. 1 1
      app.js
  2. 4 5
      app/controller/home.js
  3. 1 0
      app/service/trade/order.js
  4. 23 23
      app/service/util/rabbitMq.js

+ 1 - 1
app.js

@@ -13,7 +13,7 @@ class AppBootHook {
     const ctx = await this.app.createAnonymousContext();
     ctx.service.util.install.init();
     // 初始化死信机制
-    // ctx.service.util.rabbitMq.initDeadProcess();
+    ctx.service.util.rabbitMq.initDeadProcess();
     // await ctx.service.util.rabbitMq.mission();
   }
 }

+ 4 - 5
app/controller/home.js

@@ -5,13 +5,12 @@ class HomeController extends Controller {
   async index() {
     const { ctx } = this;
     ctx.body = 'hi, egg';
-    await this.ctx.service.util.rabbitMq.initTestQueue();
-    await this.ctx.service.util.rabbitMq.toDead();
+    await this.ctx.service.util.rabbitMq.initDeadProcess();
   }
   async m1() {
-    const { taskMqConfig } = this.app.config;
-    const body = this.ctx.request.body;
-    await this.ctx.service.util.rabbitMq.makeTask(taskMqConfig.queue, body);
+    // const { taskMqConfig } = this.app.config;
+    // const body = this.ctx.request.body;
+    // await this.ctx.service.util.rabbitMq.makeTask(taskMqConfig.queue, body);
     this.ctx.ok();
   }
 

+ 1 - 0
app/service/trade/order.js

@@ -123,6 +123,7 @@ class OrderService extends CrudService {
    */
   async cancel({ order_id }) {
     try {
+      assert(order_id, '缺少订单信息');
       const order = await this.model.findById(order_id);
       if (!order) throw new BusinessError(ErrorCode.DATA_NOT_EXIST, '未找到订单信息');
       // 退单分为 2 部分, 涉及价格不需要管.因为这是交钱之前的的操作,不涉及退款

+ 23 - 23
app/service/util/rabbitMq.js

@@ -19,6 +19,7 @@ class RabbitmqService extends Service {
       await this.initTaskQueue();
     } catch (error) {
       console.error('初始化死信机制失败');
+      return;
     }
     console.log('初始化:死信机制----成功');
 
@@ -56,8 +57,9 @@ class RabbitmqService extends Service {
       await ch.bindQueue(qr.queue, this.task.deadEx, this.task.deadLetterRoutingKey);
       await ch.consume(
         qr.queue,
-        msg => {
-          console.log('in dead', msg.content.toString());
+        async msg => {
+          console.log('in dead');
+          await this.dealTask(msg);
         },
         { noAck: true }
       );
@@ -66,6 +68,25 @@ class RabbitmqService extends Service {
       console.error(error);
     }
   }
+  async dealTask(msg) {
+    try {
+      const str = msg.content.toString();
+      const obj = JSON.parse(str);
+      const { service, method, params } = obj;
+      const arr = service.split('.');
+      let ser = this.ctx.service;
+      for (const s of arr) {
+        ser = ser[s];
+      }
+      const m = _.get(ser, method);
+      if (!_.isFunction(m)) return;
+      m(params);
+    } catch (error) {
+      console.log(error);
+    }
+  }
+
+
   /**
    * 发送定时消息
    * @param {String} queue 队列名
@@ -79,27 +100,6 @@ class RabbitmqService extends Service {
     await ch.close();
   }
 
-  // 接收消息
-  async receiveQueueMsg(ex) {
-    this.ctx.logger.info('调用mq的' + ex);
-    const self = this;
-    const { mq } = self.ctx;
-    if (mq) {
-      const ch = await mq.conn.createChannel();
-      await ch.assertExchange(ex, 'topic', { durable: true });
-      const q = await ch.assertQueue('', { exclusive: true });
-      await ch.bindQueue(q.queue, ex, '*');
-      await ch.consume(q.queue, msg => this.logMessage(msg, this), { noAck: true });
-    } else {
-      this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
-    }
-  }
-
-  async logMessage(msg) {
-    const result = msg.content.toString();
-    const headers = msg.properties.headers;
-  }
-
   // mission队列处理
   async mission() {
     const { mq } = this.ctx;