1234567891011121314151617181920212223242526272829303132333435363738394041424344 |
- 'use strict';
- const Service = require('egg').Service;
- class RabbitmqService extends Service {
- constructor(ctx) {
- super(ctx);
- }
- // mission队列处理
- async mission() {
- const { mq } = this.ctx;
- if (mq) {
- const ch = await mq.conn.createChannel();
- // TODO 需要写成匹配的形式
- const queue = 'mission/center';
- try {
- // 创建队列:在没有队列的情况,直接获取会导致程序无法启动
- await ch.assertQueue(queue, { durable: false });
- await ch.consume(queue, msg => this.dealMission(msg), { noAck: true });
- } catch (error) {
- this.ctx.logger.error('未找到订阅的队列');
- }
- } else {
- this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
- }
- }
- // 执行任务
- async dealMission(bdata) {
- if (!bdata) this.ctx.logger.error('mission队列中信息不存在');
- let data = bdata.content.toString();
- try {
- data = JSON.parse(data);
- } catch (error) {
- this.ctx.logger.error('数据不是object');
- }
- const { service, method, ...others } = data;
- if (service && method) this.ctx.service[service][method](others);
- }
- }
- module.exports = RabbitmqService;
|