rabbitmq.js 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. 'use strict';
  2. const msgvalue = require('../util/constants');
  3. const Service = require('egg').Service;
  4. class RabbitmqService extends Service {
  5. constructor(ctx) {
  6. super(ctx);
  7. this.exType = 'topic';
  8. this.durable = true;
  9. }
  10. // 发送消息
  11. async sendQueueMsg(ex, routeKey, msg, parm) {
  12. const { mq } = this.ctx;
  13. if (mq) {
  14. await mq.topic(ex, routeKey, msg, parm);
  15. } else {
  16. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  17. }
  18. }
  19. // 接收消息
  20. async receiveQueueMsg(ex) {
  21. const self = this;
  22. const { mq } = self.ctx;
  23. if (mq) {
  24. const ch = await mq.conn.createChannel();
  25. try {
  26. await ch.assertExchange(ex, self.exType, { durable: self.durable });
  27. const q = await ch.assertQueue('', { exclusive: true });
  28. console.log('==q=', q);
  29. // 队列绑定 exchange
  30. await ch.bindQueue(q.queue, ex, '*');
  31. ch.consume(q.queue, msg => {
  32. console.log('收到消息: ', msg);
  33. const result = msg.content.toString();
  34. const headers = msg.properties.headers;
  35. // 插入待办事项到数据库中。
  36. if (result != null) {
  37. this.service.message.create({ userid: headers.userId, name: headers.name, createtime: headers.createtime, type: headers.type, content: result, remark: headers.remark });
  38. }
  39. }, { noAck: true });
  40. } catch (e) {
  41. console.log('==e==', e);
  42. await ch.close();
  43. }
  44. } else {
  45. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  46. }
  47. }
  48. // 接收学校审核消息并发送模板消息到企业微信
  49. async receiveMsgSendWxSch(ex) {
  50. const self = this;
  51. const { mq } = self.ctx;
  52. if (mq) {
  53. const ch = await mq.conn.createChannel();
  54. try {
  55. await ch.assertExchange(ex, self.exType, { durable: self.durable });
  56. const q = await ch.assertQueue('', { exclusive: true });
  57. console.log('==q=', q);
  58. // 队列绑定 exchange
  59. await ch.bindQueue(q.queue, ex, '*');
  60. ch.consume(q.queue, msg => {
  61. console.log('收到消息: ', msg);
  62. const result = msg.content.toString();
  63. // const fields = msg.fields;
  64. // const properties = msg.properties;
  65. const headers = msg.properties.headers;
  66. // 插入待办事项到数据库中。
  67. const path = self.ctx.app.config.baseDirMq + self.ctx.app.config.corpsDirMq + headers.userId + '/users';
  68. self.ctx.curl(path, {
  69. method: 'GET',
  70. dataType: 'json',
  71. }).then(
  72. function(corpUser) {
  73. if (corpUser != null) {
  74. const _datas = corpUser.data.data;
  75. const _self = self;
  76. for (let i = 0; i < _datas.length; i++) {
  77. const _data = _datas[i];
  78. self.service.wechat.sendTemplateMsg(_self.ctx.app.config.REVIEW_TEMPLATE_ID, _data.openid, '您的申请已经得到批复', result, headers.name, '企业申请', headers.remark);
  79. }
  80. }
  81. });
  82. }, { noAck: true });
  83. } catch (e) {
  84. console.log('==e==', e);
  85. await ch.close();
  86. }
  87. } else {
  88. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  89. }
  90. }
  91. // 接收企业职位发布消息并发送模板消息到学生微信
  92. async receiveMsgSendWxCorp(ex) {
  93. const self = this;
  94. const { mq } = self.ctx;
  95. if (mq) {
  96. const ch = await mq.conn.createChannel();
  97. try {
  98. await ch.assertExchange(ex, self.exType, { durable: self.durable });
  99. const q = await ch.assertQueue('', { exclusive: true });
  100. console.log('==q=', q);
  101. // 队列绑定 exchange
  102. await ch.bindQueue(q.queue, ex, '*');
  103. ch.consume(q.queue, msg => {
  104. console.log('收到消息: ', msg);
  105. const result = msg.content.toString();
  106. // const fields = msg.fields;
  107. // const properties = msg.properties;
  108. const headers = msg.properties.headers;
  109. // 插入待办事项到数据库中。
  110. const path = self.ctx.app.config.baseDirMq + self.ctx.app.config.stusDirMq + '?corpid=' + headers.userId;
  111. // const path = 'http://10.16.5.15:8101/api/studentcorp' + headers.userid;
  112. self.ctx.curl(path, {
  113. method: 'GET',
  114. dataType: 'json',
  115. }).then(
  116. function(stus) {
  117. if (stus != null) {
  118. for (const elem of stus.data.data) {
  119. const pathStu = self.ctx.app.config.baseDirMq + self.ctx.app.config.strDirMq + elem.studid;
  120. self.ctx.curl(pathStu, {
  121. method: 'GET',
  122. dataType: 'json',
  123. }).then(
  124. // eslint-disable-next-line no-loop-func
  125. function(stud) {
  126. if (stud != null) {
  127. self.service.wechat.sendTemplateMsg(self.ctx.app.config.REVIEW_TEMPLATE_ID, stud.data.data.openid, '您订阅的企业已经有新职位发布', result, headers.name, '职位发布', headers.remark);
  128. }
  129. });
  130. }
  131. }
  132. });
  133. }, { noAck: true });
  134. } catch (e) {
  135. console.log('==e==', e);
  136. await ch.close();
  137. }
  138. } else {
  139. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  140. }
  141. }
  142. // 接收企业发送面试邀请发送给学生消息
  143. async receiveQueueMsgOffer(ex) {
  144. const self = this;
  145. const { mq } = self.ctx;
  146. if (mq) {
  147. const ch = await mq.conn.createChannel();
  148. try {
  149. await ch.assertExchange(ex, self.exType, { durable: self.durable });
  150. const q = await ch.assertQueue('', { exclusive: true });
  151. console.log('==q=', q);
  152. // 队列绑定 exchange
  153. await ch.bindQueue(q.queue, ex, '*');
  154. ch.consume(q.queue, msg => {
  155. console.log('收到消息: ', msg);
  156. const result = msg.content.toString();
  157. // const fields = msg.fields;
  158. // const properties = msg.properties;
  159. const headers = msg.properties.headers;
  160. // 插入待办事项到数据库中。
  161. const pathStu = self.ctx.app.config.baseDirMq + self.ctx.app.config.strDirMq + headers.userId;
  162. self.ctx.curl(pathStu, {
  163. method: 'GET',
  164. dataType: 'json',
  165. }).then(
  166. // eslint-disable-next-line no-loop-func
  167. function(stud) {
  168. if (stud != null) {
  169. self.service.wechat.sendTemplateMsg(self.ctx.app.config.REVIEW_TEMPLATE_ID, stud.data.data.openid, '您有新的面试邀请,请及时查看。', result, headers.name, '面试邀请', headers.remark);
  170. }
  171. });
  172. }, { noAck: true });
  173. } catch (e) {
  174. console.log('==e==', e);
  175. await ch.close();
  176. }
  177. } else {
  178. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  179. }
  180. }
  181. }
  182. module.exports = RabbitmqService;