12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667 |
- 'use strict';
- const Service = require('egg').Service;
- const _ = require('lodash');
- class RabbitmqService extends Service {
- // mission队列处理
- async mission() {
- const { mq } = this.ctx;
- const { queue } = this.ctx.app.config;
- if (mq && queue) {
- const ch = await mq.conn.createChannel();
- try {
- // 创建队列:在没有队列的情况,直接获取会导致程序无法启动
- await ch.assertQueue(queue, { durable: false });
- await ch.consume(queue, msg => this.dealMission(msg), { noAck: true });
- } catch (error) {
- this.ctx.logger.error('未找到订阅的队列');
- }
- } else {
- this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
- }
- }
- // 执行任务
- async dealMission(bdata) {
- if (!bdata) this.ctx.logger.error('mission队列中信息不存在');
- let data = bdata.content.toString();
- try {
- data = JSON.parse(data);
- } catch (error) {
- this.ctx.logger.error('数据转换错误');
- }
- // 因为这个地址只管添加日志,所以就直接走创建就行
- this.ctx.service.logs.create(data);
- }
- /**
- * 发送队列消息
- * @param {Any} data 消息队列数据
- * @param {String} queueKey 消息队列可以
- */
- async sendToMqQueue(data, queueKey) {
- const { mq } = this.ctx;
- const { sendQueue } = this.ctx.app.config;
- let queue;
- // 获取队列名称
- if (_.isObject(sendQueue)) {
- queue = sendQueue[queueKey];
- }
- if (mq && queue) {
- if (!_.isString(data)) data = JSON.stringify(data);
- const ch = await mq.conn.createChannel();
- try {
- // 创建队列:在没有队列的情况,直接获取会导致程序无法启动
- await ch.assertQueue(queue, { durable: false });
- await ch.sendToQueue(queue, Buffer.from(data));
- await ch.close();
- } catch (error) {
- console.error(error);
- this.ctx.logger.error('未找到订阅的队列');
- }
- } else {
- this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
- }
- }
- }
- module.exports = RabbitmqService;
|