|
@@ -7,11 +7,16 @@ import lombok.Setter;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.jetlinks.community.network.DefaultNetworkType;
|
|
|
import org.jetlinks.community.network.NetworkType;
|
|
|
+import org.jetlinks.community.network.tcp.client.TcpClient;
|
|
|
import org.jetlinks.community.network.tcp.client.VertxTcpClient;
|
|
|
import org.jetlinks.community.network.tcp.parser.PayloadParser;
|
|
|
+import reactor.core.publisher.EmitterProcessor;
|
|
|
+import reactor.core.publisher.Flux;
|
|
|
+import reactor.core.publisher.FluxSink;
|
|
|
|
|
|
import java.time.Duration;
|
|
|
import java.util.Collection;
|
|
|
+import java.util.function.Function;
|
|
|
import java.util.function.Supplier;
|
|
|
|
|
|
/**
|
|
@@ -19,7 +24,7 @@ import java.util.function.Supplier;
|
|
|
* @since 1.0
|
|
|
**/
|
|
|
@Slf4j
|
|
|
-public class VertxTcpServer extends AbstractTcpServer implements TcpServer {
|
|
|
+public class VertxTcpServer implements TcpServer {
|
|
|
|
|
|
Collection<NetServer> tcpServers;
|
|
|
|
|
@@ -29,12 +34,22 @@ public class VertxTcpServer extends AbstractTcpServer implements TcpServer {
|
|
|
private long keepAliveTimeout = Duration.ofMinutes(10).toMillis();
|
|
|
|
|
|
@Getter
|
|
|
- private String id;
|
|
|
+ private final String id;
|
|
|
+
|
|
|
+ private final EmitterProcessor<TcpClient> processor = EmitterProcessor.create(false);
|
|
|
+
|
|
|
+ private final FluxSink<TcpClient> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
|
|
|
|
|
|
public VertxTcpServer(String id) {
|
|
|
this.id = id;
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public Flux<TcpClient> handleConnection() {
|
|
|
+ return processor
|
|
|
+ .map(Function.identity());
|
|
|
+ }
|
|
|
+
|
|
|
private void execute(Runnable runnable) {
|
|
|
try {
|
|
|
runnable.run();
|
|
@@ -61,6 +76,11 @@ public class VertxTcpServer extends AbstractTcpServer implements TcpServer {
|
|
|
|
|
|
|
|
|
protected void acceptTcpConnection(NetSocket socket) {
|
|
|
+ if (!processor.hasDownstreams()) {
|
|
|
+ log.warn("not handler for tcp client[{}]", socket.remoteAddress());
|
|
|
+ socket.close();
|
|
|
+ return;
|
|
|
+ }
|
|
|
VertxTcpClient client = new VertxTcpClient(id + "_" + socket.remoteAddress());
|
|
|
client.setKeepAliveTimeoutMs(keepAliveTimeout);
|
|
|
try {
|
|
@@ -72,7 +92,7 @@ public class VertxTcpServer extends AbstractTcpServer implements TcpServer {
|
|
|
});
|
|
|
client.setRecordParser(parserSupplier.get());
|
|
|
client.setSocket(socket);
|
|
|
- received(client);
|
|
|
+ sink.next(client);
|
|
|
log.debug("accept tcp client [{}] connection", socket.remoteAddress());
|
|
|
} catch (Exception e) {
|
|
|
log.error("create tcp server client error", e);
|