rabbitMq.js 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. 'use strict';
  2. const Service = require('egg').Service;
  3. class RabbitmqService extends Service {
  4. constructor(ctx) {
  5. super(ctx);
  6. this.exType = 'topic';
  7. this.durable = true;
  8. }
  9. // 接收消息
  10. async receiveQueueMsg(ex) {
  11. this.ctx.logger.info('调用mq的' + ex);
  12. const self = this;
  13. const { mq } = self.ctx;
  14. if (mq) {
  15. const ch = await mq.conn.createChannel();
  16. await ch.assertExchange(ex, 'topic', { durable: true });
  17. const q = await ch.assertQueue('', { exclusive: true });
  18. await ch.bindQueue(q.queue, ex, '*');
  19. await ch.consume(q.queue, msg => this.logMessage(msg, this), { noAck: true });
  20. } else {
  21. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  22. }
  23. }
  24. async logMessage(msg) {
  25. const result = msg.content.toString();
  26. const headers = msg.properties.headers;
  27. }
  28. // mission队列处理
  29. async mission() {
  30. const { mq } = this.ctx;
  31. if (mq) {
  32. const ch = await mq.conn.createChannel();
  33. const queue = 'mission/market';
  34. try {
  35. // 创建队列:在没有队列的情况,直接获取会导致程序无法启动
  36. await ch.assertQueue(queue, { durable: false });
  37. await ch.consume(queue, msg => this.dealMission(msg), { noAck: true });
  38. } catch (error) {
  39. this.ctx.logger.error('未找到订阅的队列');
  40. }
  41. } else {
  42. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  43. }
  44. }
  45. // 执行任务
  46. async dealMission(bdata) {
  47. if (!bdata) this.ctx.logger.error('mission队列中信息不存在');
  48. let data = bdata.content.toString();
  49. try {
  50. data = JSON.parse(data);
  51. } catch (error) {
  52. this.ctx.logger.error('数据不是object');
  53. }
  54. const { service, method, ...others } = data;
  55. if (service && method) this.ctx.service[service][method](others);
  56. }
  57. }
  58. module.exports = RabbitmqService;