zhou-hao 3 tahun lalu
induk
melakukan
16529b93c1

+ 26 - 20
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/AbstractElasticSearchIndexStrategy.java

@@ -20,6 +20,7 @@ import org.jetlinks.core.metadata.DataType;
 import org.jetlinks.core.metadata.PropertyMetadata;
 import org.jetlinks.core.metadata.SimplePropertyMetadata;
 import org.jetlinks.core.metadata.types.*;
+import org.springframework.util.CollectionUtils;
 import reactor.core.publisher.Mono;
 
 import java.util.*;
@@ -51,24 +52,24 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc
                                     boolean justUpdateMapping) {
         String index = wrapIndex(metadata.getIndex());
         return this.indexExists(index)
-            .flatMap(exists -> {
-                if (exists) {
-                    return doLoadIndexMetadata(index)
-                        .flatMap(oldMapping -> Mono.justOrEmpty(createPutMappingRequest(metadata, oldMapping)))
-                        .flatMap(request -> client.updateMapping(request))
-                        .then();
-                }
-                if (justUpdateMapping) {
-                    return Mono.empty();
-                }
-                return doCreateIndex(metadata);
-            });
+                   .flatMap(exists -> {
+                       if (exists) {
+                           return doLoadIndexMetadata(index)
+                               .flatMap(oldMapping -> Mono.justOrEmpty(createPutMappingRequest(metadata, oldMapping)))
+                               .flatMap(request -> client.updateMapping(request))
+                               .then();
+                       }
+                       if (justUpdateMapping) {
+                           return Mono.empty();
+                       }
+                       return doCreateIndex(metadata);
+                   });
     }
 
     protected Mono<ElasticSearchIndexMetadata> doLoadIndexMetadata(String _index) {
         String index = wrapIndex(_index);
         return client.getMapping(new GetMappingsRequest().indices(index))
-            .flatMap(resp -> Mono.justOrEmpty(convertMetadata(index, resp.mappings().get(index))));
+                     .flatMap(resp -> Mono.justOrEmpty(convertMetadata(index, resp.mappings().get(index))));
     }
 
 
@@ -86,7 +87,8 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc
         Map<String, Object> properties = createElasticProperties(metadata.getProperties());
         Map<String, Object> ignoreProperties = createElasticProperties(ignore.getProperties());
         for (Map.Entry<String, Object> en : ignoreProperties.entrySet()) {
-            log.trace("ignore update index [{}] mapping property:{},{}", wrapIndex(metadata.getIndex()), en.getKey(), en.getValue());
+            log.trace("ignore update index [{}] mapping property:{},{}", wrapIndex(metadata.getIndex()), en.getKey(), en
+                .getValue());
             properties.remove(en.getKey());
         }
         if (properties.isEmpty()) {
@@ -109,8 +111,10 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc
         if (metadata == null) {
             return new HashMap<>();
         }
-        return metadata.stream()
-            .collect(Collectors.toMap(PropertyMetadata::getId, prop -> this.createElasticProperty(prop.getValueType())));
+        return metadata
+            .stream()
+            .collect(Collectors.toMap(PropertyMetadata::getId,
+                                      prop -> this.createElasticProperty(prop.getValueType()), (a, v) -> a));
     }
 
     protected Map<String, Object> createElasticProperty(DataType type) {
@@ -143,7 +147,9 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc
         } else if (type instanceof ObjectType) {
             property.put("type", "nested");
             ObjectType objectType = ((ObjectType) type);
-            property.put("properties", createElasticProperties(objectType.getProperties()));
+            if (!CollectionUtils.isEmpty(objectType.getProperties())) {
+                property.put("properties", createElasticProperties(objectType.getProperties()));
+            }
         } else {
             property.put("type", "keyword");
             property.put("ignore_above", 512);
@@ -219,7 +225,7 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc
             }
             metadata.setValueType(dataType);
         } else {
-            metadata.setValueType(new StringType());
+            metadata.setValueType(StringType.GLOBAL);
         }
         return metadata;
     }
@@ -229,13 +235,13 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc
         {
             Map<String, Object> config = new HashMap<>();
             config.put("match_mapping_type", "string");
-            config.put("mapping", createElasticProperty(new StringType()));
+            config.put("mapping", createElasticProperty(StringType.GLOBAL));
             maps.add(Collections.singletonMap("string_fields", config));
         }
         {
             Map<String, Object> config = new HashMap<>();
             config.put("match_mapping_type", "date");
-            config.put("mapping", createElasticProperty(new DateTimeType()));
+            config.put("mapping", createElasticProperty(DateTimeType.GLOBAL));
             maps.add(Collections.singletonMap("date_fields", config));
         }
 

+ 8 - 8
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/DefaultReactiveElasticsearchClient.java

@@ -117,6 +117,7 @@ import java.util.function.Supplier;
 import static org.springframework.data.elasticsearch.client.util.RequestConverters.createContentType;
 
 @Slf4j
+@Generated
 public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient {
     private final HostProvider<?> hostProvider;
     private final RequestCreator requestCreator;
@@ -292,9 +293,9 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
     @Override
     public Flux<SearchHit> search(HttpHeaders headers, SearchRequest searchRequest) {
 
-        return sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers) //
-                                                                                                  .map(SearchResponse::getHits) //
-                                                                                                  .flatMap(Flux::fromIterable);
+        return sendRequest(searchRequest, this::buildSearchRequest, SearchResponse.class, headers) //
+                                                                                                   .map(SearchResponse::getHits) //
+                                                                                                   .flatMap(Flux::fromIterable);
     }
 
     /*
@@ -310,9 +311,9 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
         searchRequest.source().size(0);
         searchRequest.source().trackTotalHits(false);
 
-        return sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers) //
-                                                                                                  .map(SearchResponse::getAggregations) //
-                                                                                                  .flatMap(Flux::fromIterable);
+        return sendRequest(searchRequest, this::buildSearchRequest, SearchResponse.class, headers) //
+                                                                                                   .map(SearchResponse::getAggregations) //
+                                                                                                   .flatMap(Flux::fromIterable);
     }
 
     /*
@@ -1006,7 +1007,6 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
 
     @Override
     public Mono<SearchResponse> searchForPage(SearchRequest request) {
-        long startTime = System.currentTimeMillis();
         // if (version.after(Version.V_7_0_0)) {
         request.source().trackTotalHits(true);
         // }
@@ -1014,7 +1014,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
             .sendRequest(request, this::buildSearchRequest, SearchResponse.class, HttpHeaders.EMPTY)
             .singleOrEmpty()
             .doOnSuccess(res -> log
-                .trace("execute search {} {}ms : {}", request.indices(), System.currentTimeMillis() - startTime, request.source()))
+                .trace("execute search {} {} : {}", request.indices(),res.getTook(), request.source()))
             .doOnError(err -> log.warn("execute search {} error : {}", request.indices(), request.source(), err));
     }