rabbitmq.js 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. /* eslint-disable no-mixed-spaces-and-tabs */
  2. 'use strict';
  3. const _ = require('lodash');
  4. const Service = require('egg').Service;
  5. const amqp = require('amqplib');
  6. class RabbitmqnewService extends Service {
  7. constructor(ctx) {
  8. super(ctx);
  9. this.hosts = [{
  10. hostname: '127.0.0.1',
  11. port: '5672',
  12. username: 'wy',
  13. password: '1',
  14. authMechanism: 'AMQPLAIN',
  15. pathname: '/',
  16. ssl: {
  17. enabled: false,
  18. },
  19. }];
  20. this.index = 0;
  21. this.exType = 'topic';
  22. this.durable = true;
  23. this.autoDelete = true;
  24. }
  25. // 发送消息
  26. async sendQueueMsg(queueName, routeKey, msg) {
  27. const self = this;
  28. const conn = await amqp.connect(self.hosts[self.index]);
  29. const ch = await conn.createConfirmChannel();
  30. try {
  31. await ch.assertExchange(queueName, this.exType, { durable: this.durable });
  32. const result = await ch.publish(queueName, routeKey, Buffer.from(msg), {
  33. persistent: true, // 消息持久化
  34. mandatory: true,
  35. });
  36. console.log('==result==', result);
  37. if (result) {
  38. console.log('发送成功');
  39. } else {
  40. console.log('发送失败');
  41. }
  42. await ch.close();
  43. } catch (e) {
  44. console.log('==e==', e);
  45. await ch.close();
  46. }
  47. }
  48. // 接收消息
  49. async receiveQueueMsg(queueName, routeKey, receiveCallBack) {
  50. const self = this;
  51. const conn = await amqp.connect(self.hosts[self.index]);
  52. const ch = await conn.createConfirmChannel();
  53. try {
  54. await ch.assertExchange(queueName, this.exType, { durable: this.durable });
  55. const q = await ch.assertQueue('', { exclusive: false });
  56. console.log('==q=', q);
  57. // 队列绑定 exchange
  58. await ch.bindQueue(q.queue, queueName, routeKey);
  59. await ch.consume(q.queue, msg => {
  60. console.log('收到消息: ', msg);
  61. const data = msg.content.toString();
  62. // 发送确认消息
  63. ch.ack(msg);
  64. receiveCallBack && receiveCallBack(data);
  65. }, { noAck: false });
  66. } catch (e) {
  67. console.log('==e==', e);
  68. await ch.close();
  69. }
  70. }
  71. }
  72. module.exports = RabbitmqnewService;