|
@@ -0,0 +1,79 @@
|
|
|
+/* eslint-disable no-mixed-spaces-and-tabs */
|
|
|
+'use strict';
|
|
|
+
|
|
|
+const _ = require('lodash');
|
|
|
+const Service = require('egg').Service;
|
|
|
+const amqp = require('amqplib');
|
|
|
+
|
|
|
+class RabbitmqnewService extends Service {
|
|
|
+ constructor(ctx) {
|
|
|
+ super(ctx);
|
|
|
+
|
|
|
+ this.hosts = [{
|
|
|
+ hostname: '127.0.0.1',
|
|
|
+ port: '5672',
|
|
|
+ username: 'wy',
|
|
|
+ password: '1',
|
|
|
+ authMechanism: 'AMQPLAIN',
|
|
|
+ pathname: '/',
|
|
|
+ ssl: {
|
|
|
+ enabled: false,
|
|
|
+ },
|
|
|
+ }];
|
|
|
+ this.index = 0;
|
|
|
+ this.exType = 'topic';
|
|
|
+ this.durable = true;
|
|
|
+ this.autoDelete = true;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 发送消息
|
|
|
+ async sendQueueMsg(queueName, routeKey, msg) {
|
|
|
+ const self = this;
|
|
|
+ const conn = await amqp.connect(self.hosts[self.index]);
|
|
|
+ const ch = await conn.createConfirmChannel();
|
|
|
+ try {
|
|
|
+ await ch.assertExchange(queueName, this.exType, { durable: this.durable });
|
|
|
+ const result = await ch.publish(queueName, routeKey, Buffer.from(msg), {
|
|
|
+ persistent: true, // 消息持久化
|
|
|
+ mandatory: true,
|
|
|
+ });
|
|
|
+ console.log('==result==', result);
|
|
|
+ if (result) {
|
|
|
+ console.log('发送成功');
|
|
|
+ } else {
|
|
|
+ console.log('发送失败');
|
|
|
+ }
|
|
|
+ await ch.close();
|
|
|
+ } catch (e) {
|
|
|
+ console.log('==e==', e);
|
|
|
+ await ch.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 接收消息
|
|
|
+ async receiveQueueMsg(queueName, routeKey, receiveCallBack) {
|
|
|
+ const self = this;
|
|
|
+ const conn = await amqp.connect(self.hosts[self.index]);
|
|
|
+ const ch = await conn.createConfirmChannel();
|
|
|
+ try {
|
|
|
+ await ch.assertExchange(queueName, this.exType, { durable: this.durable });
|
|
|
+ const q = await ch.assertQueue('', { exclusive: false });
|
|
|
+ console.log('==q=', q);
|
|
|
+ // 队列绑定 exchange
|
|
|
+ await ch.bindQueue(q.queue, queueName, routeKey);
|
|
|
+ await ch.consume(q.queue, msg => {
|
|
|
+ console.log('收到消息: ', msg);
|
|
|
+ const data = msg.content.toString();
|
|
|
+ // 发送确认消息
|
|
|
+ ch.ack(msg);
|
|
|
+ receiveCallBack && receiveCallBack(data);
|
|
|
+ }, { noAck: false });
|
|
|
+ } catch (e) {
|
|
|
+ console.log('==e==', e);
|
|
|
+ await ch.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+module.exports = RabbitmqnewService;
|