Quellcode durchsuchen

优化elasticsearch

zhouhao vor 4 Jahren
Ursprung
Commit
ec30bafddb
19 geänderte Dateien mit 408 neuen und 151 gelöschten Zeilen
  1. 8 2
      jetlinks-components/common-component/src/main/java/org/jetlinks/community/Interval.java
  2. 2 2
      jetlinks-components/common-component/src/main/java/org/jetlinks/community/ValueObject.java
  3. 1 1
      jetlinks-components/dashboard-component/src/main/java/org/jetlinks/community/dashboard/MeasurementParameter.java
  4. 6 1
      jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/AbstractElasticSearchIndexStrategy.java
  5. 79 0
      jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/AggType.java
  6. 49 20
      jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/DefaultReactiveElasticsearchClient.java
  7. 178 102
      jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveAggregationService.java
  8. 3 0
      jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticsearchClient.java
  9. 1 1
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/SubscribeRequest.java
  10. 1 1
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayProperties.java
  11. 1 1
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/TcpClientProperties.java
  12. 1 1
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/TcpServerProperties.java
  13. 1 1
      jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/TimeSeriesData.java
  14. 8 1
      jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/Aggregation.java
  15. 2 1
      jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/AggregationData.java
  16. 26 5
      jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/AggregationQueryParam.java
  17. 21 0
      jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/LimitAggregationColumn.java
  18. 15 0
      jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/LimitGroup.java
  19. 5 11
      jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/TimeGroup.java

+ 8 - 2
jetlinks-components/common-component/src/main/java/org/jetlinks/community/Interval.java

