Jelajahi Sumber

优化代码结构

zhou-hao 3 tahun lalu
induk
melakukan
d22ff38384

+ 69 - 59
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveAggregationService.java

@@ -14,10 +14,7 @@ import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
 import org.elasticsearch.search.aggregations.bucket.histogram.LongBounds;
 import org.elasticsearch.search.aggregations.bucket.terms.Terms;
 import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
-import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation;
-import org.elasticsearch.search.aggregations.metrics.TopHits;
-import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder;
-import org.elasticsearch.search.aggregations.metrics.ValueCount;
+import org.elasticsearch.search.aggregations.metrics.*;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.sort.SortBuilders;
 import org.elasticsearch.search.sort.SortOrder;
@@ -146,17 +143,19 @@ public class ReactiveAggregationService implements AggregationService {
         boolean group = aggregationBuilder != null;
         for (AggregationColumn aggColumn : aggregationQueryParam.getAggColumns()) {
             AggregationBuilder builder = AggType.of(aggColumn.getAggregation().name())
-                .aggregationBuilder(aggColumn.getAlias(), aggColumn.getProperty());
+                                                .aggregationBuilder(aggColumn.getAlias(), aggColumn.getProperty());
             if (builder instanceof TopHitsAggregationBuilder) {
                 TopHitsAggregationBuilder topHitsBuilder = ((TopHitsAggregationBuilder) builder);
                 if (CollectionUtils.isEmpty(queryParam.getSorts())) {
                     topHitsBuilder.sort(aggregationQueryParam.getTimeProperty(), SortOrder.DESC);
                 } else {
-                    topHitsBuilder.sorts(queryParam.getSorts()
-                        .stream()
-                        .map(sort -> SortBuilders.fieldSort(sort.getName())
-                            .order("desc".equalsIgnoreCase(sort.getOrder()) ? SortOrder.DESC : SortOrder.ASC))
-                        .collect(Collectors.toList()));
+                    topHitsBuilder.sorts(queryParam
+                                             .getSorts()
+                                             .stream()
+                                             .map(sort -> SortBuilders
+                                                 .fieldSort(sort.getName())
+                                                 .order("desc".equalsIgnoreCase(sort.getOrder()) ? SortOrder.DESC : SortOrder.ASC))
+                                             .collect(Collectors.toList()));
                 }
                 if (aggColumn instanceof LimitAggregationColumn) {
                     topHitsBuilder.size(((LimitAggregationColumn) aggColumn).getLimit());
@@ -172,41 +171,42 @@ public class ReactiveAggregationService implements AggregationService {
         }
 
         return Flux.fromArray(index)
-            .flatMap(idx -> Mono.zip(indexManager.getIndexStrategy(idx), Mono.just(idx)))
-            .collectList()
-            .flatMap(strategy ->
-                this
-                    .createSearchSourceBuilder(queryParam, index[0])
-                    .map(builder -> {
-                            aggs.forEach(builder.size(0)::aggregation);
-                            return new SearchRequest(strategy
-                                .stream()
-                                .map(tp2 -> tp2.getT1().getIndexForSearch(tp2.getT2()))
-                                .toArray(String[]::new))
-                                .indicesOptions(DefaultElasticSearchService.indexOptions)
-                                .source(builder);
-                        }
-                    )
-            )
-            .flatMap(restClient::searchForPage)
-            .flatMapMany(this::parseResult)
-            .as(flux -> {
-                if (!group) {
-                    return flux
-                        .map(Map::entrySet)
-                        .flatMap(Flux::fromIterable)
-                        .collectMap(Map.Entry::getKey, Map.Entry::getValue)
-                        .flux();
-                }
-                return flux;
-            })
+                   .flatMap(idx -> Mono.zip(indexManager.getIndexStrategy(idx), Mono.just(idx)))
+                   .collectList()
+                   .flatMap(strategy -> this
+                       .createSearchSourceBuilder(queryParam, index[0])
+                       .map(builder -> {
+                                aggs.forEach(builder.size(0)::aggregation);
+                                return new SearchRequest(strategy
+                                                             .stream()
+                                                             .map(tp2 -> tp2
+                                                                 .getT1()
+                                                                 .getIndexForSearch(tp2.getT2()))
+                                                             .toArray(String[]::new))
+                                    .indicesOptions(ReactiveElasticSearchService.indexOptions)
+                                    .source(builder);
+                            }
+                       )
+                   )
+                   .flatMap(restClient::searchForPage)
+                   .flatMapMany(this::parseResult)
+                   .as(flux -> {
+                       if (!group) {
+                           return flux
+                               .map(Map::entrySet)
+                               .flatMap(Flux::fromIterable)
+                               .collectMap(Map.Entry::getKey, Map.Entry::getValue)
+                               .flux();
+                       }
+                       return flux;
+                   })
             ;
     }
 
     protected Flux<Map<String, Object>> parseResult(SearchResponse searchResponse) {
         return Mono.justOrEmpty(searchResponse.getAggregations())
-            .flatMapIterable(Aggregations::asList)
-            .flatMap(agg -> parseAggregation(agg.getName(), agg));
+                   .flatMapIterable(Aggregations::asList)
+                   .flatMap(agg -> parseAggregation(agg.getName(), agg), Integer.MAX_VALUE);
     }
 
     private Flux<Map<String, Object>> parseAggregation(String name, org.elasticsearch.search.aggregations.Aggregation aggregation) {
@@ -232,7 +232,13 @@ public class ReactiveAggregationService implements AggregationService {
             return Flux.just(Collections.singletonMap(name, ((ValueCount) aggregation).getValue()));
         }
         if (aggregation instanceof NumericMetricsAggregation.SingleValue) {
-            return Flux.just(Collections.singletonMap(name, getSafeNumber(((NumericMetricsAggregation.SingleValue) aggregation).value())));
+            return Flux.just(Collections.singletonMap(name, getSafeNumber(((NumericMetricsAggregation.SingleValue) aggregation)
+                                                                              .value())));
+        }
+        if (aggregation instanceof ExtendedStats) {
+            ExtendedStats stats = ((ExtendedStats) aggregation);
+            // TODO: 2020/10/29 只处理了标准差差
+            return Flux.just(Collections.singletonMap(name, stats.getStdDeviation()));
         }
 
         return Flux.empty();
@@ -247,32 +253,33 @@ public class ReactiveAggregationService implements AggregationService {
         return Flux
             .fromIterable(aggregation.getBuckets())
             .flatMap(bucket ->
-                    Flux.fromIterable(bucket.getAggregations().asList())
-                        .flatMap(agg -> this.parseAggregation(agg.getName(), agg))
-                        .defaultIfEmpty(Collections.emptyMap())
+                         Flux.fromIterable(bucket.getAggregations().asList())
+                             .flatMap(agg -> this.parseAggregation(agg.getName(), agg), Integer.MAX_VALUE)
+                             .defaultIfEmpty(Collections.emptyMap())
 //                    .map(Map::entrySet)
 //                    .flatMap(Flux::fromIterable)
 //                    .collectMap(Map.Entry::getKey, Map.Entry::getValue)
-                        .map(map -> {
-                            Map<String, Object> val = new HashMap<>(map);
-                            val.put(aggregation.getName(), bucket.getKeyAsString());
-                            val.put("_" + aggregation.getName(), bucket.getKey());
-                            return val;
-                        })
+                             .map(map -> {
+                                 Map<String, Object> val = new HashMap<>(map);
+                                 val.put(aggregation.getName(), bucket.getKeyAsString());
+                                 val.put("_" + aggregation.getName(), bucket.getKey());
+                                 return val;
+                             }),
+                     Integer.MAX_VALUE
             );
     }
 
     private Flux<Map<String, Object>> parseAggregation(Terms aggregation) {
 
         return Flux.fromIterable(aggregation.getBuckets())
-            .flatMap(bucket -> Flux.fromIterable(bucket.getAggregations().asList())
-                .flatMap(agg -> parseAggregation(agg.getName(), agg)
-                    .map(map -> {
-                        Map<String, Object> val = new HashMap<>(map);
-                        val.put(aggregation.getName(), bucket.getKeyAsString());
-                        return val;
-                    })
-                ));
+                   .flatMap(bucket -> Flux.fromIterable(bucket.getAggregations().asList())
+                                          .flatMap(agg -> parseAggregation(agg.getName(), agg)
+                                              .map(map -> {
+                                                  Map<String, Object> val = new HashMap<>(map);
+                                                  val.put(aggregation.getName(), bucket.getKeyAsString());
+                                                  return val;
+                                              })
+                                          ));
     }
 
     protected static QueryParam prepareQueryParam(AggregationQueryParam param) {
@@ -299,7 +306,9 @@ public class ReactiveAggregationService implements AggregationService {
     }
 
     //聚合查询默认的时间间隔
-    static long thirtyDayMillis = Duration.ofDays(Integer.getInteger("elasticsearch.agg.default-range-day", 90)).toMillis();
+    static long thirtyDayMillis = Duration
+        .ofDays(Integer.getInteger("elasticsearch.agg.default-range-day", 90))
+        .toMillis();
 
     private static long calculateStartWithTime(AggregationQueryParam param) {
         long startWithParam = param.getStartWithTime();
@@ -327,4 +336,5 @@ public class ReactiveAggregationService implements AggregationService {
         return startWithParam;
     }
 
+
 }

+ 1 - 1
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceCategoryController.java

@@ -60,7 +60,7 @@ public class DeviceCategoryController {
 
         } catch (Exception e) {
             statics = new ArrayList<>();
-            log.error(e.getMessage(), e);
+            DeviceCategoryController.log.error(e.getMessage(), e);
         }
     }