'use strict'; const Service = require('egg').Service; const _ = require('lodash'); class RabbitmqService extends Service { // mission队列处理 async mission() { const { mq } = this.ctx; const { queue } = this.ctx.app.config; if (mq && queue) { const ch = await mq.conn.createChannel(); // const queue = 'freeAdmin/server-user'; try { // 创建队列:在没有队列的情况,直接获取会导致程序无法启动 await ch.assertQueue(queue, { durable: false }); await ch.consume(queue, msg => this.dealMission(msg), { noAck: true }); } catch (error) { this.ctx.logger.error('未找到订阅的队列'); } } else { this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!'); } } // 执行任务 async dealMission(bdata) { // if (!bdata) this.ctx.logger.error('mission队列中信息不存在'); // let data = bdata.content.toString(); // try { // data = JSON.parse(data); // } catch (error) { // this.ctx.logger.error('数据不是object'); // } // const { service, method, project, ...others } = data; // const arr = service.split('.'); // let s = this.ctx.service; // for (const key of arr) { // s = s[key]; // } // s[method](others); } /** * 发送队列消息 * @param {Any} data 消息队列数据 * @param {String} queueKey 消息队列可以 */ async sendToMqQueue(data, queueKey) { const { mq } = this.ctx; const { sendQueue } = this.ctx.app.config; let queue; // 获取队列名称 if (_.isObject(sendQueue)) { queue = sendQueue[queueKey]; } if (mq && queue) { if (!_.isString(data)) data = JSON.stringify(data); const ch = await mq.conn.createChannel(); try { // 创建队列:在没有队列的情况,直接获取会导致程序无法启动 // await ch.assertQueue(queue, { durable: false }); await ch.sendToQueue(queue, Buffer.from(data)); await ch.close(); } catch (error) { console.error(error); this.ctx.logger.error('mq消息发送失败'); } } else { this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!'); } } } module.exports = RabbitmqService;