Jelajahi Sumber

优化es索引管理

zhouhao 3 tahun lalu
induk
melakukan
65e8988639

+ 18 - 12
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/DefaultElasticSearchIndexManager.java

@@ -1,15 +1,16 @@
 package org.jetlinks.community.elastic.search.index;
 
+import lombok.Generated;
 import lombok.Getter;
 import lombok.Setter;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Mono;
 
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 
 @Component
 @ConfigurationProperties(prefix = "elasticsearch.index")
@@ -17,32 +18,37 @@ public class DefaultElasticSearchIndexManager implements ElasticSearchIndexManag
 
     @Getter
     @Setter
+    @Generated
     private String defaultStrategy = "direct";
 
     @Getter
     @Setter
-    private Map<String, String> indexUseStrategy = new HashMap<>();
+    @Generated
+    private Map<String, String> indexUseStrategy = new ConcurrentSkipListMap<>(String.CASE_INSENSITIVE_ORDER);
 
-    private final Map<String, ElasticSearchIndexStrategy> strategies = new ConcurrentHashMap<>();
+    private final Map<String, ElasticSearchIndexStrategy> strategies = new ConcurrentSkipListMap<>(String.CASE_INSENSITIVE_ORDER);
 
-    private final Map<String, ElasticSearchIndexMetadata> indexMetadataStore = new ConcurrentHashMap<>();
+    private final Map<String, ElasticSearchIndexMetadata> indexMetadataStore = new ConcurrentSkipListMap<>(String.CASE_INSENSITIVE_ORDER);
 
