import { Processor, IProcessor } from '@midwayjs/bull'; import { DataRecordService } from '../service/record/dataRecord.service'; import { Inject } from '@midwayjs/decorator'; import { difference, differenceBy, get, intersection, isObject } from 'lodash'; import { ElasticsearchService } from '../service/elasticsearch'; import { FORMAT } from '@midwayjs/core'; @Processor('esSync', { repeat: { cron: FORMAT.CRONTAB.EVERY_MINUTE, }, }) export class EsSyncProcessor implements IProcessor { @Inject() dataRecord: DataRecordService; @Inject() es: ElasticsearchService; async execute() { // ... console.log('in es sync processor'); const result = await this.dataRecord.noDealQuery({ es_sync: false }); console.log(result.length); for (const i of result) { const origin_data = this.hasContext(get(i, 'origin_data')); const new_data = this.hasContext(get(i, 'new_data')); // 什么也没有直接下一个 if (!origin_data && !new_data) continue; if (origin_data && !new_data) { // 有旧数据,但是没有新数据: 所有的旧数据都是要删除的数据 const keys = Object.keys(origin_data); await this.allDeleteData(i, keys); } else if (!origin_data && new_data) { // 有新数据但是没有旧数据: 所有的新数据都是添加的 const keys = Object.keys(new_data); await this.allCreateData(i, keys); } else { // 新旧数据都存在,那就需要拿出来一个一个对比: // 新旧数据都有的:修改 // 有新数据,没有旧数据: 创建 // 有旧数据,但是没有新数据: 删除 const odkeys = Object.keys(origin_data); const ndkeys = Object.keys(new_data); // 比较下表有没有区别 // d1: 以原数据为基础,新数据缺少的表.全都删除 const d1 = difference(odkeys, ndkeys); // d2: 以新数据为基础,原数据缺少的表.全都创建 const d2 = difference(ndkeys, odkeys); // d3: 两组数据共有的表,都是修改 const d3 = intersection(odkeys, ndkeys); console.log(d1, d2, d3); if (d1.length > 0) { // 全部删除 await this.allDeleteData(i, d1); } if (d2.length > 0) { // 全创建 await this.allCreateData(i, d2); } if (d3.length > 0) { // 需要两组数据对比着进行数据甄别 await this.updateData(i, d3); } } await this.dataRecord.updateOne(i._id, { es_sync: true }); } } async updateData(searchResult, tables) { const ods = get(searchResult, 'origin_data'); const nds = get(searchResult, 'new_data'); for (const key of tables) { // 因为走upsert,所以只需要知道那些是删除就行 const odList = ods[key]; // ndList: upsert的数据 const ndList = nds[key]; // 以原数据组为基础,不在新数据组中的数据:删除 const deleteList = differenceBy(odList, ndList, '_id'); // 批量删除 if (deleteList.length > 0) await this.es.deleteBat(key, deleteList); // 批量修改 if (ndList.length > 0) await this.es.createAndUpdateBat(key, ndList); } } async allDeleteData(searchResult, tables) { const ds = get(searchResult, 'origin_data'); for (const key of tables) { const datas = ds[key]; if (datas.length <= 0) continue; // 批量删除 await this.es.deleteBat(key, datas); } } async allCreateData(searchResult, tables) { const ds = get(searchResult, 'new_data'); for (const key of tables) { const datas = ds[key]; if (datas.length <= 0) continue; // 批量创建 await this.es.createAndUpdateBat(key, datas); } } hasContext(obj) { if (!obj) return false; if (!isObject(obj)) return false; if (Object.keys(obj).length <= 0) return false; return obj; } }