소스 검색

优化监控

zhouhao 5 년 전
부모
커밋
abdebab063

+ 28 - 1
jetlinks-components/dashboard-component/src/main/java/org/jetlinks/community/dashboard/MeasurementValue.java

@@ -1,6 +1,10 @@
 package org.jetlinks.community.dashboard;
 
-public interface MeasurementValue {
+import org.hswebframework.ezorm.rdb.operator.dml.query.SortOrder;
+
+import java.util.Comparator;
+
+public interface MeasurementValue  extends Comparable<MeasurementValue> {
 
     Object getValue();
 
@@ -8,4 +12,27 @@ public interface MeasurementValue {
 
     long getTimestamp();
 
+    //默认排序,时间大的在前.
+    @Override
+    default int compareTo(MeasurementValue o) {
+        return Long.compare(o.getTimestamp(), getTimestamp());
+    }
+
+    static Comparator<MeasurementValue> sort(SortOrder.Order order) {
+        return order == SortOrder.Order.asc ? sort() : sortDesc();
+    }
+
+    Comparator<MeasurementValue> asc = Comparator.comparing(MeasurementValue::getTimestamp);
+
+    Comparator<MeasurementValue> desc = asc.reversed();
+
+    //返回排序对比器,时间小的在前
+    static Comparator<MeasurementValue> sort() {
+        return asc;
+    }
+
+    static Comparator<MeasurementValue> sortDesc() {
+        return desc;
+    }
+
 }

+ 112 - 109
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/monitor/measurements/DeviceGatewayMeasurement.java

@@ -31,125 +31,128 @@ class DeviceGatewayMeasurement extends StaticMeasurement {
 
         private String property;
 
-        public DeviceGatewayMeasurement(MeasurementDefinition definition,
-                                        String property,
-                                        Aggregation defaultAgg,
-                                        TimeSeriesManager timeSeriesManager) {
-            super(definition);
-            this.timeSeriesManager = timeSeriesManager;
-            this.defaultAgg = defaultAgg;
-            this.type = definition.getId();
-            this.property = property;
-            addDimension(new AggDeviceStateDimension());
-            addDimension(new HistoryDimension());
+    public DeviceGatewayMeasurement(MeasurementDefinition definition,
+                                    String property,
+                                    Aggregation defaultAgg,
+                                    TimeSeriesManager timeSeriesManager) {
+        super(definition);
+        this.timeSeriesManager = timeSeriesManager;
+        this.defaultAgg = defaultAgg;
+        this.type = definition.getId();
+        this.property = property;
+        addDimension(new AggDeviceStateDimension());
+        addDimension(new HistoryDimension());
+    }
+
+    static ConfigMetadata historyConfigMetadata = new DefaultConfigMetadata()
+        .add("gatewayId", "网关", "", new StringType())
+        .add("time", "周期", "例如: 1h,10m,30s", new StringType())
+        .add("format", "时间格式", "如: MM-dd:HH", new StringType())
+        .add("limit", "最大数据量", "", new IntType())
+        .add("from", "时间从", "", new DateTimeType().format("yyyy-MM-dd HH:mm:ss"))
+        .add("to", "时间至", "", new DateTimeType().format("yyyy-MM-dd HH:mm:ss"));
+
+
+    class HistoryDimension implements MeasurementDimension {
+
+        @Override
+        public DimensionDefinition getDefinition() {
+            return CommonDimensionDefinition.history;
         }
 
-        static ConfigMetadata historyConfigMetadata = new DefaultConfigMetadata()
-            .add("gatewayId", "网关", "", new StringType())
-            .add("time", "周期", "例如: 1h,10m,30s", new StringType())
-            .add("format", "时间格式", "如: MM-dd:HH", new StringType())
-            .add("limit", "最大数据量", "", new IntType())
-            .add("from", "时间从", "", new DateTimeType().format("yyyy-MM-dd HH:mm:ss"))
-            .add("to", "时间至", "", new DateTimeType().format("yyyy-MM-dd HH:mm:ss"));
-
+        @Override
+        public DataType getValueType() {
+            return new IntType();
+        }
 
-        class HistoryDimension implements MeasurementDimension {
+        @Override
+        public ConfigMetadata getParams() {
+            return historyConfigMetadata;
+        }
 
-            @Override
-            public DimensionDefinition getDefinition() {
-                return CommonDimensionDefinition.history;
-            }
+        @Override
+        public boolean isRealTime() {
+            return false;
+        }
 
-            @Override
-            public DataType getValueType() {
-                return new IntType();
-            }
+        @Override
+        public Flux<SimpleMeasurementValue> getValue(MeasurementParameter parameter) {
+            return QueryParamEntity.newQuery()
+                .where("target", type)
+                .is("name", parameter.getString("gatewayId").orElse(null))
+                .doPaging(0, parameter.getInt("limit").orElse(1))
+                .between("timestamp",
+                    parameter.getDate("from").orElseGet(() -> Date.from(LocalDateTime.now().plusDays(-1).atZone(ZoneId.systemDefault()).toInstant())),
+                    parameter.getDate("to").orElseGet(Date::new)
+                )
+                .execute(timeSeriesManager.getService(GatewayTimeSeriesMetric.deviceGatewayMetric())::query)
+                .map(data -> SimpleMeasurementValue.of(
+                    data.getInt(property).orElse(0),
+                    data.getTimestamp()))
+                .sort(MeasurementValue.sort());
+        }
+    }
+
+
+    static ConfigMetadata aggConfigMetadata = new DefaultConfigMetadata()
+        .add("gatewayId", "网关", "", new StringType())
+        .add("time", "周期", "例如: 1h,10m,30s", new StringType())
+        .add("format", "时间格式", "如: MM-dd:HH", new StringType())
+        .add("agg", "聚合方式", "", new EnumType()
+            .addElement(EnumType.Element.of("SUM", "总和"))
+            .addElement(EnumType.Element.of("MAX", "最大值"))
+            .addElement(EnumType.Element.of("MIN", "最小值"))
+            .addElement(EnumType.Element.of("AVG", "平局值"))
+        )
+        .add("limit", "最大数据量", "", new IntType())
+        .add("from", "时间从", "", new DateTimeType().format("yyyy-MM-dd HH:mm:ss"))
+        .add("to", "时间至", "", new DateTimeType().format("yyyy-MM-dd HH:mm:ss"));
+
+
+    //聚合数据
+    class AggDeviceStateDimension implements MeasurementDimension {
+
+        @Override
+        public DimensionDefinition getDefinition() {
+            return CommonDimensionDefinition.agg;
+        }
 
-            @Override
-            public ConfigMetadata getParams() {
-                return historyConfigMetadata;
-            }
+        @Override
+        public DataType getValueType() {
+            return new IntType();
+        }
 
-            @Override
-            public boolean isRealTime() {
-                return false;
-            }
+        @Override
+        public ConfigMetadata getParams() {
+            return aggConfigMetadata;
+        }
 
-            @Override
-            public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
-                return QueryParamEntity.newQuery()
-                    .where("target", type)
-                    .is("name", parameter.getString("gatewayId").orElse(null))
-                    .doPaging(0, parameter.getInt("limit").orElse(1))
-                    .between("timestamp",
-                        parameter.getDate("from").orElseGet(() -> Date.from(LocalDateTime.now().plusDays(-1).atZone(ZoneId.systemDefault()).toInstant())),
-                        parameter.getDate("to").orElseGet(Date::new)
-                    )
-                    .execute(timeSeriesManager.getService(GatewayTimeSeriesMetric.deviceGatewayMetric())::query)
-                    .map(data -> SimpleMeasurementValue.of(
-                        data.getInt(property).orElse(0),
-                        data.getTimestamp()));
-            }
+        @Override
+        public boolean isRealTime() {
+            return false;
         }
 
+        @Override
+        public Flux<SimpleMeasurementValue> getValue(MeasurementParameter parameter) {
 
-        static ConfigMetadata aggConfigMetadata = new DefaultConfigMetadata()
-            .add("gatewayId", "网关", "", new StringType())
-            .add("time", "周期", "例如: 1h,10m,30s", new StringType())
-            .add("format", "时间格式", "如: MM-dd:HH", new StringType())
-            .add("agg", "聚合方式", "", new EnumType()
-                .addElement(EnumType.Element.of("SUM", "总和"))
-                .addElement(EnumType.Element.of("MAX", "最大值"))
-                .addElement(EnumType.Element.of("MIN", "最小值"))
-                .addElement(EnumType.Element.of("AVG", "平局值"))
-            )
-            .add("limit", "最大数据量", "", new IntType())
-            .add("from", "时间从", "", new DateTimeType().format("yyyy-MM-dd HH:mm:ss"))
-            .add("to", "时间至", "", new DateTimeType().format("yyyy-MM-dd HH:mm:ss"));
-
-
-        //聚合数据
-        class AggDeviceStateDimension implements MeasurementDimension {
-
-            @Override
-            public DimensionDefinition getDefinition() {
-                return CommonDimensionDefinition.agg;
-            }
-
-            @Override
-            public DataType getValueType() {
-                return new IntType();
-            }
-
-            @Override
-            public ConfigMetadata getParams() {
-                return aggConfigMetadata;
-            }
-
-            @Override
-            public boolean isRealTime() {
-                return false;
-            }
-
-            @Override
-            public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
-
-                return AggregationQueryParam.of()
-                    .agg(property, parameter.get("agg", Aggregation.class).orElse(defaultAgg))
-                    .groupBy(parameter.getDuration("time").orElse(Duration.ofHours(1)),
-                        "time",
-                        parameter.getString("format").orElse("MM-dd:HH"))
-                    .filter(query -> query
-                        .where("target", type)
-                        .is("name", parameter.getString("gatewayId").orElse(null)))
-                    .limit(parameter.getInt("limit").orElse(1))
-                    .from(parameter.getDate("from").orElseGet(()->Date.from(LocalDateTime.now().plusDays(-1).atZone(ZoneId.systemDefault()).toInstant())))
-                    .to(parameter.getDate("to").orElse(new Date()))
-                    .execute(timeSeriesManager.getService(GatewayTimeSeriesMetric.deviceGatewayMetric())::aggregation)
-                    .map(data -> SimpleMeasurementValue.of(
-                        data.getInt(property).orElse(0),
-                        data.getString("time").orElse(""),
-                        System.currentTimeMillis()));
-            }
+            return AggregationQueryParam.of()
+                .agg(property, parameter.get("agg", Aggregation.class).orElse(defaultAgg))
+                .groupBy(parameter.getDuration("time").orElse(Duration.ofHours(1)),
+                    "time",
+                    parameter.getString("format").orElse("MM-dd:HH"))
+                .filter(query -> query
+                    .where("target", type)
+                    .is("name", parameter.getString("gatewayId").orElse(null)))
+                .limit(parameter.getInt("limit").orElse(1))
+                .from(parameter.getDate("from").orElseGet(() -> Date.from(LocalDateTime.now().plusDays(-1).atZone(ZoneId.systemDefault()).toInstant())))
+                .to(parameter.getDate("to").orElse(new Date()))
+                .execute(timeSeriesManager.getService(GatewayTimeSeriesMetric.deviceGatewayMetric())::aggregation)
+                .index((index, data) -> SimpleMeasurementValue.of(
+                    data.getInt(property).orElse(0),
+                    data.getString("time").orElse(""),
+                    index))
+                .sort();
         }
+    }
+
 }

+ 55 - 2
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DevicePropertiesEntity.java

@@ -1,12 +1,25 @@
 package org.jetlinks.community.device.entity;
 
-import lombok.Getter;
-import lombok.Setter;
+import lombok.*;
+import org.hswebframework.web.bean.FastBeanCopier;
+import org.jetlinks.core.metadata.DataType;
+import org.jetlinks.core.metadata.PropertyMetadata;
+import org.jetlinks.core.metadata.types.DateTimeType;
+import org.jetlinks.core.metadata.types.NumberType;
+import org.jetlinks.core.metadata.types.ObjectType;
 
 import java.math.BigDecimal;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.util.Optional.ofNullable;
 
 @Getter
 @Setter
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
 public class DevicePropertiesEntity {
 
     public String id;
@@ -32,4 +45,44 @@ public class DevicePropertiesEntity {
     private String orgId;
 
     private String productId;
+
+    private Date timeValue;
+
+
+    public Map<String, Object> toMap() {
+        return FastBeanCopier.copy(this, HashMap::new);
+    }
+
+    public DevicePropertiesEntity withValue(PropertyMetadata metadata, Object value) {
+        if (metadata == null) {
+            return this;
+        }
+        setProperty(metadata.getId());
+        setPropertyName(metadata.getName());
+        return withValue(metadata.getValueType(), value);
+
+    }
+
+    public DevicePropertiesEntity withValue(DataType type, Object value) {
+        if (value == null) {
+            return this;
+        }
+        if (type instanceof NumberType) {
+            NumberType<?> numberType = (NumberType<?>) type;
+            setNumberValue(new BigDecimal(numberType.convertNumber(value).toString()));
+        } else if (type instanceof DateTimeType) {
+            DateTimeType dateTimeType = (DateTimeType) type;
+            setTimeValue(dateTimeType.convert(value));
+        } else if (type instanceof ObjectType) {
+            ObjectType ObjectType = (ObjectType) type;
+            setObjectValue(ObjectType.convert(value));
+        } else {
+            setStringValue(String.valueOf(value));
+        }
+        ofNullable(type.format(value))
+            .map(String::valueOf)
+            .ifPresent(this::setFormatValue);
+
+        return this;
+    }
 }

+ 3 - 4
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceEventMeasurement.java

@@ -39,16 +39,15 @@ class DeviceEventMeasurement extends StaticMeasurement {
         .add("deviceId", "设备", "指定设备", new StringType().expand("selector", "device-selector"))
         .add("history", "历史数据量", "查询出历史数据后开始推送实时数据", new IntType().min(0).expand("defaultValue", 10));
 
-
-    Flux<MeasurementValue> fromHistory(String deviceId, int history) {
+    Flux<SimpleMeasurementValue> fromHistory(String deviceId, int history) {
         return history <= 0 ? Flux.empty() : QueryParamEntity.newQuery()
             .doPaging(0, history)
             .where("deviceId", deviceId)
             .execute(eventTsService::query)
-            .map(data -> SimpleMeasurementValue.of(data.getData(), data.getTimestamp()));
+            .map(data -> SimpleMeasurementValue.of(data.getData(), data.getTimestamp()))
+            .sort(MeasurementValue.sort());
     }
 
-
     Flux<MeasurementValue> fromRealTime(String deviceId) {
         return messageGateway
             .subscribe(Collections.singletonList(new Subscription("/device/" + deviceId + "/message/event/" + eventMetadata.getId())), true)

+ 4 - 2
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertyMeasurement.java

@@ -47,13 +47,14 @@ class DevicePropertyMeasurement extends StaticMeasurement {
         return values;
     }
 
-    Flux<MeasurementValue> fromHistory(String deviceId, int history) {
+    Flux<SimpleMeasurementValue> fromHistory(String deviceId, int history) {
         return history <= 0 ? Flux.empty() : QueryParamEntity.newQuery()
             .doPaging(0, history)
             .where("deviceId", deviceId)
             .and("property", metadata.getId())
             .execute(timeSeriesService::query)
-            .map(data -> SimpleMeasurementValue.of(createValue(data.get("value").orElse(null)), data.getTimestamp()));
+            .map(data -> SimpleMeasurementValue.of(createValue(data.get("value").orElse(null)), data.getTimestamp()))
+            .sort(MeasurementValue.sort());
     }
 
     Flux<MeasurementValue> fromRealTime(String deviceId) {
@@ -79,6 +80,7 @@ class DevicePropertyMeasurement extends StaticMeasurement {
             .filter(msg -> msg.containsKey(metadata.getId()))
             .map(msg -> SimpleMeasurementValue.of(createValue(msg.get(metadata.getId())), System.currentTimeMillis()));
     }
+
     static ConfigMetadata configMetadata = new DefaultConfigMetadata()
         .add("deviceId", "设备", "指定设备", new StringType().expand("selector", "device-selector"))
         .add("history", "历史数据量", "查询出历史数据后开始推送实时数据", new IntType().min(0).expand("defaultValue", 10));

+ 6 - 5
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurement.java

@@ -108,26 +108,27 @@ class DeviceMessageMeasurement extends StaticMeasurement {
         }
 
         @Override
-        public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
+        public Flux<SimpleMeasurementValue> getValue(MeasurementParameter parameter) {
 
             return AggregationQueryParam.of()
                 .sum("count")
                 .groupBy(parameter.getDuration("time").orElse(Duration.ofHours(1)),
                     "time",
-                    parameter.getString("format").orElse("MM-dd:HH"))
+                    parameter.getString("format").orElse("MM月dd日 HH时"))
                 .filter(query ->
                     query.where("name", "message-count")
                         .is("productId", parameter.getString("productId").orElse(null))
                         .is("msgType", parameter.getString("msgType").orElse(null))
                 )
                 .limit(parameter.getInt("limit").orElse(1))
-                .from(parameter.getDate("from").orElse(Date.from(LocalDateTime.now().plusDays(-1).atZone(ZoneId.systemDefault()).toInstant())))
+                .from(parameter.getDate("from").orElseGet(() -> Date.from(LocalDateTime.now().plusDays(-1).atZone(ZoneId.systemDefault()).toInstant())))
                 .to(parameter.getDate("to").orElse(new Date()))
                 .execute(timeSeriesManager.getService(DeviceTimeSeriesMetric.deviceMetrics())::aggregation)
-                .map(data -> SimpleMeasurementValue.of(
+                .index((index, data) -> SimpleMeasurementValue.of(
                     data.getInt("count").orElse(0),
                     data.getString("time").orElse(""),
-                    System.currentTimeMillis()));
+                    index))
+                .sort();
         }
     }
 

+ 5 - 4
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusChangeMeasurement.java

@@ -86,13 +86,13 @@ class DeviceStatusChangeMeasurement extends StaticMeasurement {
         }
 
         @Override
-        public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
+        public Flux<SimpleMeasurementValue> getValue(MeasurementParameter parameter) {
 
             return AggregationQueryParam.of()
                 .sum("count")
                 .groupBy(parameter.getDuration("time").orElse(Duration.ofHours(1)),
                     "time",
-                    parameter.getString("format").orElse("MM-dd:HH"))
+                    parameter.getString("format").orElse("MM月dd日 HH时"))
                 .filter(query ->
                     query.where("name", parameter.getString("type").orElse("online"))
                         .is("productId", parameter.getString("productId").orElse(null))
@@ -101,10 +101,11 @@ class DeviceStatusChangeMeasurement extends StaticMeasurement {
                 .from(parameter.getDate("from").orElse(Date.from(LocalDateTime.now().plusDays(-1).atZone(ZoneId.systemDefault()).toInstant())))
                 .to(parameter.getDate("to").orElse(new Date()))
                 .execute(timeSeriesManager.getService(DeviceTimeSeriesMetric.deviceMetrics())::aggregation)
-                .map(data -> SimpleMeasurementValue.of(
+                .index((index, data) -> SimpleMeasurementValue.of(
                     data.getInt("count").orElse(0),
                     data.getString("time").orElse(""),
-                    System.currentTimeMillis()));
+                    index))
+                .sort();
         }
     }
 

+ 7 - 6
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusRecordMeasurement.java

@@ -74,7 +74,7 @@ class DeviceStatusRecordMeasurement
         }
 
         @Override
-        public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
+        public Flux<SimpleMeasurementValue> getValue(MeasurementParameter parameter) {
             return AggregationQueryParam.of()
                 .max("value")
                 .filter(query ->
@@ -84,13 +84,14 @@ class DeviceStatusRecordMeasurement
                 .from(parameter.getDate("from").orElse(Date.from(LocalDateTime.now().plusDays(-30).atZone(ZoneId.systemDefault()).toInstant())))
                 .to(parameter.getDate("to").orElse(new Date()))
                 .groupBy(parameter.getDuration("time").orElse(Duration.ofDays(1)),
-                    parameter.getString("format").orElse("yyyy-MM-dd"))
+                    parameter.getString("format").orElse("yyyy年MM月dd日"))
                 .limit(parameter.getInt("limit").orElse(10))
                 .execute(timeSeriesManager.getService(DeviceTimeSeriesMetric.deviceMetrics())::aggregation)
-                .map(data ->
-                    SimpleMeasurementValue.of(data.getInt("value").orElse(0),
-                        data.getString("time").orElse("-"),
-                        System.currentTimeMillis()));
+                .index((index, data) -> SimpleMeasurementValue.of(
+                    data.getInt("value").orElse(0),
+                    data.getString("time").orElse("-"),
+                    index))
+                .sort();
         }
     }
 

+ 6 - 13
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceMessageController.java

@@ -84,25 +84,18 @@ public class DeviceMessageController {
             .flatMapMany(deviceOperator -> deviceOperator.messageSender()
                 .readProperty(property).messageId(IDGenerator.SNOW_FLAKE_STRING.generate())
                 .send()
-                .map(ReadPropertyMessageReply::getProperties)
+                .map(mapReply(ReadPropertyMessageReply::getProperties))
                 .flatMap(map -> {
                     Object value = map.get(property);
                     return deviceOperator.getMetadata()
                         .map(deviceMetadata -> deviceMetadata.getProperty(property)
                             .map(PropertyMetadata::getValueType)
                             .orElse(new StringType()))
-                        .map(dataType -> {
-                            DevicePropertiesEntity entity = new DevicePropertiesEntity();
-                            if (value != null) {
-                                entity.setDeviceId(deviceId);
-                                entity.setProperty(property);
-                                entity.setValue(value.toString());
-                                entity.setStringValue(value.toString());
-                                entity.setFormatValue(dataType.format(value).toString());
-
-                            }
-                            return entity;
-                        });
+                        .map(dataType -> DevicePropertiesEntity.builder()
+                            .deviceId(deviceId)
+                            .productId(property)
+                            .build()
+                            .withValue(dataType, value));
                 })))
             ;