rabbitMq.js 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. 'use strict';
  2. const Service = require('egg').Service;
  3. const _ = require('lodash');
  4. class RabbitmqService extends Service {
  5. constructor(ctx) {
  6. super(ctx);
  7. this.exType = 'topic';
  8. this.durable = true;
  9. // 定时任务队列
  10. this.task = this.app.config.taskMqConfig;
  11. }
  12. // 初始化死信机制
  13. async initDeadProcess() {
  14. try {
  15. await this.initDeadQueue();
  16. await this.initTaskQueue();
  17. } catch (error) {
  18. console.error('初始化死信机制失败');
  19. return;
  20. }
  21. console.log('初始化:死信机制----成功');
  22. }
  23. /**
  24. * 初始化定时任务队列并设置死信机制
  25. */
  26. async initTaskQueue() {
  27. const { mq } = this.ctx;
  28. try {
  29. const ch = await mq.conn.createChannel();
  30. // 声明正常交换器
  31. await ch.assertExchange(this.task.ex, 'direct', { durable: true });
  32. // 声明正常队列(配置死信队列设置)
  33. const q = await ch.assertQueue(this.task.queue, { exclusive: false, deadLetterExchange: this.task.deadEx, deadLetterRoutingKey: this.task.deadLetterRoutingKey });
  34. // 正常队里绑定至正常交换器
  35. await ch.bindQueue(q.queue, this.task.ex);
  36. } catch (error) {
  37. console.error('mq---- 死信机制-任务队列生成失败');
  38. console.error(error);
  39. }
  40. }
  41. /**
  42. * 初始化定时任务死信机制队列
  43. */
  44. async initDeadQueue() {
  45. const { mq } = this.ctx;
  46. try {
  47. const ch = await mq.conn.createChannel();
  48. await ch.assertExchange(this.task.deadEx, 'direct', { durable: true });
  49. const qr = await ch.assertQueue(this.task.deadQueue, {
  50. exclusive: false,
  51. });
  52. await ch.bindQueue(qr.queue, this.task.deadEx, this.task.deadLetterRoutingKey);
  53. await ch.consume(
  54. qr.queue,
  55. async msg => {
  56. console.log('in dead');
  57. await this.dealTask(msg);
  58. },
  59. { noAck: true }
  60. );
  61. } catch (error) {
  62. console.error('mq---- 死信机制-死信队列生成失败');
  63. console.error(error);
  64. }
  65. }
  66. async dealTask(msg) {
  67. try {
  68. const str = msg.content.toString();
  69. const obj = JSON.parse(str);
  70. const { service, method, params } = obj;
  71. const arr = service.split('.');
  72. let ser = this.ctx.service;
  73. for (const s of arr) {
  74. ser = ser[s];
  75. }
  76. ser[method](params);
  77. } catch (error) {
  78. console.log(error);
  79. }
  80. }
  81. /**
  82. * 发送定时消息
  83. * @param {String} queue 队列名
  84. * @param {Any} data 数据
  85. * @param {Number} time 时间:分(函数内需要转为毫秒) 默认 6秒
  86. */
  87. async makeTask(queue, data, time = '0.1') {
  88. time = this.ctx.multiply(time, 60 * 1000); // 转换为毫秒
  89. const ch = await this.ctx.mq.conn.createChannel();
  90. await ch.sendToQueue(queue, Buffer.from(JSON.stringify(data)), { expiration: time });
  91. await ch.close();
  92. }
  93. // mission队列处理
  94. async mission() {
  95. const { mq } = this.ctx;
  96. if (mq) {
  97. const ch = await mq.conn.createChannel();
  98. const queue = 'mission/market';
  99. try {
  100. // 创建队列:在没有队列的情况,直接获取会导致程序无法启动
  101. await ch.assertQueue(queue, { durable: false });
  102. await ch.consume(queue, msg => this.dealMission(msg), { noAck: true });
  103. } catch (error) {
  104. this.ctx.logger.error('未找到订阅的队列');
  105. }
  106. } else {
  107. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  108. }
  109. }
  110. // 执行任务
  111. async dealMission(bdata) {
  112. if (!bdata) this.ctx.logger.error('mission队列中信息不存在');
  113. let data = bdata.content.toString();
  114. try {
  115. data = JSON.parse(data);
  116. } catch (error) {
  117. this.ctx.logger.error('数据不是object');
  118. }
  119. const { service, method, project, ...others } = data;
  120. const arr = service.split('.');
  121. let s = this.ctx.service;
  122. for (const key of arr) {
  123. s = s[key];
  124. }
  125. s[method](others);
  126. }
  127. }
  128. module.exports = RabbitmqService;