-    public DefaultElasticSearchIndexManager(List<ElasticSearchIndexStrategy> strategies) {
-        strategies.forEach(this::registerStrategy);
+    public DefaultElasticSearchIndexManager(@Autowired(required = false) List<ElasticSearchIndexStrategy> strategies) {
+        if (strategies != null) {
+            strategies.forEach(this::registerStrategy);
+        }
     }
 
     @Override
     public Mono<Void> putIndex(ElasticSearchIndexMetadata index) {
         return this.getIndexStrategy(index.getIndex())
-            .flatMap(strategy -> strategy.putIndex(index))
-            .doOnSuccess(metadata -> indexMetadataStore.put(index.getIndex(), index));
+                   .flatMap(strategy -> strategy.putIndex(index))
+                   .doOnNext(idx -> indexMetadataStore.put(idx.getIndex(), idx))
+                   .then();
     }
 
     @Override
     public Mono<ElasticSearchIndexMetadata> getIndexMetadata(String index) {
         return Mono.justOrEmpty(indexMetadataStore.get(index))
-            .switchIfEmpty(Mono.defer(() -> doLoadMetaData(index)
-                .doOnNext(metadata -> indexMetadataStore.put(metadata.getIndex(), metadata))));
+                   .switchIfEmpty(Mono.defer(() -> doLoadMetaData(index)
+                       .doOnNext(metadata -> indexMetadataStore.put(metadata.getIndex(), metadata))));
     }
 
     protected Mono<ElasticSearchIndexMetadata> doLoadMetaData(String index) {
@@ -53,7 +59,7 @@ public class DefaultElasticSearchIndexManager implements ElasticSearchIndexManag
     @Override
     public Mono<ElasticSearchIndexStrategy> getIndexStrategy(String index) {
         return Mono.justOrEmpty(strategies.get(indexUseStrategy.getOrDefault(index.toLowerCase(), defaultStrategy)))
-            .switchIfEmpty(Mono.error(() -> new IllegalArgumentException("[" + index + "] 不支持任何索引策略")));
+                   .switchIfEmpty(Mono.error(() -> new IllegalArgumentException("[" + index + "] 不支持任何索引策略")));
     }
 
     @Override

+ 1 - 1
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/ElasticSearchIndexStrategy.java

@@ -39,7 +39,7 @@ public interface ElasticSearchIndexStrategy {
      * @param metadata 索引元数据
      * @return 更新结果
      */
-    Mono<Void> putIndex(ElasticSearchIndexMetadata metadata);
+    Mono<ElasticSearchIndexMetadata> putIndex(ElasticSearchIndexMetadata metadata);
 
     Mono<ElasticSearchIndexMetadata> loadIndexMetadata(String index);
 }

+ 26 - 21
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/AbstractElasticSearchIndexStrategy.java

@@ -3,12 +3,17 @@ package org.jetlinks.community.elastic.search.index.strategies;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
-import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
+import org.elasticsearch.Version;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.client.indices.GetIndexRequest;
+import org.elasticsearch.client.indices.GetMappingsRequest;
+import org.elasticsearch.client.indices.PutMappingRequest;
 import org.elasticsearch.cluster.metadata.MappingMetadata;
-import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.compress.CompressedXContent;
+import org.jetlinks.core.metadata.DataType;
+import org.jetlinks.core.metadata.PropertyMetadata;
+import org.jetlinks.core.metadata.SimplePropertyMetadata;
+import org.jetlinks.core.metadata.types.*;
 import org.jetlinks.community.elastic.search.enums.ElasticDateFormat;
 import org.jetlinks.community.elastic.search.enums.ElasticPropertyType;
 import org.jetlinks.community.elastic.search.index.DefaultElasticSearchIndexMetadata;
@@ -16,10 +21,6 @@ import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata;
 import org.jetlinks.community.elastic.search.index.ElasticSearchIndexProperties;
 import org.jetlinks.community.elastic.search.index.ElasticSearchIndexStrategy;
 import org.jetlinks.community.elastic.search.service.reactive.ReactiveElasticsearchClient;
-import org.jetlinks.core.metadata.DataType;
-import org.jetlinks.core.metadata.PropertyMetadata;
-import org.jetlinks.core.metadata.SimplePropertyMetadata;
-import org.jetlinks.core.metadata.types.*;
 import org.springframework.util.CollectionUtils;
 import reactor.core.publisher.Mono;
 
@@ -41,7 +42,7 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc
     }
 
     protected Mono<Boolean> indexExists(String index) {
-        return client.existsIndex(req -> req.indices(wrapIndex(index)));
+        return client.existsIndex(new GetIndexRequest(wrapIndex(index)));
     }
 
     protected Mono<Void> doCreateIndex(ElasticSearchIndexMetadata metadata) {
@@ -58,7 +59,7 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc
                        if (exists) {
                            return doLoadIndexMetadata(index)
                                .flatMap(oldMapping -> Mono.justOrEmpty(createPutMappingRequest(metadata, oldMapping)))
-                               .flatMap(request -> client.updateMapping(request))
+                               .flatMap(request -> client.putMapping(request))
                                .then();
                        }
                        if (justUpdateMapping) {
@@ -81,7 +82,11 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc
         Map<String, Object> mappingConfig = new HashMap<>();
         mappingConfig.put("properties", createElasticProperties(metadata.getProperties()));
         mappingConfig.put("dynamic_templates", createDynamicTemplates());
-        request.mapping("_doc", mappingConfig);
+        if (client.serverVersion().after(Version.V_7_0_0)) {
+            request.mapping(mappingConfig);
+        } else {
+            request.mapping(Collections.singletonMap("_doc", mappingConfig));
+        }
         return request;
     }
 
@@ -99,7 +104,6 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc
         }
         Map<String, Object> mappingConfig = new HashMap<>();
         PutMappingRequest request = new PutMappingRequest(wrapIndex(metadata.getIndex()));
-        request.type("_doc");
         List<PropertyMetadata> allProperties = new ArrayList<>();
         allProperties.addAll(metadata.getProperties());
         allProperties.addAll(ignore.getProperties());
@@ -159,26 +163,27 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc
         return property;
     }
 
