'use strict'; const assert = require('assert'); const _ = require('lodash'); const moment = require('moment'); const { ObjectId } = require('mongoose').Types; const { CrudService } = require('naf-framework-mongoose/lib/service'); const { BusinessError, ErrorCode } = require('naf-core').Error; class MissionService extends CrudService { constructor(ctx) { super(ctx, 'mission'); this.model = this.ctx.model.Mission; this.mq = this.ctx.mq; } async create(data) { const res = await this.model.create(data); await this.toMq({ id: res.id }); await this.start({ id: res.id }); return res; } async update({ id }, { params, ...data }) { // 需要精确修改,所以分成2部分去更改,一部分是非params部分,一部分是params更新 const obj = {}; if (params) { const keys = Object.keys(params); for (const key of keys) { obj[`params.${key}`] = params[key]; } } if (data) { const dks = Object.keys(data); for (const dk of dks) { obj[dk] = data[dk]; } } if (obj.status === '2') obj.progress = '100'; const mission = await this.model.updateOne({ _id: ObjectId(id) }, obj); await this.toMq({ id }); return mission; } async delete({ id }) { const res = await this.model.findByIdAndDelete(id); await this.toMq({ id }); return res; } async start({ id }) { const mission = await this.model.findById(id); if (!mission) { throw new BusinessError( ErrorCode.DATA_NOT_EXIST, '未找到指定任务,无法开始任务!' ); } if (!mission.params) { throw new BusinessError(ErrorCode.DATA_INVALID, '任务信息缺少参数设置'); } const { project, service, method, body } = mission.params; assert(project, '任务信息中未找到需要执行的项目名称'); assert(service, '任务信息中未找到需要访问的服务'); assert(method, '任务信息中未找到要执行的方法'); try { this.toQueue( project, JSON.stringify({ ...body, missionid: id, project, service, method }) ); } catch (error) { console.error(error.toString()); await this.updateOne({ id }, { status: '3' }); throw new BusinessError(ErrorCode.SERVICE_FAULT, '执行任务失败'); } await this.updateOne({ id }, { status: '1', progress: undefined }); } async updateProgress({ id, progress, status = '1', remark }) { if (id && progress) { await this.model.updateOne({ _id: ObjectId(id) }, { progress, status, remark }); this.toMq({ id }); } } async toMq({ id }) { if (this.mq) { const exchange = 'mission'; const prefix = []; if (id) { const mission = await this.model.findById(id); if (mission) { const { user, params } = mission; const { project } = params; if (project) prefix.push(project); if (user)prefix.push(user); } } const routerKey = `${prefix.join('/')}/remind`; console.log(routerKey); const parm = { durable: true }; await this.mq.fanout(exchange, routerKey, 'to refresh', parm); } } async toQueue(project, data) { const ch = await this.mq.conn.createChannel(); const queue = `mission/${project}`; await ch.assertQueue(queue, { durable: false }); await ch.sendToQueue(queue, Buffer.from(data)); await ch.close(); } } module.exports = MissionService;