rabbitmq.js 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. 'use strict';
  2. const Service = require('egg').Service;
  3. class RabbitmqService extends Service {
  4. constructor(ctx) {
  5. super(ctx);
  6. this.exType = 'topic';
  7. this.durable = true;
  8. }
  9. // 发送消息
  10. async sendQueueMsg(ex, routeKey, msg) {
  11. const self = this;
  12. const { mq } = self.ctx;
  13. if (mq) {
  14. await mq.topic(ex, msg, routeKey, { durable: self.durable });
  15. } else {
  16. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  17. }
  18. }
  19. // 接收消息
  20. async receiveQueueMsg(ex, routeKey, receiveCallBack) {
  21. const self = this;
  22. const { mq } = self.ctx;
  23. console.log(mq);
  24. console.log(self.exType);
  25. if (mq) {
  26. const ch = await mq.conn.createChannel();
  27. try {
  28. await ch.assertExchange(ex, self.exType, { durable: self.durable });
  29. const q = await ch.assertQueue('', { exclusive: false });
  30. console.log('==q=', q);
  31. // 队列绑定 exchange
  32. await ch.bindQueue(q.queue, ex, routeKey);
  33. ch.consume(q.queue, msg => {
  34. console.log('收到消息: ', msg);
  35. // 发送确认消息
  36. ch.ack(msg);
  37. receiveCallBack && receiveCallBack(msg);
  38. }, { noAck: false });
  39. } catch (e) {
  40. console.log('==e==', e);
  41. await ch.close();
  42. }
  43. } else {
  44. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  45. }
  46. }
  47. }
  48. module.exports = RabbitmqService;