123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- import { App, Config, Init, Inject, Logger, Provide, Singleton } from '@midwayjs/decorator';
- import { Application, Context } from '@midwayjs/koa';
- import { InjectEntityModel } from '@midwayjs/typegoose';
- import { ReturnModelType } from '@typegoose/typegoose';
- import { get, head, last } from 'lodash';
- import { DataLogs } from '../entityRecord/dataLogs.entity';
- import { mongoose } from '@typegoose/typegoose';
- import * as dayjs from 'dayjs';
- import { Types } from 'mongoose';
- import { DataRecordService } from './record/dataRecord.service';
- import { GetModel } from 'free-midway-component';
- import { ILogger } from '@midwayjs/core';
- const ObjectId = Types.ObjectId;
- @Provide()
- @Singleton()
- export class DBService {
- @Config('dbName')
- dbName: string;
- @Logger()
- logger: ILogger;
- @Config('mongoose.dataSource')
- mongooseConfig: any;
- @InjectEntityModel(DataLogs)
- dataLogsModel: ReturnModelType<typeof DataLogs>;
- @Inject()
- dataRecordService: DataRecordService;
- conn: any;
- @App()
- app: Application;
- @Init()
- async init() {
- const conns = mongoose.connections;
- const conn = conns.find(f => f.name === this.dbName);
- console.log('change stream init');
- conn.watch([], { fullDocument: 'updateLookup' }).on('change', async data => {
- const record_id: string = this.app.getAttr('record_id');
- if (!record_id) return;
- // 查询日志数据,没有直接返回
- const record: any = await this.dataRecordService.fetch(record_id);
- if (!record) return;
- const { operationType } = data;
- const modelName: string = get(data, 'ns.coll');
- // 没有表名不能继续
- if (!modelName) return;
- const model = GetModel(modelName);
- // 未找到model不能继续
- if (!model) return;
- const allowOpera = ['insert', 'update', 'delete'];
- // 只对表数据变化做处理
- if (!allowOpera.includes(operationType)) return;
- let data_id;
- if (get(data, 'documentKey._id') && ObjectId.isValid(get(data, 'documentKey._id'))) data_id = new ObjectId(get(data, 'documentKey._id')).toString();
- else {
- console.log('id error');
- return false;
- }
- const logsObj = { coll: modelName, time: dayjs().format('YYYY-MM-DD HH:mm:ss'), data_id, data: get(data, 'fullDocument'), method: operationType };
- // 多个数据同步,不过根据logsObj去查询.不重复添加记录.一个数据同一个时间只允许记录一个,先来先记
- const num = await this.dataLogsModel.count(logsObj);
- if (num > 0) {
- console.log('多个变更记录');
- return;
- }
- let new_data, origin_data;
- if (operationType === 'insert') {
- // 无原数据
- // 可以直接创建数据
- await this.dataLogsModel.create(logsObj);
- new_data = logsObj.data;
- } else if (operationType === 'update') {
- // 可以直接创建数据并且取最后两条
- await this.dataLogsModel.create(logsObj);
- const logs = await this.dataLogsModel.find({ coll: logsObj.coll, data_id: logsObj.data_id }).sort({ time: -1 }).limit(2).lean();
- const newLogs = head(logs);
- const originLogs = last(logs);
- if (newLogs) new_data = get(newLogs, 'data');
- if (originLogs) origin_data = get(originLogs, 'data');
- } else if (operationType === 'delete') {
- // 无新数据
- // 先取最后一条,再创建
- const logs = await this.dataLogsModel.find({ coll: logsObj.coll, data_id: logsObj.data_id }).sort({ time: -1 }).limit(1).lean();
- const originLogs = head(logs);
- if (originLogs) origin_data = get(originLogs, 'data');
- await this.dataLogsModel.create(logsObj);
- }
- // 处理数据
- if (origin_data) {
- const rod = get(record, 'origin_data', {});
- if (!rod[modelName]) rod[modelName] = [];
- rod[modelName].push(origin_data);
- record.origin_data = rod;
- }
- if (new_data) {
- const nod = get(record, 'new_data', {});
- if (!nod[modelName]) nod[modelName] = [];
- nod[modelName].push(new_data);
- record.new_data = nod;
- }
- // 更新记录
- await this.dataRecordService.updateOne(record_id, record);
- this.logger.warn('record success')
- });
- }
- }
|