123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- import { Processor, IProcessor } from '@midwayjs/bull';
- import { DataRecordService } from '../service/record/dataRecord.service';
- import { Inject } from '@midwayjs/decorator';
- import { difference, differenceBy, get, intersection, isObject } from 'lodash';
- import { ElasticsearchService } from '../service/elasticsearch';
- import { FORMAT } from '@midwayjs/core';
- @Processor('esSync', {
- repeat: {
- cron: FORMAT.CRONTAB.EVERY_MINUTE,
- },
- })
- export class EsSyncProcessor implements IProcessor {
- @Inject()
- dataRecord: DataRecordService;
- @Inject()
- es: ElasticsearchService;
- async execute() {
- // ...
- console.log('in es sync processor');
- const result = await this.dataRecord.noDealQuery({ es_sync: false });
- console.log(result.length);
- for (const i of result) {
- const origin_data = this.hasContext(get(i, 'origin_data'));
- const new_data = this.hasContext(get(i, 'new_data'));
- // 什么也没有直接下一个
- if (!origin_data && !new_data) continue;
- if (origin_data && !new_data) {
- // 有旧数据,但是没有新数据: 所有的旧数据都是要删除的数据
- const keys = Object.keys(origin_data);
- await this.allDeleteData(i, keys);
- } else if (!origin_data && new_data) {
- // 有新数据但是没有旧数据: 所有的新数据都是添加的
- const keys = Object.keys(new_data);
- await this.allCreateData(i, keys);
- } else {
- // 新旧数据都存在,那就需要拿出来一个一个对比:
- // 新旧数据都有的:修改
- // 有新数据,没有旧数据: 创建
- // 有旧数据,但是没有新数据: 删除
- const odkeys = Object.keys(origin_data);
- const ndkeys = Object.keys(new_data);
- // 比较下表有没有区别
- // d1: 以原数据为基础,新数据缺少的表.全都删除
- const d1 = difference(odkeys, ndkeys);
- // d2: 以新数据为基础,原数据缺少的表.全都创建
- const d2 = difference(ndkeys, odkeys);
- // d3: 两组数据共有的表,都是修改
- const d3 = intersection(odkeys, ndkeys);
- console.log(d1, d2, d3);
- if (d1.length > 0) {
- // 全部删除
- await this.allDeleteData(i, d1);
- }
- if (d2.length > 0) {
- // 全创建
- await this.allCreateData(i, d2);
- }
- if (d3.length > 0) {
- // 需要两组数据对比着进行数据甄别
- await this.updateData(i, d3);
- }
- }
- await this.dataRecord.updateOne(i._id, { es_sync: true });
- }
- }
- async updateData(searchResult, tables) {
- const ods = get(searchResult, 'origin_data');
- const nds = get(searchResult, 'new_data');
- for (const key of tables) {
- // 因为走upsert,所以只需要知道那些是删除就行
- const odList = ods[key];
- // ndList: upsert的数据
- const ndList = nds[key];
- // 以原数据组为基础,不在新数据组中的数据:删除
- const deleteList = differenceBy(odList, ndList, '_id');
- // 批量删除
- if (deleteList.length > 0) await this.es.deleteBat(key, deleteList);
- // 批量修改
- if (ndList.length > 0) await this.es.createAndUpdateBat(key, ndList);
- }
- }
- async allDeleteData(searchResult, tables) {
- const ds = get(searchResult, 'origin_data');
- for (const key of tables) {
- const datas = ds[key];
- if (datas.length <= 0) continue;
- // 批量删除
- await this.es.deleteBat(key, datas);
- }
- }
- async allCreateData(searchResult, tables) {
- const ds = get(searchResult, 'new_data');
- for (const key of tables) {
- const datas = ds[key];
- if (datas.length <= 0) continue;
- // 批量创建
- await this.es.createAndUpdateBat(key, datas);
- }
- }
- hasContext(obj) {
- if (!obj) return false;
- if (!isObject(obj)) return false;
- if (Object.keys(obj).length <= 0) return false;
- return obj;
- }
- }
|