浏览代码

优化聚合查询

zhou-hao 4 年之前
父节点
当前提交
3e0c6cb2bc

+ 46 - 26
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveAggregationService.java

@@ -52,6 +52,7 @@ import java.util.stream.Collectors;
 @Slf4j
 public class ReactiveAggregationService implements AggregationService {
 
+
     private final ReactiveElasticsearchClient restClient;
 
     private final ElasticSearchIndexManager indexManager;
@@ -129,17 +130,20 @@ public class ReactiveAggregationService implements AggregationService {
             groups.add(aggregationQueryParam.getGroupByTime());
         }
         groups.addAll(aggregationQueryParam.getGroupBy());
-        AggregationBuilder aggregationBuilder;
-        AggregationBuilder lastAggBuilder;
+        List<AggregationBuilder> aggs = new ArrayList<>();
+
+        AggregationBuilder aggregationBuilder = null;
+        AggregationBuilder lastAgg = null;
         if (!groups.isEmpty()) {
             Group first = groups.get(0);
-            aggregationBuilder = lastAggBuilder = createBuilder(first, aggregationQueryParam);
+            aggregationBuilder = lastAgg = createBuilder(first, aggregationQueryParam);
             for (int i = 1; i < groups.size(); i++) {
-                aggregationBuilder.subAggregation(lastAggBuilder = createBuilder(groups.get(i), aggregationQueryParam));
+                aggregationBuilder.subAggregation(lastAgg = createBuilder(groups.get(i), aggregationQueryParam));
             }
-        } else {
-            aggregationBuilder = lastAggBuilder = AggregationBuilders.count("count");
+            aggs.add(aggregationBuilder);
         }
+
+        boolean group = aggregationBuilder != null;
         for (AggregationColumn aggColumn : aggregationQueryParam.getAggColumns()) {
             AggregationBuilder builder = AggType.of(aggColumn.getAggregation().name())
                 .aggregationBuilder(aggColumn.getAlias(), aggColumn.getProperty());
@@ -160,29 +164,42 @@ public class ReactiveAggregationService implements AggregationService {
                     topHitsBuilder.size(1);
                 }
             }
-            lastAggBuilder.subAggregation(builder);
+            if (group) {
+                lastAgg.subAggregation(builder);
+            } else {
+                aggs.add(builder);
+            }
         }
 
-        AggregationBuilder ageBuilder = aggregationBuilder;
-
         return Flux.fromArray(index)
             .flatMap(idx -> Mono.zip(indexManager.getIndexStrategy(idx), Mono.just(idx)))
             .collectList()
             .flatMap(strategy ->
                 this
                     .createSearchSourceBuilder(queryParam, index[0])
-                    .map(builder ->
-                        new SearchRequest(strategy
-                            .stream()
-                            .map(tp2 -> tp2.getT1().getIndexForSearch(tp2.getT2()))
-                            .toArray(String[]::new))
-                            .indicesOptions(DefaultElasticSearchService.indexOptions)
-                            .source(builder.size(0).aggregation(ageBuilder))
+                    .map(builder -> {
+                            aggs.forEach(builder.size(0)::aggregation);
+                            return new SearchRequest(strategy
+                                .stream()
+                                .map(tp2 -> tp2.getT1().getIndexForSearch(tp2.getT2()))
+                                .toArray(String[]::new))
+                                .indicesOptions(DefaultElasticSearchService.indexOptions)
+                                .source(builder);
+                        }
                     )
             )
             .flatMap(restClient::searchForPage)
             .flatMapMany(this::parseResult)
-            .as(flux -> aggregationQueryParam.getLimit() > 0 ? flux.take(aggregationQueryParam.getLimit()) : flux)
+            .as(flux -> {
+                if (!group) {
+                    return flux
+                        .map(Map::entrySet)
+                        .flatMap(Flux::fromIterable)
+                        .collectMap(Map.Entry::getKey, Map.Entry::getValue)
+                        .flux();
+                }
+                return flux;
+            })
             ;
     }
 
@@ -230,15 +247,18 @@ public class ReactiveAggregationService implements AggregationService {
         return Flux
             .fromIterable(aggregation.getBuckets())
             .flatMap(bucket ->
-                Flux.fromIterable(bucket.getAggregations().asList())
-                    .flatMap(agg -> this.parseAggregation(agg.getName(), agg))
-                    .defaultIfEmpty(Collections.emptyMap())
-                    .map(map -> {
-                        Map<String, Object> val = new HashMap<>(map);
-                        val.put(aggregation.getName(), bucket.getKeyAsString());
-                        val.put("_" + aggregation.getName(), bucket.getKey());
-                        return val;
-                    })
+                    Flux.fromIterable(bucket.getAggregations().asList())
+                        .flatMap(agg -> this.parseAggregation(agg.getName(), agg))
+                        .defaultIfEmpty(Collections.emptyMap())
+//                    .map(Map::entrySet)
+//                    .flatMap(Flux::fromIterable)
+//                    .collectMap(Map.Entry::getKey, Map.Entry::getValue)
+                        .map(map -> {
+                            Map<String, Object> val = new HashMap<>(map);
+                            val.put(aggregation.getName(), bucket.getKeyAsString());
+                            val.put("_" + aggregation.getName(), bucket.getKey());
+                            return val;
+                        })
             );
     }
 

