|
@@ -1,81 +1,54 @@
|
|
|
-/* eslint-disable no-mixed-spaces-and-tabs */
|
|
|
'use strict';
|
|
|
|
|
|
const _ = require('lodash');
|
|
|
const Service = require('egg').Service;
|
|
|
-const amqp = require('amqplib');
|
|
|
|
|
|
-class RabbitmqnewService extends Service {
|
|
|
+class RabbitmqService extends Service {
|
|
|
+
|
|
|
constructor(ctx) {
|
|
|
super(ctx);
|
|
|
-
|
|
|
- this.hosts = [{
|
|
|
- hostname: '127.0.0.1',
|
|
|
- port: '5672',
|
|
|
- username: 'wy',
|
|
|
- password: '1',
|
|
|
- authMechanism: 'AMQPLAIN',
|
|
|
- pathname: '/',
|
|
|
- ssl: {
|
|
|
- enabled: false,
|
|
|
- },
|
|
|
- }];
|
|
|
- this.index = 0;
|
|
|
this.exType = 'topic';
|
|
|
this.durable = true;
|
|
|
- this.autoDelete = true;
|
|
|
}
|
|
|
|
|
|
// 发送消息
|
|
|
- async sendQueueMsg(queueName, routeKey, msg, sendCallBack) {
|
|
|
+ async sendQueueMsg(ex, routeKey, msg) {
|
|
|
const self = this;
|
|
|
- const conn = await amqp.connect(self.hosts[self.index]);
|
|
|
- const ch = await conn.createConfirmChannel();
|
|
|
- try {
|
|
|
- await ch.assertExchange(queueName, this.exType, { durable: this.durable });
|
|
|
- const result = await ch.publish(queueName, routeKey, Buffer.from(msg), {
|
|
|
- persistent: true, // 消息持久化
|
|
|
- mandatory: true,
|
|
|
- });
|
|
|
- console.log('==result==', result);
|
|
|
- if (result) {
|
|
|
- console.log('发送成功');
|
|
|
- sendCallBack && sendCallBack(true);
|
|
|
- } else {
|
|
|
- console.log('发送失败');
|
|
|
- sendCallBack && sendCallBack(false);
|
|
|
- }
|
|
|
- await ch.close();
|
|
|
- } catch (e) {
|
|
|
- console.log('==e==', e);
|
|
|
- await ch.close();
|
|
|
+ const { mq } = self.ctx;
|
|
|
+ if (mq) {
|
|
|
+ await mq.topic(ex, msg, routeKey, { durable: self.durable });
|
|
|
+ } else {
|
|
|
+ this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// 接收消息
|
|
|
- async receiveQueueMsg(queueName, routeKey, receiveCallBack) {
|
|
|
+ async receiveQueueMsg(ex, routeKey, receiveCallBack) {
|
|
|
const self = this;
|
|
|
- const conn = await amqp.connect(self.hosts[self.index]);
|
|
|
- const ch = await conn.createConfirmChannel();
|
|
|
- try {
|
|
|
- await ch.assertExchange(queueName, this.exType, { durable: this.durable });
|
|
|
- const q = await ch.assertQueue('', { exclusive: false });
|
|
|
- console.log('==q=', q);
|
|
|
- // 队列绑定 exchange
|
|
|
- await ch.bindQueue(q.queue, queueName, routeKey);
|
|
|
- await ch.consume(q.queue, msg => {
|
|
|
- console.log('收到消息: ', msg);
|
|
|
- const data = msg.content.toString();
|
|
|
- // 发送确认消息
|
|
|
- ch.ack(msg);
|
|
|
- receiveCallBack && receiveCallBack(data);
|
|
|
- }, { noAck: false });
|
|
|
- } catch (e) {
|
|
|
- console.log('==e==', e);
|
|
|
- await ch.close();
|
|
|
+ const { mq } = self.ctx;
|
|
|
+ if (mq) {
|
|
|
+ const ch = await mq.conn.createChannel();
|
|
|
+ try {
|
|
|
+ await ch.assertExchange(ex, self.exType, { durable: self.durable });
|
|
|
+ const q = await ch.assertQueue('', { exclusive: false });
|
|
|
+ console.log('==q=', q);
|
|
|
+ // 队列绑定 exchange
|
|
|
+ await ch.bindQueue(q.queue, ex, routeKey);
|
|
|
+ await ch.consume(q.queue, msg => {
|
|
|
+ console.log('收到消息: ', msg);
|
|
|
+ // 发送确认消息
|
|
|
+ ch.ack(msg);
|
|
|
+ receiveCallBack && receiveCallBack(msg);
|
|
|
+ }, { noAck: false });
|
|
|
+ } catch (e) {
|
|
|
+ console.log('==e==', e);
|
|
|
+ await ch.close();
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
|
|
|
}
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
-module.exports = RabbitmqnewService;
|
|
|
+module.exports = RabbitmqService;
|