Forráskód Böngészése

优化session manager

zhou-hao 5 éve
szülő
commit
644df9bcb6

+ 27 - 30
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/DefaultDeviceSessionManager.java

@@ -4,6 +4,7 @@ import lombok.Getter;
 import lombok.Setter;
 import org.jetlinks.core.device.DeviceConfigKey;
 import org.jetlinks.core.device.DeviceRegistry;
+import org.jetlinks.core.device.DeviceState;
 import org.jetlinks.core.message.codec.Transport;
 import org.jetlinks.core.server.monitor.GatewayServerMonitor;
 import org.jetlinks.core.server.session.ChildrenDeviceSession;
@@ -30,6 +31,7 @@ import java.util.stream.Collectors;
  */
 public class DefaultDeviceSessionManager implements DeviceSessionManager {
 
+
     private final Map<String, DeviceSession> repository = new ConcurrentHashMap<>(4096);
 
     private final Map<String, Map<String, ChildrenDeviceSession>> children = new ConcurrentHashMap<>(4096);
@@ -50,23 +52,20 @@ public class DefaultDeviceSessionManager implements DeviceSessionManager {
     @Setter
     private DeviceRegistry registry;
 
-    private FluxProcessor<DeviceSession, DeviceSession> onDeviceRegister = EmitterProcessor.create(false);
-
-    private FluxProcessor<DeviceSession, DeviceSession> onDeviceUnRegister = EmitterProcessor.create(false);
+    private final FluxProcessor<DeviceSession, DeviceSession> onDeviceRegister = EmitterProcessor.create(false);
 
-    private FluxSink<DeviceSession> unregisterListener = onDeviceUnRegister.sink();
-    private FluxSink<DeviceSession> registerListener = onDeviceRegister.sink();
+    private final FluxProcessor<DeviceSession, DeviceSession> onDeviceUnRegister = EmitterProcessor.create(false);
+    private final EmitterProcessor<DeviceSession> unregisterHandler = EmitterProcessor.create(false);
 
+    private final FluxSink<DeviceSession> unregisterListener = onDeviceUnRegister.sink(FluxSink.OverflowStrategy.BUFFER);
+    private final FluxSink<DeviceSession> registerListener = onDeviceRegister.sink(FluxSink.OverflowStrategy.BUFFER);
+    private final FluxSink<DeviceSession> unregisterSession = unregisterHandler.sink(FluxSink.OverflowStrategy.BUFFER);
 
     private String serverId;
 
-    private Queue<Runnable> scheduleJobQueue = new ArrayDeque<>();
-
-    private EmitterProcessor<DeviceSession> unregisterHandler = EmitterProcessor.create(Integer.MAX_VALUE, false);
-
-    private FluxSink<DeviceSession> unregisterSession = unregisterHandler.sink();
+    private final Queue<Runnable> scheduleJobQueue = new ArrayDeque<>();
 
-    private Map<String, LongAdder> transportCounter = new ConcurrentHashMap<>();
+    private final Map<String, LongAdder> transportCounter = new ConcurrentHashMap<>();
 
     @Getter
     @Setter
@@ -78,7 +77,7 @@ public class DefaultDeviceSessionManager implements DeviceSessionManager {
 
     public void shutdown() {
         repository.values()
-            .parallelStream()
+            .stream()
             .map(DeviceSession::getId)
             .forEach(this::unregister);
     }
@@ -107,28 +106,26 @@ public class DefaultDeviceSessionManager implements DeviceSessionManager {
             .distinct()
             .publishOn(Schedulers.parallel())
             .filterWhen(session -> {
-                if (!session.isAlive()) {
+                if (!session.isAlive() || session.getOperator() == null) {
                     return Mono.just(true);
                 }
-                return session
-                    .getOperator()
-                    .getConnectionServerId()
-                    .defaultIfEmpty("")
-                    .filter(s -> !serverId.equals(s))
+                return Mono.zip(
+                    session.getOperator().getState().defaultIfEmpty(DeviceState.offline),
+                    session.getOperator().getConnectionServerId().defaultIfEmpty("")
+                )
+                    .filter(tp2 -> !tp2.getT1().equals(DeviceState.online) || !tp2.getT2().equals(serverId))
                     .flatMap((ignore) -> {
-                        log.warn("device [{}] state error", session.getDeviceId());
                         //设备设备状态为在线
                         return session
                             .getOperator()
                             .online(serverId, session.getId())
                             .then(Mono.fromRunnable(() -> registerListener.next(session)));
                     })
-                    .flatMap(ignore -> session.getOperator().online(serverId, session.getId()))
                     .thenReturn(false);
             })
             .map(DeviceSession::getId)
             .doOnNext(this::unregister)
-            .collect(Collectors.counting())
+            .count()
             .doOnNext((l) -> {
                 if (log.isInfoEnabled() && l > 0) {
                     log.info("expired sessions:{}", l);
@@ -154,7 +151,6 @@ public class DefaultDeviceSessionManager implements DeviceSessionManager {
                     log.info("check session complete,current server sessions:{}.use time:{}ms.",
                         transportCounter,
                         System.currentTimeMillis() - startWith.get());
-
                 }
             });
     }
@@ -171,7 +167,7 @@ public class DefaultDeviceSessionManager implements DeviceSessionManager {
         executorService.scheduleAtFixedRate(() -> this.checkSession().subscribe(), 10, 30, TimeUnit.SECONDS);
 
         unregisterHandler
-            .subscribeOn(Schedulers.parallel())
+            .publishOn(Schedulers.parallel())
             .flatMap(session -> {
                 //注册中心下线
                 return session.getOperator()
@@ -231,7 +227,7 @@ public class DefaultDeviceSessionManager implements DeviceSessionManager {
                 .getDevice(childrenDeviceId)
                 .switchIfEmpty(Mono.fromRunnable(() -> log.warn("children device [{}] not fond in registry", childrenDeviceId)))
                 .flatMap(deviceOperator -> deviceOperator
-                    .online(session.getServerId().orElse(serverId), session.getId())
+                    .online(session.getServerId().orElse(serverId), session.getId(), session.getClientAddress().map(String::valueOf).orElse(null))
                     .then(deviceOperator.setConfig(DeviceConfigKey.parentGatewayId, deviceId))
                     .thenReturn(new ChildrenDeviceSession(childrenDeviceId, session, deviceOperator)))
                 .doOnSuccess(s -> children.computeIfAbsent(deviceId, __ -> new ConcurrentHashMap<>()).put(childrenDeviceId, s));
@@ -269,11 +265,13 @@ public class DefaultDeviceSessionManager implements DeviceSessionManager {
             repository.put(session.getId(), session);
         }
         if (null != old) {
-            //1. 可能是多个设备使用了相同的id.
-            //2. 可能是同一个设备,注销后立即上线,由于种种原因,先处理了上线后处理了注销逻辑.
-            log.warn("device[{}] session exists,disconnect old session:{}", old.getDeviceId(), old);
-            //加入关闭连接队列
-            scheduleJobQueue.add(old::close);
+            if (!old.equals(session)) {
+                //1. 可能是多个设备使用了相同的id.
+                //2. 可能是同一个设备,注销后立即上线,由于种种原因,先处理了上线后处理了注销逻辑.
+                log.warn("device[{}] session exists,disconnect old session:{}", old.getDeviceId(), session);
+                //加入关闭连接队列
+                scheduleJobQueue.add(old::close);
+            }
         } else {
             //本地计数
             transportCounter
@@ -340,7 +338,6 @@ public class DefaultDeviceSessionManager implements DeviceSessionManager {
             transportCounter
                 .computeIfAbsent(session.getTransport().getId(), transport -> new LongAdder())
                 .decrement();
-            // TODO: 2019/12/26 monitor
             if (unregisterHandler.getPending() > 0) {
                 log.info("pending unregister session:{}", unregisterHandler.getPending());
             }