瀏覽代碼

增加获取属性变更记录dashboard

zhouhao 5 年之前
父節點
當前提交
c352f9fc28

+ 29 - 0
jetlinks-components/common-component/src/main/java/org/jetlinks/community/ValueObject.java

@@ -15,31 +15,60 @@ public interface ValueObject {
         return get(name, Integer.class);
     }
 
+    default int getInt(String name, int defaultValue) {
+        return getInt(name).orElse(defaultValue);
+    }
+
     default Optional<Long> getLong(String name) {
         return get(name, Long.class);
     }
 
+    default long getLong(String name, long defaultValue) {
+        return getLong(name).orElse(defaultValue);
+    }
+
     default Optional<Duration> getDuration(String name) {
         return getString(name)
             .map(TimeUtils::parse);
     }
 
+    default Duration getDuration(String name, Duration defaultValue) {
+        return getDuration(name)
+            .orElse(defaultValue);
+    }
+
     default Optional<Date> getDate(String name) {
         return get(name, Date.class);
     }
 
+    default Date getDate(String name, Date defaultValue) {
+        return getDate(name).orElse(defaultValue);
+    }
+
     default Optional<Double> getDouble(String name) {
         return get(name, Double.class);
     }
 
+    default double getDouble(String name, double defaultValue) {
+        return getDouble(name).orElse(defaultValue);
+    }
+
     default Optional<String> getString(String name) {
         return get(name, String.class);
     }
 
+    default String getString(String name, String defaultValue) {
+        return getString(name).orElse(defaultValue);
+    }
+
     default Optional<Boolean> getBoolean(String name) {
         return get(name, Boolean.class);
     }
 
