mission.js 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. 'use strict';
  2. const assert = require('assert');
  3. const _ = require('lodash');
  4. const moment = require('moment');
  5. const { ObjectId } = require('mongoose').Types;
  6. const { CrudService } = require('naf-framework-mongoose/lib/service');
  7. const { BusinessError, ErrorCode } = require('naf-core').Error;
  8. class MissionService extends CrudService {
  9. constructor(ctx) {
  10. super(ctx, 'mission');
  11. this.model = this.ctx.model.Mission;
  12. this.mq = this.ctx.mq;
  13. }
  14. async create(data) {
  15. const res = await this.model.create(data);
  16. await this.toMq({ id: res.id });
  17. await this.start({ id: res.id });
  18. return res;
  19. }
  20. async update({ id }, { params, ...data }) {
  21. // 需要精确修改,所以分成2部分去更改,一部分是非params部分,一部分是params更新
  22. const obj = {};
  23. if (params) {
  24. const keys = Object.keys(params);
  25. for (const key of keys) {
  26. obj[`params.${key}`] = params[key];
  27. }
  28. }
  29. if (data) {
  30. const dks = Object.keys(data);
  31. for (const dk of dks) {
  32. obj[dk] = data[dk];
  33. }
  34. }
  35. if (obj.status === '2') obj.progress = '100';
  36. const mission = await this.model.updateOne({ _id: ObjectId(id) }, obj);
  37. await this.toMq({ id });
  38. return mission;
  39. }
  40. async delete({ id }) {
  41. const res = await this.model.findByIdAndDelete(id);
  42. await this.toMq({ id });
  43. return res;
  44. }
  45. async start({ id }) {
  46. const mission = await this.model.findById(id);
  47. if (!mission) {
  48. throw new BusinessError(
  49. ErrorCode.DATA_NOT_EXIST,
  50. '未找到指定任务,无法开始任务!'
  51. );
  52. }
  53. if (!mission.params) { throw new BusinessError(ErrorCode.DATA_INVALID, '任务信息缺少参数设置'); }
  54. const { project, service, method, body, query } = mission.params;
  55. assert(project, '任务信息中未找到需要执行的项目名称');
  56. assert(service, '任务信息中未找到需要访问的服务');
  57. assert(method, '任务信息中未找到要执行的方法');
  58. try {
  59. this.toQueue(project, JSON.stringify({ ...body, missionid: id, project, service, method, query }));
  60. } catch (error) {
  61. console.error(error.toString());
  62. await this.model.updateOne({ id }, { status: '3' });
  63. throw new BusinessError(ErrorCode.SERVICE_FAULT, '执行任务失败');
  64. }
  65. await this.model.updateOne({ id }, { status: '1', progress: undefined });
  66. }
  67. async updateProgress(data) {
  68. const { id, progress, status = '1', ...others } = data;
  69. if (id && progress) {
  70. await this.model.updateOne({ _id: ObjectId(id) }, { progress, status, ...others });
  71. this.toMq({ id });
  72. }
  73. }
  74. async toMq({ id }) {
  75. if (this.mq) {
  76. const exchange = 'mission';
  77. const prefix = [];
  78. if (id) {
  79. const mission = await this.model.findById(id);
  80. if (mission) {
  81. const { user, params } = mission;
  82. const { project } = params;
  83. if (project) prefix.push(project);
  84. if (user)prefix.push(user);
  85. }
  86. }
  87. const routerKey = `${prefix.join('/')}/remind`;
  88. console.log(routerKey);
  89. const parm = { durable: true };
  90. await this.mq.fanout(exchange, routerKey, 'to refresh', parm);
  91. }
  92. }
  93. async toQueue(project, data) {
  94. const ch = await this.mq.conn.createChannel();
  95. const queue = `mission/${project}`;
  96. await ch.assertQueue(queue, { durable: false });
  97. await ch.sendToQueue(queue, Buffer.from(data));
  98. await ch.close();
  99. }
  100. }
  101. module.exports = MissionService;