Browse Source

任务队列,每分钟同步一次日志数据变化到es中

lrf 1 year ago
parent
commit
bc40198048
1 changed files with 110 additions and 0 deletions
  1. 110 0
      src/queue/esSync.queue.ts

+ 110 - 0
src/queue/esSync.queue.ts

@@ -0,0 +1,110 @@
+import { Processor, IProcessor } from '@midwayjs/bull';
+import { DataRecordService } from '../service/record/dataRecord.service';
+import { Inject } from '@midwayjs/decorator';
+import { difference, differenceBy, get, intersection, isObject } from 'lodash';
+import { ElasticsearchService } from '../service/elasticsearch';
+import { FORMAT } from '@midwayjs/core';
+@Processor('esSync', {
+  repeat: {
+    cron: FORMAT.CRONTAB.EVERY_MINUTE,
+  },
+})
+export class EsSyncProcessor implements IProcessor {
+  @Inject()
+  dataRecord: DataRecordService;
+  @Inject()
+  es: ElasticsearchService;
+  async execute() {
+    // ...
+    console.log('in es sync processor');
+    const result = await this.dataRecord.noDealQuery({ es_sync: false });
+    console.log(result.length);
+    for (const i of result) {
+      const origin_data = this.hasContext(get(i, 'origin_data'));
+      const new_data = this.hasContext(get(i, 'new_data'));
+      // 什么也没有直接下一个
+      if (!origin_data && !new_data) continue;
+      if (origin_data && !new_data) {
+        // 有旧数据,但是没有新数据: 所有的旧数据都是要删除的数据
+        const keys = Object.keys(origin_data);
+        await this.allDeleteData(i, keys);
+      } else if (!origin_data && new_data) {
+        // 有新数据但是没有旧数据: 所有的新数据都是添加的
+        const keys = Object.keys(new_data);
+        await this.allCreateData(i, keys);
+      } else {
+        // 新旧数据都存在,那就需要拿出来一个一个对比:
+        // 新旧数据都有的:修改
+        // 有新数据,没有旧数据: 创建
+        // 有旧数据,但是没有新数据: 删除
+        const odkeys = Object.keys(origin_data);
+        const ndkeys = Object.keys(new_data);
+        // 比较下表有没有区别
+        // d1: 以原数据为基础,新数据缺少的表.全都删除
+        const d1 = difference(odkeys, ndkeys);
+        // d2: 以新数据为基础,原数据缺少的表.全都创建
+        const d2 = difference(ndkeys, odkeys);
+        // d3: 两组数据共有的表,都是修改
+        const d3 = intersection(odkeys, ndkeys);
+        console.log(d1, d2, d3);
+        if (d1.length > 0) {
+          // 全部删除
+          await this.allDeleteData(i, d1);
+        }
+        if (d2.length > 0) {
+          // 全创建
+          await this.allCreateData(i, d2);
+        }
+        if (d3.length > 0) {
+          // 需要两组数据对比着进行数据甄别
+          await this.updateData(i, d3);
+        }
+      }
+      await this.dataRecord.updateOne(i._id, { es_sync: true });
+    }
+  }
+
+  async updateData(searchResult, tables) {
+    const ods = get(searchResult, 'origin_data');
+    const nds = get(searchResult, 'new_data');
+    for (const key of tables) {
+      // 因为走upsert,所以只需要知道那些是删除就行
+      const odList = ods[key];
+      // ndList: upsert的数据
+      const ndList = nds[key];
+      // 以原数据组为基础,不在新数据组中的数据:删除
+      const deleteList = differenceBy(odList, ndList, '_id');
+      // 批量删除
+      if (deleteList.length > 0) await this.es.deleteBat(key, deleteList);
+      // 批量修改
+      if (ndList.length > 0) await this.es.createAndUpdateBat(key, ndList);
+    }
+  }
+
+  async allDeleteData(searchResult, tables) {
+    const ds = get(searchResult, 'origin_data');
+    for (const key of tables) {
+      const datas = ds[key];
+      if (datas.length <= 0) continue;
+      // 批量删除
+      await this.es.deleteBat(key, datas);
+    }
+  }
+
+  async allCreateData(searchResult, tables) {
+    const ds = get(searchResult, 'new_data');
+    for (const key of tables) {
+      const datas = ds[key];
+      if (datas.length <= 0) continue;
+      // 批量创建
+      await this.es.createAndUpdateBat(key, datas);
+    }
+  }
+
+  hasContext(obj) {
+    if (!obj) return false;
+    if (!isObject(obj)) return false;
+    if (Object.keys(obj).length <= 0) return false;
+    return obj;
+  }
+}