+    default boolean getBoolean(String name, boolean defaultValue) {
+        return getBoolean(name).orElse(defaultValue);
+    }
+
     default <T> Optional<T> get(String name, Class<T> type) {
         return get(name)
             .map(obj -> FastBeanCopier.DEFAULT_CONVERT.convert(obj, type, FastBeanCopier.EMPTY_CLASS_ARRAY));

+ 8 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceDashboardObject.java

@@ -56,10 +56,14 @@ public class DeviceDashboardObject implements DashboardObject {
     @Override
     public Flux<Measurement> getMeasurements() {
         return Flux.concat(
+
             productOperator.getMetadata()
                 .flatMapIterable(DeviceMetadata::getEvents)
                 .map(event -> new DeviceEventMeasurement(messageGateway, event, timeSeriesManager.getService(DeviceTimeSeriesMetric.deviceEventMetric(id, event.getId())))),
 
+            productOperator.getMetadata()
+                .map(metadata -> new DevicePropertiesMeasurement(messageGateway, metadata, timeSeriesManager.getService(DeviceTimeSeriesMetric.devicePropertyMetric(id)))),
+
             productOperator.getMetadata()
                 .flatMapIterable(DeviceMetadata::getProperties)
                 .map(event -> new DevicePropertyMeasurement(messageGateway, event, timeSeriesManager.getService(DeviceTimeSeriesMetric.devicePropertyMetric(id))))
@@ -68,6 +72,10 @@ public class DeviceDashboardObject implements DashboardObject {
 
     @Override
     public Mono<Measurement> getMeasurement(String id) {
+        if ("properties".equals(id)) {
+            return productOperator.getMetadata()
+                .map(metadata -> new DevicePropertiesMeasurement(messageGateway, metadata, timeSeriesManager.getService(DeviceTimeSeriesMetric.devicePropertyMetric(this.id))));
+        }
         return productOperator.getMetadata()
             .flatMap(metadata -> Mono.justOrEmpty(metadata.getEvent(id)))
             .<Measurement>map(event -> new DeviceEventMeasurement(messageGateway, event, timeSeriesManager.getService(DeviceTimeSeriesMetric.deviceEventMetric(this.id, event.getId()))))

+ 158 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertiesMeasurement.java

@@ -0,0 +1,158 @@
+package org.jetlinks.community.device.measurements;
+
+import org.hswebframework.web.api.crud.entity.QueryParamEntity;
+import org.jetlinks.core.message.property.ReadPropertyMessageReply;
+import org.jetlinks.core.message.property.ReportPropertyMessage;
+import org.jetlinks.core.message.property.WritePropertyMessageReply;
+import org.jetlinks.core.metadata.*;
+import org.jetlinks.core.metadata.types.ObjectType;
+import org.jetlinks.core.metadata.types.StringType;
+import org.jetlinks.community.dashboard.*;
+import org.jetlinks.community.dashboard.supports.StaticMeasurement;
+import org.jetlinks.community.device.message.DeviceMessageUtils;
+import org.jetlinks.community.gateway.MessageGateway;
+import org.jetlinks.community.gateway.Subscription;
+import org.jetlinks.community.timeseries.TimeSeriesService;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+class DevicePropertiesMeasurement extends StaticMeasurement {
+
+    private MessageGateway messageGateway;
+
+    private TimeSeriesService timeSeriesService;
+
+    private DeviceMetadata metadata;
+
+    public DevicePropertiesMeasurement(MessageGateway messageGateway, DeviceMetadata deviceMetadata, TimeSeriesService timeSeriesService) {
+        super(MeasurementDefinition.of("properties", "属性记录"));
+        this.messageGateway = messageGateway;
+        this.timeSeriesService = timeSeriesService;
+        this.metadata = deviceMetadata;
+        addDimension(new RealTimeDevicePropertyDimension());
+    }
+
+    Flux<SimpleMeasurementValue> fromHistory(String deviceId, int history) {
+        return history <= 0 ? Flux.empty() : Flux.fromIterable(metadata.getProperties())
+            .flatMap(propertyMetadata -> QueryParamEntity.newQuery()
+                .doPaging(0, history)
+                .where("deviceId", deviceId)
+                .and("property", propertyMetadata.getId())
+                .execute(timeSeriesService::query)
+                .map(data -> SimpleMeasurementValue.of(createValue(propertyMetadata.getId(), data.get("value").orElse(null)), data.getTimestamp()))
+                .sort(MeasurementValue.sort()));
+    }
+
+    Map<String, Object> createValue(String property, Object value) {
+        return metadata.getProperty(property)
+            .map(meta -> {
+                Map<String, Object> values = new HashMap<>();
+                DataType type = meta.getValueType();
+                Object val = type instanceof Converter ? ((Converter<?>) type).convert(value) : value;
+                values.put("formatValue", type.format(val));
+                values.put("value", val);
+                values.put("property", property);
+
+                return values;
+            })
+            .orElseGet(() -> {
+                Map<String, Object> values = new HashMap<>();
+                values.put("formatValue", value);
+                values.put("value", value);
+                values.put("property", property);
+                return values;
+            });
+    }
+
+    Flux<MeasurementValue> fromRealTime(String deviceId) {
+        return messageGateway
+            .subscribe(Stream.of(
+                "/device/" + deviceId + "/message/property/report"
+                , "/device/" + deviceId + "/message/property/*/reply")
+                .map(Subscription::new)
+                .collect(Collectors.toList()), true)
+            .flatMap(val -> Mono.justOrEmpty(DeviceMessageUtils.convert(val)))
+            .flatMap(msg -> {
+                if (msg instanceof ReportPropertyMessage) {
+                    return Mono.justOrEmpty(((ReportPropertyMessage) msg).getProperties());
+                }
+                if (msg instanceof ReadPropertyMessageReply) {
+                    return Mono.justOrEmpty(((ReadPropertyMessageReply) msg).getProperties());
+                }
+                if (msg instanceof WritePropertyMessageReply) {
+                    return Mono.justOrEmpty(((WritePropertyMessageReply) msg).getProperties());
+                }
+                return Mono.empty();
+            })
+            .flatMap(map -> Flux.fromIterable(map.entrySet()))
+            .map(kv -> SimpleMeasurementValue.of(createValue(kv.getKey(), kv.getValue()), System.currentTimeMillis()));
+    }
+
+    static ConfigMetadata configMetadata = new DefaultConfigMetadata()
+        .add("deviceId", "设备", "指定设备", new StringType().expand("selector", "device-selector"));
+
+    /**
+     * 实时设备事件
+     */
+    private class RealTimeDevicePropertyDimension implements MeasurementDimension {
+
+        @Override
+        public DimensionDefinition getDefinition() {
+            return CommonDimensionDefinition.realTime;
+        }
+
+        @Override
+        public DataType getValueType() {
+            SimplePropertyMetadata property = new SimplePropertyMetadata();
+            property.setId("property");
+            property.setName("属性");
+            property.setValueType(new StringType());
+
+            SimplePropertyMetadata value = new SimplePropertyMetadata();
+            value.setId("value");
+            value.setName("值");
+            value.setValueType(new StringType());
+
+            SimplePropertyMetadata formatValue = new SimplePropertyMetadata();
+            value.setId("formatValue");
+            value.setName("格式化值");
+            value.setValueType(new StringType());
+
+            return new ObjectType()
+                .addPropertyMetadata(property)
+                .addPropertyMetadata(value)
+                .addPropertyMetadata(formatValue);
+        }
+
+        @Override
+        public ConfigMetadata getParams() {
+            return configMetadata;
+        }
+
+        @Override
+        public boolean isRealTime() {
+            return true;
+        }
+
+        @Override
+        public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
+            return Mono.justOrEmpty(parameter.getString("deviceId"))
+                .flatMapMany(deviceId -> {
+                    int history = parameter.getInt("history").orElse(0);
+                    //合并历史数据和实时数据
+                    return Flux.concat(
+                        //查询历史数据
+                        fromHistory(deviceId, history)
+                        ,
+                        //从消息网关订阅实时事件消息
+                        fromRealTime(deviceId)
+                    );
+                });
+        }
+    }
+}

+ 15 - 1
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertyMeasurement.java

@@ -12,6 +12,7 @@ import org.jetlinks.community.gateway.MessageGateway;
 import org.jetlinks.community.gateway.Subscription;
 import org.jetlinks.community.timeseries.TimeSeriesService;
 import org.jetlinks.core.metadata.types.IntType;
+import org.jetlinks.core.metadata.types.ObjectType;
 import org.jetlinks.core.metadata.types.StringType;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -97,7 +98,20 @@ class DevicePropertyMeasurement extends StaticMeasurement {
 
         @Override
         public DataType getValueType() {
-            return metadata.getValueType();
+
+            SimplePropertyMetadata value = new SimplePropertyMetadata();
+            value.setId("value");
+            value.setName("值");
+            value.setValueType(metadata.getValueType());
+
+            SimplePropertyMetadata formatValue = new SimplePropertyMetadata();
+            value.setId("formatValue");
+            value.setName("格式化值");
+            value.setValueType(new StringType());
+
+            return new ObjectType()
+                .addPropertyMetadata(value)
+                .addPropertyMetadata(formatValue);
         }
 
         @Override

+ 6 - 7
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurement.java

@@ -112,17 +112,16 @@ class DeviceMessageMeasurement extends StaticMeasurement {
 
             return AggregationQueryParam.of()
                 .sum("count")
-                .groupBy(parameter.getDuration("time").orElse(Duration.ofHours(1)),
-                    "time",
-                    parameter.getString("format").orElse("MM月dd日 HH时"))
+                .groupBy(parameter.getDuration("time", Duration.ofHours(1)),
+                    parameter.getString("format", "MM月dd日 HH时"))
                 .filter(query ->
                     query.where("name", "message-count")
-                        .is("productId", parameter.getString("productId").orElse(null))
-                        .is("msgType", parameter.getString("msgType").orElse(null))
+                        .is("productId", parameter.getString("productId", null))
+                        .is("msgType", parameter.getString("msgType", null))
                 )
-                .limit(parameter.getInt("limit").orElse(1))
+                .limit(parameter.getInt("limit", 1))
                 .from(parameter.getDate("from").orElseGet(() -> Date.from(LocalDateTime.now().plusDays(-1).atZone(ZoneId.systemDefault()).toInstant())))
-                .to(parameter.getDate("to").orElse(new Date()))
+                .to(parameter.getDate("to").orElseGet(Date::new))
                 .execute(timeSeriesManager.getService(DeviceTimeSeriesMetric.deviceMetrics())::aggregation)
                 .index((index, data) -> SimpleMeasurementValue.of(
                     data.getInt("count").orElse(0),