|
@@ -30,9 +30,11 @@ import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.FluxSink;
|
|
import reactor.core.publisher.FluxSink;
|
|
import reactor.core.publisher.Mono;
|
|
import reactor.core.publisher.Mono;
|
|
|
|
|
|
|
|
+import java.time.Duration;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.concurrent.CopyOnWriteArrayList;
|
|
import java.util.concurrent.CopyOnWriteArrayList;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
+import java.util.concurrent.atomic.AtomicReference;
|
|
import java.util.concurrent.atomic.LongAdder;
|
|
import java.util.concurrent.atomic.LongAdder;
|
|
import java.util.function.Function;
|
|
import java.util.function.Function;
|
|
|
|
|
|
@@ -108,7 +110,8 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
- disposable.add(tcpServer.handleConnection()
|
|
|
|
|
|
+ disposable.add(tcpServer
|
|
|
|
+ .handleConnection()
|
|
.flatMap(client -> {
|
|
.flatMap(client -> {
|
|
counter.increment();
|
|
counter.increment();
|
|
gatewayMonitor.totalConnection(counter.intValue());
|
|
gatewayMonitor.totalConnection(counter.intValue());
|
|
@@ -118,6 +121,7 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
|
|
gatewayMonitor.totalConnection(counter.sum());
|
|
gatewayMonitor.totalConnection(counter.sum());
|
|
sessionManager.unregister(client.getId());
|
|
sessionManager.unregister(client.getId());
|
|
});
|
|
});
|
|
|
|
+ AtomicReference<Duration> keepaliveTimeout = new AtomicReference<>();
|
|
|
|
|
|
return client.subscribe()
|
|
return client.subscribe()
|
|
.filter(r -> started.get())
|
|
.filter(r -> started.get())
|
|
@@ -140,6 +144,11 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
|
|
public Mono<Boolean> send(EncodedMessage encodedMessage) {
|
|
public Mono<Boolean> send(EncodedMessage encodedMessage) {
|
|
return super.send(encodedMessage).doOnSuccess(r -> gatewayMonitor.sentMessage());
|
|
return super.send(encodedMessage).doOnSuccess(r -> gatewayMonitor.sentMessage());
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void setKeepAliveTimeout(Duration timeout) {
|
|
|
|
+ keepaliveTimeout.set(timeout);
|
|
|
|
+ }
|
|
};
|
|
};
|
|
}
|
|
}
|
|
return session;
|
|
return session;
|
|
@@ -156,12 +165,16 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
|
|
.flatMap(device -> {
|
|
.flatMap(device -> {
|
|
//设备上线
|
|
//设备上线
|
|
if (message instanceof DeviceOnlineMessage) {
|
|
if (message instanceof DeviceOnlineMessage) {
|
|
- sessionManager.register(new TcpDeviceSession(client.getId(), device, client, getTransport()) {
|
|
|
|
|
|
+ TcpDeviceSession session = new TcpDeviceSession(client.getId(), device, client, getTransport()) {
|
|
@Override
|
|
@Override
|
|
public Mono<Boolean> send(EncodedMessage encodedMessage) {
|
|
public Mono<Boolean> send(EncodedMessage encodedMessage) {
|
|
return super.send(encodedMessage).doOnSuccess(r -> gatewayMonitor.sentMessage());
|
|
return super.send(encodedMessage).doOnSuccess(r -> gatewayMonitor.sentMessage());
|
|
}
|
|
}
|
|
- });
|
|
|
|
|
|
+ };
|
|
|
|
+ if (keepaliveTimeout.get() != null) {
|
|
|
|
+ session.setKeepAliveTimeout(keepaliveTimeout.get());
|
|
|
|
+ }
|
|
|
|
+ sessionManager.register(session);
|
|
return Mono.empty();
|
|
return Mono.empty();
|
|
}
|
|
}
|
|
//设备下线
|
|
//设备下线
|
|
@@ -174,9 +187,7 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
|
|
}
|
|
}
|
|
return clientMessageHandler.handleMessage(device, message);
|
|
return clientMessageHandler.handleMessage(device, message);
|
|
}))
|
|
}))
|
|
- .onErrorContinue((err, o) -> {
|
|
|
|
- log.error(err.getMessage(), err);
|
|
|
|
- });
|
|
|
|
|
|
+ .onErrorContinue((err, o) -> log.error(err.getMessage(), err));
|
|
}).subscribe());
|
|
}).subscribe());
|
|
}
|
|
}
|
|
|
|
|