1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374 |
- 'use strict';
- const Service = require('egg').Service;
- const _ = require('lodash');
- class RabbitmqService extends Service {
- // mission队列处理
- async mission() {
- const { mq } = this.ctx;
- const { queue } = this.ctx.app.config;
- if (mq && queue) {
- const ch = await mq.conn.createChannel();
- // const queue = 'freeAdmin/server-user';
- try {
- // 创建队列:在没有队列的情况,直接获取会导致程序无法启动
- await ch.assertQueue(queue, { durable: false });
- await ch.consume(queue, msg => this.dealMission(msg), { noAck: true });
- } catch (error) {
- this.ctx.logger.error('未找到订阅的队列');
- }
- } else {
- this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
- }
- }
- // 执行任务
- async dealMission(bdata) {
- // if (!bdata) this.ctx.logger.error('mission队列中信息不存在');
- // let data = bdata.content.toString();
- // try {
- // data = JSON.parse(data);
- // } catch (error) {
- // this.ctx.logger.error('数据不是object');
- // }
- // const { service, method, project, ...others } = data;
- // const arr = service.split('.');
- // let s = this.ctx.service;
- // for (const key of arr) {
- // s = s[key];
- // }
- // s[method](others);
- }
- /**
- * 发送队列消息
- * @param {Any} data 消息队列数据
- * @param {String} queueKey 消息队列可以
- */
- async sendToMqQueue(data, queueKey) {
- const { mq } = this.ctx;
- const { sendQueue } = this.ctx.app.config;
- let queue;
- // 获取队列名称
- if (_.isObject(sendQueue)) {
- queue = sendQueue[queueKey];
- }
- if (mq && queue) {
- if (!_.isString(data)) data = JSON.stringify(data);
- const ch = await mq.conn.createChannel();
- try {
- // 创建队列:在没有队列的情况,直接获取会导致程序无法启动
- // await ch.assertQueue(queue, { durable: false });
- await ch.sendToQueue(queue, Buffer.from(data));
- await ch.close();
- } catch (error) {
- console.error(error);
- this.ctx.logger.error('mq消息发送失败');
- }
- } else {
- this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
- }
- }
- }
- module.exports = RabbitmqService;
|