Procházet zdrojové kódy

数据变化日志;es同步

lrf před 1 rokem
rodič
revize
3a4a553c1c

+ 1 - 0
package.json

@@ -6,6 +6,7 @@
   "dependencies": {
     "@elastic/elasticsearch": "^8.12.2",
     "@midwayjs/bootstrap": "^3.12.0",
+    "@midwayjs/bull": "3",
     "@midwayjs/core": "^3.12.0",
     "@midwayjs/decorator": "^3.12.0",
     "@midwayjs/i18n": "3",

+ 3 - 0
pnpm-lock.yaml

@@ -11,6 +11,9 @@ dependencies:
   '@midwayjs/bootstrap':
     specifier: ^3.12.0
     version: 3.15.6
+  '@midwayjs/bull':
+    specifier: '3'
+    version: 3.15.6
   '@midwayjs/core':
     specifier: ^3.12.0
     version: 3.15.6

+ 18 - 4
src/config/config.local.ts

@@ -21,6 +21,13 @@ export default {
     secret: 'Ziyouyanfa!@#',
     expiresIn: 3600, // 3600
   },
+  elasticsearch: {
+    node: 'http://192.168.1.197:9200',
+    auth: {
+      username: 'elastic',
+      password: 'NAjqFz_7tS2DkdpU7p*x',
+    },
+  },
   mongoose: {
     dataSource: {
       default: {
@@ -53,11 +60,18 @@ export default {
       db: redisDB,
     },
   },
+  bull: {
+    // 默认的队列配置
+    defaultQueueOptions: {
+      redis: {
+        port: 6379,
+        host: redisHost,
+        password: redisPwd,
+        db: redisDB,
+      },
+    },
+  },
   upload: {
     whitelist: null,
   },
-  weixinConfig: {
-    appid: 'wxe8b4c4d5e87a7d31',
-    secret: '60cffc9d8766e8b169db970d2422a647',
-  },
 } as MidwayConfig;

+ 20 - 9
src/config/config.prod.ts

@@ -2,9 +2,9 @@ import { MidwayConfig } from '@midwayjs/core';
 const ip = 'host.docker.internal';
 const redisHost = ip;
 const redisPwd = '123456';
-const redisDB = 6;
+const redisDB = 1;
 const projectDB = 'vue3js-template-test';
-const loginSign = 'tsFrame';
+const loginSign = 'information_platform';
 export default {
   // use for cookie sign key, should change to your own and keep security
   keys: '1697684406848_4978',
@@ -13,13 +13,17 @@ export default {
     port: 9700,
     globalPrefix: '/ts/frame/api',
   },
-  swagger: {
-    swaggerPath: '/doc/api',
-  },
   jwt: {
     secret: 'Ziyouyanfa!@#',
     expiresIn: 3600, // 3600
   },
+  elasticsearch: {
+    node: 'http://localhost:9200',
+    auth: {
+      username: 'elastic',
+      password: 'NAjqFz_7tS2DkdpU7p*x',
+    },
+  },
   mongoose: {
     dataSource: {
       default: {
@@ -42,11 +46,18 @@ export default {
       db: redisDB,
     },
   },
+  bull: {
+    // 默认的队列配置
+    defaultQueueOptions: {
+      redis: {
+        port: 6379,
+        host: redisHost,
+        password: redisPwd,
+        db: redisDB,
+      },
+    },
+  },
   upload: {
     whitelist: null,
   },
-  weixinConfig: {
-    appid: 'wxe8b4c4d5e87a7d31',
-    secret: '60cffc9d8766e8b169db970d2422a647',
-  },
 } as MidwayConfig;

+ 10 - 2
src/configuration.ts

@@ -1,4 +1,4 @@
-import { Configuration, App, Inject, MidwayDecoratorService } from '@midwayjs/core';
+import { Configuration, App, Inject, MidwayDecoratorService, IMidwayContainer, IMidwayApplication } from '@midwayjs/core';
 import * as koa from '@midwayjs/koa';
 import * as validate from '@midwayjs/validate';
 import * as info from '@midwayjs/info';
@@ -7,13 +7,14 @@ import * as FreeFrame from 'free-midway-component';
 import * as jwt from '@midwayjs/jwt';
 import { VerifyTokenInit } from './decorator/verifyToken.decorator';
 import { CheckPermissionCodeInit } from './decorator/checkPermissionCode';
-// import { CheckTokenMiddleware } from './middleware/checkToken.middleware';
 import * as swagger from '@midwayjs/swagger';
 import * as redis from '@midwayjs/redis';
 import { CheckOnePointLoginMiddleware } from './middleware/checkOnePointLogin.middleware';
 import * as i18n from '@midwayjs/i18n';
 import { SetLocaleToCtxMiddleware } from './middleware/setLocaleToCtx.middleware';
 import { DataRecordInit } from './decorator/dataRecord';
+import * as bull from '@midwayjs/bull';
+import { ElasticsearchService } from './service/elasticsearch';
 @Configuration({
   imports: [
     koa,
@@ -22,6 +23,7 @@ import { DataRecordInit } from './decorator/dataRecord';
     jwt,
     redis,
     i18n,
+    bull,
     {
       component: info,
       enabledEnvironment: ['local'],
@@ -47,4 +49,10 @@ export class MainConfiguration {
     CheckPermissionCodeInit(this.decoratorService);
     DataRecordInit(this.decoratorService)
   }
+  // 应用服务已启动后执行
+  async onServerReady?(container: IMidwayContainer, app: IMidwayApplication) {
+    // 初始化es
+    const esService = await container.getAsync(ElasticsearchService);
+    await esService.preparMapping();
+  }
 }

+ 2 - 0
src/entityRecord/dataRecord.entity.ts

@@ -24,4 +24,6 @@ export class DataRecord extends BaseModel {
   device: string;
   @prop({ required: false, index: false, zh: '状态', remark: '0:成功;-1失败' })
   status: string;
+  @prop({ required: false, index: false, zh: 'es是否同步', default: false })
+  es_sync: boolean;
 }

+ 14 - 0
src/error/elasticsearch.error.ts

@@ -0,0 +1,14 @@
+import { MidwayError, registerErrorCode } from '@midwayjs/core';
+
+export enum ErrorCode {
+  ES_ERROR = '-3000',
+  ES_INDEX_NOT_FOUND = '-3001',
+  ES_DATA_NOT_FOUND = '-3404'
+}
+export const EsErrorEnum = registerErrorCode('EsError', ErrorCode);
+
+export class ElasticSearchError extends MidwayError {
+  constructor(str: string, errcode: string = ErrorCode.ES_ERROR) {
+    super(str, errcode);
+  }
+}

+ 2 - 0
src/locales/en_us/errors.ts

@@ -14,4 +14,6 @@ export default {
   '-101': 'not login',
   '-102': 'bad password',
   '-500': 'request falut',
+  '-3000': 'ElasticSearch error',
+  '-3001': 'ElasticSearch error, index not found',
 };

+ 2 - 0
src/locales/zh_cn/errors.ts

@@ -14,4 +14,6 @@ export default {
   '-101': '未登录',
   '-102': '密码错误',
   '-500': '请求错误',
+  '-3000': 'ElasticSearch发生错误',
+  '-3001': 'ElasticSearch发生错误,未找到索引',
 };

+ 0 - 33
src/middleware/checkToken.middleware.ts

@@ -1,33 +0,0 @@
-import { IMiddleware } from '@midwayjs/core';
-import { Middleware, Inject } from '@midwayjs/decorator';
-import { NextFunction, Context } from '@midwayjs/koa';
-import get = require('lodash/get');
-import { JwtService } from '@midwayjs/jwt';
-
-@Middleware()
-export class CheckTokenMiddleware
-  implements IMiddleware<Context, NextFunction>
-{
-  @Inject()
-  jwtService: JwtService;
-  resolve() {
-    return async (ctx: Context, next: NextFunction) => {
-      const token: any = get(ctx.request, 'header.token');
-      if (token) {
-        const data = this.jwtService.decodeSync(token);
-        if (data) ctx.user = data;
-      }
-      // 添加管理员身份
-      const adminToken: any = get(ctx.request, 'header.admin-token');
-      if (adminToken) {
-        const data = this.jwtService.decodeSync(adminToken);
-        if (data) ctx.admin = data;
-      }
-      await next();
-    };
-  }
-
-  static getName(): string {
-    return 'checkToken';
-  }
-}

+ 188 - 128
src/service/elasticsearch.ts

@@ -1,86 +1,123 @@
-import { Provide } from '@midwayjs/core';
+import { Config, Init, Inject, Provide } from '@midwayjs/core';
 import { Client, estypes } from '@elastic/elasticsearch';
-import { get, isArray, pickBy } from 'lodash';
-import { ReturnModelType } from '@typegoose/typegoose';
-import { Basic } from '../entity/basic.entity';
-import { InjectEntityModel } from '@midwayjs/typegoose';
-import { FrameworkErrorEnum, ServiceError } from 'free-midway-component';
-import * as dayjs from 'dayjs';
+import { get, isArray, pick, toLower } from 'lodash';
 import { Types } from 'mongoose';
+import { GetModel } from 'free-midway-component';
+import { ElasticSearchError, EsErrorEnum } from '../error/elasticsearch.error';
+import { MidwayI18nService } from '@midwayjs/i18n';
 const ObjectId = Types.ObjectId;
 
 @Provide()
 export class ElasticsearchService {
-  @InjectEntityModel(Basic)
-  basicModel: ReturnModelType<typeof Basic>;
+  @Config('elasticsearch')
+  esConfig: object;
+
+  @Config('mongoose.dataSource')
+  mongooseConfig: any;
+
+  @Inject()
+  i18n: MidwayI18nService;
 
   esClient: Client;
+  @Init()
+  async init() {
+    const esClient = new Client(this.esConfig);
+    this.esClient = esClient;
+  }
+
+  async preparMapping() {
+    console.log('in preparMapping');
+    // 根据mongodb设置,取出各个表中的es类型,形成以表名为索引的mapping
+    for (const db in this.mongooseConfig) {
+      const obj = this.mongooseConfig[db];
+      const entities = obj.entities;
+      for (const e of entities) {
+        this.createIndex(e.name, false);
+      }
+    }
+    console.log('out preparMapping');
+  }
+
+  async createIndex(tableName: string, remake = true) {
+    // 检查是否有该表的索引
+    const index = toLower(tableName);
+    const hasIndex = await this.esClient.indices.exists({ index });
+    if (hasIndex) {
+      if (remake) await this.esClient.indices.delete({ index });
+      else return;
+    }
+    // 没有索引,建立索引,先查表配置,根据配置进行mapping数据类型匹配.生成索引
+    const m = GetModel(tableName);
+    const sch = get(m, 'schema.tree');
+    const esSch = {};
+    for (const key in sch) {
+      const o = sch[key];
+      const type = get(o, 'esType');
+      const esType = this.getType(type);
+      if (esType) esSch[key] = esType;
+    }
+    // 没有一个字段有esType.那就不需要创建索引
+    if (Object.keys(esSch).length <= 0) return;
+    const properties: Record<estypes.PropertyName, estypes.MappingProperty> = esSch;
+    await this.esClient.indices.create({ index, mappings: { properties } });
+    await this.asyncData(tableName);
+  }
+
   /**
-   * 同步数据至es
+   * 初始化同步数据至es; 不负责创建索引
    * 同步数据需要过滤,有些数据类型,如果为不存在或者未 '' e.g.: 字符串日期,被识别成日期.那这个字符串就不能为 '',加入索引会报错
+   * @param {string} index 表名
    */
-  async asyncData() {
+  async asyncData(index) {
     if (!this.esClient) await this.init();
-    const list = await this.basicModel.find({}).lean();
-    let nowData;
-    try {
-      for (const i of list) {
-        const _id = new ObjectId(i._id).toString();
-        const doc = pickBy(i, (val, key) => {
-          if (key === '_id') return false;
-          if (val && val !== '') return true;
-        });
-        const obj = {
-          index: 'basic',
-          id: _id,
-          document: doc,
-        };
-        nowData = obj;
-        await this.esClient.index(obj);
-      }
-    } catch (e) {
-      console.log(e);
+    // es的索引要求都是小写
+    const indexName = toLower(index);
+    const hasIndex = await this.esClient.indices.exists({ index: indexName });
+    if (!hasIndex) return;
+    // 获取model: 因为索引和表名一致,直接用索引获取表
+    const model = GetModel(index);
+    // 获取表设置
+    const sch = get(model, 'schema.tree');
+    // 获取es设置
+    const esSch = {};
+    for (const key in sch) {
+      const o = sch[key];
+      const type = get(o, 'esType', get(o, 'type'));
+      const esType = this.getType(type);
+      if (esType) esSch[key] = esType;
     }
+    // 初始化更新
+    let datas = await model.find({}).lean();
+    datas = datas.map(i => ({ ...pick(i, Object.keys(esSch)), id: new ObjectId(i._id).toString() }));
+    const result = await this.esClient.helpers.bulk({
+      datasource: datas,
+      onDocument(doc) {
+        return {
+          index: { _index: indexName, _id: get(doc, 'id') },
+        };
+      },
+    });
+    console.log(result);
   }
 
   /**
    * 查询es中的数据
+   * 哪里需要,就单独为哪里做查询,这里只是试试好不好使
    * @param query 查询条件
    * @returns
    */
-  async search({ index, skip, limit, ...query }) {
+  async search({ index, skip, limit, ...query }: any) {
     if (!this.esClient) await this.init();
-    const res1 = await this.esClient.indices.exists({ index: 'test1' });
-    const res2 = await this.esClient.indices.exists({ index: 'testtime1' });
-    console.log(res1)
-    console.log(res2)
-    // 需要对查询条件进行处理
-    const matchList = ['name', 'english', 'type', 'subject'];
-    const rangeList = ['lab_acreage'];
-    const timeList = ['build_time'];
     const obj: any = {};
+    if (index) {
+      const hasIndex = await this.esClient.indices.exists({ index });
+      if (!hasIndex) return { data: [], total: 0 };
+      obj.index = index;
+    }
     const oq: any = {};
     for (const key in query) {
-      if (matchList.includes(key)) {
-        if (!oq.match) oq.match = {};
-        oq.match[key] = query[key];
-      } else if (rangeList.includes(key)) {
-        if (!oq.match) oq.range = {};
-        const value = query[key];
-        if (!isArray(value)) {
-          oq.range[key] = { gte: value };
-        } else {
-          oq.range[key] = { gte: value[0], lte: value[1] };
-        }
-      } else if (timeList.includes(key)) {
-        if (!oq.match) oq.range = {};
-        const value = query[key];
-        if (!isArray(value)) {
-          oq.range[`${key}`] = { gte: value };
-        } else {
-          oq.range[`${key}`] = { gte: 'now-1y', lte: 'now' };
-        }
-      }
+      if (!oq.match) oq.match = {};
+      oq.match[key] = query[key];
     }
     if (skip || skip === 0 || skip === '0') obj.from = skip;
     if (limit && limit !== '0') obj.size = limit;
@@ -90,87 +127,102 @@ export class ElasticsearchService {
     const total = this.getSearchTotal(r);
     return { data: result, total };
   }
+
+  async checkIndexAndGetSchmea(index) {
+    if (!this.esClient) await this.init();
+    // es的索引要求都是小写
+    const indexName = toLower(index);
+    const hasIndex = await this.esClient.indices.exists({ index: indexName });
+    if (!hasIndex) return;
+    // 获取model: 因为索引和表名一致,直接用索引获取表
+    const model = GetModel(index);
+    // 获取表设置
+    const sch = get(model, 'schema.tree');
+    // 获取es设置
+    const esSch = {};
+    for (const key in sch) {
+      const o = sch[key];
+      const type = get(o, 'esType', get(o, 'type'));
+      const esType = this.getType(type);
+      if (esType) esSch[key] = esType;
+    }
+    return esSch;
+  }
   /**
-   * 创建数据
-   * @param index 索引
+   * 批量创建/修改
+   * @param index 表名
    * @param data 数据
    */
-  async create({ index, data }) {
+  async createAndUpdateBat(index, data) {
     if (!this.esClient) await this.init();
-    console.log(data);
-    const mapping: Record<estypes.PropertyName, estypes.MappingProperty> = {
-      key: {
-        type: 'text',
+    const esSch = await this.checkIndexAndGetSchmea(index);
+    // 表没有es设置就不需要继续了
+    if (Object.keys(esSch).length <= 0) return;
+    const indexName = toLower(index);
+    const datas = data.map(i => ({ ...pick(i, Object.keys(esSch)), id: new ObjectId(i._id).toString() }));
+    const result = await this.esClient.helpers.bulk({
+      datasource: datas,
+      onDocument(doc) {
+        return [{ update: { _index: indexName, _id: get(doc, 'id') } }, { doc_as_upsert: true }];
       },
-      keyword: {
-        type: 'keyword',
-      },
-      num: {
-        type: 'float',
-      },
-      time: {
-        type: 'date',
-        format: 'yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis',
-      },
-      boo: {
-        type: 'boolean',
-      },
-      sort: {
-        type: 'integer',
+    });
+    return result;
+  }
+  /**
+   * 批量删除
+   * @param index 表名
+   * @param data 数据
+   */
+  async deleteBat(index, data) {
+    if (!this.esClient) await this.init();
+    const esSch = await this.checkIndexAndGetSchmea(index);
+    // 表没有es设置就不需要继续了
+    if (Object.keys(esSch).length <= 0) return;
+    const indexName = toLower(index);
+    const datas = data.map(i => ({ ...pick(i, Object.keys(esSch)), id: new ObjectId(i._id).toString() }));
+    const result = await this.esClient.helpers.bulk({
+      datasource: datas,
+      onDocument(doc) {
+        return { delete: { _index: indexName, _id: get(doc, 'id') } };
       },
-    };
-    for (let k = 1; k <= 3; k++) {
-      await this.esClient.indices.create({
-        index: `test${k}`,
-        mappings: {
-          properties: mapping,
-        },
-      });
-      for (let i = 1; i <= 3; i++) {
-        await this.esClient.index({
-          index: `test${k}`,
-          document: {
-            key: `${k}-菜单${i}`,
-            keyword: `${k}-菜${i}`,
-            time: `2023-06-0${i}`,
-            num: `${i}`,
-            boo: i % 2 === 0,
-            sort: i,
-            mkey: `${k}-单${i}`,
-          },
-        });
-      }
-    }
-
-    // return r;
+    });
+    return result;
   }
+
   /**
    * 更新数据
+   * @param index 索引
    * @param id 数据在es中的id
    * @param data 新数据
    */
-  async update(id: string, data: any) {
+  async update(index: string, id: string, data: any) {
     if (!this.esClient) await this.init();
+    const indexName = toLower(index);
+    const hasIndex = await this.esClient.indices.exists({ index: indexName });
+    if (!hasIndex) throw new ElasticSearchError(this.i18n.translate(EsErrorEnum.ES_INDEX_NOT_FOUND, { group: 'error' }), EsErrorEnum.ES_INDEX_NOT_FOUND);
     const result = await this.esClient.update({
-      index: 'basic',
+      index,
       id,
-      doc: {
-        doc: data,
-      },
+      doc: data,
     });
+    return result;
   }
   /**
    * 删除数据
    * @param id 数据在es的1id
    */
-  async delete(id: string) {
+  async delete(index: string, id: string) {
     if (!this.esClient) await this.init();
+    const indexName = toLower(index);
+    const hasIndex = await this.esClient.indices.exists({ index: indexName });
+    if (!hasIndex) throw new ElasticSearchError(this.i18n.translate(EsErrorEnum.ES_INDEX_NOT_FOUND, { group: 'error' }), EsErrorEnum.ES_INDEX_NOT_FOUND);
     try {
-      const r = await this.esClient.delete({ index: 'basic', id: null });
+      const result = await this.esClient.delete({ index: indexName, id });
+      return result;
     } catch (e) {
       const code = e.statusCode;
-      if (code === 404) throw new ServiceError('es未找到数据!', FrameworkErrorEnum.DATA_NOT_FOUND);
-      else if (code === 401) throw new ServiceError('es鉴权发生错误!', FrameworkErrorEnum.DATA_NOT_FOUND);
+      if (code === 404) throw new ElasticSearchError(this.i18n.translate(EsErrorEnum.ES_DATA_NOT_FOUND, { group: 'error' }), EsErrorEnum.ES_DATA_NOT_FOUND);
+      else throw new ElasticSearchError(this.i18n.translate(EsErrorEnum.ES_ERROR, { group: 'error' }), EsErrorEnum.ES_ERROR);
     }
   }
   /**
@@ -178,14 +230,33 @@ export class ElasticsearchService {
    * @param index 索引
    */
   async deleteIndex(index: string) {
-    await this.esClient.indices.delete({ index });
+    return await this.esClient.indices.delete({ index });
+  }
+
+  getType(type) {
+    const types = {
+      text: { type: 'text' },
+      keyword: { type: 'keyword' },
+      number: { type: 'integer' },
+      long: { type: 'long' },
+      float: { type: 'float' },
+      double: { type: 'double' },
+      date: {
+        type: 'date',
+        format: 'yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis',
+      },
+      boolean: { type: 'boolean' },
+      nested: { type: 'nested' },
+    };
+    return get(types, type);
   }
+
   getSearchResult(result) {
     const res = get(result, 'hits.hits');
     let rData;
     if (isArray(res)) {
       // 整理结果,将_id放到数据中
-      rData = res.map(i => ({ _id: i._id, ...get(i, '_source.doc') }));
+      rData = res.map(i => ({ ...get(i, '_source'), es_index: i._index }));
     } else {
       rData = get(res, '_source');
     }
@@ -195,15 +266,4 @@ export class ElasticsearchService {
     const res = get(result, 'hits.total.value', 0);
     return res;
   }
-
-  async init() {
-    const esClient = new Client({
-      node: 'http://localhost:9200',
-      auth: {
-        username: 'elastic',
-        password: 'NAjqFz_7tS2DkdpU7p*x',
-      },
-    });
-    this.esClient = esClient;
-  }
 }