rabbitMq.js 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. 'use strict';
  2. const Service = require('egg').Service;
  3. const _ = require('lodash');
  4. class RabbitmqService extends Service {
  5. // mission队列处理
  6. async mission() {
  7. const { mq } = this.ctx;
  8. const { queue } = this.ctx.app.config;
  9. if (mq && queue) {
  10. const ch = await mq.conn.createChannel();
  11. // const queue = 'freeAdmin/server-user';
  12. try {
  13. // 创建队列:在没有队列的情况,直接获取会导致程序无法启动
  14. await ch.assertQueue(queue, { durable: false });
  15. await ch.consume(queue, msg => this.dealMission(msg), { noAck: true });
  16. } catch (error) {
  17. this.ctx.logger.error('未找到订阅的队列');
  18. }
  19. } else {
  20. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  21. }
  22. }
  23. // 执行任务
  24. async dealMission(bdata) {
  25. // if (!bdata) this.ctx.logger.error('mission队列中信息不存在');
  26. // let data = bdata.content.toString();
  27. // try {
  28. // data = JSON.parse(data);
  29. // } catch (error) {
  30. // this.ctx.logger.error('数据不是object');
  31. // }
  32. // const { service, method, project, ...others } = data;
  33. // const arr = service.split('.');
  34. // let s = this.ctx.service;
  35. // for (const key of arr) {
  36. // s = s[key];
  37. // }
  38. // s[method](others);
  39. }
  40. /**
  41. * 发送队列消息
  42. * @param {Any} data 消息队列数据
  43. * @param {String} queueKey 消息队列可以
  44. */
  45. async sendToMqQueue(data, queueKey) {
  46. const { mq } = this.ctx;
  47. const { sendQueue } = this.ctx.app.config;
  48. let queue;
  49. // 获取队列名称
  50. if (_.isObject(sendQueue)) {
  51. queue = sendQueue[queueKey];
  52. }
  53. if (mq && queue) {
  54. if (!_.isString(data)) data = JSON.stringify(data);
  55. const ch = await mq.conn.createChannel();
  56. try {
  57. // 创建队列:在没有队列的情况,直接获取会导致程序无法启动
  58. // await ch.assertQueue(queue, { durable: false });
  59. await ch.sendToQueue(queue, Buffer.from(data));
  60. await ch.close();
  61. } catch (error) {
  62. console.error(error);
  63. this.ctx.logger.error('mq消息发送失败');
  64. }
  65. } else {
  66. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  67. }
  68. }
  69. }
  70. module.exports = RabbitmqService;