|
@@ -6,12 +6,12 @@ import lombok.extern.slf4j.Slf4j;
|
|
|
import org.h2.mvstore.MVMap;
|
|
|
import org.h2.mvstore.MVStore;
|
|
|
import org.h2.mvstore.MVStoreException;
|
|
|
+import org.jetlinks.community.configure.cluster.Cluster;
|
|
|
import org.jetlinks.core.device.DeviceRegistry;
|
|
|
import org.jetlinks.core.device.session.DeviceSessionEvent;
|
|
|
import org.jetlinks.core.rpc.RpcManager;
|
|
|
import org.jetlinks.core.server.session.DeviceSession;
|
|
|
import org.jetlinks.core.server.session.PersistentSession;
|
|
|
-import org.jetlinks.community.configure.cluster.Cluster;
|
|
|
import org.jetlinks.supports.device.session.ClusterDeviceSessionManager;
|
|
|
import org.springframework.beans.BeansException;
|
|
|
import org.springframework.boot.CommandLineRunner;
|
|
@@ -23,16 +23,8 @@ import reactor.core.publisher.Mono;
|
|
|
|
|
|
import javax.annotation.Nonnull;
|
|
|
import java.io.File;
|
|
|
-import java.util.function.Function;
|
|
|
import java.util.function.Supplier;
|
|
|
|
|
|
-/**
|
|
|
- * 支持会话持久化的设备会话管理器,将{@link PersistentSession}持久化到本地磁盘,在服务器重启后
|
|
|
- * 将会对会话进行恢复.
|
|
|
- *
|
|
|
- * @author zhouhao
|
|
|
- * @since 1.20
|
|
|
- */
|
|
|
@Slf4j
|
|
|
public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager implements CommandLineRunner, ApplicationContextAware {
|
|
|
private Supplier<DeviceRegistry> registry;
|
|
@@ -98,12 +90,11 @@ public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager
|
|
|
public void shutdown() {
|
|
|
super.shutdown();
|
|
|
Flux.fromIterable(localSessions.values())
|
|
|
- .flatMap(Function.identity())
|
|
|
- .filter(session -> session.isWrapFrom(PersistentSession.class))
|
|
|
- .map(session -> session.unwrap(PersistentSession.class))
|
|
|
+ .filter(ref -> ref.loaded != null)
|
|
|
+ .filter(ref -> ref.loaded.isWrapFrom(PersistentSession.class))
|
|
|
+ .map(ref -> ref.loaded.unwrap(PersistentSession.class))
|
|
|
.as(this::tryPersistent)
|
|
|
.block();
|
|
|
- repository.store.compactMoveChunks();
|
|
|
repository.store.close();
|
|
|
}
|
|
|
|
|
@@ -142,7 +133,9 @@ public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager
|
|
|
.toSession(registry.get())
|
|
|
.doOnNext(session -> {
|
|
|
log.debug("resume session[{}]", session.getDeviceId());
|
|
|
- localSessions.putIfAbsent(session.getDeviceId(), Mono.just(session));
|
|
|
+ localSessions.putIfAbsent(session.getDeviceId(), new DeviceSessionRef(session.getDeviceId(),
|
|
|
+ this,
|
|
|
+ Mono.just(session)));
|
|
|
})
|
|
|
.onErrorResume((err) -> {
|
|
|
log.debug("resume session[{}] error", entity.getDeviceId(), err);
|