'use strict'; const Service = require('egg').Service; const _ = require('lodash'); class RabbitmqService extends Service { // mission队列处理 async mission() { const { mq } = this.ctx; const { queue } = this.ctx.app.config; if (mq && queue) { const ch = await mq.conn.createChannel(); try { // 创建队列:在没有队列的情况,直接获取会导致程序无法启动 await ch.assertQueue(queue, { durable: false }); await ch.consume(queue, msg => this.dealMission(msg), { noAck: true }); } catch (error) { this.ctx.logger.error('未找到订阅的队列'); } } else { this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!'); } } // 执行任务 async dealMission(bdata) { if (!bdata) this.ctx.logger.error('mission队列中信息不存在'); let data = bdata.content.toString(); try { data = JSON.parse(data); } catch (error) { this.ctx.logger.error('数据转换错误'); } // 因为这个地址只管添加日志,所以就直接走创建就行 this.ctx.service.logs.create(data); } /** * 发送队列消息 * @param {Any} data 消息队列数据 * @param {String} queueKey 消息队列可以 */ async sendToMqQueue(data, queueKey) { const { mq } = this.ctx; const { sendQueue } = this.ctx.app.config; let queue; // 获取队列名称 if (_.isObject(sendQueue)) { queue = sendQueue[queueKey]; } if (mq && queue) { if (!_.isString(data)) data = JSON.stringify(data); const ch = await mq.conn.createChannel(); try { // 创建队列:在没有队列的情况,直接获取会导致程序无法启动 await ch.assertQueue(queue, { durable: false }); await ch.sendToQueue(queue, Buffer.from(data)); await ch.close(); } catch (error) { console.error(error); this.ctx.logger.error('未找到订阅的队列'); } } else { this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!'); } } } module.exports = RabbitmqService;