rabbitmq.js 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. 'use strict';
  2. const msgvalue = require('../util/constants');
  3. const Service = require('egg').Service;
  4. class RabbitmqService extends Service {
  5. constructor(ctx) {
  6. super(ctx);
  7. this.exType = 'topic';
  8. this.durable = true;
  9. }
  10. // 发送消息
  11. async sendQueueMsg(ex, routeKey, msg, parm) {
  12. const { mq } = this.ctx;
  13. if (mq) {
  14. await mq.topic(ex, routeKey, msg, parm);
  15. } else {
  16. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  17. }
  18. }
  19. // 接收消息
  20. async receiveQueueMsg(ex) {
  21. const self = this;
  22. const { mq } = self.ctx;
  23. if (mq) {
  24. const ch = await mq.conn.createChannel();
  25. try {
  26. await ch.assertExchange(ex, self.exType, { durable: self.durable });
  27. const q = await ch.assertQueue('', { exclusive: true });
  28. console.log('==q=', q);
  29. // 队列绑定 exchange
  30. await ch.bindQueue(q.queue, ex, '*');
  31. await ch.consume(q.queue, msg => this.logMessage(msg, this), { noAck: true });
  32. } catch (e) {
  33. console.log('==e==', e);
  34. await ch.close();
  35. }
  36. } else {
  37. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  38. }
  39. }
  40. // 消息回调方法
  41. async logMessage(msg) {
  42. const result = msg.content.toString();
  43. const headers = msg.properties.headers;
  44. // 插入待办事项到数据库中。
  45. if (result != null) {
  46. await this.service.message.create({ userid: headers.userId, name: headers.name, createtime: headers.createtime, type: headers.type, content: result, remark: headers.remark });
  47. }
  48. }
  49. // 接收学校审核消息并发送模板消息到企业微信
  50. async receiveMsgSendWxSch(ex) {
  51. const self = this;
  52. const { mq } = self.ctx;
  53. if (mq) {
  54. const ch = await mq.conn.createChannel();
  55. try {
  56. await ch.assertExchange(ex, self.exType, { durable: self.durable });
  57. const q = await ch.assertQueue('', { exclusive: true });
  58. console.log('==q=', q);
  59. // 队列绑定 exchange
  60. await ch.bindQueue(q.queue, ex, '*');
  61. await ch.consume(q.queue, msg => this.logMessageSendWxSch(msg, this), { noAck: true });
  62. } catch (e) {
  63. console.log('==e==', e);
  64. await ch.close();
  65. }
  66. } else {
  67. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  68. }
  69. }
  70. // 消息回调方法
  71. async logMessageSendWxSch(msg) {
  72. const result = msg.content.toString();
  73. const headers = msg.properties.headers;
  74. const path = this.ctx.app.config.baseDirMq + this.ctx.app.config.corpsDirMq + headers.userId + '/users';
  75. const corpUser = await this.ctx.curl(path, {
  76. method: 'GET',
  77. dataType: 'json',
  78. });
  79. if (corpUser != null) {
  80. const _datas = corpUser.data.data;
  81. for (const elm of _datas) {
  82. await this.service.wechat.sendTemplateMsg(this.ctx.app.config.REVIEW_TEMPLATE_ID, elm.openid, '您的申请已经得到批复', result, headers.name, '企业申请', headers.remark);
  83. }
  84. }
  85. }
  86. // 接收企业职位发布消息并发送模板消息到学生微信
  87. async receiveMsgSendWxCorp(ex) {
  88. const self = this;
  89. const { mq } = self.ctx;
  90. if (mq) {
  91. const ch = await mq.conn.createChannel();
  92. try {
  93. await ch.assertExchange(ex, self.exType, { durable: self.durable });
  94. const q = await ch.assertQueue('', { exclusive: true });
  95. console.log('==q=', q);
  96. // 队列绑定 exchange
  97. await ch.bindQueue(q.queue, ex, '*');
  98. await ch.consume(q.queue, msg => this.logMessageSendWxCorp(msg, this), { noAck: true });
  99. } catch (e) {
  100. console.log('==e==', e);
  101. await ch.close();
  102. }
  103. } else {
  104. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  105. }
  106. }
  107. // 消息回调方法
  108. async logMessageSendWxCorp(msg) {
  109. const result = msg.content.toString();
  110. const headers = msg.properties.headers;
  111. const path = this.ctx.app.config.baseDirMq + this.ctx.app.config.stusDirMq + '?corpid=' + headers.userId;
  112. // const path = 'http://10.16.5.15:8101/api/studentcorp' + headers.userid;
  113. const stus = await this.ctx.curl(path, {
  114. method: 'GET',
  115. dataType: 'json',
  116. });
  117. for (const elem of stus.data.data) {
  118. const pathStu = this.ctx.app.config.baseDirMq + this.ctx.app.config.strDirMq + elem.studid;
  119. const stud = await this.ctx.curl(pathStu, {
  120. method: 'GET',
  121. dataType: 'json',
  122. });
  123. if (stud != null) {
  124. await this.service.wechat.sendTemplateMsg(this.ctx.app.config.REVIEW_TEMPLATE_ID, stud.data.data.openid, '您订阅的企业已经有新职位发布', result, headers.name, '职位发布', headers.remark);
  125. }
  126. }
  127. }
  128. // 接收企业发送面试邀请发送给学生消息
  129. async receiveQueueMsgOffer(ex) {
  130. const self = this;
  131. const { mq } = self.ctx;
  132. if (mq) {
  133. const ch = await mq.conn.createChannel();
  134. try {
  135. await ch.assertExchange(ex, self.exType, { durable: self.durable });
  136. const q = await ch.assertQueue('', { exclusive: true });
  137. console.log('==q=', q);
  138. // 队列绑定 exchange
  139. await ch.bindQueue(q.queue, ex, '*');
  140. await ch.consume(q.queue, msg => this.logMessageSendOffer(msg, this), { noAck: true });
  141. } catch (e) {
  142. console.log('==e==', e);
  143. await ch.close();
  144. }
  145. } else {
  146. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  147. }
  148. }
  149. // 消息回调方法
  150. async logMessageSendOffer(msg) {
  151. const result = msg.content.toString();
  152. const headers = msg.properties.headers;
  153. const pathStu = this.ctx.app.config.baseDirMq + this.ctx.app.config.strDirMq + headers.userId;
  154. const stud = await this.ctx.curl(pathStu, {
  155. method: 'GET',
  156. dataType: 'json',
  157. });
  158. if (stud != null) {
  159. await this.service.wechat.sendTemplateMsg(this.ctx.app.config.OFFER_TEMPLATE_ID, stud.data.data.openid, '您有新的面试邀请,请及时查看。', result, headers.name, headers.createtime, headers.remark);
  160. }
  161. }
  162. }
  163. module.exports = RabbitmqService;