'use strict'; const Service = require('egg').Service; class RabbitmqService extends Service { constructor(ctx) { super(ctx); this.exType = 'topic'; this.durable = true; } // 发送消息 async sendQueueMsg(ex, routeKey, msg, parm) { const { mq } = this.ctx; if (mq) { await mq.topic(ex, routeKey, msg, parm); } else { this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!'); } } // 接收消息 async receiveQueueMsg(ex) { const self = this; const { mq } = self.ctx; if (mq) { const ch = await mq.conn.createChannel(); try { await ch.assertExchange(ex, self.exType, { durable: self.durable }); const q = await ch.assertQueue('', { exclusive: true }); // 队列绑定 exchange await ch.bindQueue(q.queue, ex, '*'); ch.consume(q.queue, msg => { const result = msg.content.toString(); const headers = msg.properties.headers; // 插入待办事项到数据库中。 }, { noAck: true }); } catch (e) { await ch.close(); } } else { this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!'); } } } module.exports = RabbitmqService;