rabbitmq.js 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. 'use strict';
  2. const Service = require('egg').Service;
  3. class RabbitmqService extends Service {
  4. constructor(ctx) {
  5. super(ctx);
  6. this.exType = 'topic';
  7. this.durable = true;
  8. }
  9. // 发送消息
  10. async sendQueueMsg(ex, routeKey, msg, parm) {
  11. const { mq } = this.ctx;
  12. if (mq) {
  13. await mq.topic(ex, routeKey, msg, parm);
  14. } else {
  15. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  16. }
  17. }
  18. // 接收消息
  19. async receiveQueueMsg(ex) {
  20. const self = this;
  21. const { mq } = self.ctx;
  22. if (mq) {
  23. const ch = await mq.conn.createChannel();
  24. try {
  25. await ch.assertExchange(ex, self.exType, { durable: self.durable });
  26. const q = await ch.assertQueue('', { exclusive: true });
  27. console.log('==q=', q);
  28. // 队列绑定 exchange
  29. await ch.bindQueue(q.queue, ex, '*');
  30. ch.consume(q.queue, msg => {
  31. console.log('收到消息: ', msg);
  32. const result = msg.content.toString();
  33. const headers = msg.properties.headers;
  34. // 插入待办事项到数据库中。
  35. console.log(result);
  36. console.log(headers);
  37. }, { noAck: true });
  38. } catch (e) {
  39. console.log('==e==', e);
  40. await ch.close();
  41. }
  42. } else {
  43. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  44. }
  45. }
  46. }
  47. module.exports = RabbitmqService;