12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879 |
- /* eslint-disable no-mixed-spaces-and-tabs */
- 'use strict';
- const _ = require('lodash');
- const Service = require('egg').Service;
- const amqp = require('amqplib');
- class RabbitmqnewService extends Service {
- constructor(ctx) {
- super(ctx);
- this.hosts = [{
- hostname: '127.0.0.1',
- port: '5672',
- username: 'wy',
- password: '1',
- authMechanism: 'AMQPLAIN',
- pathname: '/',
- ssl: {
- enabled: false,
- },
- }];
- this.index = 0;
- this.exType = 'topic';
- this.durable = true;
- this.autoDelete = true;
- }
- // 发送消息
- async sendQueueMsg(queueName, routeKey, msg) {
- const self = this;
- const conn = await amqp.connect(self.hosts[self.index]);
- const ch = await conn.createConfirmChannel();
- try {
- await ch.assertExchange(queueName, this.exType, { durable: this.durable });
- const result = await ch.publish(queueName, routeKey, Buffer.from(msg), {
- persistent: true, // 消息持久化
- mandatory: true,
- });
- console.log('==result==', result);
- if (result) {
- console.log('发送成功');
- } else {
- console.log('发送失败');
- }
- await ch.close();
- } catch (e) {
- console.log('==e==', e);
- await ch.close();
- }
- }
- // 接收消息
- async receiveQueueMsg(queueName, routeKey, receiveCallBack) {
- const self = this;
- const conn = await amqp.connect(self.hosts[self.index]);
- const ch = await conn.createConfirmChannel();
- try {
- await ch.assertExchange(queueName, this.exType, { durable: this.durable });
- const q = await ch.assertQueue('', { exclusive: false });
- console.log('==q=', q);
- // 队列绑定 exchange
- await ch.bindQueue(q.queue, queueName, routeKey);
- await ch.consume(q.queue, msg => {
- console.log('收到消息: ', msg);
- const data = msg.content.toString();
- // 发送确认消息
- ch.ack(msg);
- receiveCallBack && receiveCallBack(data);
- }, { noAck: false });
- } catch (e) {
- console.log('==e==', e);
- await ch.close();
- }
- }
- }
- module.exports = RabbitmqnewService;
|