'use strict'; const Service = require('egg').Service; const _ = require('lodash'); class RabbitmqService extends Service { constructor(ctx) { super(ctx); this.exType = 'topic'; this.durable = true; // 定时任务队列 this.task = this.app.config.taskMqConfig; } // 初始化死信机制 async initDeadProcess() { try { await this.initDeadQueue(); await this.initTaskQueue(); } catch (error) { console.error('初始化死信机制失败'); } console.log('初始化:死信机制----成功'); } /** * 初始化定时任务队列并设置死信机制 */ async initTaskQueue() { const { mq } = this.ctx; try { const ch = await mq.conn.createChannel(); // 声明正常交换器 await ch.assertExchange(this.task.ex, 'direct', { durable: true }); // 声明正常队列(配置死信队列设置) const q = await ch.assertQueue(this.task.queue, { exclusive: false, deadLetterExchange: this.task.deadEx, deadLetterRoutingKey: this.task.deadLetterRoutingKey }); // 正常队里绑定至正常交换器 await ch.bindQueue(q.queue, this.task.ex); } catch (error) { console.error('mq---- 死信机制-任务队列生成失败'); console.error(error); } } /** * 初始化定时任务死信机制队列 */ async initDeadQueue() { const { mq } = this.ctx; try { const ch = await mq.conn.createChannel(); await ch.assertExchange(this.task.deadEx, 'direct', { durable: true }); const qr = await ch.assertQueue(this.task.deadQueue, { exclusive: false, }); await ch.bindQueue(qr.queue, this.task.deadEx, this.task.deadLetterRoutingKey); await ch.consume( qr.queue, msg => { console.log('in dead', msg.content.toString()); }, { noAck: true } ); } catch (error) { console.error('mq---- 死信机制-死信队列生成失败'); console.error(error); } } /** * 发送定时消息 * @param {String} queue 队列名 * @param {Any} data 数据 * @param {Number} time 时间:分(函数内需要转为毫秒) 默认 6秒 */ async makeTask(queue, data, time = '0.1') { time = this.ctx.multiply(time, 60 * 1000); // 转换为毫秒 const ch = await this.ctx.mq.conn.createChannel(); await ch.sendToQueue(queue, Buffer.from(JSON.stringify(data)), { expiration: time }); await ch.close(); } // 接收消息 async receiveQueueMsg(ex) { this.ctx.logger.info('调用mq的' + ex); const self = this; const { mq } = self.ctx; if (mq) { const ch = await mq.conn.createChannel(); await ch.assertExchange(ex, 'topic', { durable: true }); const q = await ch.assertQueue('', { exclusive: true }); await ch.bindQueue(q.queue, ex, '*'); await ch.consume(q.queue, msg => this.logMessage(msg, this), { noAck: true }); } else { this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!'); } } async logMessage(msg) { const result = msg.content.toString(); const headers = msg.properties.headers; } // mission队列处理 async mission() { const { mq } = this.ctx; if (mq) { const ch = await mq.conn.createChannel(); const queue = 'mission/market'; try { // 创建队列:在没有队列的情况,直接获取会导致程序无法启动 await ch.assertQueue(queue, { durable: false }); await ch.consume(queue, msg => this.dealMission(msg), { noAck: true }); } catch (error) { this.ctx.logger.error('未找到订阅的队列'); } } else { this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!'); } } // 执行任务 async dealMission(bdata) { if (!bdata) this.ctx.logger.error('mission队列中信息不存在'); let data = bdata.content.toString(); try { data = JSON.parse(data); } catch (error) { this.ctx.logger.error('数据不是object'); } const { service, method, project, ...others } = data; const arr = service.split('.'); let s = this.ctx.service; for (const key of arr) { s = s[key]; } s[method](others); } } module.exports = RabbitmqService;