rabbitMq.js 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. 'use strict';
  2. const Service = require('egg').Service;
  3. const _ = require('lodash');
  4. class RabbitmqService extends Service {
  5. constructor(ctx) {
  6. super(ctx);
  7. this.exType = 'topic';
  8. this.durable = true;
  9. // 定时任务队列
  10. this.task = this.app.config.taskMqConfig;
  11. }
  12. // 初始化死信机制
  13. async initDeadProcess() {
  14. try {
  15. await this.initDeadQueue();
  16. await this.initTaskQueue();
  17. } catch (error) {
  18. console.error('初始化死信机制失败');
  19. }
  20. console.log('初始化:死信机制----成功');
  21. }
  22. /**
  23. * 初始化定时任务队列并设置死信机制
  24. */
  25. async initTaskQueue() {
  26. const { mq } = this.ctx;
  27. try {
  28. const ch = await mq.conn.createChannel();
  29. // 声明正常交换器
  30. await ch.assertExchange(this.task.ex, 'direct', { durable: true });
  31. // 声明正常队列(配置死信队列设置)
  32. const q = await ch.assertQueue(this.task.queue, { exclusive: false, deadLetterExchange: this.task.deadEx, deadLetterRoutingKey: this.task.deadLetterRoutingKey });
  33. // 正常队里绑定至正常交换器
  34. await ch.bindQueue(q.queue, this.task.ex);
  35. } catch (error) {
  36. console.error('mq---- 死信机制-任务队列生成失败');
  37. console.error(error);
  38. }
  39. }
  40. /**
  41. * 初始化定时任务死信机制队列
  42. */
  43. async initDeadQueue() {
  44. const { mq } = this.ctx;
  45. try {
  46. const ch = await mq.conn.createChannel();
  47. await ch.assertExchange(this.task.deadEx, 'direct', { durable: true });
  48. const qr = await ch.assertQueue(this.task.deadQueue, {
  49. exclusive: false,
  50. });
  51. await ch.bindQueue(qr.queue, this.task.deadEx, this.task.deadLetterRoutingKey);
  52. await ch.consume(
  53. qr.queue,
  54. msg => {
  55. console.log('in dead', msg.content.toString());
  56. },
  57. { noAck: true }
  58. );
  59. } catch (error) {
  60. console.error('mq---- 死信机制-死信队列生成失败');
  61. console.error(error);
  62. }
  63. }
  64. /**
  65. * 发送定时消息
  66. * @param {String} queue 队列名
  67. * @param {Any} data 数据
  68. * @param {Number} time 时间:分(函数内需要转为毫秒) 默认 6秒
  69. */
  70. async makeTask(queue, data, time = '0.1') {
  71. time = this.ctx.multiply(time, 60 * 1000); // 转换为毫秒
  72. const ch = await this.ctx.mq.conn.createChannel();
  73. await ch.sendToQueue(queue, Buffer.from(JSON.stringify(data)), { expiration: time });
  74. await ch.close();
  75. }
  76. // 接收消息
  77. async receiveQueueMsg(ex) {
  78. this.ctx.logger.info('调用mq的' + ex);
  79. const self = this;
  80. const { mq } = self.ctx;
  81. if (mq) {
  82. const ch = await mq.conn.createChannel();
  83. await ch.assertExchange(ex, 'topic', { durable: true });
  84. const q = await ch.assertQueue('', { exclusive: true });
  85. await ch.bindQueue(q.queue, ex, '*');
  86. await ch.consume(q.queue, msg => this.logMessage(msg, this), { noAck: true });
  87. } else {
  88. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  89. }
  90. }
  91. async logMessage(msg) {
  92. const result = msg.content.toString();
  93. const headers = msg.properties.headers;
  94. }
  95. // mission队列处理
  96. async mission() {
  97. const { mq } = this.ctx;
  98. if (mq) {
  99. const ch = await mq.conn.createChannel();
  100. const queue = 'mission/market';
  101. try {
  102. // 创建队列:在没有队列的情况,直接获取会导致程序无法启动
  103. await ch.assertQueue(queue, { durable: false });
  104. await ch.consume(queue, msg => this.dealMission(msg), { noAck: true });
  105. } catch (error) {
  106. this.ctx.logger.error('未找到订阅的队列');
  107. }
  108. } else {
  109. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  110. }
  111. }
  112. // 执行任务
  113. async dealMission(bdata) {
  114. if (!bdata) this.ctx.logger.error('mission队列中信息不存在');
  115. let data = bdata.content.toString();
  116. try {
  117. data = JSON.parse(data);
  118. } catch (error) {
  119. this.ctx.logger.error('数据不是object');
  120. }
  121. const { service, method, project, ...others } = data;
  122. const arr = service.split('.');
  123. let s = this.ctx.service;
  124. for (const key of arr) {
  125. s = s[key];
  126. }
  127. s[method](others);
  128. }
  129. }
  130. module.exports = RabbitmqService;