rabbitmq.js 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. 'use strict';
  2. const Service = require('egg').Service;
  3. class RabbitmqService extends Service {
  4. constructor(ctx) {
  5. super(ctx);
  6. }
  7. // mission队列处理
  8. async mission() {
  9. const { mq } = this.ctx;
  10. if (mq) {
  11. const ch = await mq.conn.createChannel();
  12. // TODO 需要写成匹配的形式
  13. const queue = 'mission/center';
  14. try {
  15. // 创建队列:在没有队列的情况,直接获取会导致程序无法启动
  16. await ch.assertQueue(queue, { durable: false });
  17. await ch.consume(queue, msg => this.dealMission(msg), { noAck: true });
  18. } catch (error) {
  19. this.ctx.logger.error('未找到订阅的队列');
  20. }
  21. } else {
  22. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  23. }
  24. }
  25. // 执行任务
  26. async dealMission(bdata) {
  27. if (!bdata) this.ctx.logger.error('mission队列中信息不存在');
  28. let data = bdata.content.toString();
  29. try {
  30. data = JSON.parse(data);
  31. } catch (error) {
  32. this.ctx.logger.error('数据不是object');
  33. }
  34. const { service, method, ...others } = data;
  35. if (service && method) this.ctx.service[service][method](others);
  36. }
  37. }
  38. module.exports = RabbitmqService;