rabbitmq.js 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. 'use strict';
  2. const msgvalue = require('../util/constants');
  3. const Service = require('egg').Service;
  4. class RabbitmqService extends Service {
  5. constructor(ctx) {
  6. super(ctx);
  7. this.exType = 'topic';
  8. this.durable = true;
  9. }
  10. // 发送消息
  11. async sendQueueMsg(ex, routeKey, msg, parm) {
  12. const { mq } = this.ctx;
  13. if (mq) {
  14. await mq.topic(ex, routeKey, msg, parm);
  15. } else {
  16. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  17. }
  18. }
  19. // 接收消息
  20. async receiveQueueMsg(ex) {
  21. const self = this;
  22. const { mq } = self.ctx;
  23. if (mq) {
  24. const ch = await mq.conn.createChannel();
  25. try {
  26. await ch.assertExchange(ex, self.exType, { durable: self.durable });
  27. const q = await ch.assertQueue('', { exclusive: true });
  28. console.log('==q=', q);
  29. // 队列绑定 exchange
  30. await ch.bindQueue(q.queue, ex, '*');
  31. ch.consume(q.queue, msg => {
  32. console.log('收到消息: ', msg);
  33. const result = msg.content.toString();
  34. const headers = msg.properties.headers;
  35. // 插入待办事项到数据库中。
  36. if (result != null) {
  37. this.service.message.create({ userid: headers.userId, name: headers.name, createtime: headers.createtime, type: headers.type, content: result, remark: headers.remark });
  38. }
  39. }, { noAck: true });
  40. } catch (e) {
  41. console.log('==e==', e);
  42. await ch.close();
  43. }
  44. } else {
  45. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  46. }
  47. }
  48. // 接收学校审核消息并发送模板消息到企业微信
  49. async receiveMsgSendWxSch(ex) {
  50. const self = this;
  51. const { mq } = self.ctx;
  52. if (mq) {
  53. const ch = await mq.conn.createChannel();
  54. try {
  55. await ch.assertExchange(ex, self.exType, { durable: self.durable });
  56. const q = await ch.assertQueue('', { exclusive: true });
  57. console.log('==q=', q);
  58. // 队列绑定 exchange
  59. await ch.bindQueue(q.queue, ex, '*');
  60. ch.consume(q.queue, msg => {
  61. console.log('收到消息: ', msg);
  62. const result = msg.content.toString();
  63. // const fields = msg.fields;
  64. // const properties = msg.properties;
  65. const headers = msg.properties.headers;
  66. // 插入待办事项到数据库中。
  67. const path = self.ctx.app.config.baseDir + self.ctx.app.config.corpsDir + headers.userid;
  68. const corp = self.ctx.curl(path, {
  69. method: 'GET',
  70. dataType: 'json',
  71. });
  72. // 插入待办事项到数据库中。
  73. if (corp != null) {
  74. self.service.wechat.sendTemplateMsg(msgvalue.REVIEW_TEMPLATE_ID, corp.openid, '您的申请已经得到批复', result, headers.name, '企业申请', headers.remark);
  75. }
  76. }, { noAck: true });
  77. } catch (e) {
  78. console.log('==e==', e);
  79. await ch.close();
  80. }
  81. } else {
  82. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  83. }
  84. }
  85. // 接收企业职位发布消息并发送模板消息到学生微信
  86. async receiveMsgSendWxCorp(ex) {
  87. const self = this;
  88. const { mq } = self.ctx;
  89. if (mq) {
  90. const ch = await mq.conn.createChannel();
  91. try {
  92. await ch.assertExchange(ex, self.exType, { durable: self.durable });
  93. const q = await ch.assertQueue('', { exclusive: true });
  94. console.log('==q=', q);
  95. // 队列绑定 exchange
  96. await ch.bindQueue(q.queue, ex, '*');
  97. ch.consume(q.queue, msg => {
  98. console.log('收到消息: ', msg);
  99. const result = msg.content.toString();
  100. // const fields = msg.fields;
  101. // const properties = msg.properties;
  102. const headers = msg.properties.headers;
  103. // 插入待办事项到数据库中。
  104. const path = self.ctx.app.config.baseDir + self.ctx.app.config.stusDir + headers.userid;
  105. // const path = 'http://10.16.5.15:8101/api/studentcorp' + headers.userid;
  106. const stus = self.ctx.curl(path, {
  107. method: 'GET',
  108. dataType: 'json',
  109. });
  110. for (const elem of stus.data) {
  111. const pathStu = self.ctx.app.config.baseDir + self.ctx.app.config.strDir + elem.id;
  112. const stud = self.ctx.curl(pathStu, {
  113. method: 'GET',
  114. dataType: 'json',
  115. });
  116. // 插入待办事项到数据库中。
  117. if (stud != null) {
  118. self.service.wechat.sendTemplateMsg(msgvalue.REVIEW_TEMPLATE_ID, stud.data.openid, '您订阅的企业已经有新职位发布', result, headers.name, '职位发布', headers.remark);
  119. }
  120. }
  121. }, { noAck: true });
  122. } catch (e) {
  123. console.log('==e==', e);
  124. await ch.close();
  125. }
  126. } else {
  127. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  128. }
  129. }
  130. }
  131. module.exports = RabbitmqService;