소스 검색

更新进度使用mq

lrf 3 년 전
부모
커밋
7f8ceb26c3
2개의 변경된 파일82개의 추가작업 그리고 0개의 파일을 삭제
  1. 16 0
      app.js
  2. 66 0
      app/service/rabbitMq.js

+ 16 - 0
app.js

@@ -0,0 +1,16 @@
+'use strict';
+class AppBootHook {
+  constructor(app) {
+    this.app = app;
+  }
+
+  async serverDidReady() {
+    // 应用已经启动完毕
+    const ctx = await this.app.createAnonymousContext();
+    // 检查种子
+    // await ctx.service.install.index();
+    await ctx.service.rabbitMq.mission();
+  }
+
+}
+module.exports = AppBootHook;

+ 66 - 0
app/service/rabbitMq.js

@@ -0,0 +1,66 @@
+'use strict';
+
+const Service = require('egg').Service;
+const _ = require('lodash');
+
+class RabbitmqService extends Service {
+
+  constructor(ctx) {
+    super(ctx);
+    this.exType = 'topic';
+    this.durable = true;
+  }
+
+  // 接收消息
+  async receiveQueueMsg(ex) {
+    this.ctx.logger.info('调用mq的' + ex);
+    const self = this;
+    const { mq } = self.ctx;
+    if (mq) {
+      const ch = await mq.conn.createChannel();
+      await ch.assertExchange(ex, 'topic', { durable: true });
+      const q = await ch.assertQueue('', { exclusive: true });
+      await ch.bindQueue(q.queue, ex, '*');
+      await ch.consume(q.queue, msg => this.logMessage(msg, this), { noAck: true });
+    } else {
+      this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
+    }
+  }
+
+  async logMessage(msg) {
+    const result = msg.content.toString();
+    const headers = msg.properties.headers;
+  }
+
+  // mission队列处理
+  async mission() {
+    const { mq } = this.ctx;
+    if (mq) {
+      const ch = await mq.conn.createChannel();
+      const queue = 'mission/return';
+      try {
+        // 创建队列:在没有队列的情况,直接获取会导致程序无法启动
+        await ch.assertQueue(queue, { durable: false });
+        await ch.consume(queue, msg => this.dealMission(msg), { noAck: true });
+      } catch (error) {
+        this.ctx.logger.error('未找到订阅的队列');
+      }
+    } else {
+      this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
+    }
+  }
+  // 执行任务
+  async dealMission(bdata) {
+    if (!bdata) this.ctx.logger.error('mission队列中信息不存在');
+    let data = bdata.content.toString();
+    try {
+      data = JSON.parse(data);
+    } catch (error) {
+      this.ctx.logger.error('数据不是object');
+    }
+    console.log(data);
+    this.ctx.service.mission.updateProgress(data);
+  }
+}
+
+module.exports = RabbitmqService;