@@ -18,9 +18,15 @@ public class Interval {
     public static String minutes = "m";
     public static String seconds = "s";
 
-    private BigDecimal number;
+    private final BigDecimal number;
 
-    private String expression;
+    private final String expression;
+
+    public boolean isFixed() {
+        return expression.equals(hours) ||
+            expression.equals(minutes) ||
+            expression.equals(seconds);
+    }
 
     @Override
     public String toString() {

+ 2 - 2
jetlinks-components/common-component/src/main/java/org/jetlinks/community/ValueObject.java

@@ -11,10 +11,10 @@ import java.util.Optional;
 
 public interface ValueObject {
 
-    Map<String, Object> getAll();
+    Map<String, Object> values();
 
     default Optional<Object> get(String name) {
-        return Optional.ofNullable(getAll())
+        return Optional.ofNullable(values())
             .map(map -> map.get(name));
     }
 

+ 1 - 1
jetlinks-components/dashboard-component/src/main/java/org/jetlinks/community/dashboard/MeasurementParameter.java

@@ -22,7 +22,7 @@ public class MeasurementParameter implements ValueObject {
     }
 
     @Override
-    public Map<String, Object> getAll() {
+    public Map<String, Object> values() {
         return params;
     }
 }

+ 6 - 1
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/AbstractElasticSearchIndexStrategy.java

@@ -95,7 +95,12 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc
         }
         Map<String, Object> mappingConfig = new HashMap<>();
         PutMappingRequest request = new PutMappingRequest(wrapIndex(metadata.getIndex()));
-        mappingConfig.put("properties", createElasticProperties(metadata.getProperties()));
+        request.type("_doc");
+        List<PropertyMetadata> allProperties = new ArrayList<>();
+        allProperties.addAll(metadata.getProperties());
+        allProperties.addAll(ignore.getProperties());
+
+        mappingConfig.put("properties", createElasticProperties(allProperties));
         request.source(mappingConfig);
         return request;
     }

+ 79 - 0
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/AggType.java

@@ -0,0 +1,79 @@
+package org.jetlinks.community.elastic.search.service.reactive;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import org.elasticsearch.search.aggregations.AggregationBuilder;
+import org.elasticsearch.search.aggregations.AggregationBuilders;
+
+@AllArgsConstructor
+public enum AggType {
+
+    AVG("平均") {
+        @Override
+        public AggregationBuilder aggregationBuilder(String name, String filed) {
+            return AggregationBuilders.avg(name).field(filed).missing(0);
+        }
+
+    },
+    MAX("最大") {
+        @Override
+        public AggregationBuilder aggregationBuilder(String name, String filed) {
+            return AggregationBuilders.max(name).field(filed).missing(0);
+        }
+
+
+    },
+    COUNT("非空值计数") {
+        @Override
+        public AggregationBuilder aggregationBuilder(String name, String filed) {
+            return AggregationBuilders.count(name).field(filed).missing(0);
+        }
+
+    },
+    MIN("最小") {
+        @Override
+        public AggregationBuilder aggregationBuilder(String name, String filed) {
+            return AggregationBuilders.min(name).field(filed).missing(0);
+        }
+    },
+    FIRST("第一条数据") {
+        @Override
+        public AggregationBuilder aggregationBuilder(String name, String filed) {
+            return AggregationBuilders.topHits(name).size(1);
+        }
+    },
+    TOP("第N条数据") {
+        @Override
+        public AggregationBuilder aggregationBuilder(String name, String filed) {
+            return AggregationBuilders.topHits(name);
+        }
+    },
+    SUM("总数") {
+        @Override
+        public AggregationBuilder aggregationBuilder(String name, String filed) {
+            return AggregationBuilders.sum(name).field(filed).missing(0);
+        }
+    },
+    STATS("统计汇总") {
+        @Override
+        public AggregationBuilder aggregationBuilder(String name, String filed) {
+            return AggregationBuilders.stats(name).field(filed).missing(0);
+        }
+
+    };
+
+    @Getter
+    private final String text;
+
+    public abstract AggregationBuilder aggregationBuilder(String name, String filed);
+
+    public static AggType of(String name) {
+        for (AggType type : AggType.values()) {
+            if (type.name().equalsIgnoreCase(name)) {
+                return type;
+            }
+        }
+        throw new UnsupportedOperationException("不支持的聚合度量类型:" + name);
+    }
+
+}

+ 49 - 20
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/DefaultReactiveElasticsearchClient.java

@@ -1,8 +1,9 @@
 package org.jetlinks.community.elastic.search.service.reactive;
 
 import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.serializer.SerializerFeature;
 import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpPut;
 import org.apache.http.entity.ByteArrayEntity;
@@ -11,6 +12,7 @@ import org.apache.http.util.EntityUtils;
 import org.apache.lucene.util.BytesRef;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
@@ -50,16 +52,15 @@ import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.action.update.UpdateResponse;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.cluster.health.ClusterHealthStatus;
-import org.elasticsearch.cluster.metadata.MappingMetadata;
 import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.BytesReference;
-import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.lucene.uid.Versions;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.*;
 import org.elasticsearch.index.VersionType;
 import org.elasticsearch.index.get.GetResult;
+import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.reindex.BulkByScrollResponse;
 import org.elasticsearch.index.reindex.DeleteByQueryRequest;
 import org.elasticsearch.index.seqno.SequenceNumbers;
@@ -110,8 +111,10 @@ import java.util.StringJoiner;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
+import static org.elasticsearch.rest.BaseRestHandler.INCLUDE_TYPE_NAME_PARAMETER;
 import static org.springframework.data.elasticsearch.client.util.RequestConverters.createContentType;
 
+@Slf4j
 public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient {
     private final HostProvider hostProvider;
     private final RequestCreator requestCreator;
@@ -131,6 +134,11 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
 
         this.hostProvider = hostProvider;
         this.requestCreator = requestCreator;
+        info()
+            .subscribe(mainResponse -> {
+                log.debug("connect elasticsearch server : {}", JSON.toJSONString(mainResponse, SerializerFeature.PrettyFormat));
+                version = mainResponse.getVersion();
+            });
     }
 
     public void setHeadersSupplier(Supplier<HttpHeaders> headersSupplier) {
@@ -216,7 +224,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
      * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#indices()
      */
     @Override
-    public Indices indices() {
+    public org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices indices() {
         return this;
     }
 
@@ -547,9 +555,10 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
      */
     @Override
     public Mono<Boolean> existsIndex(HttpHeaders headers, GetIndexRequest request) {
-
-        return sendRequest(request, requestCreator.indexExists(), RawActionResponse.class, headers) //
-            .map(response -> response.statusCode().is2xxSuccessful()) //
+        return sendRequest(request, requestCreator.indexExists()
+            , RawActionResponse.class, headers) //
+            .map(response -> response.statusCode().is2xxSuccessful())
+            .onErrorReturn(false)
             .next();
     }
 
@@ -585,7 +594,15 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
     @Override
     public Mono<Void> openIndex(HttpHeaders headers, OpenIndexRequest request) {
 
-        return sendRequest(request, requestCreator.indexOpen(), AcknowledgedResponse.class, headers) //
+        return sendRequest(
+            request,
+            requestCreator
+                .indexOpen()
+                .andThen(r -> {
+                    r.addParameter("include_type_name", "true");
+                    return r;
+                }),
+            AcknowledgedResponse.class, headers) //
             .then();
     }
 
@@ -618,7 +635,9 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
     @Override
     public Mono<Void> updateMapping(HttpHeaders headers, PutMappingRequest putMappingRequest) {
 
-        return sendRequest(putMappingRequest, requestCreator.putMapping(), AcknowledgedResponse.class, headers) //
+        return sendRequest(putMappingRequest
+            , requestCreator.putMapping()
+            , AcknowledgedResponse.class, headers) //
             .then();
     }
 
@@ -638,7 +657,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
      * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.ReactiveElasticsearchClientCallback)
      */
     @Override
-    public Mono<ClientResponse> execute(ReactiveElasticsearchClientCallback callback) {
+    public Mono<ClientResponse> execute(org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.ReactiveElasticsearchClientCallback callback) {
 
         return this.hostProvider.getActive(HostProvider.Verification.LAZY) //
             .flatMap(callback::doWithClient) //
@@ -655,7 +674,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
     }
 
     @Override
-    public Mono<Status> status() {
+    public Mono<org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Status> status() {
 
         return hostProvider.clusterInfo() //
             .map(it -> new ClientStatus(it.getNodes()));
@@ -795,8 +814,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
             } catch (Exception e) {
 
                 return Mono
-                    .error(new ElasticsearchStatusException(content,
-                        RestStatus.fromCode(response.statusCode().value()),errorParseFailure));
+                    .error(new ElasticsearchStatusException(content, RestStatus.fromCode(response.statusCode().value())));
             }
         }
     }
@@ -900,9 +918,15 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
 
     @Override
     public Mono<SearchResponse> searchForPage(SearchRequest request) {
-
+        long startTime = System.currentTimeMillis();
         return sendRequest(request, requestCreator.search(), SearchResponse.class, HttpHeaders.EMPTY)
-            .singleOrEmpty();
+            .singleOrEmpty()
+            .doOnNext(res -> {
+                log.trace("execute search {} {}ms : {}", request.indices(), System.currentTimeMillis() - startTime, request.source());
+            })
+            .doOnError(err -> {
+                log.warn("execute search {} error : {}", request.indices(), request.source(), err);
+            });
     }
 
     @SneakyThrows
@@ -940,9 +964,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
     }
 
     Request convertGetIndexTemplateRequest(GetIndexTemplatesRequest getIndexTemplatesRequest) {
-        final Request request = new Request(HttpGet.METHOD_NAME, "/_template/" + String.join(",", getIndexTemplatesRequest.names()));
-
-        return request;
+        return new Request(HttpGet.METHOD_NAME, "/_template/" + String.join(",", getIndexTemplatesRequest.names()));
     }
 
     @Override
@@ -974,16 +996,23 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
             .singleOrEmpty();
     }
 
