|
@@ -1,106 +0,0 @@
|
|
|
-import { App, Config, Init, Inject, Logger, Provide, Singleton } from '@midwayjs/decorator';
|
|
|
-import { Application, Context } from '@midwayjs/koa';
|
|
|
-import { InjectEntityModel } from '@midwayjs/typegoose';
|
|
|
-import { ReturnModelType } from '@typegoose/typegoose';
|
|
|
-import { get, head, last } from 'lodash';
|
|
|
-import { DataLogs } from '../entityRecord/dataLogs.entity';
|
|
|
-import { mongoose } from '@typegoose/typegoose';
|
|
|
-import * as dayjs from 'dayjs';
|
|
|
-import { Types } from 'mongoose';
|
|
|
-import { DataRecordService } from './record/dataRecord.service';
|
|
|
-import { GetModel } from 'free-midway-component';
|
|
|
-import { ILogger } from '@midwayjs/core';
|
|
|
-const ObjectId = Types.ObjectId;
|
|
|
-
|
|
|
-@Provide()
|
|
|
-@Singleton()
|
|
|
-export class DBService {
|
|
|
- @Config('dbName')
|
|
|
- dbName: string;
|
|
|
- @Logger()
|
|
|
- logger: ILogger;
|
|
|
- @Config('mongoose.dataSource')
|
|
|
- mongooseConfig: any;
|
|
|
- @InjectEntityModel(DataLogs)
|
|
|
- dataLogsModel: ReturnModelType<typeof DataLogs>;
|
|
|
- @Inject()
|
|
|
- dataRecordService: DataRecordService;
|
|
|
-
|
|
|
- conn: any;
|
|
|
- @App()
|
|
|
- app: Application;
|
|
|
- @Init()
|
|
|
- async init() {
|
|
|
- const conns = mongoose.connections;
|
|
|
- const conn = conns.find(f => f.name === this.dbName);
|
|
|
- console.log('change stream init');
|
|
|
- conn.watch([], { fullDocument: 'updateLookup' }).on('change', async data => {
|
|
|
- const record_id: string = this.app.getAttr('record_id');
|
|
|
- if (!record_id) return;
|
|
|
- // 查询日志数据,没有直接返回
|
|
|
- const record: any = await this.dataRecordService.fetch(record_id);
|
|
|
- if (!record) return;
|
|
|
- const { operationType } = data;
|
|
|
- const modelName: string = get(data, 'ns.coll');
|
|
|
- // 没有表名不能继续
|
|
|
- if (!modelName) return;
|
|
|
- const model = GetModel(modelName);
|
|
|
- // 未找到model不能继续
|
|
|
- if (!model) return;
|
|
|
- const allowOpera = ['insert', 'update', 'delete'];
|
|
|
- // 只对表数据变化做处理
|
|
|
- if (!allowOpera.includes(operationType)) return;
|
|
|
- let data_id;
|
|
|
- if (get(data, 'documentKey._id') && ObjectId.isValid(get(data, 'documentKey._id'))) data_id = new ObjectId(get(data, 'documentKey._id')).toString();
|
|
|
- else {
|
|
|
- console.log('id error');
|
|
|
- return false;
|
|
|
- }
|
|
|
- const logsObj = { coll: modelName, time: dayjs().format('YYYY-MM-DD HH:mm:ss'), data_id, data: get(data, 'fullDocument'), method: operationType };
|
|
|
- // 多个数据同步,不过根据logsObj去查询.不重复添加记录.一个数据同一个时间只允许记录一个,先来先记
|
|
|
- const num = await this.dataLogsModel.count(logsObj);
|
|
|
- if (num > 0) {
|
|
|
- console.log('多个变更记录');
|
|
|
- return;
|
|
|
- }
|
|
|
- let new_data, origin_data;
|
|
|
- if (operationType === 'insert') {
|
|
|
- // 无原数据
|
|
|
- // 可以直接创建数据
|
|
|
- await this.dataLogsModel.create(logsObj);
|
|
|
- new_data = logsObj.data;
|
|
|
- } else if (operationType === 'update') {
|
|
|
- // 可以直接创建数据并且取最后两条
|
|
|
- await this.dataLogsModel.create(logsObj);
|
|
|
- const logs = await this.dataLogsModel.find({ coll: logsObj.coll, data_id: logsObj.data_id }).sort({ time: -1 }).limit(2).lean();
|
|
|
- const newLogs = head(logs);
|
|
|
- const originLogs = last(logs);
|
|
|
- if (newLogs) new_data = get(newLogs, 'data');
|
|
|
- if (originLogs) origin_data = get(originLogs, 'data');
|
|
|
- } else if (operationType === 'delete') {
|
|
|
- // 无新数据
|
|
|
- // 先取最后一条,再创建
|
|
|
- const logs = await this.dataLogsModel.find({ coll: logsObj.coll, data_id: logsObj.data_id }).sort({ time: -1 }).limit(1).lean();
|
|
|
- const originLogs = head(logs);
|
|
|
- if (originLogs) origin_data = get(originLogs, 'data');
|
|
|
- await this.dataLogsModel.create(logsObj);
|
|
|
- }
|
|
|
- // 处理数据
|
|
|
- if (origin_data) {
|
|
|
- const rod = get(record, 'origin_data', {});
|
|
|
- if (!rod[modelName]) rod[modelName] = [];
|
|
|
- rod[modelName].push(origin_data);
|
|
|
- record.origin_data = rod;
|
|
|
- }
|
|
|
- if (new_data) {
|
|
|
- const nod = get(record, 'new_data', {});
|
|
|
- if (!nod[modelName]) nod[modelName] = [];
|
|
|
- nod[modelName].push(new_data);
|
|
|
- record.new_data = nod;
|
|
|
- }
|
|
|
- // 更新记录
|
|
|
- await this.dataRecordService.updateOne(record_id, record);
|
|
|
- this.logger.warn('record success')
|
|
|
- });
|
|
|
- }
|
|
|
-}
|