'use strict'; const assert = require('assert'); const _ = require('lodash'); const moment = require('moment'); const { ObjectId } = require('mongoose').Types; const { CrudService } = require('naf-framework-mongoose/lib/service'); const { BusinessError, ErrorCode } = require('naf-core').Error; class MissionService extends CrudService { constructor(ctx) { super(ctx, 'mission'); this.model = this.ctx.model.Mission; this.mq = this.ctx.mq; } async create(data) { const res = await this.model.create(data); await this.toMq(); await this.start({ id: res.id }); return res; } async update({ id }, { params, ...data }) { // 需要精确修改,所以分成2部分去更改,一部分是非params部分,一部分是params更新 const obj = {}; if (params) { const keys = Object.keys(params); for (const key of keys) { obj[`params.${key}`] = params[key]; } } if (data) { const dks = Object.keys(data); for (const dk of dks) { obj[dk] = data[dk]; } } if (obj.status === '2') obj.progress = '100'; const mission = await this.model.update({ _id: ObjectId(id) }, obj); await this.toMq(); return mission; } async delete({ id }) { const res = await this.model.findByIdAndDelete(id); await this.toMq(); return res; } async start({ id }) { const mission = await this.model.findById(id); if (!mission) { throw new BusinessError( ErrorCode.DATA_NOT_EXIST, '未找到指定任务,无法开始任务!' ); } if (!mission.params) { throw new BusinessError(ErrorCode.DATA_INVALID, '任务信息缺少参数设置'); } const { project, service, method, body } = mission.params; assert(project, '任务信息中未找到需要执行的项目名称'); assert(service, '任务信息中未找到需要访问的服务'); assert(method, '任务信息中未找到要执行的方法'); // if (query) { // const keys = Object.keys(query); // if (keys.length > 0) url = `${url}?`; // for (let i = 0; i < keys.length; i++) { // const key = keys[i]; // const value = query[key]; // if (i === 0) url = `${url}${key}=${value}`; // else url = `${url}&&${key}=${value}`; // } // } try { this.toQueue( project, JSON.stringify({ ...body, missionid: id, project, service, method }) ); } catch (error) { console.error(error.toString()); await this.update({ id }, { status: '3' }); throw new BusinessError(ErrorCode.SERVICE_FAULT, '执行任务失败'); } await this.update({ id }, { status: '1', progress: undefined }); } async updateProgress({ id, progress }) { if (id && progress) { await this.model.update({ _id: ObjectId(id) }, { progress }); this.toMq(); } } async toMq() { if (this.mq) { const exchange = 'mission'; const routerKey = 'remind'; const parm = { durable: true }; await this.mq.fanout(exchange, routerKey, 'to refresh', parm); } } async toQueue(project, data) { const ch = await this.mq.conn.createChannel(); const queue = `mission/${project}`; await ch.assertQueue(queue, { durable: false }); await ch.sendToQueue(queue, Buffer.from(data)); await ch.close(); } } module.exports = MissionService;