rabbitmq.js 944 B

12345678910111213141516171819202122232425262728293031323334353637
  1. 'use strict';
  2. const Service = require('egg').Service;
  3. class RabbitmqService extends Service {
  4. constructor(ctx) {
  5. super(ctx);
  6. this.exType = 'topic';
  7. this.durable = true;
  8. }
  9. // 接收消息
  10. async receiveQueueMsg(ex) {
  11. console.log(ex);
  12. this.ctx.logger.info('调用mq的' + ex);
  13. const self = this;
  14. const { mq } = self.ctx;
  15. if (mq) {
  16. const ch = await mq.conn.createChannel();
  17. await ch.assertExchange(ex, 'topic', { durable: true });
  18. const q = await ch.assertQueue('', { exclusive: true });
  19. await ch.bindQueue(q.queue, ex, '*');
  20. await ch.consume(q.queue, msg => this.logMessage(msg, this), { noAck: true });
  21. } else {
  22. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  23. }
  24. }
  25. async logMessage(msg) {
  26. const result = msg.content.toString();
  27. const headers = msg.properties.headers;
  28. console.log(headers);
  29. }
  30. }
  31. module.exports = RabbitmqService;