rabbitmq.js 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. 'use strict';
  2. const Service = require('egg').Service;
  3. class RabbitmqService extends Service {
  4. constructor(ctx) {
  5. super(ctx);
  6. this.exType = 'topic';
  7. this.durable = true;
  8. }
  9. // 发送消息
  10. async sendQueueMsg(ex, routeKey, msg, parm) {
  11. const { mq } = this.ctx;
  12. if (mq) {
  13. await mq.topic(ex, routeKey, msg, parm);
  14. } else {
  15. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  16. }
  17. }
  18. // 接收消息
  19. async receiveQueueMsg(ex) {
  20. console.log(ex);
  21. const self = this;
  22. const { mq } = self.ctx;
  23. if (mq) {
  24. const ch = await mq.conn.createChannel();
  25. try {
  26. await ch.assertExchange(ex, self.exType, { durable: self.durable });
  27. const q = await ch.assertQueue('', { exclusive: true });
  28. console.log('==q=', q);
  29. // 队列绑定 exchange
  30. await ch.bindQueue(q.queue, ex, '*');
  31. ch.consume(q.queue, msg => {
  32. console.log('收到消息: ', msg);
  33. const result = msg.content.toString();
  34. const headers = msg.properties.headers;
  35. // 插入待办事项到数据库中。
  36. if (result != null) {
  37. console.log('headers: ', headers);
  38. const data = { userid: headers.userid, type: '0', status: '0' };
  39. const that = this;
  40. // 根据条件查询所有符合条件的待导入信息
  41. this.ctx.service.dataimp.query(data).then(function(dataimpResult) {
  42. for (const elem of dataimpResult) {
  43. // 取得当前信息下excel文件的数据
  44. that.ctx.service.excelimport.getImportXLSXData(elem).then(function(exceldata) {
  45. // 插入前数据格式校验
  46. that.ctx.service.excelimport.validatedata(elem, exceldata).then(function(errcount) {
  47. // 判断是否有数据错误,错误数为0时插入学生学籍库
  48. if (errcount === 0) {
  49. that.ctx.service.excelimport.dataimport(elem, exceldata);
  50. }
  51. });
  52. });
  53. }
  54. });
  55. }
  56. }, { noAck: true });
  57. } catch (e) {
  58. console.log('==e==', e);
  59. }
  60. } else {
  61. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  62. }
  63. }
  64. }
  65. module.exports = RabbitmqService;