rabbitMq.js 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. 'use strict';
  2. const Service = require('egg').Service;
  3. const _ = require('lodash');
  4. class RabbitmqService extends Service {
  5. constructor(ctx) {
  6. super(ctx);
  7. this.exType = 'topic';
  8. this.durable = true;
  9. // 定时任务队列
  10. this.task = this.app.config.taskMqConfig;
  11. }
  12. /**
  13. * 店铺系统消息
  14. * @param shop_id 店铺id
  15. */
  16. async shopMsg(shop_id) {
  17. const { mq } = this.ctx;
  18. const ex = 'shopMsg';
  19. try {
  20. const ch = await mq.conn.createChannel();
  21. await ch.assertExchange(ex, 'direct', { durable: true });
  22. await ch.publish(ex, shop_id, Buffer.from('check msg'));
  23. await ch.close();
  24. } catch (error) {
  25. console.error('mq--店铺系统消息--任务队列生成失败');
  26. console.error(error);
  27. }
  28. }
  29. // 初始化死信机制
  30. async initDeadProcess() {
  31. try {
  32. await this.initDeadQueue();
  33. await this.initTaskQueue();
  34. } catch (error) {
  35. console.error('初始化死信机制失败');
  36. return;
  37. }
  38. console.log('初始化:死信机制----成功');
  39. }
  40. /**
  41. * 初始化定时任务队列并设置死信机制
  42. */
  43. async initTaskQueue() {
  44. const { mq } = this.ctx;
  45. try {
  46. const ch = await mq.conn.createChannel();
  47. // 声明正常交换器
  48. await ch.assertExchange(this.task.ex, 'direct', { durable: true });
  49. // 声明正常队列(配置死信队列设置)
  50. const q = await ch.assertQueue(this.task.queue, { durable: true, exclusive: false, deadLetterExchange: this.task.deadEx, deadLetterRoutingKey: this.task.deadLetterRoutingKey });
  51. // 正常队里绑定至正常交换器
  52. await ch.bindQueue(q.queue, this.task.ex);
  53. } catch (error) {
  54. console.error('mq---- 死信机制-任务队列生成失败');
  55. console.error(error);
  56. }
  57. }
  58. /**
  59. * 初始化定时任务死信机制队列
  60. */
  61. async initDeadQueue() {
  62. const { mq } = this.ctx;
  63. try {
  64. const ch = await mq.conn.createChannel();
  65. await ch.assertExchange(this.task.deadEx, 'direct', { durable: true });
  66. const qr = await ch.assertQueue(this.task.deadQueue, {
  67. durable: true,
  68. exclusive: false,
  69. });
  70. await ch.bindQueue(qr.queue, this.task.deadEx, this.task.deadLetterRoutingKey);
  71. await ch.consume(
  72. qr.queue,
  73. async msg => {
  74. console.log('in dead');
  75. await this.dealTask(msg);
  76. },
  77. { noAck: true }
  78. );
  79. } catch (error) {
  80. console.error('mq---- 死信机制-死信队列生成失败');
  81. console.error(error);
  82. }
  83. }
  84. async dealTask(msg) {
  85. try {
  86. const str = msg.content.toString();
  87. const obj = JSON.parse(str);
  88. const { service, method, params } = obj;
  89. const arr = service.split('.');
  90. let ser = this.ctx.service;
  91. for (const s of arr) {
  92. ser = ser[s];
  93. }
  94. ser[method](params);
  95. } catch (error) {
  96. console.log(error);
  97. }
  98. }
  99. /**
  100. * 发送定时消息
  101. * @param {String} queue 队列名
  102. * @param {Any} data 数据
  103. * @param {Number} time 时间:分(函数内需要转为毫秒) 默认 6秒
  104. */
  105. async makeTask(queue, data, time = '0.1') {
  106. time = this.ctx.multiply(time, 60 * 1000); // 转换为毫秒
  107. const ch = await this.ctx.mq.conn.createChannel();
  108. await ch.sendToQueue(queue, Buffer.from(JSON.stringify(data)), { expiration: time });
  109. await ch.close();
  110. }
  111. // mission队列处理
  112. async mission() {
  113. const { mq } = this.ctx;
  114. if (mq) {
  115. const ch = await mq.conn.createChannel();
  116. const queue = 'mission/market';
  117. try {
  118. // 创建队列:在没有队列的情况,直接获取会导致程序无法启动
  119. await ch.assertQueue(queue, { durable: false });
  120. await ch.consume(queue, msg => this.dealMission(msg), { noAck: true });
  121. } catch (error) {
  122. this.ctx.logger.error('未找到订阅的队列');
  123. }
  124. } else {
  125. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  126. }
  127. }
  128. // 执行任务
  129. async dealMission(bdata) {
  130. if (!bdata) this.ctx.logger.error('mission队列中信息不存在');
  131. let data = bdata.content.toString();
  132. try {
  133. data = JSON.parse(data);
  134. } catch (error) {
  135. this.ctx.logger.error('数据不是object');
  136. }
  137. const { service, method, project, ...others } = data;
  138. const arr = service.split('.');
  139. let s = this.ctx.service;
  140. for (const key of arr) {
  141. s = s[key];
  142. }
  143. s[method](others);
  144. }
  145. }
  146. module.exports = RabbitmqService;