esSync.queue.ts 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. import { Processor, IProcessor } from '@midwayjs/bull';
  2. import { DataRecordService } from '../service/record/dataRecord.service';
  3. import { Inject } from '@midwayjs/decorator';
  4. import { difference, differenceBy, get, intersection, isObject } from 'lodash';
  5. import { ElasticsearchService } from '../service/elasticsearch';
  6. import { FORMAT } from '@midwayjs/core';
  7. @Processor('esSync', {
  8. repeat: {
  9. cron: FORMAT.CRONTAB.EVERY_MINUTE,
  10. },
  11. })
  12. export class EsSyncProcessor implements IProcessor {
  13. @Inject()
  14. dataRecord: DataRecordService;
  15. @Inject()
  16. es: ElasticsearchService;
  17. async execute() {
  18. // ...
  19. console.log('in es sync processor');
  20. const result = await this.dataRecord.noDealQuery({ es_sync: false });
  21. console.log(result.length);
  22. for (const i of result) {
  23. const origin_data = this.hasContext(get(i, 'origin_data'));
  24. const new_data = this.hasContext(get(i, 'new_data'));
  25. // 什么也没有直接下一个
  26. if (!origin_data && !new_data) continue;
  27. if (origin_data && !new_data) {
  28. // 有旧数据,但是没有新数据: 所有的旧数据都是要删除的数据
  29. const keys = Object.keys(origin_data);
  30. await this.allDeleteData(i, keys);
  31. } else if (!origin_data && new_data) {
  32. // 有新数据但是没有旧数据: 所有的新数据都是添加的
  33. const keys = Object.keys(new_data);
  34. await this.allCreateData(i, keys);
  35. } else {
  36. // 新旧数据都存在,那就需要拿出来一个一个对比:
  37. // 新旧数据都有的:修改
  38. // 有新数据,没有旧数据: 创建
  39. // 有旧数据,但是没有新数据: 删除
  40. const odkeys = Object.keys(origin_data);
  41. const ndkeys = Object.keys(new_data);
  42. // 比较下表有没有区别
  43. // d1: 以原数据为基础,新数据缺少的表.全都删除
  44. const d1 = difference(odkeys, ndkeys);
  45. // d2: 以新数据为基础,原数据缺少的表.全都创建
  46. const d2 = difference(ndkeys, odkeys);
  47. // d3: 两组数据共有的表,都是修改
  48. const d3 = intersection(odkeys, ndkeys);
  49. console.log(d1, d2, d3);
  50. if (d1.length > 0) {
  51. // 全部删除
  52. await this.allDeleteData(i, d1);
  53. }
  54. if (d2.length > 0) {
  55. // 全创建
  56. await this.allCreateData(i, d2);
  57. }
  58. if (d3.length > 0) {
  59. // 需要两组数据对比着进行数据甄别
  60. await this.updateData(i, d3);
  61. }
  62. }
  63. await this.dataRecord.updateOne(i._id, { es_sync: true });
  64. }
  65. }
  66. async updateData(searchResult, tables) {
  67. const ods = get(searchResult, 'origin_data');
  68. const nds = get(searchResult, 'new_data');
  69. for (const key of tables) {
  70. // 因为走upsert,所以只需要知道那些是删除就行
  71. const odList = ods[key];
  72. // ndList: upsert的数据
  73. const ndList = nds[key];
  74. // 以原数据组为基础,不在新数据组中的数据:删除
  75. const deleteList = differenceBy(odList, ndList, '_id');
  76. // 批量删除
  77. if (deleteList.length > 0) await this.es.deleteBat(key, deleteList);
  78. // 批量修改
  79. if (ndList.length > 0) await this.es.createAndUpdateBat(key, ndList);
  80. }
  81. }
  82. async allDeleteData(searchResult, tables) {
  83. const ds = get(searchResult, 'origin_data');
  84. for (const key of tables) {
  85. const datas = ds[key];
  86. if (datas.length <= 0) continue;
  87. // 批量删除
  88. await this.es.deleteBat(key, datas);
  89. }
  90. }
  91. async allCreateData(searchResult, tables) {
  92. const ds = get(searchResult, 'new_data');
  93. for (const key of tables) {
  94. const datas = ds[key];
  95. if (datas.length <= 0) continue;
  96. // 批量创建
  97. await this.es.createAndUpdateBat(key, datas);
  98. }
  99. }
  100. hasContext(obj) {
  101. if (!obj) return false;
  102. if (!isObject(obj)) return false;
  103. if (Object.keys(obj).length <= 0) return false;
  104. return obj;
  105. }
  106. }