123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172 |
- 'use strict';
- const Service = require('egg').Service;
- const _ = require('lodash');
- class RabbitmqService extends Service {
- constructor(ctx) {
- super(ctx);
- this.exType = 'topic';
- this.durable = true;
- }
- // 接收消息
- async receiveQueueMsg(ex) {
- this.ctx.logger.info('调用mq的' + ex);
- const self = this;
- const { mq } = self.ctx;
- if (mq) {
- const ch = await mq.conn.createChannel();
- await ch.assertExchange(ex, 'topic', { durable: true });
- const q = await ch.assertQueue('', { exclusive: true });
- await ch.bindQueue(q.queue, ex, '*');
- await ch.consume(q.queue, msg => this.logMessage(msg, this), { noAck: true });
- } else {
- this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
- }
- }
- async logMessage(msg) {
- const result = msg.content.toString();
- const headers = msg.properties.headers;
- }
- // mission队列处理
- async mission() {
- const { mq } = this.ctx;
- if (mq) {
- const ch = await mq.conn.createChannel();
- const queue = 'mission/market';
- try {
- // 创建队列:在没有队列的情况,直接获取会导致程序无法启动
- await ch.assertQueue(queue, { durable: false });
- await ch.consume(queue, msg => this.dealMission(msg), { noAck: true });
- } catch (error) {
- this.ctx.logger.error('未找到订阅的队列');
- }
- } else {
- this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
- }
- }
- // 执行任务
- async dealMission(bdata) {
- if (!bdata) this.ctx.logger.error('mission队列中信息不存在');
- let data = bdata.content.toString();
- try {
- data = JSON.parse(data);
- } catch (error) {
- this.ctx.logger.error('数据不是object');
- }
- const { service, method, project, ...others } = data;
- const arr = service.split('.');
- let s = this.ctx.service;
- for (const key of arr) {
- s = s[key];
- }
- s[method](others);
- }
- }
- module.exports = RabbitmqService;
|