|
@@ -23,6 +23,7 @@ import reactor.core.publisher.Mono;
|
|
|
|
|
|
import javax.annotation.Nonnull;
|
|
|
import java.io.File;
|
|
|
+import java.time.Duration;
|
|
|
import java.util.function.Supplier;
|
|
|
|
|
|
@Slf4j
|
|
@@ -35,6 +36,10 @@ public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager
|
|
|
@Setter
|
|
|
private String filePath;
|
|
|
|
|
|
+ @Getter
|
|
|
+ @Setter
|
|
|
+ private Duration flushInterval = Duration.ofMinutes(10);
|
|
|
+
|
|
|
public PersistenceDeviceSessionManager(RpcManager rpcManager) {
|
|
|
super(rpcManager);
|
|
|
}
|
|
@@ -72,6 +77,24 @@ public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager
|
|
|
}
|
|
|
repository = initStore(filePath);
|
|
|
|
|
|
+ if (!flushInterval.isZero() && !flushInterval.isNegative()) {
|
|
|
+ disposable.add(
|
|
|
+ Flux
|
|
|
+ .interval(flushInterval)
|
|
|
+ .onBackpressureDrop()
|
|
|
+ .concatMap(ignore -> Flux
|
|
|
+ .fromIterable(localSessions.values())
|
|
|
+ .mapNotNull(ref -> {
|
|
|
+ if (ref.loaded != null && ref.loaded.isWrapFrom(PersistentSession.class)) {
|
|
|
+ return ref.loaded.unwrap(PersistentSession.class);
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ })
|
|
|
+ .as(this::tryPersistent), 1)
|
|
|
+ .subscribe()
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
disposable.add(
|
|
|
listenEvent(event -> {
|
|
|
//移除持久化的会话
|
|
@@ -95,6 +118,7 @@ public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager
|
|
|
.map(ref -> ref.loaded.unwrap(PersistentSession.class))
|
|
|
.as(this::tryPersistent)
|
|
|
.block();
|
|
|
+ repository.store.compactMoveChunks();
|
|
|
repository.store.close();
|
|
|
}
|
|
|
|
|
@@ -133,10 +157,10 @@ public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager
|
|
|
.toSession(registry.get())
|
|
|
.doOnNext(session -> {
|
|
|
log.debug("resume session[{}]", session.getDeviceId());
|
|
|
- localSessions.putIfAbsent(session.getDeviceId(), new DeviceSessionRef(
|
|
|
- session.getDeviceId(),
|
|
|
- this,
|
|
|
- session));
|
|
|
+ localSessions.putIfAbsent(session.getDeviceId(),
|
|
|
+ new DeviceSessionRef(session.getDeviceId(),
|
|
|
+ this,
|
|
|
+ session));
|
|
|
})
|
|
|
.onErrorResume((err) -> {
|
|
|
log.debug("resume session[{}] error", entity.getDeviceId(), err);
|