rabbitMq.js 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. 'use strict';
  2. const Service = require('egg').Service;
  3. const _ = require('lodash');
  4. class RabbitmqService extends Service {
  5. // mission队列处理
  6. async mission() {
  7. const { mq } = this.ctx;
  8. const { queue } = this.ctx.app.config;
  9. if (mq && queue) {
  10. const ch = await mq.conn.createChannel();
  11. try {
  12. // 创建队列:在没有队列的情况,直接获取会导致程序无法启动
  13. await ch.assertQueue(queue, { durable: false });
  14. await ch.consume(queue, msg => this.dealMission(msg), { noAck: true });
  15. } catch (error) {
  16. this.ctx.logger.error('未找到订阅的队列');
  17. }
  18. } else {
  19. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  20. }
  21. }
  22. // 执行任务
  23. async dealMission(bdata) {
  24. if (!bdata) this.ctx.logger.error('mission队列中信息不存在');
  25. let data = bdata.content.toString();
  26. try {
  27. data = JSON.parse(data);
  28. } catch (error) {
  29. this.ctx.logger.error('数据转换错误');
  30. }
  31. // 因为这个地址只管添加日志,所以就直接走创建就行
  32. this.ctx.service.logs.create(data);
  33. }
  34. /**
  35. * 发送队列消息
  36. * @param {Any} data 消息队列数据
  37. * @param {String} queueKey 消息队列可以
  38. */
  39. async sendToMqQueue(data, queueKey) {
  40. const { mq } = this.ctx;
  41. const { sendQueue } = this.ctx.app.config;
  42. let queue;
  43. // 获取队列名称
  44. if (_.isObject(sendQueue)) {
  45. queue = sendQueue[queueKey];
  46. }
  47. if (mq && queue) {
  48. if (!_.isString(data)) data = JSON.stringify(data);
  49. const ch = await mq.conn.createChannel();
  50. try {
  51. // 创建队列:在没有队列的情况,直接获取会导致程序无法启动
  52. await ch.assertQueue(queue, { durable: false });
  53. await ch.sendToQueue(queue, Buffer.from(data));
  54. await ch.close();
  55. } catch (error) {
  56. console.error(error);
  57. this.ctx.logger.error('未找到订阅的队列');
  58. }
  59. } else {
  60. this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
  61. }
  62. }
  63. }
  64. module.exports = RabbitmqService;