123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- 'use strict';
- const Service = require('egg').Service;
- const _ = require('lodash');
- class RabbitmqService extends Service {
- constructor(ctx) {
- super(ctx);
- this.exType = 'topic';
- this.durable = true;
- // 定时任务队列
- this.task = this.app.config.taskMqConfig;
- this.httpUtil = this.ctx.service.util.httpUtil;
- }
- /**
- * 店铺系统消息
- * @param shop_id 店铺id
- */
- async shopMsg(shop_id) {
- // const { mq } = this.ctx;
- // const ex = _.get(this, 'app.config.msgEx');
- try {
- // 不用mq发消息,用http到聊天那边使用websocket发消息
- const prefix = _.get(this.app, 'config.httpPrefix.chat');
- const uri = '/sendWebSocket';
- await this.httpUtil.cpost(`${prefix}${uri}`, { recevier: shop_id, type: 'shopMsg' });
- // const ch = await mq.conn.createChannel();
- // await ch.assertExchange(ex, 'direct', { durable: true });
- // console.log(ex, shop_id);
- // await ch.assertQueue(shop_id, { durable: false, exclusive: false });
- // await ch.bindQueue(shop_id, ex);
- // await ch.publish(ex, shop_id, Buffer.from(JSON.stringify({ type: 'shopMsg' })));
- // // await ch.sendToQueue(shop_id, Buffer.from(JSON.stringify({ type: 'shopMsg' })));
- // await ch.close();
- } catch (error) {
- console.error('mq--店铺系统消息--任务队列生成失败');
- console.error(error);
- }
- }
- // 初始化死信机制
- async initDeadProcess() {
- try {
- await this.initDeadQueue();
- await this.initTaskQueue();
- } catch (error) {
- console.error('初始化死信机制失败');
- return;
- }
- console.log('初始化:死信机制----成功');
- }
- /**
- * 初始化定时任务队列并设置死信机制
- */
- async initTaskQueue() {
- const { mq } = this.ctx;
- try {
- const ch = await mq.conn.createChannel();
- // 声明正常交换器
- await ch.assertExchange(this.task.ex, 'direct', { durable: true });
- // 声明正常队列(配置死信队列设置)
- const q = await ch.assertQueue(this.task.queue, {
- durable: true,
- exclusive: false,
- deadLetterExchange: this.task.deadEx,
- deadLetterRoutingKey: this.task.deadLetterRoutingKey,
- });
- // 正常队里绑定至正常交换器
- await ch.bindQueue(q.queue, this.task.ex);
- } catch (error) {
- console.error('mq---- 死信机制-任务队列生成失败');
- console.error(error);
- }
- }
- /**
- * 初始化定时任务死信机制队列
- */
- async initDeadQueue() {
- const { mq } = this.ctx;
- try {
- const ch = await mq.conn.createChannel();
- await ch.assertExchange(this.task.deadEx, 'direct', { durable: true });
- const qr = await ch.assertQueue(this.task.deadQueue, {
- durable: true,
- exclusive: false,
- });
- await ch.bindQueue(qr.queue, this.task.deadEx, this.task.deadLetterRoutingKey);
- await ch.consume(
- qr.queue,
- async msg => {
- console.log('in dead');
- await this.dealTask(msg);
- },
- { noAck: true }
- );
- } catch (error) {
- console.error('mq---- 死信机制-死信队列生成失败');
- console.error(error);
- }
- }
- async dealTask(msg) {
- try {
- const str = msg.content.toString();
- const obj = JSON.parse(str);
- const { service, method, params } = obj;
- const arr = service.split('.');
- let ser = this.ctx.service;
- for (const s of arr) {
- ser = ser[s];
- }
- ser[method](params);
- } catch (error) {
- console.log(error);
- }
- }
- /**
- * 发送定时消息
- * @param {String} queue 队列名
- * @param {Any} data 数据
- * @param {Number} time 时间:分(函数内需要转为毫秒) 默认 6秒
- */
- async makeTask(queue, data, time = '0.1') {
- time = this.ctx.multiply(time, 60 * 1000); // 转换为毫秒
- const ch = await this.ctx.mq.conn.createChannel();
- await ch.sendToQueue(queue, Buffer.from(JSON.stringify(data)), { expiration: time });
- await ch.close();
- }
- // mission队列处理
- async mission() {
- const { mq } = this.ctx;
- if (mq) {
- const ch = await mq.conn.createChannel();
- const queue = 'mission/market';
- try {
- // 创建队列:在没有队列的情况,直接获取会导致程序无法启动
- await ch.assertQueue(queue, { durable: false });
- await ch.consume(queue, msg => this.dealMission(msg), { noAck: true });
- } catch (error) {
- this.ctx.logger.error('未找到订阅的队列');
- }
- } else {
- this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
- }
- }
- // 执行任务
- async dealMission(bdata) {
- if (!bdata) this.ctx.logger.error('mission队列中信息不存在');
- let data = bdata.content.toString();
- try {
- data = JSON.parse(data);
- } catch (error) {
- this.ctx.logger.error('数据不是object');
- }
- const { service, method, project, ...others } = data;
- const arr = service.split('.');
- let s = this.ctx.service;
- for (const key of arr) {
- s = s[key];
- }
- s[method](others);
- }
- }
- module.exports = RabbitmqService;
|