'use strict'; const Service = require('egg').Service; class RabbitmqService extends Service { constructor(ctx) { super(ctx); this.exType = 'topic'; this.durable = true; } // 接收消息 async receiveQueueMsg(ex) { this.ctx.logger.info('调用mq的' + ex); const self = this; const { mq } = self.ctx; if (mq) { const ch = await mq.conn.createChannel(); await ch.assertExchange(ex, 'topic', { durable: true }); const q = await ch.assertQueue('', { exclusive: true }); await ch.bindQueue(q.queue, ex, '*'); await ch.consume(q.queue, msg => this.logMessage(msg, this), { noAck: true }); } else { this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!'); } } async logMessage(msg) { const result = msg.content.toString(); const headers = msg.properties.headers; } // mission队列处理 async mission() { const { mq } = this.ctx; if (mq) { const ch = await mq.conn.createChannel(); const queue = 'mission/market'; try { // 创建队列:在没有队列的情况,直接获取会导致程序无法启动 await ch.assertQueue(queue, { durable: false }); await ch.consume(queue, msg => this.dealMission(msg), { noAck: true }); } catch (error) { this.ctx.logger.error('未找到订阅的队列'); } } else { this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!'); } } // 执行任务 async dealMission(bdata) { if (!bdata) this.ctx.logger.error('mission队列中信息不存在'); let data = bdata.content.toString(); try { data = JSON.parse(data); } catch (error) { this.ctx.logger.error('数据不是object'); } const { service, method, ...others } = data; if (service && method) this.ctx.service[service][method](others); } } module.exports = RabbitmqService;