|
@@ -0,0 +1,77 @@
|
|
|
|
+import { Provide, Scope, ScopeEnum, Init, Autoload, Destroy, Config } from '@midwayjs/core';
|
|
|
|
+import * as amqp from 'amqp-connection-manager';
|
|
|
|
+
|
|
|
|
+@Autoload()
|
|
|
|
+@Provide()
|
|
|
|
+@Scope(ScopeEnum.Singleton) // Singleton 单例,全局唯一(进程级别)
|
|
|
|
+export class MqService {
|
|
|
|
+ private connection: amqp.AmqpConnectionManager;
|
|
|
|
+
|
|
|
|
+ private channelWrapper: amqp.ChannelWrapper;
|
|
|
|
+
|
|
|
|
+ @Config('rabbitmq.url')
|
|
|
|
+ mqUrl: string;
|
|
|
|
+
|
|
|
|
+ system = {
|
|
|
|
+ queue: 'system',
|
|
|
|
+ ex: 'systemEx',
|
|
|
|
+ exType: 'topic',
|
|
|
|
+ };
|
|
|
|
+ user = {
|
|
|
|
+ queue: 'user',
|
|
|
|
+ ex: 'userEx',
|
|
|
|
+ exType: 'direct',
|
|
|
|
+ };
|
|
|
|
+ /**
|
|
|
|
+ * 1.建立通道
|
|
|
|
+ * 2.声明交换机
|
|
|
|
+ * 3.声明队列
|
|
|
|
+ * 系统队列:system
|
|
|
|
+ * 用户队列:user
|
|
|
|
+ */
|
|
|
|
+ @Init()
|
|
|
|
+ async connect() {
|
|
|
|
+ // 创建连接,你可以把配置放在 Config 中,然后注入进来
|
|
|
|
+ this.connection = await amqp.connect(this.mqUrl);
|
|
|
|
+ // console.log(this.mqUrl);
|
|
|
|
+ this.connection.on('connect', () => {
|
|
|
|
+ console.log('connect');
|
|
|
|
+ });
|
|
|
|
+ this.connection.on('connectFailed', e => {
|
|
|
|
+ console.log(e);
|
|
|
|
+ console.log('connectFailed');
|
|
|
|
+ });
|
|
|
|
+ // 创建 channel
|
|
|
|
+ this.channelWrapper = this.connection.createChannel({
|
|
|
|
+ name: 'cxyy',
|
|
|
|
+ json: true,
|
|
|
|
+ setup: channel => {
|
|
|
|
+ return Promise.all([
|
|
|
|
+ channel.assertExchange(this.system.ex, this.system.exType, { durable: true }),
|
|
|
|
+ channel.assertQueue(this.system.queue, { durable: true }),
|
|
|
|
+ channel.bindQueue(this.system.queue, this.system.ex, '*'),
|
|
|
|
+ channel.assertExchange(this.user.ex, this.user.exType, { durable: true }),
|
|
|
|
+ channel.assertQueue(this.user.queue, { durable: true }),
|
|
|
|
+ channel.bindQueue(this.user.queue, this.user.ex, '*'),
|
|
|
|
+ ]);
|
|
|
|
+ },
|
|
|
|
+ });
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ async testSendMsg() {
|
|
|
|
+ const res = await this.channelWrapper.publish(this.system.ex, 'system', 'test');
|
|
|
|
+ // const res = await this.sendToQueue(this.system.queue, { test: 'test' });
|
|
|
|
+ console.log(res);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 发送消息
|
|
|
|
+ public async sendToQueue(queueName: string, data: any) {
|
|
|
|
+ return this.channelWrapper.sendToQueue(queueName, data);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Destroy()
|
|
|
|
+ async close() {
|
|
|
|
+ await this.channelWrapper.close();
|
|
|
|
+ await this.connection.close();
|
|
|
|
+ }
|
|
|
|
+}
|