|
@@ -0,0 +1,74 @@
|
|
|
+'use strict';
|
|
|
+
|
|
|
+const Service = require('egg').Service;
|
|
|
+const _ = require('lodash');
|
|
|
+
|
|
|
+class RabbitmqService extends Service {
|
|
|
+ // mission队列处理
|
|
|
+ async mission() {
|
|
|
+ const { mq } = this.ctx;
|
|
|
+ const { queue } = this.ctx.app.config;
|
|
|
+ if (mq && queue) {
|
|
|
+ const ch = await mq.conn.createChannel();
|
|
|
+ // const queue = 'freeAdmin/server-user';
|
|
|
+ try {
|
|
|
+ // 创建队列:在没有队列的情况,直接获取会导致程序无法启动
|
|
|
+ await ch.assertQueue(queue, { durable: false });
|
|
|
+ await ch.consume(queue, msg => this.dealMission(msg), { noAck: true });
|
|
|
+ } catch (error) {
|
|
|
+ this.ctx.logger.error('未找到订阅的队列');
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 执行任务
|
|
|
+ async dealMission(bdata) {
|
|
|
+ // if (!bdata) this.ctx.logger.error('mission队列中信息不存在');
|
|
|
+ // let data = bdata.content.toString();
|
|
|
+ // try {
|
|
|
+ // data = JSON.parse(data);
|
|
|
+ // } catch (error) {
|
|
|
+ // this.ctx.logger.error('数据不是object');
|
|
|
+ // }
|
|
|
+ // const { service, method, project, ...others } = data;
|
|
|
+ // const arr = service.split('.');
|
|
|
+ // let s = this.ctx.service;
|
|
|
+ // for (const key of arr) {
|
|
|
+ // s = s[key];
|
|
|
+ // }
|
|
|
+ // s[method](others);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发送队列消息
|
|
|
+ * @param {Any} data 消息队列数据
|
|
|
+ * @param {String} queueKey 消息队列可以
|
|
|
+ */
|
|
|
+ async sendToMqQueue(data, queueKey) {
|
|
|
+ const { mq } = this.ctx;
|
|
|
+ const { sendQueue } = this.ctx.app.config;
|
|
|
+ let queue;
|
|
|
+ // 获取队列名称
|
|
|
+ if (_.isObject(sendQueue)) {
|
|
|
+ queue = sendQueue[queueKey];
|
|
|
+ }
|
|
|
+ if (mq && queue) {
|
|
|
+ if (!_.isString(data)) data = JSON.stringify(data);
|
|
|
+ const ch = await mq.conn.createChannel();
|
|
|
+ try {
|
|
|
+ // 创建队列:在没有队列的情况,直接获取会导致程序无法启动
|
|
|
+ // await ch.assertQueue(queue, { durable: false });
|
|
|
+ await ch.sendToQueue(queue, Buffer.from(data));
|
|
|
+ await ch.close();
|
|
|
+ } catch (error) {
|
|
|
+ console.error(error);
|
|
|
+ this.ctx.logger.error('mq消息发送失败');
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+module.exports = RabbitmqService;
|