'use strict'; const msgvalue = require('../util/constants'); const Service = require('egg').Service; class RabbitmqService extends Service { constructor(ctx) { super(ctx); this.exType = 'topic'; this.durable = true; } // 发送消息 async sendQueueMsg(ex, routeKey, msg, parm) { const { mq } = this.ctx; if (mq) { await mq.topic(ex, routeKey, msg, parm); } else { this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!'); } } // 接收消息 async receiveQueueMsg(ex) { const self = this; const { mq } = self.ctx; if (mq) { const ch = await mq.conn.createChannel(); try { await ch.assertExchange(ex, self.exType, { durable: self.durable }); const q = await ch.assertQueue('', { exclusive: true }); console.log('==q=', q); // 队列绑定 exchange await ch.bindQueue(q.queue, ex, '*'); await ch.consume(q.queue, msg => this.logMessage(msg, this), { noAck: true }); } catch (e) { console.log('==e==', e); await ch.close(); } } else { this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!'); } } // 消息回调方法 async logMessage(msg) { const result = msg.content.toString(); const headers = msg.properties.headers; // 插入待办事项到数据库中。 if (result != null) { await this.service.message.create({ userid: headers.userId, name: headers.name, createtime: headers.createtime, type: headers.type, content: result, remark: headers.remark }); } } // 接收学校审核消息并发送模板消息到企业微信 async receiveMsgSendWxSch(ex) { const self = this; const { mq } = self.ctx; if (mq) { const ch = await mq.conn.createChannel(); try { await ch.assertExchange(ex, self.exType, { durable: self.durable }); const q = await ch.assertQueue('', { exclusive: true }); console.log('==q=', q); // 队列绑定 exchange await ch.bindQueue(q.queue, ex, '*'); await ch.consume(q.queue, msg => this.logMessageSendWxSch(msg, this), { noAck: true }); } catch (e) { console.log('==e==', e); await ch.close(); } } else { this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!'); } } // 消息回调方法 async logMessageSendWxSch(msg) { const result = msg.content.toString(); const headers = msg.properties.headers; const path = this.ctx.app.config.baseDirMq + this.ctx.app.config.corpsDirMq + headers.userId + '/users'; const corpUser = await this.ctx.curl(path, { method: 'GET', dataType: 'json', }); if (corpUser != null) { const _datas = corpUser.data.data; for (const elm of _datas) { await this.service.wechat.sendTemplateMsg(this.ctx.app.config.REVIEW_TEMPLATE_ID, elm.openid, '您的申请已经得到批复', result, headers.name, '企业申请', headers.remark); } } } // 接收企业职位发布消息并发送模板消息到学生微信 async receiveMsgSendWxCorp(ex) { const self = this; const { mq } = self.ctx; if (mq) { const ch = await mq.conn.createChannel(); try { await ch.assertExchange(ex, self.exType, { durable: self.durable }); const q = await ch.assertQueue('', { exclusive: true }); console.log('==q=', q); // 队列绑定 exchange await ch.bindQueue(q.queue, ex, '*'); await ch.consume(q.queue, msg => this.logMessageSendWxCorp(msg, this), { noAck: true }); } catch (e) { console.log('==e==', e); await ch.close(); } } else { this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!'); } } // 消息回调方法 async logMessageSendWxCorp(msg) { const result = msg.content.toString(); const headers = msg.properties.headers; const path = this.ctx.app.config.baseDirMq + this.ctx.app.config.stusDirMq + '?corpid=' + headers.userId; // const path = 'http://10.16.5.15:8101/api/studentcorp' + headers.userid; const stus = await this.ctx.curl(path, { method: 'GET', dataType: 'json', }); for (const elem of stus.data.data) { const pathStu = this.ctx.app.config.baseDirMq + this.ctx.app.config.strDirMq + elem.studid; const stud = await this.ctx.curl(pathStu, { method: 'GET', dataType: 'json', }); if (stud != null) { await this.service.wechat.sendTemplateMsg(this.ctx.app.config.REVIEW_TEMPLATE_ID, stud.data.data.openid, '您订阅的企业已经有新职位发布', result, headers.name, '职位发布', headers.remark); } } } // 接收企业发送面试邀请发送给学生消息 async receiveQueueMsgOffer(ex) { const self = this; const { mq } = self.ctx; if (mq) { const ch = await mq.conn.createChannel(); try { await ch.assertExchange(ex, self.exType, { durable: self.durable }); const q = await ch.assertQueue('', { exclusive: true }); console.log('==q=', q); // 队列绑定 exchange await ch.bindQueue(q.queue, ex, '*'); await ch.consume(q.queue, msg => this.logMessageSendOffer(msg, this), { noAck: true }); } catch (e) { console.log('==e==', e); await ch.close(); } } else { this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!'); } } // 消息回调方法 async logMessageSendOffer(msg) { const result = msg.content.toString(); const headers = msg.properties.headers; const pathStu = this.ctx.app.config.baseDirMq + this.ctx.app.config.strDirMq + headers.userId; const stud = await this.ctx.curl(pathStu, { method: 'GET', dataType: 'json', }); if (stud != null) { await this.service.wechat.sendTemplateMsg(this.ctx.app.config.OFFER_TEMPLATE_ID, stud.data.data.openid, '您有新的面试邀请,请及时查看。', result, headers.name, headers.createtime, headers.remark); } } } module.exports = RabbitmqService;