-    protected ElasticSearchIndexMetadata convertMetadata(String index, ImmutableOpenMap<String, ?> metaData) {
+    protected ElasticSearchIndexMetadata convertMetadata(String index, MappingMetadata metadata) {
         MappingMetadata mappingMetadata;
         Object properties = null;
+        Map<String, Object> metaData = metadata.getSourceAsMap();
+
         if (metaData.containsKey("properties")) {
             Object res = metaData.get("properties");
-            if (res instanceof MappingMetadata) {
+            if (res instanceof Map) {
+                properties = res;
+            } else if (res instanceof MappingMetadata) {
                 mappingMetadata = ((MappingMetadata) res);
+                properties = mappingMetadata.sourceAsMap();
             } else if (res instanceof CompressedXContent) {
                 mappingMetadata = new MappingMetadata(((CompressedXContent) res));
+                properties = mappingMetadata.sourceAsMap();
             } else {
                 throw new UnsupportedOperationException("unsupported index metadata" + metaData);
             }
-            properties = mappingMetadata.sourceAsMap();
+
         } else {
-            Object res;
-            if (metaData.size() == 1) {
-                res = metaData.values().iterator().next().value;
-            } else {
-                res = metaData.get("_doc");
-            }
+            Object res = metaData.get("_doc");
             if (res instanceof MappingMetadata) {
                 mappingMetadata = ((MappingMetadata) res);
             } else if (res instanceof CompressedXContent) {

+ 7 - 5
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/DirectElasticSearchIndexStrategy.java

@@ -3,14 +3,14 @@ package org.jetlinks.community.elastic.search.index.strategies;
 import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata;
 import org.jetlinks.community.elastic.search.index.ElasticSearchIndexProperties;
 import org.jetlinks.community.elastic.search.service.reactive.ReactiveElasticsearchClient;
-import org.springframework.stereotype.Component;
 import reactor.core.publisher.Mono;
 
-@Component
 public class DirectElasticSearchIndexStrategy extends AbstractElasticSearchIndexStrategy {
 
+    public static String ID = "direct";
+
     public DirectElasticSearchIndexStrategy(ReactiveElasticsearchClient client, ElasticSearchIndexProperties properties) {
-        super("direct", client, properties);
+        super(ID, client, properties);
     }
 
     @Override
@@ -24,8 +24,10 @@ public class DirectElasticSearchIndexStrategy extends AbstractElasticSearchIndex
     }
 
     @Override
-    public Mono<Void> putIndex(ElasticSearchIndexMetadata metadata) {
-        return doPutIndex(metadata, false);
+    public Mono<ElasticSearchIndexMetadata> putIndex(ElasticSearchIndexMetadata metadata) {
+        ElasticSearchIndexMetadata index = metadata.newIndexName(wrapIndex(metadata.getIndex()));
+        return doPutIndex(index, false)
+            .thenReturn(index);
     }
 
     @Override

+ 19 - 8
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/TemplateElasticSearchIndexStrategy.java

@@ -1,8 +1,10 @@
 package org.jetlinks.community.elastic.search.index.strategies;
 
+import org.apache.commons.collections4.CollectionUtils;
+import org.elasticsearch.Version;
 import org.elasticsearch.action.admin.indices.alias.Alias;
-import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesRequest;
-import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
+import org.elasticsearch.client.indices.GetIndexTemplatesRequest;
+import org.elasticsearch.client.indices.PutIndexTemplateRequest;
 import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata;
 import org.jetlinks.community.elastic.search.index.ElasticSearchIndexProperties;
 import org.jetlinks.community.elastic.search.service.reactive.ReactiveElasticsearchClient;
@@ -40,11 +42,13 @@ public abstract class TemplateElasticSearchIndexStrategy extends AbstractElastic
     }
 
     @Override
-    public Mono<Void> putIndex(ElasticSearchIndexMetadata metadata) {
+    public Mono<ElasticSearchIndexMetadata> putIndex(ElasticSearchIndexMetadata metadata) {
+
         return client
-            .updateTemplate(createIndexTemplateRequest(metadata))
+            .putTemplate(createIndexTemplateRequest(metadata))
             //修改当前索引
-            .then(doPutIndex(metadata.newIndexName(getIndexForSave(metadata.getIndex())), true));
+            .then(doPutIndex(metadata.newIndexName(getIndexForSave(metadata.getIndex())), true))
+            .thenReturn(metadata.newIndexName(wrapIndex(metadata.getIndex())));
     }
 
     protected PutIndexTemplateRequest createIndexTemplateRequest(ElasticSearchIndexMetadata metadata) {
@@ -55,7 +59,11 @@ public abstract class TemplateElasticSearchIndexStrategy extends AbstractElastic
         Map<String, Object> mappingConfig = new HashMap<>();
         mappingConfig.put("properties", createElasticProperties(metadata.getProperties()));
         mappingConfig.put("dynamic_templates", createDynamicTemplates());
-        request.mapping("_doc",mappingConfig);
+        if (client.serverVersion().after(Version.V_7_0_0)) {
+            request.mapping(mappingConfig);
+        } else {
+            request.mapping(Collections.singletonMap("_doc", mappingConfig));
+        }
         request.patterns(getIndexPatterns(index));
         return request;
     }
@@ -64,7 +72,10 @@ public abstract class TemplateElasticSearchIndexStrategy extends AbstractElastic
     @Override
     public Mono<ElasticSearchIndexMetadata> loadIndexMetadata(String index) {
         return client.getTemplate(new GetIndexTemplatesRequest(getTemplate(index)))
-            .filter(resp -> resp.getIndexTemplates().size() > 0)
-            .flatMap(resp -> Mono.justOrEmpty(convertMetadata(index, resp.getIndexTemplates().get(0).mappings())));
+                     .filter(resp -> CollectionUtils.isNotEmpty(resp.getIndexTemplates()))
+                     .flatMap(resp -> Mono.justOrEmpty(convertMetadata(index, resp
+                         .getIndexTemplates()
+                         .get(0)
+                         .mappings())));
     }
 }

+ 9 - 20
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/DefaultReactiveElasticsearchClient.java

@@ -2,7 +2,6 @@ package org.jetlinks.community.elastic.search.service.reactive;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
-import com.alibaba.fastjson.serializer.SerializerFeature;
 import lombok.Generated;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -119,6 +118,7 @@ import java.io.IOException;
 import java.lang.reflect.Method;
 import java.net.ConnectException;
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Locale;
 import java.util.Map;
@@ -130,7 +130,8 @@ import static org.springframework.data.elasticsearch.client.util.RequestConverte
 
 @Slf4j
 @Generated
-public class DefaultReactiveElasticsearchClient implements org.jetlinks.community.elastic.search.service.reactive.ReactiveElasticsearchClient, ReactiveElasticsearchClient.Cluster {
+public class DefaultReactiveElasticsearchClient implements org.jetlinks.community.elastic.search.service.reactive.ReactiveElasticsearchClient,
+    org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Cluster {
     private final HostProvider<?> hostProvider;
     private final RequestCreator requestCreator;
     private Supplier<HttpHeaders> headersSupplier = () -> HttpHeaders.EMPTY;
@@ -149,11 +150,7 @@ public class DefaultReactiveElasticsearchClient implements org.jetlinks.communit
 
         this.hostProvider = hostProvider;
         this.requestCreator = requestCreator;
-        info()
-            .subscribe(mainResponse -> {
-                log.debug("connect elasticsearch server : {}", JSON.toJSONString(mainResponse, SerializerFeature.PrettyFormat));
-                version = mainResponse.getVersion();
-            });
+        version = info().block(Duration.ofSeconds(10)).getVersion();
     }
 
     public void setHeadersSupplier(Supplier<HttpHeaders> headersSupplier) {
@@ -705,16 +702,6 @@ public class DefaultReactiveElasticsearchClient implements org.jetlinks.communit
                                                                                                           .then();
     }
 
-    /*
-     * (non-Javadoc)
-     * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#updateMapping(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest)
-     */
-    @Override
-    public Mono<Boolean> updateMapping(HttpHeaders headers, PutMappingRequest putMappingRequest) {
-
-        return putMapping(headers, putMappingRequest);
-    }
-
     @Override
     public Mono<Boolean> putMapping(HttpHeaders headers, PutMappingRequest putMappingRequest) {
         return sendRequest(putMappingRequest, requestCreator.putMapping(), AcknowledgedResponse.class, headers)
@@ -724,6 +711,7 @@ public class DefaultReactiveElasticsearchClient implements org.jetlinks.communit
 
     @Override
     public Mono<Boolean> putMapping(HttpHeaders headers, org.elasticsearch.client.indices.PutMappingRequest putMappingRequest) {
+
         return sendRequest(putMappingRequest, requestCreator.putMappingRequest(), AcknowledgedResponse.class, headers)
             .map(AcknowledgedResponse::isAcknowledged)
             .next();
@@ -841,8 +829,9 @@ public class DefaultReactiveElasticsearchClient implements org.jetlinks.communit
             .flatMapMany(spec -> {
                 return spec.exchangeToFlux(response -> {
                     return Flux.from(
-                        this.readResponseBody(logId, request, response, responseType, decoder)
-                    );
+                                   this.readResponseBody(logId, request, response, responseType, decoder)
+                               )
+                               .cast(responseType);
                 });
             });
     }
@@ -1178,7 +1167,7 @@ public class DefaultReactiveElasticsearchClient implements org.jetlinks.communit
     @Override
     public Mono<GetMappingsResponse> getMapping(HttpHeaders headers, GetMappingsRequest getMappingsRequest) {
         return sendRequest(getMappingsRequest, requestCreator.getMapping(),
-                           GetMappingsResponse.class, headers).next();
+                           org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse.class, headers).next();
     }
 
     @Override