1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 |
- 'use strict';
- const Service = require('egg').Service;
- class RabbitmqService extends Service {
- constructor(ctx) {
- super(ctx);
- this.exType = 'topic';
- this.durable = true;
- }
- // 发送消息
- async sendQueueMsg(ex, routeKey, msg, parm) {
- const { mq } = this.ctx;
- if (mq) {
- await mq.topic(ex, routeKey, msg, parm);
- } else {
- this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
- }
- }
- // 接收消息
- async receiveQueueMsg(ex) {
- console.log(ex);
- const self = this;
- const { mq } = self.ctx;
- if (mq) {
- const ch = await mq.conn.createChannel();
- try {
- await ch.assertExchange(ex, self.exType, { durable: self.durable });
- const q = await ch.assertQueue('', { exclusive: true });
- console.log('==q=', q);
- // 队列绑定 exchange
- await ch.bindQueue(q.queue, ex, '*');
- ch.consume(q.queue, msg => {
- console.log('收到消息: ', msg);
- const result = msg.content.toString();
- const headers = msg.properties.headers;
- // 插入待办事项到数据库中。
- if (result != null) {
- console.log('headers: ', headers);
- const data = { userid: headers.userid, type: '0', status: '0' };
- const that = this;
- // 根据条件查询所有符合条件的待导入信息
- this.ctx.service.dataimp.query(data).then(function(dataimpResult) {
- for (const elem of dataimpResult) {
- // 取得当前信息下excel文件的数据
- that.ctx.service.excelimport.getImportXLSXData(elem).then(function(exceldata) {
- // 插入前数据格式校验
- that.ctx.service.excelimport.validatedata(elem, exceldata).then(function(errcount) {
- // 判断是否有数据错误,错误数为0时插入学生学籍库
- if (errcount === 0) {
- that.ctx.service.excelimport.dataimport(elem, exceldata);
- }
- });
- });
- }
- });
- }
- }, { noAck: true });
- } catch (e) {
- console.log('==e==', e);
- }
- } else {
- this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
- }
- }
- }
- module.exports = RabbitmqService;
|