import { App, Config, Init, Inject, Logger, Provide, Singleton } from '@midwayjs/decorator'; import { Application, Context } from '@midwayjs/koa'; import { InjectEntityModel } from '@midwayjs/typegoose'; import { ReturnModelType } from '@typegoose/typegoose'; import { get, head, last } from 'lodash'; import { DataLogs } from '../entityRecord/dataLogs.entity'; import { mongoose } from '@typegoose/typegoose'; import * as dayjs from 'dayjs'; import { Types } from 'mongoose'; import { DataRecordService } from './record/dataRecord.service'; import { GetModel } from 'free-midway-component'; import { ILogger } from '@midwayjs/core'; const ObjectId = Types.ObjectId; @Provide() @Singleton() export class DBService { @Config('dbName') dbName: string; @Logger() logger: ILogger; @Config('mongoose.dataSource') mongooseConfig: any; @InjectEntityModel(DataLogs) dataLogsModel: ReturnModelType; @Inject() dataRecordService: DataRecordService; conn: any; @App() app: Application; @Init() async init() { const conns = mongoose.connections; const conn = conns.find(f => f.name === this.dbName); console.log('change stream init'); conn.watch([], { fullDocument: 'updateLookup' }).on('change', async data => { const record_id: string = this.app.getAttr('record_id'); if (!record_id) return; // 查询日志数据,没有直接返回 const record: any = await this.dataRecordService.fetch(record_id); if (!record) return; const { operationType } = data; const modelName: string = get(data, 'ns.coll'); // 没有表名不能继续 if (!modelName) return; const model = GetModel(modelName); // 未找到model不能继续 if (!model) return; const allowOpera = ['insert', 'update', 'delete']; // 只对表数据变化做处理 if (!allowOpera.includes(operationType)) return; let data_id; if (get(data, 'documentKey._id') && ObjectId.isValid(get(data, 'documentKey._id'))) data_id = new ObjectId(get(data, 'documentKey._id')).toString(); else { console.log('id error'); return false; } const logsObj = { coll: modelName, time: dayjs().format('YYYY-MM-DD HH:mm:ss'), data_id, data: get(data, 'fullDocument'), method: operationType }; // 多个数据同步,不过根据logsObj去查询.不重复添加记录.一个数据同一个时间只允许记录一个,先来先记 const num = await this.dataLogsModel.count(logsObj); if (num > 0) { console.log('多个变更记录'); return; } let new_data, origin_data; if (operationType === 'insert') { // 无原数据 // 可以直接创建数据 await this.dataLogsModel.create(logsObj); new_data = logsObj.data; } else if (operationType === 'update') { // 可以直接创建数据并且取最后两条 await this.dataLogsModel.create(logsObj); const logs = await this.dataLogsModel.find({ coll: logsObj.coll, data_id: logsObj.data_id }).sort({ time: -1 }).limit(2).lean(); const newLogs = head(logs); const originLogs = last(logs); if (newLogs) new_data = get(newLogs, 'data'); if (originLogs) origin_data = get(originLogs, 'data'); } else if (operationType === 'delete') { // 无新数据 // 先取最后一条,再创建 const logs = await this.dataLogsModel.find({ coll: logsObj.coll, data_id: logsObj.data_id }).sort({ time: -1 }).limit(1).lean(); const originLogs = head(logs); if (originLogs) origin_data = get(originLogs, 'data'); await this.dataLogsModel.create(logsObj); } // 处理数据 if (origin_data) { const rod = get(record, 'origin_data', {}); if (!rod[modelName]) rod[modelName] = []; rod[modelName].push(origin_data); record.origin_data = rod; } if (new_data) { const nod = get(record, 'new_data', {}); if (!nod[modelName]) nod[modelName] = []; nod[modelName].push(new_data); record.new_data = nod; } // 更新记录 await this.dataRecordService.updateOne(record_id, record); this.logger.warn('record success') }); } }