'use strict'; const _ = require('lodash'); const Service = require('egg').Service; class RabbitmqService extends Service { constructor(ctx) { super(ctx); this.exType = 'topic'; this.durable = true; } // 发送消息 async sendQueueMsg(ex, routeKey, msg) { const self = this; const { mq } = self.ctx; if (mq) { await mq.topic(ex, msg, routeKey, { durable: self.durable }); } else { this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!'); } } // 接收消息 async receiveQueueMsg(ex, routeKey, receiveCallBack) { const self = this; const { mq } = self.ctx; if (mq) { const ch = await mq.conn.createChannel(); try { await ch.assertExchange(ex, self.exType, { durable: self.durable }); const q = await ch.assertQueue('', { exclusive: false }); console.log('==q=', q); // 队列绑定 exchange await ch.bindQueue(q.queue, ex, routeKey); await ch.consume(q.queue, msg => { console.log('收到消息: ', msg); // 发送确认消息 ch.ack(msg); receiveCallBack && receiveCallBack(msg); }, { noAck: false }); } catch (e) { console.log('==e==', e); await ch.close(); } } else { this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!'); } } } module.exports = RabbitmqService;