Browse Source

优化状态同步

zhouhao 2 years ago
parent
commit
1bc1ee0fdd

+ 81 - 16
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/DeviceMessageBusinessHandler.java

@@ -1,15 +1,21 @@
 package org.jetlinks.community.device.service;
 
 import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.MapUtils;
 import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
 import org.jetlinks.community.PropertyConstants;
+import org.jetlinks.community.buffer.PersistenceBuffer;
+import org.jetlinks.community.configure.cluster.Cluster;
 import org.jetlinks.community.device.entity.DeviceInstanceEntity;
 import org.jetlinks.community.device.entity.DeviceTagEntity;
 import org.jetlinks.community.device.enums.DeviceFeature;
 import org.jetlinks.community.device.enums.DeviceState;
 import org.jetlinks.community.gateway.annotation.Subscribe;
+import org.jetlinks.community.utils.ErrorUtils;
 import org.jetlinks.core.device.DeviceConfigKey;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
@@ -18,17 +24,26 @@ import org.jetlinks.core.event.Subscription;
 import org.jetlinks.core.message.*;
 import org.jetlinks.core.metadata.DeviceMetadata;
 import org.jetlinks.core.utils.FluxUtils;
+import org.jetlinks.core.utils.Reactors;
 import org.jetlinks.reactor.ql.utils.CastUtils;
 import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec;
+import org.springframework.dao.QueryTimeoutException;
 import org.springframework.stereotype.Component;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
+import reactor.core.Disposable;
+import reactor.core.Disposables;
 import reactor.core.publisher.BufferOverflowStrategy;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
 import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 import java.time.Duration;
 import java.util.Date;
 import java.util.HashMap;
@@ -51,6 +66,8 @@ public class DeviceMessageBusinessHandler {
 
     private final EventBus eventBus;
 
+    private final Disposable.Composite disposable = Disposables.composite();
+
     /**
      * 自动注册设备信息
      * <p>
@@ -285,6 +302,40 @@ public class DeviceMessageBusinessHandler {
     }
 
 
+
+    @AllArgsConstructor
+    @NoArgsConstructor
+    @Getter
+    @Setter
+    public static class StateBuf implements Externalizable {
+        //有效期一小时
+        static long expires = Duration.ofHours(1).toMillis();
+
+        private String id;
+        private long time;
+
+        @Override
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeUTF(id);
+            out.writeLong(time);
+        }
+
+        @Override
+        public void readExternal(ObjectInput in) throws IOException {
+            id = in.readUTF();
+            time = in.readLong();
+        }
+
+        public boolean isEffective() {
+            return System.currentTimeMillis() - time < expires;
+        }
+    }
+
+    @PreDestroy
+    public void shutdown() {
+        disposable.dispose();
+    }
+
     @PostConstruct
     public void init() {
 
@@ -295,22 +346,36 @@ public class DeviceMessageBusinessHandler {
             .justLocal()//只订阅本地
             .build();
 
-        //订阅设备上下线消息,同步数据库中的设备状态,
-        //最小间隔800毫秒,最大缓冲数量500,最长间隔2秒.
-        //如果2条消息间隔大于0.8秒则不缓冲直接更新
-        //否则缓冲,数量超过500后批量更新
-        //无论缓冲区是否超过500条,都每2秒更新一次.
-        FluxUtils.bufferRate(eventBus
-                                 .subscribe(subscription, DeviceMessage.class)
-                                 .map(DeviceMessage::getDeviceId),
-                             800, Integer.getInteger("device.state.sync.batch", 500), Duration.ofSeconds(2))
-                 .onBackpressureBuffer(64,
-                                       list -> log.warn("无法处理更多设备状态同步!"),
-                                       BufferOverflowStrategy.DROP_OLDEST)
-                 .publishOn(Schedulers.boundedElastic(), 64)
-                 .concatMap(list -> deviceService.syncStateBatch(Flux.just(list), false).map(List::size))
-                 .onErrorContinue((err, obj) -> log.error(err.getMessage(), err))
-                 .subscribe((i) -> log.info("同步设备状态成功:{}", i));
+        //缓冲同步设备上线信息,在突发大量上下线的情况,减少数据库的压力
+        PersistenceBuffer<StateBuf> buffer =
+            new PersistenceBuffer<>(
+                "./data/device-state-buffer",
+                "device-state.queue",
+                StateBuf::new,
+                flux -> deviceService
+                    .syncStateBatch(flux
+                                        .filter(StateBuf::isEffective)
+                                        .map(StateBuf::getId)
+                                        .distinct()
+                                        .collectList()
+                                        .flux(), false)
+                    .then(Reactors.ALWAYS_FALSE))
+                .name("device-state-synchronizer")
+                .parallelism(1)
+                .bufferTimeout(Duration.ofSeconds(1))
+                .retryWhenError(e -> ErrorUtils
+                    .hasException(e,
+                                  IOException.class,
+                                  QueryTimeoutException.class))
+                .bufferSize(1000);
+
+        buffer.start();
+
+        disposable.add(eventBus
+                           .subscribe(subscription, DeviceMessage.class)
+                           .subscribe(msg -> buffer.write(new StateBuf(msg.getDeviceId(), msg.getTimestamp()))));
+
+        disposable.add(buffer);
 
     }