rabbitmq.js 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. 'use strict';
  2. const _ = require('lodash');
  3. const Service = require('egg').Service;
  4. class RabbitmqService extends Service {
  5. constructor(ctx) {
  6. super(ctx);
  7. this.exType = 'topic';
  8. this.durable = true;
  9. }
  10. // 发送消息
  11. async sendQueueMsg(ex, routeKey, msg) {
  12. const self = this;
  13. const { mq } = self.ctx;
  14. if (mq) {
  15. await mq.topic(ex, msg, routeKey, { durable: self.durable });
  16. } else {
  17. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  18. }
  19. }
  20. // 接收消息
  21. async receiveQueueMsg(ex, routeKey, receiveCallBack) {
  22. const self = this;
  23. const { mq } = self.ctx;
  24. if (mq) {
  25. const ch = await mq.conn.createChannel();
  26. try {
  27. await ch.assertExchange(ex, self.exType, { durable: self.durable });
  28. const q = await ch.assertQueue('', { exclusive: false });
  29. console.log('==q=', q);
  30. // 队列绑定 exchange
  31. await ch.bindQueue(q.queue, ex, routeKey);
  32. await ch.consume(q.queue, msg => {
  33. console.log('收到消息: ', msg);
  34. // 发送确认消息
  35. ch.ack(msg);
  36. receiveCallBack && receiveCallBack(msg);
  37. }, { noAck: false });
  38. } catch (e) {
  39. console.log('==e==', e);
  40. await ch.close();
  41. }
  42. } else {
  43. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  44. }
  45. }
  46. }
  47. module.exports = RabbitmqService;