rabbitMq.js 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. 'use strict';
  2. const Service = require('egg').Service;
  3. const _ = require('lodash');
  4. class RabbitmqService extends Service {
  5. constructor(ctx) {
  6. super(ctx);
  7. this.exType = 'topic';
  8. this.durable = true;
  9. }
  10. // 接收消息
  11. async receiveQueueMsg(ex) {
  12. this.ctx.logger.info('调用mq的' + ex);
  13. const self = this;
  14. const { mq } = self.ctx;
  15. if (mq) {
  16. const ch = await mq.conn.createChannel();
  17. await ch.assertExchange(ex, 'topic', { durable: true });
  18. const q = await ch.assertQueue('', { exclusive: true });
  19. await ch.bindQueue(q.queue, ex, '*');
  20. await ch.consume(q.queue, msg => this.logMessage(msg, this), { noAck: true });
  21. } else {
  22. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  23. }
  24. }
  25. async logMessage(msg) {
  26. const result = msg.content.toString();
  27. const headers = msg.properties.headers;
  28. }
  29. // mission队列处理
  30. async mission() {
  31. const { mq } = this.ctx;
  32. if (mq) {
  33. const ch = await mq.conn.createChannel();
  34. const queue = 'mission/market';
  35. try {
  36. // 创建队列:在没有队列的情况,直接获取会导致程序无法启动
  37. await ch.assertQueue(queue, { durable: false });
  38. await ch.consume(queue, msg => this.dealMission(msg), { noAck: true });
  39. } catch (error) {
  40. this.ctx.logger.error('未找到订阅的队列');
  41. }
  42. } else {
  43. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  44. }
  45. }
  46. // 执行任务
  47. async dealMission(bdata) {
  48. if (!bdata) this.ctx.logger.error('mission队列中信息不存在');
  49. let data = bdata.content.toString();
  50. try {
  51. data = JSON.parse(data);
  52. } catch (error) {
  53. this.ctx.logger.error('数据不是object');
  54. }
  55. const { service, method, project, ...others } = data;
  56. const arr = service.split('.');
  57. let s = this.ctx.service;
  58. for (const key of arr) {
  59. s = s[key];
  60. }
  61. s[method](others);
  62. }
  63. }
  64. module.exports = RabbitmqService;