+ 33 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceProperty.java

@@ -1,5 +1,6 @@
 package org.jetlinks.community.device.entity;
 
+import io.swagger.v3.oas.annotations.Hidden;
 import io.swagger.v3.oas.annotations.media.Schema;
 import lombok.Getter;
 import lombok.Setter;
@@ -8,6 +9,9 @@ import org.jetlinks.community.timeseries.query.AggregationData;
 import org.jetlinks.core.metadata.Converter;
 import org.jetlinks.core.metadata.DataType;
 import org.jetlinks.core.metadata.PropertyMetadata;
+import org.jetlinks.core.metadata.types.GeoPoint;
+import org.jetlinks.core.metadata.types.NumberType;
+import org.jetlinks.core.metadata.types.ObjectType;
 
 import java.io.Serializable;
 
@@ -23,9 +27,21 @@ public class DeviceProperty implements Serializable {
     @Schema(description = "属性ID")
     private String property;
 
+    @Schema(description = "属性名")
+    private String propertyName;
+
     @Schema(description = "类型")
     private String type;
 
+    @Hidden
+    private Object numberValue;
+
+    @Hidden
+    private Object objectValue;
+
+    @Hidden
+    private GeoPoint geoValue;
+
     @Schema(description = "属性值")
     private Object value;
 
@@ -38,6 +54,9 @@ public class DeviceProperty implements Serializable {
     @Schema(description = "数据时间")
     private long timestamp;
 
+    @Schema(description = "格式化后的时间,在聚合查询时此字段有值")
+    private String formatTime;
+
     public DeviceProperty deviceId(String deviceId) {
         this.deviceId = deviceId;
         return this;
@@ -48,8 +67,15 @@ public class DeviceProperty implements Serializable {
         return this;
     }
 
+    public DeviceProperty formatTime(String formatTime) {
+        this.formatTime = formatTime;
+        return this;
+    }
+
     public DeviceProperty withProperty(PropertyMetadata metadata) {
+
         if (metadata != null) {
+            setPropertyName(metadata.getName());
             DataType type = metadata.getValueType();
             Object value = this.getValue();
             try {
@@ -57,6 +83,13 @@ public class DeviceProperty implements Serializable {
                     value = ((Converter<?>) type).convert(value);
                     this.setValue(value);
                 }
+                if (type instanceof NumberType) {
+                    setNumberValue(value);
+                }
+                if (type instanceof ObjectType) {
+                    setObjectValue(value);
+                }
+
                 this.setFormatValue(type.format(value));
             } catch (Exception ignore) {
 

+ 22 - 7
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/AbstractDeviceDataStoragePolicy.java

@@ -1,6 +1,7 @@
 package org.jetlinks.community.device.service.data;
 
 import com.alibaba.fastjson.JSON;
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.collections.MapUtils;
 import org.hswebframework.ezorm.core.param.TermType;
 import org.hswebframework.web.api.crud.entity.PagerResult;
@@ -11,6 +12,7 @@ import org.jetlinks.core.device.DeviceProductOperator;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.message.DeviceMessage;
 import org.jetlinks.core.message.DeviceMessageReply;
+import org.jetlinks.core.message.Headers;
 import org.jetlinks.core.message.event.EventMessage;
 import org.jetlinks.core.message.property.ReadPropertyMessageReply;
 import org.jetlinks.core.message.property.ReportPropertyMessage;
@@ -116,7 +118,7 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
 
     protected String createDataId(DeviceMessage message) {
         long ts = message.getTimestamp();
-        return String.join("_", message.getDeviceId(), String.valueOf(createUniqueNanoTime(ts)));
+        return DigestUtils.md5Hex(String.join("_", message.getDeviceId(), String.valueOf(createUniqueNanoTime(ts))));
     }
 
     protected Mono<Tuple2<String, TimeSeriesData>> createDeviceMessageLog(String productId,
@@ -317,14 +319,20 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
                 .getMetadata()
                 .map(metadata -> {
                     int size = properties.size();
-
+                    String id;
+                    //强制使用时间戳作为数据ID
+                    if (message.getHeader(Headers.useTimestampAsId).orElse(false)) {
+                        id = String.join("_", message.getDeviceId(), String.valueOf(message.getTimestamp()));
+                    } else {
+                        id = createDataId(message);
+                    }
                     Map<String, Object> newData = new HashMap<>(size < 5 ? 16 : (int) ((size + 5) / 0.75D) + 1);
                     properties.forEach((k, v) -> newData.put(k, convertPropertyValue(v, metadata.getPropertyOrNull(k))));
                     newData.put("deviceId", message.getDeviceId());
                     newData.put("productId", productId);
                     newData.put("timestamp", message.getTimestamp());
                     newData.put("createTime", System.currentTimeMillis());
-                    newData.put("id", createDataId(message));
+                    newData.put("id", DigestUtils.md5Hex(id));
                     return Tuples.of(getPropertyTimeSeriesMetric(productId), TimeSeriesData.of(message.getTimestamp(), newData));
                 }));
     }
@@ -344,13 +352,20 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
                     .fromIterable(properties.entrySet())
                     .index()
                     .map(entry -> {
-                        long ts = message.getTimestamp() + entry.getT1();
-                        String id = String.join("_", message.getDeviceId(), String.valueOf(createUniqueNanoTime(ts)));
+                        String id;
+                        long ts = message.getTimestamp();
+                        String property = entry.getT2().getKey();
+                        //强制使用时间戳作为数据ID
+                        if (message.getHeader(Headers.useTimestampAsId).orElse(false)) {
+                            id = String.join("_", message.getDeviceId(), property, String.valueOf(message.getTimestamp()));
+                        } else {
+                            id = String.join("_", message.getDeviceId(), property, String.valueOf(createUniqueNanoTime(ts)));
+                        }
                         DevicePropertiesEntity entity = DevicePropertiesEntity.builder()
-                            .id(id)
+                            .id(DigestUtils.md5Hex(id))
                             .deviceId(device.getDeviceId())
                             .timestamp(ts)
-                            .property(entry.getT2().getKey())
+                            .property(property)
                             .productId(productId)
                             .createTime(System.currentTimeMillis())
                             .build()

+ 21 - 7
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesColumnDeviceDataStoragePolicy.java

@@ -2,6 +2,7 @@ package org.jetlinks.community.device.service.data;
 
 import org.hswebframework.web.api.crud.entity.PagerResult;
 import org.hswebframework.web.api.crud.entity.QueryParamEntity;
+import org.jetlinks.community.timeseries.query.*;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.message.DeviceMessage;
@@ -13,10 +14,8 @@ import org.jetlinks.community.device.entity.DeviceProperty;
 import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetadata;
 import org.jetlinks.community.timeseries.TimeSeriesData;
 import org.jetlinks.community.timeseries.TimeSeriesManager;
-import org.jetlinks.community.timeseries.query.AggregationData;
-import org.jetlinks.community.timeseries.query.AggregationQueryParam;
-import org.jetlinks.community.timeseries.query.Group;
-import org.jetlinks.community.timeseries.query.TimeGroup;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -29,6 +28,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric.devicePropertyMetric;
+import static org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric.devicePropertyMetricId;
 
 @Component
 public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDataStoragePolicy implements DeviceDataStoragePolicy {
@@ -190,6 +190,7 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat
     public Flux<AggregationData> aggregationPropertiesByProduct(@Nonnull String productId,
                                                                 @Nonnull DeviceDataService.AggregationRequest request,
                                                                 @Nonnull DeviceDataService.DevicePropertyAggregation... properties) {
+        org.joda.time.format.DateTimeFormatter formatter = DateTimeFormat.forPattern(request.getFormat());
 
         return AggregationQueryParam.of()
             .as(param -> {
@@ -198,13 +199,26 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat
                 }
                 return param;
             })
-            .groupBy((Group) new TimeGroup(request.interval, "time", request.format))
-            .limit(request.limit)
+            .as(param -> {
+                if (request.interval == null) {
+                    return param;
+                }
+                return param.groupBy((Group) new TimeGroup(request.interval, "time", request.format));
+            })
+            .limit(request.limit * properties.length)
             .from(request.from)
             .to(request.to)
             .filter(request.filter)
             .execute(timeSeriesManager.getService(getPropertyTimeSeriesMetric(productId))::aggregation)
-            .doOnNext(agg -> agg.values().remove("_time"))
+            .groupBy(agg -> agg.getString("time", ""))
+            .flatMap(group -> group
+                .map(AggregationData::asMap)
+                .reduce((a, b) -> {
+                    a.putAll(b);
+                    return a;
+                })
+                .map(AggregationData::of))
+            .sort(Comparator.<AggregationData, Date>comparing(agg -> DateTime.parse(agg.getString("time", ""), formatter).toDate()).reversed())
             ;
     }