|
@@ -0,0 +1,52 @@
|
|
|
+import { Provide, Scope, ScopeEnum, Init, Autoload, Destroy, Config } from '@midwayjs/core';
|
|
|
+import * as amqp from 'amqp-connection-manager';
|
|
|
+
|
|
|
+@Autoload()
|
|
|
+@Provide()
|
|
|
+@Scope(ScopeEnum.Singleton)
|
|
|
+export class ChatMqService {
|
|
|
+ private connection: amqp.AmqpConnectionManager;
|
|
|
+ private channelWrapper;
|
|
|
+ chName = 'chat';
|
|
|
+ exName = 'chatEx';
|
|
|
+
|
|
|
+ @Config('rabbitmq')
|
|
|
+ mqConfig;
|
|
|
+ @Init()
|
|
|
+ async connect() {
|
|
|
+ // 创建连接,你可以把配置放在 Config 中,然后注入进来
|
|
|
+ this.connection = await amqp.connect(this.mqConfig.url);
|
|
|
+ // 创建 channel
|
|
|
+ this.channelWrapper = this.connection.createChannel({
|
|
|
+ json: true,
|
|
|
+ setup: channel =>
|
|
|
+ Promise.all([
|
|
|
+ // 创建队列&交换机; 绑定他俩
|
|
|
+ channel.assertQueue(this.chName, { durable: true }),
|
|
|
+ channel.assertExchange(this.exName, 'direct', { durable: true }),
|
|
|
+ channel.bindQueue(this.chName, this.exName, '*'),
|
|
|
+ // channel.consume(this.chName, msg => this.reciveMsg(msg), { noAck: true }),
|
|
|
+ ]),
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ // 发送消息
|
|
|
+ public async sendToQueue(queueName: string, data: any) {
|
|
|
+ return this.channelWrapper.sendToQueue(queueName, data);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 向交换机发送消息
|
|
|
+ async sendExMsg(routingKey: string, data: any) {
|
|
|
+ this.channelWrapper.publish(this.exName, routingKey, Buffer.from(data));
|
|
|
+ }
|
|
|
+ // 测试,自产自销,前端使用前,连消费者也产生出来,看看好不好使,好使了在注释掉.
|
|
|
+ async reciveMsg(msg) {
|
|
|
+ console.log(msg);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Destroy()
|
|
|
+ async close() {
|
|
|
+ await this.channelWrapper.close();
|
|
|
+ await this.connection.close();
|
|
|
+ }
|
|
|
+}
|