rabbitMq.js 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. 'use strict';
  2. const Service = require('egg').Service;
  3. const _ = require('lodash');
  4. class RabbitmqService extends Service {
  5. constructor(ctx) {
  6. super(ctx);
  7. this.exType = 'topic';
  8. this.durable = true;
  9. // 定时任务队列
  10. this.task = this.app.config.taskMqConfig;
  11. this.httpUtil = this.ctx.service.util.httpUtil;
  12. }
  13. /**
  14. * 店铺系统消息
  15. * @param shop_id 店铺id
  16. */
  17. async shopMsg(shop_id) {
  18. // const { mq } = this.ctx;
  19. // const ex = _.get(this, 'app.config.msgEx');
  20. try {
  21. // 不用mq发消息,用http到聊天那边使用websocket发消息
  22. const prefix = _.get(this.app, 'config.httpPrefix.chat');
  23. const uri = '/sendWebSocket';
  24. await this.httpUtil.cpost(`${prefix}${uri}`, { recevier: shop_id, type: 'shopMsg' });
  25. // const ch = await mq.conn.createChannel();
  26. // await ch.assertExchange(ex, 'direct', { durable: true });
  27. // console.log(ex, shop_id);
  28. // await ch.assertQueue(shop_id, { durable: false, exclusive: false });
  29. // await ch.bindQueue(shop_id, ex);
  30. // await ch.publish(ex, shop_id, Buffer.from(JSON.stringify({ type: 'shopMsg' })));
  31. // // await ch.sendToQueue(shop_id, Buffer.from(JSON.stringify({ type: 'shopMsg' })));
  32. // await ch.close();
  33. } catch (error) {
  34. console.error('mq--店铺系统消息--任务队列生成失败');
  35. console.error(error);
  36. }
  37. }
  38. // 初始化死信机制
  39. async initDeadProcess() {
  40. try {
  41. await this.initDeadQueue();
  42. await this.initTaskQueue();
  43. } catch (error) {
  44. console.error('初始化死信机制失败');
  45. return;
  46. }
  47. console.log('初始化:死信机制----成功');
  48. }
  49. /**
  50. * 初始化定时任务队列并设置死信机制
  51. */
  52. async initTaskQueue() {
  53. const { mq } = this.ctx;
  54. try {
  55. const ch = await mq.conn.createChannel();
  56. // 声明正常交换器
  57. await ch.assertExchange(this.task.ex, 'direct', { durable: true });
  58. // 声明正常队列(配置死信队列设置)
  59. const q = await ch.assertQueue(this.task.queue, {
  60. durable: true,
  61. exclusive: false,
  62. deadLetterExchange: this.task.deadEx,
  63. deadLetterRoutingKey: this.task.deadLetterRoutingKey,
  64. });
  65. // 正常队里绑定至正常交换器
  66. await ch.bindQueue(q.queue, this.task.ex);
  67. } catch (error) {
  68. console.error('mq---- 死信机制-任务队列生成失败');
  69. console.error(error);
  70. }
  71. }
  72. /**
  73. * 初始化定时任务死信机制队列
  74. */
  75. async initDeadQueue() {
  76. const { mq } = this.ctx;
  77. try {
  78. const ch = await mq.conn.createChannel();
  79. await ch.assertExchange(this.task.deadEx, 'direct', { durable: true });
  80. const qr = await ch.assertQueue(this.task.deadQueue, {
  81. durable: true,
  82. exclusive: false,
  83. });
  84. await ch.bindQueue(qr.queue, this.task.deadEx, this.task.deadLetterRoutingKey);
  85. await ch.consume(
  86. qr.queue,
  87. async msg => {
  88. console.log('in dead');
  89. await this.dealTask(msg);
  90. },
  91. { noAck: true }
  92. );
  93. } catch (error) {
  94. console.error('mq---- 死信机制-死信队列生成失败');
  95. console.error(error);
  96. }
  97. }
  98. async dealTask(msg) {
  99. try {
  100. const str = msg.content.toString();
  101. const obj = JSON.parse(str);
  102. const { service, method, params } = obj;
  103. const arr = service.split('.');
  104. let ser = this.ctx.service;
  105. for (const s of arr) {
  106. ser = ser[s];
  107. }
  108. ser[method](params);
  109. } catch (error) {
  110. console.log(error);
  111. }
  112. }
  113. /**
  114. * 发送定时消息
  115. * @param {String} queue 队列名
  116. * @param {Any} data 数据
  117. * @param {Number} time 时间:分(函数内需要转为毫秒) 默认 6秒
  118. */
  119. async makeTask(queue, data, time = '0.1') {
  120. time = this.ctx.multiply(time, 60 * 1000); // 转换为毫秒
  121. const ch = await this.ctx.mq.conn.createChannel();
  122. await ch.sendToQueue(queue, Buffer.from(JSON.stringify(data)), { expiration: time });
  123. await ch.close();
  124. }
  125. // mission队列处理
  126. async mission() {
  127. const { mq } = this.ctx;
  128. if (mq) {
  129. const ch = await mq.conn.createChannel();
  130. const queue = 'mission/market';
  131. try {
  132. // 创建队列:在没有队列的情况,直接获取会导致程序无法启动
  133. await ch.assertQueue(queue, { durable: false });
  134. await ch.consume(queue, msg => this.dealMission(msg), { noAck: true });
  135. } catch (error) {
  136. this.ctx.logger.error('未找到订阅的队列');
  137. }
  138. } else {
  139. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  140. }
  141. }
  142. // 执行任务
  143. async dealMission(bdata) {
  144. if (!bdata) this.ctx.logger.error('mission队列中信息不存在');
  145. let data = bdata.content.toString();
  146. try {
  147. data = JSON.parse(data);
  148. } catch (error) {
  149. this.ctx.logger.error('数据不是object');
  150. }
  151. const { service, method, project, ...others } = data;
  152. const arr = service.split('.');
  153. let s = this.ctx.service;
  154. for (const key of arr) {
  155. s = s[key];
  156. }
  157. s[method](others);
  158. }
  159. }
  160. module.exports = RabbitmqService;