Browse Source

增加设备会话统计指标

zhouhao 3 năm trước cách đây
mục cha
commit
824e6241cd

+ 268 - 0
jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/session/DeviceSessionMeasurementProvider.java

@@ -0,0 +1,268 @@
+package org.jetlinks.community.network.manager.session;
+
+import com.google.common.collect.Maps;
+import org.hswebframework.web.utils.DigestUtils;
+import org.jetlinks.core.device.DeviceConfigKey;
+import org.jetlinks.core.device.session.DeviceSessionManager;
+import org.jetlinks.core.metadata.ConfigMetadata;
+import org.jetlinks.core.metadata.DataType;
+import org.jetlinks.core.metadata.DefaultConfigMetadata;
+import org.jetlinks.core.metadata.SimplePropertyMetadata;
+import org.jetlinks.core.metadata.types.*;
+import org.jetlinks.core.server.session.DeviceSession;
+import org.jetlinks.community.PropertyConstants;
+import org.jetlinks.community.dashboard.*;
+import org.jetlinks.community.dashboard.supports.StaticMeasurement;
+import org.jetlinks.community.dashboard.supports.StaticMeasurementProvider;
+import org.jetlinks.community.gateway.monitor.measurements.GatewayObjectDefinition;
+import org.jetlinks.community.timeseries.TimeSeriesData;
+import org.jetlinks.community.timeseries.TimeSeriesManager;
+import org.jetlinks.community.timeseries.TimeSeriesMetadata;
+import org.jetlinks.community.timeseries.TimeSeriesMetric;
+import org.jetlinks.community.timeseries.query.AggregationQueryParam;
+import org.jetlinks.community.utils.TimeUtils;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.reactivestreams.Publisher;
+import org.springframework.stereotype.Component;
+import reactor.core.Disposable;
+import reactor.core.Disposables;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.time.Duration;
+import java.util.Date;
+import java.util.Map;
+
+import static org.jetlinks.community.timeseries.query.Aggregation.DISTINCT_COUNT;
+
+/**
+ * 设备会话监控统计支持
+ * <p>
+ * 获取设备在线量统计(通过统计设备在线时长监控)
+ *
+ * <pre>{@code
+ * POST /dashboard/_multi
+ * [
+ *     {
+ *         "dashboard": "device",
+ *         "object": "session",
+ *         "measurement": "online",
+ *         "dimension": "agg",
+ *         "group": "sameDay",
+ *         "params": {
+ *              "format":"yyyy-MM-dd",
+ *              "interval":"1d",
+ *              "from":"now-1d",
+ *              "to":"now",
+ *              "type":"bytesSent",
+ *              "limit":10
+ *         }
+ *     }
+ * ]
+ *
+ * [
+ * {
+ * 	"group": "sameDay",
+ * 	"data": {
+ * 		"value": 3003,
+ * 		"timeString": "2022-05-30",
+ * 		"timestamp": 1653840000000
+ *     }
+ * }
+ * ]
+ *
+ *
+ *
+ * }</pre>
+ *
+ * @author zhouhao
+ * @since 2.0
+ */
+@Component
+public class DeviceSessionMeasurementProvider extends StaticMeasurementProvider {
+
+    private final DeviceSessionManager sessionManager;
+
+    private final TimeSeriesMetric metric;
+
+    private final TimeSeriesManager timeSeriesManager;
+
+    //定时记录会话信息等间隔,此配置直接影响设备在线数量可统计的周期,默认为1小时. 值越小,数据量越大.
+    private final Duration interval = TimeUtils.parse(System.getProperty("device.session.report.interval", "1h"));
+
+    private final Disposable.Composite disposable = Disposables.composite();
+
+    public DeviceSessionMeasurementProvider(DeviceSessionManager sessionManager,
+                                            TimeSeriesManager timeSeriesManager) {
+        super(DefaultDashboardDefinition.device, GatewayObjectDefinition.session);
+
+        this.sessionManager = sessionManager;
+        this.metric = TimeSeriesMetric.of("device_session_metric");
+        this.timeSeriesManager = timeSeriesManager;
+
+        addMeasurement(new StaticMeasurement(MeasurementDefinition.of("online", "在线统计"))
+                           .addDimension(new DeviceSessionMeasurementDimension())
+        );
+    }
+
+
+    class DeviceSessionMeasurementDimension implements MeasurementDimension {
+
+        @Override
+        public DimensionDefinition getDefinition() {
+            return CommonDimensionDefinition.agg;
+        }
+
+        @Override
+        public DataType getValueType() {
+            return new ObjectType();
+        }
+
+        @Override
+        public ConfigMetadata getParams() {
+            return new DefaultConfigMetadata();
+        }
+
+        @Override
+        public boolean isRealTime() {
+            return false;
+        }
+
+        private AggregationQueryParam createQuery(MeasurementParameter parameter) {
+            return AggregationQueryParam
+                .of()
+                .agg("deviceId", "count", DISTINCT_COUNT)
+                .groupBy(parameter.getInterval("interval", parameter.getInterval("time", null)),
+                         parameter.getString("format", "yyyy-MM-dd"))
+                .from(parameter.getDate("from", TimeUtils.parseDate("now-1d")))
+                .to(parameter.getDate("to").orElseGet(Date::new))
+                .filter(q -> q
+                    .is("name", "duration")
+                    //产品ID
+                    .is("productId", parameter.getString("productId", null))
+                    //设备ID
+                    .is("deviceId", parameter.getString("deviceId", null))
+                );
+
+        }
+
+        @Override
+        public Publisher<? extends Object> getValue(MeasurementParameter parameter) {
+            AggregationQueryParam queryParam = createQuery(parameter);
+            String format = parameter.getString("format", "yyyy-MM-dd");
+            DateTimeFormatter formatter = DateTimeFormat.forPattern(format);
+
+            return timeSeriesManager
+                .getService(metric)
+                .aggregation(queryParam)
+                .map(data -> SimpleMeasurementValue.of(
+                    data.getLong("count", 0),
+                    data.getString("time", ""),
+                    data.getString("time").map(formatter::parseMillis).orElse(0L)
+                ))
+                .sort()
+                .take(parameter.getInt("limit", 30));
+        }
+    }
+
+    @PostConstruct
+    public void init() {
+        timeSeriesManager
+            .registerMetadata(
+                TimeSeriesMetadata.of(
+                    metric,
+                    SimplePropertyMetadata.of("server", "server", StringType.GLOBAL),
+                    SimplePropertyMetadata.of("duration", "duration", IntType.GLOBAL),
+                    SimplePropertyMetadata.of("count", "count", IntType.GLOBAL),
+                    SimplePropertyMetadata.of("bindings", "bindings", new ArrayType().elementType(StringType.GLOBAL)),
+                    SimplePropertyMetadata.of("connectTime", "connectTime", DateTimeType.GLOBAL)
+                ))
+            .block(Duration.ofSeconds(30));
+
+        Scheduler scheduler = Schedulers.newSingle("device-session-reporter");
+
+        disposable.add(scheduler);
+
+        if (!interval.isZero() && !interval.isNegative()) {
+            disposable.add(
+                Flux
+                    .interval(interval, scheduler)
+                    .flatMap(ignore -> reportDeviceSession())
+                    .subscribe()
+            );
+        }
+
+        disposable.add(
+            sessionManager
+                .listenEvent(event -> reportDeviceSession(event.getSession(), "session"))
+        );
+
+    }
+
+    @PreDestroy
+    public void shutdown() {
+        disposable.dispose();
+    }
+
+    private Mono<Void> reportDeviceSession() {
+        return sessionManager
+            .getSessions()
+            .flatMap(this::reportDeviceSession)
+            .onErrorResume(err -> Mono.empty())
+            .then();
+    }
+
+    protected Mono<Void> reportDeviceSession(DeviceSession session) {
+        return reportDeviceSession(session, "check");
+    }
+
+    private long computeDuration(long timestamp) {
+        return System.currentTimeMillis() - timestamp;
+    }
+
+    protected Mono<Void> reportDeviceSession(DeviceSession session, String type) {
+        if (null == session.getOperator()) {
+            return Mono.empty();
+        }
+        return session
+            .getOperator()
+            .getSelfConfigs(DeviceConfigKey.productId)
+            .map(configs -> {
+                Map<String, Object> data = Maps.newHashMapWithExpectedSize(16);
+                //以连接时间等构造id减少重复数据
+                data.put("id", DigestUtils.md5Hex(
+                    String.join(
+                        "-",
+                        session.getDeviceId(),
+                        String.valueOf(session.connectTime()),
+                        sessionManager.getCurrentServerId(),
+                        //如果是会话注册注销,则更新原始会话记录
+                        "session".equals(type) ? "session" : String.valueOf((int) (System.currentTimeMillis() / this.interval.toMillis()))
+                    )
+                ));
+                data.put("name", "duration");
+                data.put("productId", configs.getValue(DeviceConfigKey.productId).orElse(""));
+                data.put("deviceId", session.getDeviceId());
+                //所在集群节点ID
+                data.put("server", sessionManager.getCurrentServerId());
+                //上线时间
+                data.put("connectTime", session.connectTime());
+                //在线时长
+                data.put("duration", computeDuration(session.connectTime()));
+                data.put("type", type);
+                return TimeSeriesData.of(System.currentTimeMillis(), data);
+            })
+            .as(
+                timeSeriesManager
+                    .getService(metric)
+                    ::commit
+            )
+            .onErrorResume(err -> Mono.empty());
+    }
+
+}