rabbitmq.js 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. 'use strict';
  2. const msgvalue = require('../util/constants');
  3. const sd = require('silly-datetime');
  4. const Service = require('egg').Service;
  5. class RabbitmqService extends Service {
  6. constructor(ctx) {
  7. super(ctx);
  8. this.exType = 'topic';
  9. this.durable = true;
  10. }
  11. // 发送消息
  12. async sendQueueMsg(ex, routeKey, msg, parm) {
  13. const self = this;
  14. const { mq } = self.ctx;
  15. if (mq) {
  16. await mq.topic(ex, routeKey, msg, parm);
  17. } else {
  18. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  19. }
  20. }
  21. // 接收消息
  22. async receiveQueueMsg(ex) {
  23. const self = this;
  24. const { mq } = self.ctx;
  25. if (mq) {
  26. const ch = await mq.conn.createChannel();
  27. try {
  28. await ch.assertExchange(ex, self.exType, { durable: self.durable });
  29. const q = await ch.assertQueue('', { exclusive: true });
  30. console.log('==q=', q);
  31. // 队列绑定 exchange
  32. await ch.bindQueue(q.queue, ex, '*');
  33. ch.consume(q.queue, msg => {
  34. console.log('收到消息: ', msg);
  35. const result = msg.content.toString();
  36. // const fields = msg.fields;
  37. // const properties = msg.properties;
  38. const headers = msg.properties.headers;
  39. // 插入待办事项到数据库中。
  40. if (result != null) {
  41. // const routingKey = msg.fields.routingKey;
  42. const updatetimes = sd.format(new Date(), 'YYYY-MM-DD HH:mm:ss');
  43. this.service.message.create({ userid: headers.userId, name: headers.name, createtime: updatetimes, type: headers.type, content: result, remark: '别忘记了' });
  44. }
  45. }, { noAck: true });
  46. } catch (e) {
  47. console.log('==e==', e);
  48. await ch.close();
  49. }
  50. } else {
  51. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  52. }
  53. }
  54. // 接收学校审核消息并发送模板消息到企业微信
  55. async receiveMsgSendWxSch(ex) {
  56. const self = this;
  57. const { mq } = self.ctx;
  58. if (mq) {
  59. const ch = await mq.conn.createChannel();
  60. try {
  61. await ch.assertExchange(ex, self.exType, { durable: self.durable });
  62. const q = await ch.assertQueue('', { exclusive: true });
  63. console.log('==q=', q);
  64. // 队列绑定 exchange
  65. await ch.bindQueue(q.queue, ex, '*');
  66. ch.consume(q.queue, msg => {
  67. console.log('收到消息: ', msg);
  68. const result = msg.content.toString();
  69. // const fields = msg.fields;
  70. // const properties = msg.properties;
  71. const headers = msg.properties.headers;
  72. // 插入待办事项到数据库中。
  73. const path = self.ctx.app.config.baseDir + self.ctx.app.config.corpsDir + headers.userid;
  74. const corp = self.ctx.curl(path, {
  75. method: 'GET',
  76. dataType: 'json',
  77. });
  78. // 插入待办事项到数据库中。
  79. if (corp != null) {
  80. self.service.wechat.sendTemplateMsg(msgvalue.REVIEW_TEMPLATE_ID, corp.openid, '您的申请已经得到批复', result, headers.name, '企业申请', headers.remark);
  81. }
  82. }, { noAck: true });
  83. } catch (e) {
  84. console.log('==e==', e);
  85. await ch.close();
  86. }
  87. } else {
  88. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  89. }
  90. }
  91. // 接收企业职位发布消息并发送模板消息到学生微信
  92. async receiveMsgSendWxCorp(ex) {
  93. const self = this;
  94. const { mq } = self.ctx;
  95. if (mq) {
  96. const ch = await mq.conn.createChannel();
  97. try {
  98. await ch.assertExchange(ex, self.exType, { durable: self.durable });
  99. const q = await ch.assertQueue('', { exclusive: true });
  100. console.log('==q=', q);
  101. // 队列绑定 exchange
  102. await ch.bindQueue(q.queue, ex, '*');
  103. ch.consume(q.queue, msg => {
  104. console.log('收到消息: ', msg);
  105. const result = msg.content.toString();
  106. // const fields = msg.fields;
  107. // const properties = msg.properties;
  108. const headers = msg.properties.headers;
  109. // 插入待办事项到数据库中。
  110. // const path = self.ctx.app.config.baseDir + self.ctx.app.config.stusDir + headers.userid;
  111. const path = 'http://10.16.5.15:8101/api/studentcorp' + headers.userid;
  112. const stus = self.ctx.curl(path, {
  113. method: 'GET',
  114. dataType: 'json',
  115. });
  116. for (const elem of stus.data) {
  117. const pathStu = self.ctx.app.config.baseDir + self.ctx.app.config.strDir + elem.id;
  118. const stud = self.ctx.curl(pathStu, {
  119. method: 'GET',
  120. dataType: 'json',
  121. });
  122. // 插入待办事项到数据库中。
  123. if (result != null) {
  124. self.service.wechat.sendTemplateMsg(msgvalue.REVIEW_TEMPLATE_ID, stud.data.openid, '您订阅的企业已经有新职位发布', result, headers.name, '职位发布', headers.remark);
  125. }
  126. }
  127. }, { noAck: true });
  128. } catch (e) {
  129. console.log('==e==', e);
  130. await ch.close();
  131. }
  132. } else {
  133. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  134. }
  135. }
  136. }
  137. module.exports = RabbitmqService;