/* eslint-disable no-mixed-spaces-and-tabs */ 'use strict'; const _ = require('lodash'); const Service = require('egg').Service; const amqp = require('amqplib'); class RabbitmqnewService extends Service { constructor(ctx) { super(ctx); this.hosts = [{ hostname: '127.0.0.1', port: '5672', username: 'wy', password: '1', authMechanism: 'AMQPLAIN', pathname: '/', ssl: { enabled: false, }, }]; this.index = 0; this.exType = 'topic'; this.durable = true; this.autoDelete = true; } // 发送消息 async sendQueueMsg(queueName, routeKey, msg) { const self = this; const conn = await amqp.connect(self.hosts[self.index]); const ch = await conn.createConfirmChannel(); try { await ch.assertExchange(queueName, this.exType, { durable: this.durable }); const result = await ch.publish(queueName, routeKey, Buffer.from(msg), { persistent: true, // 消息持久化 mandatory: true, }); console.log('==result==', result); if (result) { console.log('发送成功'); } else { console.log('发送失败'); } await ch.close(); } catch (e) { console.log('==e==', e); await ch.close(); } } // 接收消息 async receiveQueueMsg(queueName, routeKey, receiveCallBack) { const self = this; const conn = await amqp.connect(self.hosts[self.index]); const ch = await conn.createConfirmChannel(); try { await ch.assertExchange(queueName, this.exType, { durable: this.durable }); const q = await ch.assertQueue('', { exclusive: false }); console.log('==q=', q); // 队列绑定 exchange await ch.bindQueue(q.queue, queueName, routeKey); await ch.consume(q.queue, msg => { console.log('收到消息: ', msg); const data = msg.content.toString(); // 发送确认消息 ch.ack(msg); receiveCallBack && receiveCallBack(data); }, { noAck: false }); } catch (e) { console.log('==e==', e); await ch.close(); } } } module.exports = RabbitmqnewService;