123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354 |
- 'use strict';
- const _ = require('lodash');
- const Service = require('egg').Service;
- class RabbitmqService extends Service {
- constructor(ctx) {
- super(ctx);
- this.exType = 'topic';
- this.durable = true;
- }
- // 发送消息
- async sendQueueMsg(ex, routeKey, msg) {
- const self = this;
- const { mq } = self.ctx;
- if (mq) {
- await mq.topic(ex, msg, routeKey, { durable: self.durable });
- } else {
- this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
- }
- }
- // 接收消息
- async receiveQueueMsg(ex, routeKey, receiveCallBack) {
- const self = this;
- const { mq } = self.ctx;
- if (mq) {
- const ch = await mq.conn.createChannel();
- try {
- await ch.assertExchange(ex, self.exType, { durable: self.durable });
- const q = await ch.assertQueue('', { exclusive: false });
- console.log('==q=', q);
- // 队列绑定 exchange
- await ch.bindQueue(q.queue, ex, routeKey);
- await ch.consume(q.queue, msg => {
- console.log('收到消息: ', msg);
- // 发送确认消息
- ch.ack(msg);
- receiveCallBack && receiveCallBack(msg);
- }, { noAck: false });
- } catch (e) {
- console.log('==e==', e);
- await ch.close();
- }
- } else {
- this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
- }
- }
- }
- module.exports = RabbitmqService;
|