|
@@ -1,269 +0,0 @@
|
|
|
-import { Config, Init, Inject, Provide } from '@midwayjs/core';
|
|
|
-import { Client, estypes } from '@elastic/elasticsearch';
|
|
|
-import { get, isArray, pick, toLower } from 'lodash';
|
|
|
-import { Types } from 'mongoose';
|
|
|
-import { GetModel } from 'free-midway-component';
|
|
|
-import { ElasticSearchError, EsErrorEnum } from '../error/elasticsearch.error';
|
|
|
-import { MidwayI18nService } from '@midwayjs/i18n';
|
|
|
-const ObjectId = Types.ObjectId;
|
|
|
-
|
|
|
-@Provide()
|
|
|
-export class ElasticsearchService {
|
|
|
- @Config('elasticsearch')
|
|
|
- esConfig: object;
|
|
|
-
|
|
|
- @Config('mongoose.dataSource')
|
|
|
- mongooseConfig: any;
|
|
|
-
|
|
|
- @Inject()
|
|
|
- i18n: MidwayI18nService;
|
|
|
-
|
|
|
- esClient: Client;
|
|
|
- @Init()
|
|
|
- async init() {
|
|
|
- const esClient = new Client(this.esConfig);
|
|
|
- this.esClient = esClient;
|
|
|
- }
|
|
|
-
|
|
|
- async preparMapping() {
|
|
|
- console.log('in preparMapping');
|
|
|
- // 根据mongodb设置,取出各个表中的es类型,形成以表名为索引的mapping
|
|
|
- for (const db in this.mongooseConfig) {
|
|
|
- const obj = this.mongooseConfig[db];
|
|
|
- const entities = obj.entities;
|
|
|
- for (const e of entities) {
|
|
|
- this.createIndex(e.name, false);
|
|
|
- }
|
|
|
- }
|
|
|
- console.log('out preparMapping');
|
|
|
- }
|
|
|
-
|
|
|
- async createIndex(tableName: string, remake = true) {
|
|
|
- // 检查是否有该表的索引
|
|
|
- const index = toLower(tableName);
|
|
|
- const hasIndex = await this.esClient.indices.exists({ index });
|
|
|
- if (hasIndex) {
|
|
|
- if (remake) await this.esClient.indices.delete({ index });
|
|
|
- else return;
|
|
|
- }
|
|
|
- // 没有索引,建立索引,先查表配置,根据配置进行mapping数据类型匹配.生成索引
|
|
|
- const m = GetModel(tableName);
|
|
|
- const sch = get(m, 'schema.tree');
|
|
|
- const esSch = {};
|
|
|
- for (const key in sch) {
|
|
|
- const o = sch[key];
|
|
|
- const type = get(o, 'esType');
|
|
|
- const esType = this.getType(type);
|
|
|
- if (esType) esSch[key] = esType;
|
|
|
- }
|
|
|
- // 没有一个字段有esType.那就不需要创建索引
|
|
|
- if (Object.keys(esSch).length <= 0) return;
|
|
|
- const properties: Record<estypes.PropertyName, estypes.MappingProperty> = esSch;
|
|
|
- await this.esClient.indices.create({ index, mappings: { properties } });
|
|
|
- await this.asyncData(tableName);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 初始化同步数据至es; 不负责创建索引
|
|
|
- * 同步数据需要过滤,有些数据类型,如果为不存在或者未 '' e.g.: 字符串日期,被识别成日期.那这个字符串就不能为 '',加入索引会报错
|
|
|
- * @param {string} index 表名
|
|
|
- */
|
|
|
- async asyncData(index) {
|
|
|
- if (!this.esClient) await this.init();
|
|
|
- // es的索引要求都是小写
|
|
|
- const indexName = toLower(index);
|
|
|
- const hasIndex = await this.esClient.indices.exists({ index: indexName });
|
|
|
- if (!hasIndex) return;
|
|
|
- // 获取model: 因为索引和表名一致,直接用索引获取表
|
|
|
- const model = GetModel(index);
|
|
|
- // 获取表设置
|
|
|
- const sch = get(model, 'schema.tree');
|
|
|
- // 获取es设置
|
|
|
- const esSch = {};
|
|
|
- for (const key in sch) {
|
|
|
- const o = sch[key];
|
|
|
- const type = get(o, 'esType', get(o, 'type'));
|
|
|
- const esType = this.getType(type);
|
|
|
- if (esType) esSch[key] = esType;
|
|
|
- }
|
|
|
- // 初始化更新
|
|
|
- let datas = await model.find({}).lean();
|
|
|
- datas = datas.map(i => ({ ...pick(i, Object.keys(esSch)), id: new ObjectId(i._id).toString() }));
|
|
|
- const result = await this.esClient.helpers.bulk({
|
|
|
- datasource: datas,
|
|
|
- onDocument(doc) {
|
|
|
- return {
|
|
|
- index: { _index: indexName, _id: get(doc, 'id') },
|
|
|
- };
|
|
|
- },
|
|
|
- });
|
|
|
- console.log(result);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 查询es中的数据
|
|
|
- * 哪里需要,就单独为哪里做查询,这里只是试试好不好使
|
|
|
- * @param query 查询条件
|
|
|
- * @returns
|
|
|
- */
|
|
|
- async search({ index, skip, limit, ...query }: any) {
|
|
|
- if (!this.esClient) await this.init();
|
|
|
- const obj: any = {};
|
|
|
- if (index) {
|
|
|
- const hasIndex = await this.esClient.indices.exists({ index });
|
|
|
- if (!hasIndex) return { data: [], total: 0 };
|
|
|
- obj.index = index;
|
|
|
- }
|
|
|
- const oq: any = {};
|
|
|
- for (const key in query) {
|
|
|
- if (!oq.match) oq.match = {};
|
|
|
- oq.match[key] = query[key];
|
|
|
- }
|
|
|
- if (skip || skip === 0 || skip === '0') obj.from = skip;
|
|
|
- if (limit && limit !== '0') obj.size = limit;
|
|
|
- if (Object.keys(oq).length > 0) obj.query = oq;
|
|
|
- const r = await this.esClient.search(obj);
|
|
|
- const result = this.getSearchResult(r);
|
|
|
- const total = this.getSearchTotal(r);
|
|
|
- return { data: result, total };
|
|
|
- }
|
|
|
-
|
|
|
- async checkIndexAndGetSchmea(index) {
|
|
|
- if (!this.esClient) await this.init();
|
|
|
- // es的索引要求都是小写
|
|
|
- const indexName = toLower(index);
|
|
|
- const hasIndex = await this.esClient.indices.exists({ index: indexName });
|
|
|
- if (!hasIndex) return;
|
|
|
- // 获取model: 因为索引和表名一致,直接用索引获取表
|
|
|
- const model = GetModel(index);
|
|
|
- // 获取表设置
|
|
|
- const sch = get(model, 'schema.tree');
|
|
|
- // 获取es设置
|
|
|
- const esSch = {};
|
|
|
- for (const key in sch) {
|
|
|
- const o = sch[key];
|
|
|
- const type = get(o, 'esType', get(o, 'type'));
|
|
|
- const esType = this.getType(type);
|
|
|
- if (esType) esSch[key] = esType;
|
|
|
- }
|
|
|
- return esSch;
|
|
|
- }
|
|
|
- /**
|
|
|
- * 批量创建/修改
|
|
|
- * @param index 表名
|
|
|
- * @param data 数据
|
|
|
- */
|
|
|
- async createAndUpdateBat(index, data) {
|
|
|
- if (!this.esClient) await this.init();
|
|
|
- const esSch = await this.checkIndexAndGetSchmea(index);
|
|
|
- // 表没有es设置就不需要继续了
|
|
|
- if (Object.keys(esSch).length <= 0) return;
|
|
|
- const indexName = toLower(index);
|
|
|
- const datas = data.map(i => ({ ...pick(i, Object.keys(esSch)), id: new ObjectId(i._id).toString() }));
|
|
|
- const result = await this.esClient.helpers.bulk({
|
|
|
- datasource: datas,
|
|
|
- onDocument(doc) {
|
|
|
- return [{ update: { _index: indexName, _id: get(doc, 'id') } }, { doc_as_upsert: true }];
|
|
|
- },
|
|
|
- });
|
|
|
- return result;
|
|
|
- }
|
|
|
- /**
|
|
|
- * 批量删除
|
|
|
- * @param index 表名
|
|
|
- * @param data 数据
|
|
|
- */
|
|
|
- async deleteBat(index, data) {
|
|
|
- if (!this.esClient) await this.init();
|
|
|
- const esSch = await this.checkIndexAndGetSchmea(index);
|
|
|
- // 表没有es设置就不需要继续了
|
|
|
- if (Object.keys(esSch).length <= 0) return;
|
|
|
- const indexName = toLower(index);
|
|
|
- const datas = data.map(i => ({ ...pick(i, Object.keys(esSch)), id: new ObjectId(i._id).toString() }));
|
|
|
- const result = await this.esClient.helpers.bulk({
|
|
|
- datasource: datas,
|
|
|
- onDocument(doc) {
|
|
|
- return { delete: { _index: indexName, _id: get(doc, 'id') } };
|
|
|
- },
|
|
|
- });
|
|
|
- return result;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 更新数据
|
|
|
- * @param index 索引
|
|
|
- * @param id 数据在es中的id
|
|
|
- * @param data 新数据
|
|
|
- */
|
|
|
- async update(index: string, id: string, data: any) {
|
|
|
- if (!this.esClient) await this.init();
|
|
|
- const indexName = toLower(index);
|
|
|
- const hasIndex = await this.esClient.indices.exists({ index: indexName });
|
|
|
- if (!hasIndex) throw new ElasticSearchError(this.i18n.translate(EsErrorEnum.ES_INDEX_NOT_FOUND, { group: 'error' }), EsErrorEnum.ES_INDEX_NOT_FOUND);
|
|
|
- const result = await this.esClient.update({
|
|
|
- index,
|
|
|
- id,
|
|
|
- doc: data,
|
|
|
- });
|
|
|
- return result;
|
|
|
- }
|
|
|
- /**
|
|
|
- * 删除数据
|
|
|
- * @param id 数据在es的1id
|
|
|
- */
|
|
|
- async delete(index: string, id: string) {
|
|
|
- if (!this.esClient) await this.init();
|
|
|
- const indexName = toLower(index);
|
|
|
- const hasIndex = await this.esClient.indices.exists({ index: indexName });
|
|
|
- if (!hasIndex) throw new ElasticSearchError(this.i18n.translate(EsErrorEnum.ES_INDEX_NOT_FOUND, { group: 'error' }), EsErrorEnum.ES_INDEX_NOT_FOUND);
|
|
|
- try {
|
|
|
- const result = await this.esClient.delete({ index: indexName, id });
|
|
|
- return result;
|
|
|
- } catch (e) {
|
|
|
- const code = e.statusCode;
|
|
|
- if (code === 404) throw new ElasticSearchError(this.i18n.translate(EsErrorEnum.ES_DATA_NOT_FOUND, { group: 'error' }), EsErrorEnum.ES_DATA_NOT_FOUND);
|
|
|
- else throw new ElasticSearchError(this.i18n.translate(EsErrorEnum.ES_ERROR, { group: 'error' }), EsErrorEnum.ES_ERROR);
|
|
|
- }
|
|
|
- }
|
|
|
- /**
|
|
|
- * 删除索引
|
|
|
- * @param index 索引
|
|
|
- */
|
|
|
- async deleteIndex(index: string) {
|
|
|
- return await this.esClient.indices.delete({ index });
|
|
|
- }
|
|
|
-
|
|
|
- getType(type) {
|
|
|
- const types = {
|
|
|
- text: { type: 'text' },
|
|
|
- keyword: { type: 'keyword' },
|
|
|
- number: { type: 'integer' },
|
|
|
- long: { type: 'long' },
|
|
|
- float: { type: 'float' },
|
|
|
- double: { type: 'double' },
|
|
|
- date: {
|
|
|
- type: 'date',
|
|
|
- format: 'yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis',
|
|
|
- },
|
|
|
- boolean: { type: 'boolean' },
|
|
|
- nested: { type: 'nested' },
|
|
|
- };
|
|
|
- return get(types, type);
|
|
|
- }
|
|
|
-
|
|
|
- getSearchResult(result) {
|
|
|
- const res = get(result, 'hits.hits');
|
|
|
- let rData;
|
|
|
- if (isArray(res)) {
|
|
|
- // 整理结果,将_id放到数据中
|
|
|
- rData = res.map(i => ({ ...get(i, '_source'), es_index: i._index }));
|
|
|
- } else {
|
|
|
- rData = get(res, '_source');
|
|
|
- }
|
|
|
- return rData;
|
|
|
- }
|
|
|
- getSearchTotal(result) {
|
|
|
- const res = get(result, 'hits.total.value', 0);
|
|
|
- return res;
|
|
|
- }
|
|
|
-}
|