'use strict'; const Service = require('egg').Service; class RabbitmqService extends Service { constructor(ctx) { super(ctx); this.exType = 'topic'; this.durable = true; } // 发送消息 async sendQueueMsg(ex, routeKey, msg, parm) { const { mq } = this.ctx; if (mq) { await mq.topic(ex, routeKey, msg, parm); } else { this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!'); } } // 接收消息 async receiveQueueMsg(ex) { console.log(ex); const self = this; const { mq } = self.ctx; if (mq) { const ch = await mq.conn.createChannel(); try { await ch.assertExchange(ex, self.exType, { durable: self.durable }); const q = await ch.assertQueue('', { exclusive: true }); console.log('==q=', q); // 队列绑定 exchange await ch.bindQueue(q.queue, ex, '*'); const getResult = async () => { return new Promise(resolve => { ch.consume(q.queue, msg => { ch.close(); resolve(msg); }, { noAck: true }); }); }; const resultMq = await getResult(); if (resultMq != null) { const headers = resultMq.properties.headers; const data = { userid: headers.userid, type: '0', status: '0' }; const dataimpResult = await this.ctx.service.dataimp.query(data); await this.ctx.service.excelimport.getImportXLSXData(dataimpResult); } } catch (e) { console.log('==e==', e); await ch.close(); } } else { this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!'); } } } module.exports = RabbitmqService;