rabbitMq.js 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. 'use strict';
  2. const Service = require('egg').Service;
  3. const _ = require('lodash');
  4. class RabbitmqService extends Service {
  5. constructor(ctx) {
  6. super(ctx);
  7. this.exType = 'topic';
  8. this.durable = true;
  9. // 定时任务队列
  10. this.task = this.app.config.taskMqConfig;
  11. }
  12. // 初始化死信机制
  13. async initDeadProcess() {
  14. try {
  15. await this.initDeadQueue();
  16. await this.initTaskQueue();
  17. } catch (error) {
  18. console.error('初始化死信机制失败');
  19. return;
  20. }
  21. console.log('初始化:死信机制----成功');
  22. }
  23. /**
  24. * 初始化定时任务队列并设置死信机制
  25. */
  26. async initTaskQueue() {
  27. const { mq } = this.ctx;
  28. try {
  29. const ch = await mq.conn.createChannel();
  30. // 声明正常交换器
  31. await ch.assertExchange(this.task.ex, 'direct', { durable: true });
  32. // 声明正常队列(配置死信队列设置)
  33. const q = await ch.assertQueue(this.task.queue, { durable: true, exclusive: false, deadLetterExchange: this.task.deadEx, deadLetterRoutingKey: this.task.deadLetterRoutingKey });
  34. // 正常队里绑定至正常交换器
  35. await ch.bindQueue(q.queue, this.task.ex);
  36. } catch (error) {
  37. console.error('mq---- 死信机制-任务队列生成失败');
  38. console.error(error);
  39. }
  40. }
  41. /**
  42. * 初始化定时任务死信机制队列
  43. */
  44. async initDeadQueue() {
  45. const { mq } = this.ctx;
  46. try {
  47. const ch = await mq.conn.createChannel();
  48. await ch.assertExchange(this.task.deadEx, 'direct', { durable: true });
  49. const qr = await ch.assertQueue(this.task.deadQueue, {
  50. durable: true,
  51. exclusive: false,
  52. });
  53. await ch.bindQueue(qr.queue, this.task.deadEx, this.task.deadLetterRoutingKey);
  54. await ch.consume(
  55. qr.queue,
  56. async msg => {
  57. console.log('in dead');
  58. await this.dealTask(msg);
  59. },
  60. { noAck: true }
  61. );
  62. } catch (error) {
  63. console.error('mq---- 死信机制-死信队列生成失败');
  64. console.error(error);
  65. }
  66. }
  67. async dealTask(msg) {
  68. try {
  69. const str = msg.content.toString();
  70. const obj = JSON.parse(str);
  71. const { service, method, params } = obj;
  72. const arr = service.split('.');
  73. let ser = this.ctx.service;
  74. for (const s of arr) {
  75. ser = ser[s];
  76. }
  77. ser[method](params);
  78. } catch (error) {
  79. console.log(error);
  80. }
  81. }
  82. /**
  83. * 发送定时消息
  84. * @param {String} queue 队列名
  85. * @param {Any} data 数据
  86. * @param {Number} time 时间:分(函数内需要转为毫秒) 默认 6秒
  87. */
  88. async makeTask(queue, data, time = '0.1') {
  89. time = this.ctx.multiply(time, 60 * 1000); // 转换为毫秒
  90. const ch = await this.ctx.mq.conn.createChannel();
  91. await ch.sendToQueue(queue, Buffer.from(JSON.stringify(data)), { expiration: time });
  92. await ch.close();
  93. }
  94. // mission队列处理
  95. async mission() {
  96. const { mq } = this.ctx;
  97. if (mq) {
  98. const ch = await mq.conn.createChannel();
  99. const queue = 'mission/market';
  100. try {
  101. // 创建队列:在没有队列的情况,直接获取会导致程序无法启动
  102. await ch.assertQueue(queue, { durable: false });
  103. await ch.consume(queue, msg => this.dealMission(msg), { noAck: true });
  104. } catch (error) {
  105. this.ctx.logger.error('未找到订阅的队列');
  106. }
  107. } else {
  108. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  109. }
  110. }
  111. // 执行任务
  112. async dealMission(bdata) {
  113. if (!bdata) this.ctx.logger.error('mission队列中信息不存在');
  114. let data = bdata.content.toString();
  115. try {
  116. data = JSON.parse(data);
  117. } catch (error) {
  118. this.ctx.logger.error('数据不是object');
  119. }
  120. const { service, method, project, ...others } = data;
  121. const arr = service.split('.');
  122. let s = this.ctx.service;
  123. for (const key of arr) {
  124. s = s[key];
  125. }
  126. s[method](others);
  127. }
  128. }
  129. module.exports = RabbitmqService;