'use strict'; const Service = require('egg').Service; const _ = require('lodash'); class RabbitmqService extends Service { constructor(ctx) { super(ctx); this.exType = 'topic'; this.durable = true; // 定时任务队列 this.task = this.app.config.taskMqConfig; this.httpUtil = this.ctx.service.util.httpUtil; } /** * 店铺系统消息 * @param shop_id 店铺id */ async shopMsg(shop_id) { // const { mq } = this.ctx; // const ex = _.get(this, 'app.config.msgEx'); try { // 不用mq发消息,用http到聊天那边使用websocket发消息 const prefix = _.get(this.app, 'config.httpPrefix.chat'); const uri = '/sendWebSocket'; await this.httpUtil.cpost(`${prefix}${uri}`, { recevier: shop_id, type: 'shopMsg' }); // const ch = await mq.conn.createChannel(); // await ch.assertExchange(ex, 'direct', { durable: true }); // console.log(ex, shop_id); // await ch.assertQueue(shop_id, { durable: false, exclusive: false }); // await ch.bindQueue(shop_id, ex); // await ch.publish(ex, shop_id, Buffer.from(JSON.stringify({ type: 'shopMsg' }))); // // await ch.sendToQueue(shop_id, Buffer.from(JSON.stringify({ type: 'shopMsg' }))); // await ch.close(); } catch (error) { console.error('mq--店铺系统消息--任务队列生成失败'); console.error(error); } } // 初始化死信机制 async initDeadProcess() { try { await this.initDeadQueue(); await this.initTaskQueue(); } catch (error) { console.error('初始化死信机制失败'); return; } console.log('初始化:死信机制----成功'); } /** * 初始化定时任务队列并设置死信机制 */ async initTaskQueue() { const { mq } = this.ctx; try { const ch = await mq.conn.createChannel(); // 声明正常交换器 await ch.assertExchange(this.task.ex, 'direct', { durable: true }); // 声明正常队列(配置死信队列设置) const q = await ch.assertQueue(this.task.queue, { durable: true, exclusive: false, deadLetterExchange: this.task.deadEx, deadLetterRoutingKey: this.task.deadLetterRoutingKey, }); // 正常队里绑定至正常交换器 await ch.bindQueue(q.queue, this.task.ex); } catch (error) { console.error('mq---- 死信机制-任务队列生成失败'); console.error(error); } } /** * 初始化定时任务死信机制队列 */ async initDeadQueue() { const { mq } = this.ctx; try { const ch = await mq.conn.createChannel(); await ch.assertExchange(this.task.deadEx, 'direct', { durable: true }); const qr = await ch.assertQueue(this.task.deadQueue, { durable: true, exclusive: false, }); await ch.bindQueue(qr.queue, this.task.deadEx, this.task.deadLetterRoutingKey); await ch.consume( qr.queue, async msg => { console.log('in dead'); await this.dealTask(msg); }, { noAck: true } ); } catch (error) { console.error('mq---- 死信机制-死信队列生成失败'); console.error(error); } } async dealTask(msg) { try { const str = msg.content.toString(); const obj = JSON.parse(str); const { service, method, params } = obj; const arr = service.split('.'); let ser = this.ctx.service; for (const s of arr) { ser = ser[s]; } ser[method](params); } catch (error) { console.log(error); } } /** * 发送定时消息 * @param {String} queue 队列名 * @param {Any} data 数据 * @param {Number} time 时间:分(函数内需要转为毫秒) 默认 6秒 */ async makeTask(queue, data, time = '0.1') { time = this.ctx.multiply(time, 60 * 1000); // 转换为毫秒 const ch = await this.ctx.mq.conn.createChannel(); await ch.sendToQueue(queue, Buffer.from(JSON.stringify(data)), { expiration: time }); await ch.close(); } // mission队列处理 async mission() { const { mq } = this.ctx; if (mq) { const ch = await mq.conn.createChannel(); const queue = 'mission/market'; try { // 创建队列:在没有队列的情况,直接获取会导致程序无法启动 await ch.assertQueue(queue, { durable: false }); await ch.consume(queue, msg => this.dealMission(msg), { noAck: true }); } catch (error) { this.ctx.logger.error('未找到订阅的队列'); } } else { this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!'); } } // 执行任务 async dealMission(bdata) { if (!bdata) this.ctx.logger.error('mission队列中信息不存在'); let data = bdata.content.toString(); try { data = JSON.parse(data); } catch (error) { this.ctx.logger.error('数据不是object'); } const { service, method, project, ...others } = data; const arr = service.split('.'); let s = this.ctx.service; for (const key of arr) { s = s[key]; } s[method](others); } } module.exports = RabbitmqService;