|
@@ -0,0 +1,67 @@
|
|
|
+'use strict';
|
|
|
+
|
|
|
+const Service = require('egg').Service;
|
|
|
+const _ = require('lodash');
|
|
|
+
|
|
|
+class RabbitmqService extends Service {
|
|
|
+ // mission队列处理
|
|
|
+ async mission() {
|
|
|
+ const { mq } = this.ctx;
|
|
|
+ const { queue } = this.ctx.app.config;
|
|
|
+ if (mq && queue) {
|
|
|
+ const ch = await mq.conn.createChannel();
|
|
|
+ try {
|
|
|
+ // 创建队列:在没有队列的情况,直接获取会导致程序无法启动
|
|
|
+ await ch.assertQueue(queue, { durable: false });
|
|
|
+ await ch.consume(queue, msg => this.dealMission(msg), { noAck: true });
|
|
|
+ } catch (error) {
|
|
|
+ this.ctx.logger.error('未找到订阅的队列');
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 执行任务
|
|
|
+ async dealMission(bdata) {
|
|
|
+ if (!bdata) this.ctx.logger.error('mission队列中信息不存在');
|
|
|
+ let data = bdata.content.toString();
|
|
|
+ try {
|
|
|
+ data = JSON.parse(data);
|
|
|
+ } catch (error) {
|
|
|
+ this.ctx.logger.error('数据转换错误');
|
|
|
+ }
|
|
|
+ // 因为这个地址只管添加日志,所以就直接走创建就行
|
|
|
+ this.ctx.service.logs.create(data);
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * 发送队列消息
|
|
|
+ * @param {Any} data 消息队列数据
|
|
|
+ * @param {String} queueKey 消息队列可以
|
|
|
+ */
|
|
|
+ async sendToMqQueue(data, queueKey) {
|
|
|
+ const { mq } = this.ctx;
|
|
|
+ const { sendQueue } = this.ctx.app.config;
|
|
|
+ let queue;
|
|
|
+ // 获取队列名称
|
|
|
+ if (_.isObject(sendQueue)) {
|
|
|
+ queue = sendQueue[queueKey];
|
|
|
+ }
|
|
|
+ if (mq && queue) {
|
|
|
+ if (!_.isString(data)) data = JSON.stringify(data);
|
|
|
+ const ch = await mq.conn.createChannel();
|
|
|
+ try {
|
|
|
+ // 创建队列:在没有队列的情况,直接获取会导致程序无法启动
|
|
|
+ await ch.assertQueue(queue, { durable: false });
|
|
|
+ await ch.sendToQueue(queue, Buffer.from(data));
|
|
|
+ await ch.close();
|
|
|
+ } catch (error) {
|
|
|
+ console.error(error);
|
|
|
+ this.ctx.logger.error('未找到订阅的队列');
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+module.exports = RabbitmqService;
|