db.service.ts 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. import { App, Config, Init, Inject, Logger, Provide, Singleton } from '@midwayjs/decorator';
  2. import { Application, Context } from '@midwayjs/koa';
  3. import { InjectEntityModel } from '@midwayjs/typegoose';
  4. import { ReturnModelType } from '@typegoose/typegoose';
  5. import { get, head, last } from 'lodash';
  6. import { DataLogs } from '../entityRecord/dataLogs.entity';
  7. import { mongoose } from '@typegoose/typegoose';
  8. import * as dayjs from 'dayjs';
  9. import { Types } from 'mongoose';
  10. import { DataRecordService } from './record/dataRecord.service';
  11. import { GetModel } from 'free-midway-component';
  12. import { ILogger } from '@midwayjs/core';
  13. const ObjectId = Types.ObjectId;
  14. @Provide()
  15. @Singleton()
  16. export class DBService {
  17. @Config('dbName')
  18. dbName: string;
  19. @Logger()
  20. logger: ILogger;
  21. @Config('mongoose.dataSource')
  22. mongooseConfig: any;
  23. @InjectEntityModel(DataLogs)
  24. dataLogsModel: ReturnModelType<typeof DataLogs>;
  25. @Inject()
  26. dataRecordService: DataRecordService;
  27. conn: any;
  28. @App()
  29. app: Application;
  30. @Init()
  31. async init() {
  32. const conns = mongoose.connections;
  33. const conn = conns.find(f => f.name === this.dbName);
  34. console.log('change stream init');
  35. conn.watch([], { fullDocument: 'updateLookup' }).on('change', async data => {
  36. const record_id: string = this.app.getAttr('record_id');
  37. if (!record_id) return;
  38. // 查询日志数据,没有直接返回
  39. const record: any = await this.dataRecordService.fetch(record_id);
  40. if (!record) return;
  41. const { operationType } = data;
  42. const modelName: string = get(data, 'ns.coll');
  43. // 没有表名不能继续
  44. if (!modelName) return;
  45. const model = GetModel(modelName);
  46. // 未找到model不能继续
  47. if (!model) return;
  48. const allowOpera = ['insert', 'update', 'delete'];
  49. // 只对表数据变化做处理
  50. if (!allowOpera.includes(operationType)) return;
  51. let data_id;
  52. if (get(data, 'documentKey._id') && ObjectId.isValid(get(data, 'documentKey._id'))) data_id = new ObjectId(get(data, 'documentKey._id')).toString();
  53. else {
  54. console.log('id error');
  55. return false;
  56. }
  57. const logsObj = { coll: modelName, time: dayjs().format('YYYY-MM-DD HH:mm:ss'), data_id, data: get(data, 'fullDocument'), method: operationType };
  58. // 多个数据同步,不过根据logsObj去查询.不重复添加记录.一个数据同一个时间只允许记录一个,先来先记
  59. const num = await this.dataLogsModel.count(logsObj);
  60. if (num > 0) {
  61. console.log('多个变更记录');
  62. return;
  63. }
  64. let new_data, origin_data;
  65. if (operationType === 'insert') {
  66. // 无原数据
  67. // 可以直接创建数据
  68. await this.dataLogsModel.create(logsObj);
  69. new_data = logsObj.data;
  70. } else if (operationType === 'update') {
  71. // 可以直接创建数据并且取最后两条
  72. await this.dataLogsModel.create(logsObj);
  73. const logs = await this.dataLogsModel.find({ coll: logsObj.coll, data_id: logsObj.data_id }).sort({ time: -1 }).limit(2).lean();
  74. const newLogs = head(logs);
  75. const originLogs = last(logs);
  76. if (newLogs) new_data = get(newLogs, 'data');
  77. if (originLogs) origin_data = get(originLogs, 'data');
  78. } else if (operationType === 'delete') {
  79. // 无新数据
  80. // 先取最后一条,再创建
  81. const logs = await this.dataLogsModel.find({ coll: logsObj.coll, data_id: logsObj.data_id }).sort({ time: -1 }).limit(1).lean();
  82. const originLogs = head(logs);
  83. if (originLogs) origin_data = get(originLogs, 'data');
  84. await this.dataLogsModel.create(logsObj);
  85. }
  86. // 处理数据
  87. if (origin_data) {
  88. const rod = get(record, 'origin_data', {});
  89. if (!rod[modelName]) rod[modelName] = [];
  90. rod[modelName].push(origin_data);
  91. record.origin_data = rod;
  92. }
  93. if (new_data) {
  94. const nod = get(record, 'new_data', {});
  95. if (!nod[modelName]) nod[modelName] = [];
  96. nod[modelName].push(new_data);
  97. record.new_data = nod;
  98. }
  99. // 更新记录
  100. await this.dataRecordService.updateOne(record_id, record);
  101. this.logger.warn('record success')
  102. });
  103. }
  104. }