+    private Version version = Version.CURRENT;
+
+    @Override
+    public Version serverVersion() {
+        return version;
+    }
+
     // endregion
 
     // region internal classes
 
     /**
-     * Reactive client {@link Status} implementation.
+     * Reactive client {@link ReactiveElasticsearchClient.Status} implementation.
      *
      * @author Christoph Strobl
      */
-    class ClientStatus implements Status {
+    class ClientStatus implements ReactiveElasticsearchClient.Status {
 
         private final Collection<ElasticsearchHost> connectedHosts;
 

+ 178 - 102
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveAggregationService.java

@@ -1,29 +1,41 @@
 package org.jetlinks.community.elastic.search.service.reactive;
 
 import lombok.extern.slf4j.Slf4j;
+import org.elasticsearch.Version;
 import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.search.aggregations.AggregationBuilder;
+import org.elasticsearch.search.aggregations.AggregationBuilders;
+import org.elasticsearch.search.aggregations.Aggregations;
+import org.elasticsearch.search.aggregations.BucketOrder;
+import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
+import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
 import org.elasticsearch.search.aggregations.bucket.histogram.ExtendedBounds;
+import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
+import org.elasticsearch.search.aggregations.bucket.terms.Terms;
+import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
+import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation;
+import org.elasticsearch.search.aggregations.metrics.TopHits;
+import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder;
+import org.elasticsearch.search.aggregations.metrics.ValueCount;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.SortBuilders;
+import org.elasticsearch.search.sort.SortOrder;
 import org.hswebframework.ezorm.core.param.QueryParam;
 import org.hswebframework.ezorm.core.param.TermType;
-import org.jetlinks.community.elastic.search.aggreation.bucket.Bucket;
-import org.jetlinks.community.elastic.search.aggreation.bucket.BucketAggregationsStructure;
-import org.jetlinks.community.elastic.search.aggreation.bucket.BucketResponse;
-import org.jetlinks.community.elastic.search.aggreation.bucket.Sort;
-import org.jetlinks.community.elastic.search.aggreation.enums.BucketType;
-import org.jetlinks.community.elastic.search.aggreation.enums.MetricsType;
-import org.jetlinks.community.elastic.search.aggreation.enums.OrderType;
-import org.jetlinks.community.elastic.search.aggreation.metrics.MetricsAggregationStructure;
 import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager;
 import org.jetlinks.community.elastic.search.service.AggregationService;
 import org.jetlinks.community.elastic.search.service.DefaultElasticSearchService;
 import org.jetlinks.community.elastic.search.utils.ElasticSearchConverter;
-import org.jetlinks.community.timeseries.query.AggregationQueryParam;
+import org.jetlinks.community.timeseries.query.*;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
+import org.springframework.util.StringUtils;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.time.ZoneId;
 import java.util.*;
 import java.util.stream.Collectors;
 
@@ -53,69 +65,186 @@ public class ReactiveAggregationService implements AggregationService {
             .map(metadata -> ElasticSearchConverter.convertSearchSourceBuilder(queryParam, metadata));
     }
 
+    private AggregationBuilder createBuilder(Group group, AggregationQueryParam param) {
+
+        if (group instanceof TimeGroup) {
+            TimeGroup timeGroup = ((TimeGroup) group);
+            DateHistogramAggregationBuilder builder = AggregationBuilders
+                .dateHistogram(timeGroup.getAlias())
+                .field(timeGroup.getProperty());
+            if (StringUtils.hasText(timeGroup.getFormat())) {
+                String format = timeGroup.getFormat();
+                if (format.startsWith("yyyy")) {
+                    format = "8" + format;
+                }
+                builder.format(format);
+            }
+            builder.order(BucketOrder.key(false));
+            if (timeGroup.getInterval() != null) {
+                if (restClient.serverVersion().after(Version.V_7_2_0)) {
+                    if (timeGroup.getInterval().isFixed()) {
+                        builder.fixedInterval(new DateHistogramInterval(timeGroup.getInterval().toString()));
+                    } else {
+                        builder.calendarInterval(new DateHistogramInterval(timeGroup.getInterval().toString()));
+                    }
+                } else {
+                    builder.dateHistogramInterval(new DateHistogramInterval(timeGroup.getInterval().toString()));
+                }
+            }
+
+            builder.extendedBounds(getExtendedBounds(param));
+//            builder.missing("");
+
+            builder.timeZone(ZoneId.systemDefault());
+            return builder;
+        } else {
+            TermsAggregationBuilder builder = AggregationBuilders
+                .terms(group.getAlias())
+                .field(group.getProperty());
+            if (group instanceof LimitGroup) {
+                builder.size(((LimitGroup) group).getLimit());
+            } else {
+                builder.size(100);
+            }
+//            builder.missing(0);
+            return builder.executionHint("map");
+        }
+    }
+
     @Override
     public Flux<Map<String, Object>> aggregation(String[] index, AggregationQueryParam aggregationQueryParam) {
         QueryParam queryParam = prepareQueryParam(aggregationQueryParam);
-        BucketAggregationsStructure structure = createAggParameter(aggregationQueryParam);
+
+        List<Group> groups = new ArrayList<>();
+        // TODO: 2020/9/3
+        if (aggregationQueryParam.getGroupByTime() != null) {
+            groups.add(aggregationQueryParam.getGroupByTime());
+        }
+        groups.addAll(aggregationQueryParam.getGroupBy());
+        AggregationBuilder aggregationBuilder;
+        AggregationBuilder lastAggBuilder;
+        if (!groups.isEmpty()) {
+            Group first = groups.get(0);
+            aggregationBuilder = lastAggBuilder = createBuilder(first, aggregationQueryParam);
+            for (int i = 1; i < groups.size(); i++) {
+                aggregationBuilder.subAggregation(lastAggBuilder = createBuilder(groups.get(i), aggregationQueryParam));
+            }
+        } else {
+            aggregationBuilder = lastAggBuilder = AggregationBuilders.count("count");
+        }
+        for (AggregationColumn aggColumn : aggregationQueryParam.getAggColumns()) {
+            AggregationBuilder builder = AggType.of(aggColumn.getAggregation().name())
+                .aggregationBuilder(aggColumn.getAlias(), aggColumn.getProperty());
+            if (builder instanceof TopHitsAggregationBuilder) {
+                TopHitsAggregationBuilder topHitsBuilder = ((TopHitsAggregationBuilder) builder);
+                if (CollectionUtils.isEmpty(queryParam.getSorts())) {
+                    topHitsBuilder.sort(aggregationQueryParam.getTimeProperty(), SortOrder.DESC);
+                } else {
+                    topHitsBuilder.sorts(queryParam.getSorts()
+                        .stream()
+                        .map(sort -> SortBuilders.fieldSort(sort.getName())
+                            .order("desc".equalsIgnoreCase(sort.getOrder()) ? SortOrder.DESC : SortOrder.ASC))
+                        .collect(Collectors.toList()));
+                }
+                if (aggColumn instanceof LimitAggregationColumn) {
+                    topHitsBuilder.size(((LimitAggregationColumn) aggColumn).getLimit());
+                }else {
+                    topHitsBuilder.size(1);
+                }
+            }
+            lastAggBuilder.subAggregation(builder);
+        }
+
+        AggregationBuilder ageBuilder = aggregationBuilder;
+
         return Flux.fromArray(index)
             .flatMap(idx -> Mono.zip(indexManager.getIndexStrategy(idx), Mono.just(idx)))
             .collectList()
             .flatMap(strategy ->
-                createSearchSourceBuilder(queryParam, index[0])
+                this
+                    .createSearchSourceBuilder(queryParam, index[0])
                     .map(builder ->
                         new SearchRequest(strategy
                             .stream()
                             .map(tp2 -> tp2.getT1().getIndexForSearch(tp2.getT2()))
                             .toArray(String[]::new))
                             .indicesOptions(DefaultElasticSearchService.indexOptions)
-                            .source(builder.size(0).aggregation(structure.getType().aggregationBuilder(structure))
-                            )
+                            .source(builder.size(0).aggregation(ageBuilder))
                     )
             )
             .flatMap(restClient::searchForPage)
-            .filter(response -> response.getAggregations() != null)
-            .map(response -> BucketResponse.builder()
-                .name(structure.getName())
-                .buckets(structure.getType().convert(response.getAggregations().get(structure.getName())))
-                .build())
-            .flatMapIterable(BucketsParser::convert)
-            .take(aggregationQueryParam.getLimit())
+            .flatMapMany(this::parseResult)
+            .as(flux -> aggregationQueryParam.getLimit() > 0 ? flux.take(aggregationQueryParam.getLimit()) : flux)
             ;
     }
 
-    static class BucketsParser {
-
-        private final List<Map<String, Object>> result = new ArrayList<>();
+    protected Flux<Map<String, Object>> parseResult(SearchResponse searchResponse) {
+        return Mono.justOrEmpty(searchResponse.getAggregations())
+            .flatMapIterable(Aggregations::asList)
+            .flatMap(agg -> parseAggregation(agg.getName(), agg));
+    }
 
-        public static List<Map<String, Object>> convert(BucketResponse response) {
-            return new BucketsParser(response).result;
+    private Flux<Map<String, Object>> parseAggregation(String name, org.elasticsearch.search.aggregations.Aggregation aggregation) {
+        if (aggregation instanceof Terms) {
+            return parseAggregation(((Terms) aggregation));
         }
-
-        public BucketsParser(BucketResponse response) {
-            this(response.getBuckets());
+        if (aggregation instanceof TopHits) {
+            TopHits topHits = ((TopHits) aggregation);
+            return Flux
+                .fromArray(topHits.getHits().getHits())
+                .map(hit -> {
+                    Map<String, Object> val = hit.getSourceAsMap();
+                    if (!val.containsKey("id")) {
+                        val.put("id", hit.getId());
+                    }
+                    return val;
+                });
         }
-
-        public BucketsParser(List<Bucket> buckets) {
-            buckets.forEach(bucket -> parser(bucket, new HashMap<>()));
+        if (aggregation instanceof Histogram) {
+            return parseAggregation(((Histogram) aggregation));
         }
-
-        public void parser(Bucket bucket, Map<String, Object> fMap) {
-            addBucketProperty(bucket, fMap);
-            if (bucket.getBuckets() != null && !bucket.getBuckets().isEmpty()) {
-                bucket.getBuckets().forEach(b -> {
-                    Map<String, Object> map = new HashMap<>(fMap);
-                    addBucketProperty(b, map);
-                    parser(b, map);
-                });
-            } else {
-                result.add(fMap);
-            }
+        if (aggregation instanceof ValueCount) {
+            return Flux.just(Collections.singletonMap(name, ((ValueCount) aggregation).getValue()));
         }
-
-        private void addBucketProperty(Bucket bucket, Map<String, Object> fMap) {
-            fMap.put(bucket.getName(), bucket.getKey());
-            fMap.putAll(bucket.toMap());
+        if (aggregation instanceof NumericMetricsAggregation.SingleValue) {
+            return Flux.just(Collections.singletonMap(name, getSafeNumber(((NumericMetricsAggregation.SingleValue) aggregation).value())));
         }
+
+        return Flux.empty();
+    }
+
+    private double getSafeNumber(double number) {
+        return (Double.isNaN(number) || Double.isInfinite(number)) ? 0D : number;
+    }
+
+    private Flux<Map<String, Object>> parseAggregation(Histogram aggregation) {
+
+        return Flux
+            .fromIterable(aggregation.getBuckets())
+            .flatMap(bucket ->
+                Flux.fromIterable(bucket.getAggregations().asList())
+                    .flatMap(agg -> this.parseAggregation(agg.getName(), agg))
+                    .defaultIfEmpty(Collections.emptyMap())
+                    .map(map -> {
+                        Map<String, Object> val = new HashMap<>(map);
+                        val.put(aggregation.getName(), bucket.getKeyAsString());
+                        val.put("_" + aggregation.getName(), bucket.getKey());
+                        return val;
+                    })
+            );
+    }
+
+    private Flux<Map<String, Object>> parseAggregation(Terms aggregation) {
+
+        return Flux.fromIterable(aggregation.getBuckets())
+            .flatMap(bucket -> Flux.fromIterable(bucket.getAggregations().asList())
+                .flatMap(agg -> parseAggregation(agg.getName(), agg)
+                    .map(map -> {
+                        Map<String, Object> val = new HashMap<>(map);
+                        val.put(aggregation.getName(), bucket.getKeyAsString());
+                        return val;
+                    })
+                ));
     }
 
     protected static QueryParam prepareQueryParam(AggregationQueryParam param) {
@@ -128,54 +257,14 @@ public class ReactiveAggregationService implements AggregationService {
         return queryParam;
     }
 
-    protected BucketAggregationsStructure createAggParameter(AggregationQueryParam param) {
-        List<BucketAggregationsStructure> structures = new ArrayList<>();
-        if (param.getGroupByTime() != null) {
-            structures.add(convertAggGroupTimeStructure(param));
-        }
-        if (param.getGroupBy() != null && !param.getGroupBy().isEmpty()) {
-            structures.addAll(getTermTypeStructures(param));
-        }
-        for (int i = 0, size = structures.size(); i < size; i++) {
-            if (i < size - 1) {
-                structures.get(i).setSubBucketAggregation(Collections.singletonList(structures.get(i + 1)));
-            }
-            if (i == size - 1) {
-                structures.get(i)
-                    .setSubMetricsAggregation(param
-                        .getAggColumns()
-                        .stream()
-                        .map(agg -> {
-                            MetricsAggregationStructure metricsAggregationStructure = new MetricsAggregationStructure();
-                            metricsAggregationStructure.setField(agg.getProperty());
-                            metricsAggregationStructure.setName(agg.getAlias());
-                            metricsAggregationStructure.setType(MetricsType.of(agg.getAggregation().name()));
-                            return metricsAggregationStructure;
-                        }).collect(Collectors.toList()));
-            }
-        }
-        return structures.get(0);
-    }
-
-    protected BucketAggregationsStructure convertAggGroupTimeStructure(AggregationQueryParam param) {
-        BucketAggregationsStructure structure = new BucketAggregationsStructure();
-        structure.setInterval(param.getGroupByTime().getInterval().toString());
-        structure.setType(BucketType.DATE_HISTOGRAM);
-        structure.setFormat(param.getGroupByTime().getFormat());
-        structure.setName(param.getGroupByTime().getAlias());
-        structure.setField(param.getGroupByTime().getProperty());
-        structure.setSort(Sort.desc(OrderType.KEY));
-        structure.setExtendedBounds(getExtendedBounds(param));
-        return structure;
-    }
-
     protected static ExtendedBounds getExtendedBounds(AggregationQueryParam param) {
         return new ExtendedBounds(calculateStartWithTime(param), param.getEndWithTime());
     }
 
     private static long calculateStartWithTime(AggregationQueryParam param) {
         long startWithParam = param.getStartWithTime();
-//        if (param.getGroupByTime() != null && param.getGroupByTime().getInterval() != null) {
+
+//        if (param.getGroupByTime() != nullcalculateStartWithTime(param) && param.getGroupByTime().getInterval() != null) {
 //            long timeInterval = param.getGroupByTime().getInterval().toMillis() * param.getLimit();
 //            long tempStartWithParam = param.getEndWithTime() - timeInterval;
 //            startWithParam = Math.max(tempStartWithParam, startWithParam);
@@ -183,17 +272,4 @@ public class ReactiveAggregationService implements AggregationService {
         return startWithParam;
     }
 
-    protected List<BucketAggregationsStructure> getTermTypeStructures(AggregationQueryParam param) {
-        return param.getGroupBy()
-            .stream()
-            .map(group -> {
-                BucketAggregationsStructure structure = new BucketAggregationsStructure();
-                structure.setType(BucketType.TERMS);
-                structure.setSize(param.getLimit());
-                structure.setField(group.getProperty());
-                structure.setName(group.getAlias());
-                return structure;
-            }).collect(Collectors.toList());
-    }
-
 }

+ 3 - 0
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticsearchClient.java

@@ -1,5 +1,6 @@
 package org.jetlinks.community.elastic.search.service.reactive;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
 import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
 import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesRequest;
@@ -26,4 +27,6 @@ public interface ReactiveElasticsearchClient extends
 
     Mono<AcknowledgedResponse> updateTemplate(PutIndexTemplateRequest request);
 
+    Version serverVersion();
+
 }

+ 1 - 1
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/SubscribeRequest.java

@@ -23,7 +23,7 @@ public class SubscribeRequest implements ValueObject {
     private Authentication authentication;
 
     @Override
-    public Map<String, Object> getAll() {
+    public Map<String, Object> values() {
         return parameter;
     }
 

+ 1 - 1
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayProperties.java

@@ -20,7 +20,7 @@ public class DeviceGatewayProperties  implements ValueObject {
     private Map<String,Object> configuration=new HashMap<>();
 
     @Override
-    public Map<String, Object> getAll() {
+    public Map<String, Object> values() {
         return configuration;
     }
 }

+ 1 - 1
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/TcpClientProperties.java

@@ -35,7 +35,7 @@ public class TcpClientProperties implements ValueObject {
     private boolean enabled;
 
     @Override
-    public Map<String, Object> getAll() {
+    public Map<String, Object> values() {
         return parserConfiguration;
     }
 }

+ 1 - 1
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/TcpServerProperties.java

@@ -53,7 +53,7 @@ public class TcpServerProperties implements ValueObject {
     }
 
     @Override
-    public Map<String, Object> getAll() {
+    public Map<String, Object> values() {
         return parserConfiguration;
     }
 }

+ 1 - 1
jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/TimeSeriesData.java

@@ -14,7 +14,7 @@ public interface TimeSeriesData extends ValueObject {
     Map<String, Object> getData();
 
     @Override
-    default Map<String, Object> getAll() {
+    default Map<String, Object> values() {
         return getData();
     }
 

+ 8 - 1
jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/Aggregation.java

@@ -2,6 +2,13 @@ package org.jetlinks.community.timeseries.query;
 
 public enum Aggregation {
 
-    MIN, MAX, AVG, SUM, COUNT, NONE;
+    MIN,
+    MAX,
+    AVG,
+    SUM,
+    COUNT,
+    FIRST,
+    TOP,
+    NONE;
 
 }

+ 2 - 1
jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/AggregationData.java

@@ -1,5 +1,6 @@
 package org.jetlinks.community.timeseries.query;
 
+
 import org.jetlinks.community.ValueObject;
 
 import java.util.Map;
@@ -15,7 +16,7 @@ public interface AggregationData extends ValueObject {
     }
 
     @Override
-    default Map<String, Object> getAll() {
+    default Map<String, Object> values() {
         return asMap();
     }
 

+ 26 - 5
jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/AggregationQueryParam.java

@@ -4,23 +4,27 @@ import lombok.Getter;
 import lombok.Setter;
 import org.hswebframework.ezorm.core.dsl.Query;
 import org.hswebframework.ezorm.core.param.QueryParam;
+import org.hswebframework.web.api.crud.entity.QueryParamEntity;
 import org.jetlinks.community.Interval;
 
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
+/**
+ * 聚合查询参数
+ */
 @Getter
 @Setter
 public class AggregationQueryParam {
 
-    //合列
+    //合列
     private List<AggregationColumn> aggColumns = new ArrayList<>();
 
     //按时间分组
+    @Deprecated
     private TimeGroup groupByTime;
 
     //按字段分组
@@ -36,12 +40,16 @@ public class AggregationQueryParam {
     private String timeProperty = "timestamp";
 
     //条件过滤
-    private QueryParam queryParam = new QueryParam();
+    private QueryParamEntity queryParam = new QueryParamEntity();
 
     public static AggregationQueryParam of() {
         return new AggregationQueryParam();
     }
 
+    public <T> T as(Function<AggregationQueryParam, T> mapper) {
+        return mapper.apply(this);
+    }
+
     public AggregationQueryParam from(long time) {
         this.startWithTime = time;
         return this;
@@ -66,11 +74,15 @@ public class AggregationQueryParam {
         return this;
     }
 
-    public AggregationQueryParam agg(String property, String alias, Aggregation agg) {
-        aggColumns.add(new AggregationColumn(property, alias, agg));
+    public AggregationQueryParam agg(AggregationColumn agg) {
+        aggColumns.add(agg);
         return this;
     }
 
+    public AggregationQueryParam agg(String property, String alias, Aggregation agg) {
+        return this.agg(new AggregationColumn(property, alias, agg));
+    }
+
 
     public AggregationQueryParam agg(String property, Aggregation agg) {
         return agg(property, property, agg);
@@ -138,6 +150,10 @@ public class AggregationQueryParam {
         return groupBy(new Group(property, alias));
     }
 
+    public AggregationQueryParam groupBy(String property) {
+        return groupBy(new Group(property, property));
+    }
+
     public <T> T execute(Function<AggregationQueryParam, T> executor) {
         return executor.apply(this);
     }
@@ -147,6 +163,11 @@ public class AggregationQueryParam {
         return this;
     }
 
+    public AggregationQueryParam filter(QueryParamEntity queryParam) {
+        this.queryParam = queryParam;
+        return this;
+    }
+
     public AggregationQueryParam limit(int limit) {
         this.limit = limit;
         return this;

+ 21 - 0
jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/LimitAggregationColumn.java

@@ -0,0 +1,21 @@
+package org.jetlinks.community.timeseries.query;
+
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@Getter
+@Setter
+@NoArgsConstructor
+public class LimitAggregationColumn extends AggregationColumn {
+
+    private int limit;
+
+    public LimitAggregationColumn(String property,
+                                  String alias,
+                                  Aggregation aggregation,
+                                  int limit) {
+        super(property, alias, aggregation);
+        this.limit = limit;
+    }
+}

+ 15 - 0
jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/LimitGroup.java

@@ -0,0 +1,15 @@
+package org.jetlinks.community.timeseries.query;
+
+import lombok.Getter;
+import lombok.Setter;
+
+@Getter
+@Setter
+public class LimitGroup extends Group {
+    private int limit;
+
+    public LimitGroup(String property, String alias, int limit) {
+        super(property, alias);
+        this.limit = limit;
+    }
+}

+ 5 - 11
jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/TimeGroup.java

@@ -1,34 +1,28 @@
 package org.jetlinks.community.timeseries.query;
 
-import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.Setter;
 import org.jetlinks.community.Interval;
 
-import java.time.Duration;
-
 @Getter
 @Setter
-@AllArgsConstructor
 @NoArgsConstructor
-public class TimeGroup {
-
-    private String property = "timestamp";
+public class TimeGroup extends Group{
 
-    //时间分组间隔,如: 1d , 30s
+    /**
+     * 时间分组间隔,如: 1d , 30s
+     */
     private Interval interval;
 
-    private String alias;
-
     /**
      * 时序时间返回格式  如 YYYY-MM-dd
      */
     private String format;
 
     public TimeGroup(Interval interval, String alias, String format) {
+        super("timestamp",alias);
         this.interval = interval;
-        this.alias = alias;
         this.format = format;
     }
 }