mission.js 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. 'use strict';
  2. const assert = require('assert');
  3. const _ = require('lodash');
  4. const moment = require('moment');
  5. const { ObjectId } = require('mongoose').Types;
  6. const { CrudService } = require('naf-framework-mongoose/lib/service');
  7. const { BusinessError, ErrorCode } = require('naf-core').Error;
  8. class MissionService extends CrudService {
  9. constructor(ctx) {
  10. super(ctx, 'mission');
  11. this.model = this.ctx.model.Mission;
  12. this.mq = this.ctx.mq;
  13. }
  14. async create(data) {
  15. const res = await this.model.create(data);
  16. await this.toMq({ id: res.id });
  17. await this.start({ id: res.id });
  18. return res;
  19. }
  20. async update({ id }, { params, ...data }) {
  21. // 需要精确修改,所以分成2部分去更改,一部分是非params部分,一部分是params更新
  22. const obj = {};
  23. if (params) {
  24. const keys = Object.keys(params);
  25. for (const key of keys) {
  26. obj[`params.${key}`] = params[key];
  27. }
  28. }
  29. if (data) {
  30. const dks = Object.keys(data);
  31. for (const dk of dks) {
  32. obj[dk] = data[dk];
  33. }
  34. }
  35. if (obj.status === '2') obj.progress = '100';
  36. const mission = await this.model.updateOne({ _id: ObjectId(id) }, obj);
  37. await this.toMq({ id });
  38. return mission;
  39. }
  40. async delete({ id }) {
  41. const res = await this.model.findByIdAndDelete(id);
  42. await this.toMq({ id });
  43. return res;
  44. }
  45. async start({ id }) {
  46. const mission = await this.model.findById(id);
  47. if (!mission) {
  48. throw new BusinessError(
  49. ErrorCode.DATA_NOT_EXIST,
  50. '未找到指定任务,无法开始任务!'
  51. );
  52. }
  53. if (!mission.params) { throw new BusinessError(ErrorCode.DATA_INVALID, '任务信息缺少参数设置'); }
  54. const { project, service, method, body } = mission.params;
  55. assert(project, '任务信息中未找到需要执行的项目名称');
  56. assert(service, '任务信息中未找到需要访问的服务');
  57. assert(method, '任务信息中未找到要执行的方法');
  58. try {
  59. this.toQueue(
  60. project,
  61. JSON.stringify({ ...body, missionid: id, project, service, method })
  62. );
  63. } catch (error) {
  64. console.error(error.toString());
  65. await this.updateOne({ id }, { status: '3' });
  66. throw new BusinessError(ErrorCode.SERVICE_FAULT, '执行任务失败');
  67. }
  68. await this.updateOne({ id }, { status: '1', progress: undefined });
  69. }
  70. async updateProgress({ id, progress, status = '1', remark }) {
  71. if (id && progress) {
  72. await this.model.updateOne({ _id: ObjectId(id) }, { progress, status, remark });
  73. this.toMq({ id });
  74. }
  75. }
  76. async toMq({ id }) {
  77. if (this.mq) {
  78. const exchange = 'mission';
  79. const prefix = [];
  80. if (id) {
  81. const mission = await this.model.findById(id);
  82. if (mission) {
  83. const { user, params } = mission;
  84. const { project } = params;
  85. if (project) prefix.push(project);
  86. if (user)prefix.push(user);
  87. }
  88. }
  89. const routerKey = `${prefix.join('/')}/remind`;
  90. console.log(routerKey);
  91. const parm = { durable: true };
  92. await this.mq.fanout(exchange, routerKey, 'to refresh', parm);
  93. }
  94. }
  95. async toQueue(project, data) {
  96. const ch = await this.mq.conn.createChannel();
  97. const queue = `mission/${project}`;
  98. await ch.assertQueue(queue, { durable: false });
  99. await ch.sendToQueue(queue, Buffer.from(data));
  100. await ch.close();
  101. }
  102. }
  103. module.exports = MissionService;