123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 |
- 'use strict';
- const msgvalue = require('../util/constants');
- const Service = require('egg').Service;
- class RabbitmqService extends Service {
- constructor(ctx) {
- super(ctx);
- this.exType = 'topic';
- this.durable = true;
- }
- // 发送消息
- async sendQueueMsg(ex, routeKey, msg, parm) {
- const { mq } = this.ctx;
- if (mq) {
- await mq.topic(ex, routeKey, msg, parm);
- } else {
- this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
- }
- }
- // 接收消息
- async receiveQueueMsg(ex) {
- const self = this;
- const { mq } = self.ctx;
- if (mq) {
- const ch = await mq.conn.createChannel();
- try {
- await ch.assertExchange(ex, self.exType, { durable: self.durable });
- const q = await ch.assertQueue('', { exclusive: true });
- console.log('==q=', q);
- // 队列绑定 exchange
- await ch.bindQueue(q.queue, ex, '*');
- await ch.consume(q.queue, msg => this.logMessage(msg, this), { noAck: true });
- } catch (e) {
- console.log('==e==', e);
- await ch.close();
- }
- } else {
- this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
- }
- }
- // 消息回调方法
- async logMessage(msg) {
- const result = msg.content.toString();
- const headers = msg.properties.headers;
- // 插入待办事项到数据库中。
- if (result != null) {
- await this.service.message.create({ userid: headers.userId, name: headers.name, createtime: headers.createtime, type: headers.type, content: result, remark: headers.remark });
- }
- }
- // 接收学校审核消息并发送模板消息到企业微信
- async receiveMsgSendWxSch(ex) {
- const self = this;
- const { mq } = self.ctx;
- if (mq) {
- const ch = await mq.conn.createChannel();
- try {
- await ch.assertExchange(ex, self.exType, { durable: self.durable });
- const q = await ch.assertQueue('', { exclusive: true });
- console.log('==q=', q);
- // 队列绑定 exchange
- await ch.bindQueue(q.queue, ex, '*');
- await ch.consume(q.queue, msg => this.logMessageSendWxSch(msg, this), { noAck: true });
- } catch (e) {
- console.log('==e==', e);
- await ch.close();
- }
- } else {
- this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
- }
- }
- // 消息回调方法
- async logMessageSendWxSch(msg) {
- const result = msg.content.toString();
- const headers = msg.properties.headers;
- const path = this.ctx.app.config.baseDirMq + this.ctx.app.config.corpsDirMq + headers.userId + '/users';
- const corpUser = await this.ctx.curl(path, {
- method: 'GET',
- dataType: 'json',
- });
- if (corpUser != null) {
- const _datas = corpUser.data.data;
- for (const elm of _datas) {
- await this.service.wechat.sendTemplateMsg(this.ctx.app.config.REVIEW_TEMPLATE_ID, elm.openid, '您的申请已经得到批复', result, headers.name, '企业申请', headers.remark);
- }
- }
- }
- // 接收企业职位发布消息并发送模板消息到学生微信
- async receiveMsgSendWxCorp(ex) {
- const self = this;
- const { mq } = self.ctx;
- if (mq) {
- const ch = await mq.conn.createChannel();
- try {
- await ch.assertExchange(ex, self.exType, { durable: self.durable });
- const q = await ch.assertQueue('', { exclusive: true });
- console.log('==q=', q);
- // 队列绑定 exchange
- await ch.bindQueue(q.queue, ex, '*');
- await ch.consume(q.queue, msg => this.logMessageSendWxCorp(msg, this), { noAck: true });
- } catch (e) {
- console.log('==e==', e);
- await ch.close();
- }
- } else {
- this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
- }
- }
- // 消息回调方法
- async logMessageSendWxCorp(msg) {
- const result = msg.content.toString();
- const headers = msg.properties.headers;
- const path = this.ctx.app.config.baseDirMq + this.ctx.app.config.stusDirMq + '?corpid=' + headers.userId;
- // const path = 'http://10.16.5.15:8101/api/studentcorp' + headers.userid;
- const stus = await this.ctx.curl(path, {
- method: 'GET',
- dataType: 'json',
- });
- for (const elem of stus.data.data) {
- const pathStu = this.ctx.app.config.baseDirMq + this.ctx.app.config.strDirMq + elem.studid;
- const stud = await this.ctx.curl(pathStu, {
- method: 'GET',
- dataType: 'json',
- });
- if (stud != null) {
- await this.service.wechat.sendTemplateMsg(this.ctx.app.config.REVIEW_TEMPLATE_ID, stud.data.data.openid, '您订阅的企业已经有新职位发布', result, headers.name, '职位发布', headers.remark);
- }
- }
- }
- // 接收企业发送面试邀请发送给学生消息
- async receiveQueueMsgOffer(ex) {
- const self = this;
- const { mq } = self.ctx;
- if (mq) {
- const ch = await mq.conn.createChannel();
- try {
- await ch.assertExchange(ex, self.exType, { durable: self.durable });
- const q = await ch.assertQueue('', { exclusive: true });
- console.log('==q=', q);
- // 队列绑定 exchange
- await ch.bindQueue(q.queue, ex, '*');
- await ch.consume(q.queue, msg => this.logMessageSendOffer(msg, this), { noAck: true });
- } catch (e) {
- console.log('==e==', e);
- await ch.close();
- }
- } else {
- this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
- }
- }
- // 消息回调方法
- async logMessageSendOffer(msg) {
- const result = msg.content.toString();
- const headers = msg.properties.headers;
- const pathStu = this.ctx.app.config.baseDirMq + this.ctx.app.config.strDirMq + headers.userId;
- const stud = await this.ctx.curl(pathStu, {
- method: 'GET',
- dataType: 'json',
- });
- if (stud != null) {
- await this.service.wechat.sendTemplateMsg(this.ctx.app.config.OFFER_TEMPLATE_ID, stud.data.data.openid, '您有新的面试邀请,请及时查看。', result, headers.name, headers.createtime, headers.remark);
- }
- }
- }
- module.exports = RabbitmqService;
|