Ver Fonte

增加tcp 模块

zhou-hao há 5 anos atrás
pai
commit
2300c65235
31 ficheiros alterados com 1551 adições e 0 exclusões
  1. 1 0
      jetlinks-components/network-component/pom.xml
  2. 29 0
      jetlinks-components/network-component/tcp-component/pom.xml
  3. 30 0
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/TcpMessage.java
  4. 33 0
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/AbstractTcpClient.java
  5. 51 0
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/TcpClient.java
  6. 34 0
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/TcpClientProperties.java
  7. 164 0
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClient.java
  8. 98 0
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClientProvider.java
  9. 43 0
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/DefaultPayloadParserBuilder.java
  10. 38 0
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/PayloadParser.java
  11. 9 0
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/PayloadParserBuilder.java
  12. 9 0
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/PayloadParserBuilderStrategy.java
  13. 30 0
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/PayloadParserType.java
  14. 24 0
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/strateies/DelimitedPayloadParserBuilder.java
  15. 22 0
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/strateies/FixLengthPayloadParserBuilder.java
  16. 134 0
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/strateies/PipePayloadParser.java
  17. 45 0
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/strateies/ScriptPayloadParserBuilder.java
  18. 59 0
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/strateies/VertxPayloadParserBuilder.java
  19. 31 0
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/AbstractTcpServer.java
  20. 27 0
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/TcpServer.java
  21. 49 0
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/TcpServerProperties.java
  22. 97 0
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/TcpServerProvider.java
  23. 95 0
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/VertxTcpServer.java
  24. 50 0
      jetlinks-components/network-component/tcp-component/src/test/java/org/jetlinks/community/network/tcp/client/VertxTcpClientProviderTest.java
  25. 56 0
      jetlinks-components/network-component/tcp-component/src/test/java/org/jetlinks/community/network/tcp/parser/strateies/FixLengthPayloadParserBuilderTest.java
  26. 89 0
      jetlinks-components/network-component/tcp-component/src/test/java/org/jetlinks/community/network/tcp/parser/strateies/PipePayloadParserTest.java
  27. 57 0
      jetlinks-components/network-component/tcp-component/src/test/java/org/jetlinks/community/network/tcp/parser/strateies/ScriptPayloadParserBuilderTest.java
  28. 70 0
      jetlinks-components/network-component/tcp-component/src/test/java/org/jetlinks/community/network/tcp/server/TcpServerProviderTest.java
  29. 22 0
      jetlinks-components/network-component/tcp-component/src/test/resources/jetlinks-client.pem
  30. 49 0
      jetlinks-components/network-component/tcp-component/src/test/resources/jetlinks-server.pem
  31. 6 0
      jetlinks-manager/network-manager/pom.xml

+ 1 - 0
jetlinks-components/network-component/pom.xml

@@ -14,6 +14,7 @@
     <modules>
     <modules>
         <module>network-core</module>
         <module>network-core</module>
         <module>mqtt-component</module>
         <module>mqtt-component</module>
+        <module>tcp-component</module>
     </modules>
     </modules>
 
 
     <artifactId>network-component</artifactId>
     <artifactId>network-component</artifactId>

+ 29 - 0
jetlinks-components/network-component/tcp-component/pom.xml

@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>network-component</artifactId>
+        <groupId>org.jetlinks.community</groupId>
+        <version>1.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>tcp-component</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>network-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>io.vertx</groupId>
+            <artifactId>vertx-core</artifactId>
+        </dependency>
+
+    </dependencies>
+
+</project>

+ 30 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/TcpMessage.java

@@ -0,0 +1,30 @@
+package org.jetlinks.community.network.tcp;
+
+import io.netty.buffer.ByteBuf;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import org.jetlinks.core.message.codec.EncodedMessage;
+import org.jetlinks.core.message.codec.MessagePayloadType;
+import org.jetlinks.rule.engine.executor.PayloadType;
+
+import javax.annotation.Nonnull;
+
+/**
+ * @author bsetfeng
+ * @author zhouhao
+ * @since 1.0
+ **/
+@Getter
+@Setter
+@AllArgsConstructor
+@NoArgsConstructor
+public class TcpMessage implements EncodedMessage {
+
+    private ByteBuf payload;
+
+    //private MessagePayloadType payloadType;
+
+
+}

+ 33 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/AbstractTcpClient.java

@@ -0,0 +1,33 @@
+package org.jetlinks.community.network.tcp.client;
+
+import lombok.extern.slf4j.Slf4j;
+import org.jetlinks.community.network.tcp.TcpMessage;
+import reactor.core.publisher.EmitterProcessor;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+
+import java.nio.charset.StandardCharsets;
+import java.util.function.Function;
+
+@Slf4j
+public abstract class AbstractTcpClient implements TcpClient {
+
+    private EmitterProcessor<TcpMessage> processor = EmitterProcessor.create(false);
+
+    private FluxSink<TcpMessage> sink = processor.sink();
+
+    protected void received(TcpMessage message) {
+        if (processor.getPending() > processor.getBufferSize() / 2) {
+            log.warn("not handler,drop tcp message:{}", message.getPayload().toString(StandardCharsets.UTF_8));
+            return;
+        }
+        sink.next(message);
+    }
+
+    @Override
+    public Flux<TcpMessage> subscribe() {
+        return processor
+            .map(Function.identity());
+    }
+
+}

+ 51 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/TcpClient.java

@@ -0,0 +1,51 @@
+package org.jetlinks.community.network.tcp.client;
+
+import org.jetlinks.community.network.tcp.parser.PayloadParser;
+import org.jetlinks.community.network.Network;
+import org.jetlinks.community.network.tcp.TcpMessage;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.net.InetSocketAddress;
+
+/**
+ * TCP 客户端
+ *
+ * @author zhouhao
+ * @version 1.0
+ */
+public interface TcpClient extends Network {
+
+    /**
+     * 获取客户端远程地址
+     * @return 客户端远程地址
+     */
+    InetSocketAddress getRemoteAddress();
+
+    /**
+     * 订阅TCP消息,此消息是已经处理过粘拆包的完整消息
+     *
+     * @return TCP消息
+     * @see PayloadParser
+     */
+    Flux<TcpMessage> subscribe();
+
+    /**
+     * 向客户端发送数据
+     * @param message 数据对象
+     * @return 发送结果
+     */
+    Mono<Boolean> send(TcpMessage message);
+
+    /**
+     *
+     * @param listener
+     */
+    void onDisconnect(Runnable listener);
+
+    /**
+     * 连接保活
+     */
+    void keepAlive();
+
+}

+ 34 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/TcpClientProperties.java

@@ -0,0 +1,34 @@
+package org.jetlinks.community.network.tcp.client;
+
+import io.vertx.core.net.NetClientOptions;
+import lombok.*;
+import org.jetlinks.community.network.tcp.parser.PayloadParserType;
+
+import java.util.Map;
+
+@Getter
+@Setter
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class TcpClientProperties {
+
+    private String id;
+
+    private int port;
+
+    private String host;
+
+    private String certId;
+
+    private boolean ssl;
+
+    private PayloadParserType parserType;
+
+    private Map<String,Object> parserConfiguration;
+
+    private NetClientOptions options;
+
+    private boolean enabled;
+
+}

+ 164 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClient.java

@@ -0,0 +1,164 @@
+package org.jetlinks.community.network.tcp.client;
+
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.net.NetClient;
+import io.vertx.core.net.NetSocket;
+import io.vertx.core.net.SocketAddress;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.jetlinks.community.network.DefaultNetworkType;
+import org.jetlinks.community.network.NetworkType;
+import org.jetlinks.community.network.tcp.TcpMessage;
+import org.jetlinks.community.network.tcp.parser.PayloadParser;
+import reactor.core.publisher.Mono;
+
+import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+@Slf4j
+public class VertxTcpClient extends AbstractTcpClient {
+
+    public volatile NetClient client;
+
+    public volatile NetSocket socket;
+
+    volatile PayloadParser payloadParser;
+
+    @Getter
+    private String id;
+
+    @Setter
+    private long keepAliveTimeout = Duration.ofMinutes(10).toMillis();
+
+    private volatile long lastKeepAliveTime = System.currentTimeMillis();
+
+    private List<Runnable> disconnectListener = new CopyOnWriteArrayList<>();
+
+    @Override
+    public void keepAlive() {
+        lastKeepAliveTime = System.currentTimeMillis();
+    }
+
+    @Override
+    public boolean isAlive() {
+        return socket != null && (System.currentTimeMillis() - lastKeepAliveTime < keepAliveTimeout);
+    }
+
+    @Override
+    public boolean isAutoReload() {
+        return true;
+    }
+
+    public VertxTcpClient(String id) {
+        this.id = id;
+    }
+
+
+    private void execute(Runnable runnable) {
+        try {
+            runnable.run();
+        } catch (Exception e) {
+            log.warn("close tcp client error", e);
+        }
+    }
+
+    @Override
+    public InetSocketAddress getRemoteAddress() {
+        if (null == socket) {
+            return null;
+        }
+        SocketAddress socketAddress = socket.remoteAddress();
+        return new InetSocketAddress(socketAddress.host(), socketAddress.port());
+    }
+
+    @Override
+    public NetworkType getType() {
+        return DefaultNetworkType.TCP_CLIENT;
+    }
+
+    @Override
+    public void shutdown() {
+        if (null != client) {
+            execute(client::close);
+            client = null;
+        }
+        if (null != socket) {
+            execute(socket::close);
+            socket = null;
+        }
+        if (null != payloadParser) {
+            execute(payloadParser::close);
+            payloadParser = null;
+        }
+        disconnectListener.clear();
+    }
+
+    public void setClient(NetClient client) {
+        if (this.client != null && this.client != client) {
+            this.client.close();
+        }
+        this.client = client;
+    }
+
+    public void setRecordParser(PayloadParser payloadParser) {
+        if (null != this.payloadParser && this.payloadParser != payloadParser) {
+            this.payloadParser.close();
+        }
+        this.payloadParser = payloadParser;
+        this.payloadParser
+            .handlePayload()
+            .onErrorContinue((err, res) -> log.error(err.getMessage(), err))
+            .subscribe(buffer -> received(new TcpMessage(buffer.getByteBuf())));
+    }
+
+    public void setSocket(NetSocket socket) {
+        Objects.requireNonNull(payloadParser);
+        if (this.socket != null && this.socket != socket) {
+            this.socket.close();
+        }
+        this.socket = socket;
+        this.socket.closeHandler(v -> {
+            for (Runnable runnable : disconnectListener) {
+                runnable.run();
+            }
+            disconnectListener.clear();
+        });
+        this.socket.handler(buffer -> {
+            keepAlive();
+            payloadParser.handle(buffer);
+            if (this.socket != socket) {
+                log.warn("tcp client [{}] memory leak ", socket.remoteAddress());
+                socket.close();
+            }
+        });
+    }
+
+    @Override
+    public Mono<Boolean> send(TcpMessage message) {
+        return Mono.<Boolean>create((sink) -> {
+            if (socket == null) {
+                sink.error(new SocketException("socket closed"));
+                return;
+            }
+            Buffer buffer = Buffer.buffer(message.getPayload());
+            socket.write(buffer, r -> {
+                keepAlive();
+                if (r.succeeded()) {
+                    sink.success(true);
+                } else {
+                    sink.error(r.cause());
+                }
+            });
+        });
+    }
+
+    @Override
+    public void onDisconnect(Runnable disconnected) {
+        disconnectListener.add(disconnected);
+    }
+}

+ 98 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClientProvider.java

@@ -0,0 +1,98 @@
+package org.jetlinks.community.network.tcp.client;
+
+import io.vertx.core.Vertx;
+import io.vertx.core.net.NetClient;
+import io.vertx.core.net.NetClientOptions;
+import lombok.extern.slf4j.Slf4j;
+import org.hswebframework.web.bean.FastBeanCopier;
+import org.jetlinks.community.network.*;
+import org.jetlinks.community.network.security.CertificateManager;
+import org.jetlinks.community.network.security.VertxKeyCertTrustOptions;
+import org.jetlinks.core.Values;
+import org.jetlinks.core.metadata.ConfigMetadata;
+import org.jetlinks.community.network.tcp.parser.PayloadParserBuilder;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Mono;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+@Component
+@Slf4j
+public class VertxTcpClientProvider implements NetworkProvider<TcpClientProperties> {
+
+    private final CertificateManager certificateManager;
+
+    private final PayloadParserBuilder payloadParserBuilder;
+
+    private final Vertx vertx;
+
+    public VertxTcpClientProvider(CertificateManager certificateManager, Vertx vertx, PayloadParserBuilder payloadParserBuilder) {
+        this.certificateManager = certificateManager;
+        this.vertx = vertx;
+        this.payloadParserBuilder = payloadParserBuilder;
+    }
+
+    @Nonnull
+    @Override
+    public NetworkType getType() {
+        return DefaultNetworkType.TCP_CLIENT;
+    }
+
+    @Nonnull
+    @Override
+    public VertxTcpClient createNetwork(@Nonnull TcpClientProperties properties) {
+        VertxTcpClient client = new VertxTcpClient(properties.getId());
+
+        initClient(client, properties);
+
+        return client;
+    }
+
+    @Override
+    public void reload(@Nonnull Network network, @Nonnull TcpClientProperties properties) {
+        initClient(((VertxTcpClient) network), properties);
+    }
+
+    public void initClient(VertxTcpClient client, TcpClientProperties properties) {
+        client.setRecordParser(payloadParserBuilder.build(properties.getParserType(), Values.of(properties.getParserConfiguration())));
+        NetClient netClient = vertx.createNetClient(properties.getOptions());
+        client.setClient(netClient);
+        netClient.connect(properties.getPort(), properties.getHost(), result -> {
+            if (result.succeeded()) {
+                log.debug("connect tcp [{}:{}] success", properties.getHost(), properties.getPort());
+                client.setSocket(result.result());
+            } else {
+                log.error("connect tcp [{}:{}] error", properties.getHost(), properties.getPort(),result.cause());
+            }
+        });
+    }
+
+    @Nullable
+    @Override
+    public ConfigMetadata getConfigMetadata() {
+        // TODO: 2019/12/19
+        return null;
+    }
+
+    @Nonnull
+    @Override
+    public Mono<TcpClientProperties> createConfig(@Nonnull NetworkProperties properties) {
+        return Mono.defer(() -> {
+            TcpClientProperties config = FastBeanCopier.copy(properties.getConfigurations(), new TcpClientProperties());
+            config.setId(properties.getId());
+            if (config.getOptions() == null) {
+                config.setOptions(new NetClientOptions());
+            }
+            if (config.isSsl()) {
+                config.getOptions().setSsl(true);
+                return certificateManager.getCertificate(config.getCertId())
+                        .map(VertxKeyCertTrustOptions::new)
+                        .doOnNext(config.getOptions()::setKeyCertOptions)
+                        .doOnNext(config.getOptions()::setTrustOptions)
+                        .thenReturn(config);
+            }
+            return Mono.just(config);
+        });
+    }
+}

+ 43 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/DefaultPayloadParserBuilder.java

@@ -0,0 +1,43 @@
+package org.jetlinks.community.network.tcp.parser;
+
+import org.jetlinks.community.network.tcp.parser.strateies.ScriptPayloadParserBuilder;
+import org.jetlinks.core.Values;
+import org.jetlinks.community.network.tcp.parser.strateies.DelimitedPayloadParserBuilder;
+import org.jetlinks.community.network.tcp.parser.strateies.FixLengthPayloadParserBuilder;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Component
+public class DefaultPayloadParserBuilder implements PayloadParserBuilder, BeanPostProcessor {
+
+    private Map<PayloadParserType, PayloadParserBuilderStrategy> strategyMap = new ConcurrentHashMap<>();
+
+    public DefaultPayloadParserBuilder(){
+        register(new FixLengthPayloadParserBuilder());
+        register(new DelimitedPayloadParserBuilder());
+        register(new ScriptPayloadParserBuilder());
+    }
+    @Override
+    public PayloadParser build(PayloadParserType type, Values configuration) {
+        return Optional.ofNullable(strategyMap.get(type))
+                .map(builder -> builder.build(configuration))
+                .orElseThrow(() -> new UnsupportedOperationException("unsupported parser:" + type));
+    }
+
+    public void register(PayloadParserBuilderStrategy strategy) {
+        strategyMap.put(strategy.getType(), strategy);
+    }
+
+    @Override
+    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
+        if (bean instanceof PayloadParserBuilderStrategy) {
+            register(((PayloadParserBuilderStrategy) bean));
+        }
+        return bean;
+    }
+}

+ 38 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/PayloadParser.java

@@ -0,0 +1,38 @@
+package org.jetlinks.community.network.tcp.parser;
+
+import io.vertx.core.buffer.Buffer;
+import org.jetlinks.community.network.tcp.parser.strateies.DelimitedPayloadParserBuilder;
+import org.jetlinks.community.network.tcp.parser.strateies.FixLengthPayloadParserBuilder;
+import org.jetlinks.community.network.tcp.parser.strateies.PipePayloadParser;
+import reactor.core.publisher.Flux;
+
+/**
+ * 用于处理TCP粘拆包的解析器,通常一个客户端对应一个解析器.
+ *
+ * @author zhouhao
+ * @see PipePayloadParser
+ * @see FixLengthPayloadParserBuilder
+ * @see DelimitedPayloadParserBuilder
+ * @since 1.0
+ */
+public interface PayloadParser {
+
+    /**
+     * 处理一个数据包
+     *
+     * @param buffer 数据包
+     */
+    void handle(Buffer buffer);
+
+    /**
+     * 订阅完整的数据包流,每一个元素为一个完整的数据包
+     *
+     * @return 完整数据包流
+     */
+    Flux<Buffer> handlePayload();
+
+    /**
+     * 关闭以释放相关资源
+     */
+    void close();
+}

+ 9 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/PayloadParserBuilder.java

@@ -0,0 +1,9 @@
+package org.jetlinks.community.network.tcp.parser;
+
+import org.jetlinks.core.Values;
+
+public interface PayloadParserBuilder {
+
+    PayloadParser build(PayloadParserType type, Values configuration);
+
+}

+ 9 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/PayloadParserBuilderStrategy.java

@@ -0,0 +1,9 @@
+package org.jetlinks.community.network.tcp.parser;
+
+import org.jetlinks.core.Values;
+
+public interface PayloadParserBuilderStrategy {
+    PayloadParserType getType();
+
+    PayloadParser build(Values config);
+}

+ 30 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/PayloadParserType.java

@@ -0,0 +1,30 @@
+package org.jetlinks.community.network.tcp.parser;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import org.hswebframework.web.dict.Dict;
+import org.hswebframework.web.dict.EnumDict;
+import org.jetlinks.community.network.tcp.parser.strateies.PipePayloadParser;
+import org.jetlinks.community.network.tcp.parser.strateies.ScriptPayloadParserBuilder;
+
+@Getter
+@AllArgsConstructor
+@Dict("tcp-payload-parser-type")
+public enum PayloadParserType implements EnumDict<String> {
+    FIXED_LENGTH("固定长度"),
+
+    DELIMITED("分隔符"),
+
+    /**
+     * @see ScriptPayloadParserBuilder
+     * @see PipePayloadParser
+     */
+    SCRIPT("自定义脚本")
+    ;
+
+    private String text;
+    @Override
+    public String getValue() {
+        return name();
+    }
+}

+ 24 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/strateies/DelimitedPayloadParserBuilder.java

@@ -0,0 +1,24 @@
+package org.jetlinks.community.network.tcp.parser.strateies;
+
+import io.vertx.core.parsetools.RecordParser;
+import org.apache.commons.lang.StringEscapeUtils;
+import org.jetlinks.community.network.tcp.parser.PayloadParserType;
+import org.jetlinks.core.Value;
+import org.jetlinks.core.Values;
+
+public class DelimitedPayloadParserBuilder extends VertxPayloadParserBuilder {
+    @Override
+    public PayloadParserType getType() {
+        return PayloadParserType.DELIMITED;
+    }
+
+    @Override
+    protected RecordParser createParser(Values config) {
+
+        return RecordParser.newDelimited(StringEscapeUtils.unescapeJava(config.getValue("delimited")
+                .map(Value::asString)
+                .orElseThrow(() -> new IllegalArgumentException("delimited can not be null"))));
+    }
+
+
+}

+ 22 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/strateies/FixLengthPayloadParserBuilder.java

@@ -0,0 +1,22 @@
+package org.jetlinks.community.network.tcp.parser.strateies;
+
+import io.vertx.core.parsetools.RecordParser;
+import org.jetlinks.community.network.tcp.parser.PayloadParserType;
+import org.jetlinks.core.Value;
+import org.jetlinks.core.Values;
+
+public class FixLengthPayloadParserBuilder extends VertxPayloadParserBuilder {
+    @Override
+    public PayloadParserType getType() {
+        return PayloadParserType.FIXED_LENGTH;
+    }
+
+    @Override
+    protected RecordParser createParser(Values config) {
+        return RecordParser.newFixed(config.getValue("size")
+                .map(Value::asInt)
+                .orElseThrow(() -> new IllegalArgumentException("size can not be null")));
+    }
+
+
+}

+ 134 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/strateies/PipePayloadParser.java

@@ -0,0 +1,134 @@
+package org.jetlinks.community.network.tcp.parser.strateies;
+
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.parsetools.RecordParser;
+import lombok.extern.slf4j.Slf4j;
+import org.jetlinks.community.network.tcp.parser.PayloadParser;
+import reactor.core.publisher.EmitterProcessor;
+import reactor.core.publisher.Flux;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * <pre>
+ * PipePayloadParser parser = new PipePayloadParser();
+ * parser.fixed(4)
+ *       .handler(buffer -> {
+ *            int len = BytesUtils.highBytes2Int(buffer.getBytes());
+ *            parser.fixed(len);
+ *         })
+ *       .handler(buffer -> parser.result(buffer.toString("UTF-8")).complete());
+ * </pre>
+ */
+@Slf4j
+public class PipePayloadParser implements PayloadParser {
+
+    private EmitterProcessor<Buffer> processor = EmitterProcessor.create(true);
+
+    private List<Consumer<Buffer>> pipe = new CopyOnWriteArrayList<>();
+
+    private List<Buffer> result = new CopyOnWriteArrayList<>();
+
+    private volatile RecordParser recordParser;
+
+    private Consumer<RecordParser> firstInit;
+
+    private AtomicInteger currentPipe = new AtomicInteger();
+
+    public PipePayloadParser result(String buffer) {
+        return result(Buffer.buffer(buffer));
+    }
+
+    public PipePayloadParser result(byte[] buffer) {
+        return result(Buffer.buffer(buffer));
+    }
+
+    public PipePayloadParser handler(Consumer<Buffer> handler) {
+        pipe.add(handler);
+        return this;
+    }
+
+    public PipePayloadParser delimited(String delimited) {
+        if (recordParser == null) {
+            setParser(RecordParser.newDelimited(delimited));
+            firstInit = (parser -> parser.delimitedMode(delimited));
+            return this;
+        }
+        recordParser.delimitedMode(delimited);
+        return this;
+    }
+
+    public PipePayloadParser fixed(int size) {
+        if (recordParser == null) {
+            setParser(RecordParser.newFixed(size));
+            firstInit = (parser -> parser.fixedSizeMode(size));
+            return this;
+        }
+        recordParser.fixedSizeMode(size);
+        return this;
+    }
+
+    private Consumer<Buffer> getNextHandler() {
+        int i = currentPipe.getAndIncrement();
+        if (i < pipe.size()) {
+            return pipe.get(i);
+        }
+        currentPipe.set(0);
+        return pipe.get(0);
+    }
+
+    private void setParser(RecordParser parser) {
+        this.recordParser = parser;
+        this.recordParser.handler(buffer -> getNextHandler().accept(buffer));
+    }
+
+    public PipePayloadParser complete() {
+        currentPipe.set(0);
+        firstInit.accept(recordParser);
+//        if (!processor.hasDownstreams()) {
+//            this.result.clear();
+//            return this;
+//        }
+        if (!this.result.isEmpty()) {
+            Buffer buffer = Buffer.buffer();
+            for (Buffer buf : this.result) {
+                buffer.appendBuffer(buf);
+            }
+            this.result.clear();
+            processor.onNext(buffer);
+        }
+        return this;
+
+    }
+
+    public PipePayloadParser result(Buffer buffer) {
+        this.result.add(buffer);
+        return this;
+    }
+
+    @Override
+    public synchronized void handle(Buffer buffer) {
+        if (recordParser == null) {
+            log.error("record parser not init");
+            return;
+        }
+        recordParser.handle(buffer);
+    }
+
+    @Override
+    public Flux<Buffer> handlePayload() {
+        return processor.map(Function.identity());
+    }
+
+    @Override
+    public void close() {
+        processor.onComplete();
+        currentPipe.set(0);
+        this.result.clear();
+    }
+
+}

+ 45 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/strateies/ScriptPayloadParserBuilder.java

@@ -0,0 +1,45 @@
+package org.jetlinks.community.network.tcp.parser.strateies;
+
+import lombok.SneakyThrows;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.hswebframework.expands.script.engine.DynamicScriptEngine;
+import org.hswebframework.expands.script.engine.DynamicScriptEngineFactory;
+import org.jetlinks.core.Value;
+import org.jetlinks.core.Values;
+import org.jetlinks.community.network.tcp.parser.PayloadParser;
+import org.jetlinks.community.network.tcp.parser.PayloadParserBuilderStrategy;
+import org.jetlinks.community.network.tcp.parser.PayloadParserType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ScriptPayloadParserBuilder implements PayloadParserBuilderStrategy {
+    @Override
+    public PayloadParserType getType() {
+        return PayloadParserType.SCRIPT;
+    }
+
+    @Override
+    @SneakyThrows
+    public PayloadParser build(Values config) {
+        String script = config.getValue("script")
+                .map(Value::asString)
+                .orElseThrow(() -> new IllegalArgumentException("script不能为空"));
+        String lang = config.getValue("lang")
+                .map(Value::asString)
+                .orElseThrow(() -> new IllegalArgumentException("lang不能为空"));
+
+        DynamicScriptEngine engine = DynamicScriptEngineFactory.getEngine(lang);
+        if (engine == null) {
+            throw new IllegalArgumentException("不支持的脚本:" + lang);
+        }
+        PipePayloadParser parser = new PipePayloadParser();
+        String id = DigestUtils.md5Hex(script);
+
+        engine.compile(id, script);
+        Map<String, Object> ctx = new HashMap<>();
+        ctx.put("parser", parser);
+        engine.execute(id, ctx).getIfSuccess();
+        return parser;
+    }
+}

+ 59 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/strateies/VertxPayloadParserBuilder.java

@@ -0,0 +1,59 @@
+package org.jetlinks.community.network.tcp.parser.strateies;
+
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.parsetools.RecordParser;
+import org.jetlinks.core.Values;
+import org.jetlinks.community.network.tcp.parser.PayloadParser;
+import org.jetlinks.community.network.tcp.parser.PayloadParserBuilderStrategy;
+import org.jetlinks.community.network.tcp.parser.PayloadParserType;
+import reactor.core.publisher.EmitterProcessor;
+import reactor.core.publisher.Flux;
+
+import java.util.function.Function;
+
+public abstract class VertxPayloadParserBuilder implements PayloadParserBuilderStrategy {
+    @Override
+    public abstract PayloadParserType getType();
+
+    protected abstract RecordParser createParser(Values config);
+
+    @Override
+    public PayloadParser build(Values config) {
+        return new RecordPayloadParser(createParser(config));
+    }
+
+    class RecordPayloadParser implements PayloadParser {
+        RecordParser recordParser;
+        EmitterProcessor<Buffer> processor = EmitterProcessor.create(false);
+
+        public RecordPayloadParser(RecordParser recordParser) {
+            this.recordParser = recordParser;
+            this.recordParser.handler(buffer -> {
+               // if (processor.hasDownstreams()) {
+                    processor.onNext(buffer);
+               // }
+            });
+        }
+
+        @Override
+        public void handle(Buffer buffer) {
+            recordParser.handle(buffer);
+        }
+
+        @Override
+        public Flux<Buffer> handlePayload() {
+            return processor.map(Function.identity());
+        }
+
+        @Override
+        public void close() {
+            try {
+                processor.dispose();
+            } catch (Exception ignore) {
+
+            }
+        }
+    }
+
+
+}

+ 31 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/AbstractTcpServer.java

@@ -0,0 +1,31 @@
+package org.jetlinks.community.network.tcp.server;
+
+import org.jetlinks.community.network.tcp.client.TcpClient;
+import reactor.core.publisher.EmitterProcessor;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+
+import java.util.function.Function;
+
+/**
+ * @author bsetfeng
+ * @since 1.0
+ **/
+public abstract class AbstractTcpServer implements TcpServer{
+
+    private EmitterProcessor<TcpClient> processor = EmitterProcessor.create(false);
+
+    FluxSink<TcpClient> sink=processor.sink();
+
+    protected void received(TcpClient tcpClient) {
+       // if (processor.hasDownstreams()) {
+        sink.next(tcpClient);
+      //  }
+    }
+
+    @Override
+    public Flux<TcpClient> handleConnection() {
+        return processor
+                .map(Function.identity());
+    }
+}

+ 27 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/TcpServer.java

@@ -0,0 +1,27 @@
+package org.jetlinks.community.network.tcp.server;
+
+import org.jetlinks.community.network.Network;
+import org.jetlinks.community.network.tcp.client.TcpClient;
+import reactor.core.publisher.Flux;
+
+/**
+ * TCP服务
+ *
+ * @author zhouhao
+ * @version 1.0
+ **/
+public interface TcpServer extends Network {
+
+    /**
+     * 订阅客户端连接
+     *
+     * @return 客户端流
+     * @see TcpClient
+     */
+    Flux<TcpClient> handleConnection();
+
+    /**
+     * 关闭服务端
+     */
+    void shutdown();
+}

+ 49 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/TcpServerProperties.java

@@ -0,0 +1,49 @@
+package org.jetlinks.community.network.tcp.server;
+
+import io.vertx.core.net.NetServerOptions;
+import io.vertx.core.net.SocketAddress;
+import lombok.*;
+import org.jetlinks.community.network.tcp.parser.PayloadParserType;
+import org.jetlinks.rule.engine.executor.PayloadType;
+import org.springframework.util.StringUtils;
+
+import java.util.Map;
+
+/**
+ * @author bsetfeng
+ * @author zhouhao
+ * @since 1.0
+ **/
+@Getter
+@Setter
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class TcpServerProperties {
+
+    private String id;
+
+    private NetServerOptions options;
+
+    private PayloadType payloadType;
+
+    private PayloadParserType parserType;
+
+    private Map<String, Object> parserConfiguration;
+
+    private String host;
+
+    private int port;
+
+    private boolean ssl;
+
+    private String certId;
+
+
+    public SocketAddress createSocketAddress() {
+        if (StringUtils.isEmpty(host)) {
+            host = "localhost";
+        }
+        return SocketAddress.inetSocketAddress(port, host);
+    }
+}

+ 97 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/TcpServerProvider.java

@@ -0,0 +1,97 @@
+package org.jetlinks.community.network.tcp.server;
+
+import io.vertx.core.Vertx;
+import io.vertx.core.net.NetServer;
+import io.vertx.core.net.NetServerOptions;
+import lombok.extern.slf4j.Slf4j;
+import org.hswebframework.web.bean.FastBeanCopier;
+import org.jetlinks.community.network.*;
+import org.jetlinks.community.network.security.CertificateManager;
+import org.jetlinks.community.network.security.VertxKeyCertTrustOptions;
+import org.jetlinks.community.network.tcp.parser.PayloadParserBuilder;
+import org.jetlinks.core.Values;
+import org.jetlinks.core.metadata.ConfigMetadata;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Mono;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+@Component
+@Slf4j
+public class TcpServerProvider implements NetworkProvider<TcpServerProperties> {
+
+    private final CertificateManager certificateManager;
+
+    private final Vertx vertx;
+
+    private final PayloadParserBuilder payloadParserBuilder;
+
+    public TcpServerProvider(CertificateManager certificateManager, Vertx vertx, PayloadParserBuilder payloadParserBuilder) {
+        this.certificateManager = certificateManager;
+        this.vertx = vertx;
+        this.payloadParserBuilder = payloadParserBuilder;
+    }
+
+    @Nonnull
+    @Override
+    public NetworkType getType() {
+        return DefaultNetworkType.TCP_SERVER;
+    }
+
+    @Nonnull
+    @Override
+    public VertxTcpServer createNetwork(@Nonnull TcpServerProperties properties) {
+
+        VertxTcpServer tcpServer = new VertxTcpServer(properties.getId());
+        initTcpServer(tcpServer, properties);
+        return tcpServer;
+    }
+
+    private void initTcpServer(VertxTcpServer tcpServer, TcpServerProperties properties) {
+        NetServer netServer = vertx.createNetServer(properties.getOptions());
+        tcpServer.setParserSupplier(() -> payloadParserBuilder.build(properties.getParserType(), Values.of(properties.getParserConfiguration())));
+        tcpServer.setServer(netServer);
+        netServer.listen(properties.createSocketAddress(), result -> {
+            if (result.succeeded()) {
+                log.info("tcp server startup on {}", result.result().actualPort());
+            } else {
+                log.error("startup tcp server error", result.cause());
+            }
+        });
+    }
+
+    @Override
+    public void reload(@Nonnull Network network, @Nonnull TcpServerProperties properties) {
+        VertxTcpServer tcpServer = ((VertxTcpServer) network);
+        tcpServer.shutdown();
+        initTcpServer(tcpServer, properties);
+    }
+
+    @Nullable
+    @Override
+    public ConfigMetadata getConfigMetadata() {
+        return null;
+    }
+
+    @Nonnull
+    @Override
+    public Mono<TcpServerProperties> createConfig(@Nonnull NetworkProperties properties) {
+        return Mono.defer(() -> {
+            TcpServerProperties config = FastBeanCopier.copy(properties.getConfigurations(), new TcpServerProperties());
+            config.setId(properties.getId());
+            if (config.getOptions() == null) {
+                config.setOptions(new NetServerOptions());
+            }
+            if (config.isSsl()) {
+                config.getOptions().setSsl(true);
+                return certificateManager.getCertificate(config.getCertId())
+                        .map(VertxKeyCertTrustOptions::new)
+                        .doOnNext(config.getOptions()::setKeyCertOptions)
+                        .doOnNext(config.getOptions()::setTrustOptions)
+                        .thenReturn(config);
+            }
+            return Mono.just(config);
+        });
+    }
+}

+ 95 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/VertxTcpServer.java

@@ -0,0 +1,95 @@
+package org.jetlinks.community.network.tcp.server;
+
+import io.vertx.core.net.NetServer;
+import io.vertx.core.net.NetSocket;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.jetlinks.community.network.DefaultNetworkType;
+import org.jetlinks.community.network.NetworkType;
+import org.jetlinks.community.network.tcp.client.VertxTcpClient;
+import org.jetlinks.community.network.tcp.parser.PayloadParser;
+
+
+import java.util.function.Supplier;
+
+/**
+ * @author bsetfeng
+ * @since 1.0
+ **/
+@Slf4j
+public class VertxTcpServer extends AbstractTcpServer implements TcpServer {
+
+    @Getter
+    volatile NetServer server;
+
+    private Supplier<PayloadParser> parserSupplier;
+
+    @Getter
+    private String id;
+
+    public VertxTcpServer(String id) {
+        this.id = id;
+    }
+
+    private void execute(Runnable runnable) {
+        try {
+            runnable.run();
+        } catch (Exception e) {
+            log.warn("close tcp server error", e);
+        }
+    }
+
+    public void setParserSupplier(Supplier<PayloadParser> parserSupplier) {
+        this.parserSupplier = parserSupplier;
+    }
+
+    public void setServer(NetServer server) {
+        if (this.server != null && this.server != server) {
+            this.server.close();
+        }
+        this.server = server;
+        this.server.connectHandler(this::acceptTcpConnection);
+    }
+
+    protected void acceptTcpConnection(NetSocket socket) {
+        VertxTcpClient client = new VertxTcpClient(id + "_" + socket.remoteAddress());
+        try {
+            socket.exceptionHandler(err -> {
+                log.error("tcp server client [{}] error", socket.remoteAddress(), err);
+            }).closeHandler((nil) -> {
+                log.info("tcp server client [{}] closed", socket.remoteAddress());
+                client.shutdown();
+            });
+            client.setRecordParser(parserSupplier.get());
+            client.setSocket(socket);
+            received(client);
+            log.debug("accept tcp client [{}] connection", socket.remoteAddress());
+        } catch (Exception e) {
+            log.error("create tcp server client error", e);
+            client.shutdown();
+        }
+    }
+
+    @Override
+    public NetworkType getType() {
+        return DefaultNetworkType.TCP_SERVER;
+    }
+
+    @Override
+    public void shutdown() {
+        if (null != server) {
+            execute(server::close);
+            server = null;
+        }
+    }
+
+    @Override
+    public boolean isAlive() {
+        return server != null;
+    }
+
+    @Override
+    public boolean isAutoReload() {
+        return false;
+    }
+}

+ 50 - 0
jetlinks-components/network-component/tcp-component/src/test/java/org/jetlinks/community/network/tcp/client/VertxTcpClientProviderTest.java

@@ -0,0 +1,50 @@
+package org.jetlinks.community.network.tcp.client;
+
+import io.vertx.core.Vertx;
+import io.vertx.core.net.NetClientOptions;
+import org.jetlinks.community.network.tcp.TcpMessage;
+import org.jetlinks.community.network.tcp.parser.DefaultPayloadParserBuilder;
+import org.jetlinks.community.network.tcp.parser.PayloadParserType;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+
+class VertxTcpClientProviderTest {
+
+
+    @Test
+    void test() {
+        Vertx vertx = Vertx.vertx();
+
+        vertx.createNetServer()
+                .connectHandler(socket -> {
+                    socket.write("tes");
+                    socket.write("ttest");
+                })
+                .listen(12311);
+
+        VertxTcpClientProvider provider = new VertxTcpClientProvider(id -> Mono.empty(), vertx, new DefaultPayloadParserBuilder());
+
+        TcpClientProperties properties = new TcpClientProperties();
+        properties.setHost("127.0.0.1");
+        properties.setPort(12311);
+        properties.setParserType(PayloadParserType.FIXED_LENGTH);
+        properties.setParserConfiguration(Collections.singletonMap("size", 4));
+        properties.setOptions(new NetClientOptions());
+
+
+        provider.createNetwork(properties)
+                .subscribe()
+                .map(TcpMessage::getPayload)
+                .map(buf -> buf.toString(StandardCharsets.UTF_8))
+                .take(2)
+                .as(StepVerifier::create)
+                .expectNext("test", "test")
+                .verifyComplete();
+
+    }
+
+}

+ 56 - 0
jetlinks-components/network-component/tcp-component/src/test/java/org/jetlinks/community/network/tcp/parser/strateies/FixLengthPayloadParserBuilderTest.java

@@ -0,0 +1,56 @@
+package org.jetlinks.community.network.tcp.parser.strateies;
+
+import io.vertx.core.buffer.Buffer;
+import org.jetlinks.community.network.tcp.parser.PayloadParser;
+import org.jetlinks.core.Values;
+import org.junit.Assert;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+class FixLengthPayloadParserBuilderTest {
+
+
+    @Test
+    void testFixLength() {
+        FixLengthPayloadParserBuilder builder = new FixLengthPayloadParserBuilder();
+        PayloadParser parser = builder.build(Values.of(Collections.singletonMap("size", 5)));
+        List<String>  arr = new ArrayList<>();
+
+        parser.handlePayload()
+                .map(buffer -> buffer.toString(StandardCharsets.UTF_8))
+                .subscribe(arr::add);
+
+        parser.handle(Buffer.buffer("123"));
+        parser.handle(Buffer.buffer("4567"));
+        parser.handle(Buffer.buffer("890"));
+
+        Assert.assertArrayEquals(arr.toArray(),new Object[]{
+                "12345","67890"
+        });
+
+    }
+
+    @Test
+    void testDelimited() {
+        DelimitedPayloadParserBuilder builder = new DelimitedPayloadParserBuilder();
+        PayloadParser parser = builder.build(Values.of(Collections.singletonMap("delimited", "@@")));
+        List<String>  arr = new ArrayList<>();
+
+        parser.handlePayload()
+                .map(buffer -> buffer.toString(StandardCharsets.UTF_8))
+                .subscribe(arr::add);
+
+        parser.handle(Buffer.buffer("123"));
+        parser.handle(Buffer.buffer("45@@67"));
+        parser.handle(Buffer.buffer("890@@111"));
+
+        Assert.assertArrayEquals(arr.toArray(),new Object[]{
+                "12345","67890"
+        });
+
+    }
+}

+ 89 - 0
jetlinks-components/network-component/tcp-component/src/test/java/org/jetlinks/community/network/tcp/parser/strateies/PipePayloadParserTest.java

@@ -0,0 +1,89 @@
+package org.jetlinks.community.network.tcp.parser.strateies;
+
+import io.vertx.core.buffer.Buffer;
+import org.jetlinks.community.network.utils.BytesUtils;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+
+class PipePayloadParserTest {
+
+
+    @Test
+    void testSplicingUnpack() {
+        PipePayloadParser parser = new PipePayloadParser();
+
+        parser.fixed(4)
+                .handler(buffer -> {
+                    int len = BytesUtils.lowBytesToInt(buffer.getBytes());
+                    parser.fixed(len);
+                })
+                .handler(buffer -> parser.result(buffer).complete());
+
+
+        parser.handlePayload()
+                .doOnSubscribe(sb -> {
+                    Mono.delay(Duration.ofMillis(100))
+                            .subscribe(r -> {
+                                Buffer buffer = Buffer.buffer(BytesUtils.toLowBytes(5));
+                                buffer.appendString("1234");
+                                parser.handle(buffer);
+                                parser.handle(Buffer.buffer("5"));
+
+                                parser.handle(Buffer.buffer(new byte[]{0, 0}));
+                                parser.handle(Buffer.buffer(new byte[]{0, 6}).appendString("12"));
+                                parser.handle(Buffer.buffer("3456"));
+                            });
+                })
+                .take(2)
+                .map(bf -> bf.toString(StandardCharsets.UTF_8))
+                .as(StepVerifier::create)
+                .expectNext("12345", "123456")
+                .verifyComplete();
+    }
+
+
+    @Test
+    void test() {
+        PipePayloadParser parser = new PipePayloadParser();
+
+        parser.fixed(4)
+                .handler(buffer -> {
+                    int len = BytesUtils.highBytesToInt(buffer.getBytes());
+                    parser.fixed(len);
+                })
+                .handler(buffer -> {
+                    parser.result(buffer)
+                            .complete();
+                });
+
+        byte[] payload = "hello".getBytes();
+
+        Buffer buffer = Buffer.buffer(payload.length + 4);
+
+        buffer.appendBytes(BytesUtils.toHighBytes(payload.length));
+        buffer.appendBytes(payload);
+
+        parser.handlePayload()
+                .doOnSubscribe(sb -> {
+                    Flux.range(0, 100)
+                            .delayElements(Duration.ofMillis(10))
+                            .subscribe(i -> {
+                                parser.handle(buffer);
+                            });
+                })
+                .take(2)
+                .map(bf -> bf.toString(StandardCharsets.UTF_8))
+                .as(StepVerifier::create)
+                .expectNext("hello", "hello")
+                .verifyComplete();
+
+
+    }
+
+
+}

+ 57 - 0
jetlinks-components/network-component/tcp-component/src/test/java/org/jetlinks/community/network/tcp/parser/strateies/ScriptPayloadParserBuilderTest.java

@@ -0,0 +1,57 @@
+package org.jetlinks.community.network.tcp.parser.strateies;
+
+import io.vertx.core.buffer.Buffer;
+import org.jetlinks.community.network.tcp.parser.PayloadParser;
+import org.jetlinks.community.network.utils.BytesUtils;
+import org.jetlinks.core.Values;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
+class ScriptPayloadParserBuilderTest {
+
+    @Test
+    void testSplicingUnpack() {
+        ScriptPayloadParserBuilder builder = new ScriptPayloadParserBuilder();
+        Map<String, Object> config = new HashMap<>();
+        config.put("script", "\n" +
+                "var BytesUtils = org.jetlinks.pro.network.utils.BytesUtils;\n" +
+                "parser.fixed(4)\n" +
+                "       .handler(function(buffer){\n" +
+                "            var len = BytesUtils.highBytesToInt(buffer.getBytes());\n" +
+                "            parser.fixed(len);\n" +
+                "        })\n" +
+                "       .handler(function(buffer){\n" +
+                "            parser.result(buffer.toString(\"UTF-8\"))\n" +
+                "                   .complete();\n" +
+                "        });");
+        config.put("lang", "javascript");
+        PayloadParser parser = builder.build(Values.of(config));
+
+        parser.handlePayload()
+                .doOnSubscribe(sb -> {
+                    Mono.delay(Duration.ofMillis(100))
+                            .subscribe(r -> {
+                                Buffer buffer = Buffer.buffer(BytesUtils.toHighBytes(5));
+                                buffer.appendString("1234");
+                                parser.handle(buffer);
+                                parser.handle(Buffer.buffer("5"));
+
+                                parser.handle(Buffer.buffer(new byte[]{5, 0}));
+                                parser.handle(Buffer.buffer(new byte[]{0, 0}).appendString("12"));
+                                parser.handle(Buffer.buffer("345"));
+                            });
+                })
+                .take(2)
+                .map(bf -> bf.toString(StandardCharsets.UTF_8))
+                .as(StepVerifier::create)
+                .expectNext("12345", "12345")
+                .verifyComplete();
+    }
+
+}

+ 70 - 0
jetlinks-components/network-component/tcp-component/src/test/java/org/jetlinks/community/network/tcp/server/TcpServerProviderTest.java

@@ -0,0 +1,70 @@
+package org.jetlinks.community.network.tcp.server;
+
+import io.vertx.core.Vertx;
+import io.vertx.core.net.NetServerOptions;
+import lombok.extern.slf4j.Slf4j;
+import org.jetlinks.community.network.tcp.client.TcpClient;
+import org.jetlinks.community.network.tcp.parser.DefaultPayloadParserBuilder;
+import org.jetlinks.community.network.tcp.parser.PayloadParserType;
+import org.jetlinks.community.network.tcp.TcpMessage;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+
+@Slf4j
+class TcpServerProviderTest {
+
+
+    static TcpServer tcpServer;
+
+    @BeforeAll
+    static void init() {
+        TcpServerProperties properties = TcpServerProperties.builder()
+                .id("test")
+                .options(new NetServerOptions().setPort(8080))
+                .parserType(PayloadParserType.FIXED_LENGTH)
+                .parserConfiguration(Collections.singletonMap("size", 5))
+                .build();
+
+        TcpServerProvider provider = new TcpServerProvider((id) -> Mono.empty(), Vertx.vertx(), new DefaultPayloadParserBuilder());
+
+        tcpServer = provider.createNetwork(properties);
+    }
+
+
+    @Test
+    void test() {
+
+        Vertx.vertx().createNetClient()
+                .connect(8080, "localhost", handle -> {
+                    if (handle.succeeded()) {
+                        //模拟粘包,同时发送2个包
+                        handle.result().write("hellohello", r -> {
+                            if (r.succeeded()) {
+                                log.info("tcp客户端消息发送成功");
+                            } else {
+                                log.error("tcp客户端消息发送错误", r.cause());
+                            }
+                        });
+                    } else {
+                        log.error("创建tcp客户端错误", handle.cause());
+                    }
+                });
+
+
+        tcpServer.handleConnection()
+                .flatMap(TcpClient::subscribe)
+                .map(TcpMessage::getPayload)
+                .map(payload -> payload.toString(StandardCharsets.UTF_8))
+                .take(2)
+                .as(StepVerifier::create)
+                .expectNext("hello", "hello")//收到2个完整的包
+                .verifyComplete();
+    }
+
+
+}

+ 22 - 0
jetlinks-components/network-component/tcp-component/src/test/resources/jetlinks-client.pem

@@ -0,0 +1,22 @@
+-----BEGIN CERTIFICATE-----
+MIIDqzCCApOgAwIBAgIINaFTJP8mRsEwDQYJKoZIhvcNAQELBQAwgYkxCzAJBgNV
+BAYTAmNuMQ4wDAYDVQQIEwVjaGluYTESMBAGA1UEBxMJY2hvbmdxaW5nMREwDwYD
+VQQKEwhqZXRsaW5rczERMA8GA1UECxMIamV0bGlua3MxETAPBgNVBAMTCGpldGxp
+bmtzMR0wGwYJKoZIhvcNAQkBFg5hZG1pbkBoc3dlYi5tZTAeFw0xOTExMjEwMjQz
+MDBaFw0yMDExMjEwMjQzMDBaMIGJMQswCQYDVQQGEwJjbjEOMAwGA1UECBMFY2hp
+bmExEjAQBgNVBAcTCWNob25ncWluZzERMA8GA1UEChMIamV0bGlua3MxETAPBgNV
+BAsTCGpldGxpbmtzMREwDwYDVQQDEwhqZXRsaW5rczEdMBsGCSqGSIb3DQEJARYO
+YWRtaW5AaHN3ZWIubWUwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDQ
+XOG64Q885+7Zn90GytG5S3jbuIP0sEvh4xD4ieKUxggGvldKlfJAdwqJ8moZpiDx
+gRtedpZ1eI+xORSjILiIWIv1G9D3i2oS+ghl7kH1dmuOMshdRVEG/ppbeW3czQfC
+aNqI0F2Merz352zpLEnwKvArf9phCW8gnLIzbQ3JH/2Pm5wE+9+uI0IgfXk7aSWx
+1xEo7Ni+nuWWNXxQ8d2eUv+XxSkKS23G/HDaEI9T/xs3sImyErOpxrKSbfqanGaM
+pLd9ubAyN1Wzeta+p/75KjBDAT+xkxbW9Zp2u7avSVq9EnpCbytzB2PYLZbBEsu4
+4YjPlvlHbcSWGDeyZHTXAgMBAAGjFTATMBEGCWCGSAGG+EIBAQQEAwIA9zANBgkq
+hkiG9w0BAQsFAAOCAQEAMQVAhcTmfogoM7uMYAUOn9My7axdnwqm8iRdw4yGrCb5
+mn7rtZeopDEzqfNwDd1WhWyvdw2bJrdlrbMACnNV67bQaFOxLnwOBVSbFxPbkMtq
+Bate59n51lM3Ij+Q8fNuLNAOgNPYLe7HE0uXYXZrWtfvQ9CjsSX8p7EwxcBd8883
+9R3jRq+CgvvDCBuHhapf0ltLANuCOD4DGI2Om+ZgW98F+U/i/MUFQku5/7Lt04BZ
+x9RmJwjhxJl24Pah9K2FkRV5GM4PseqUAH9QAw2InlmZK8q0kw4dMbI7c3qnGGqv
+xCjtNWs0fA3j4z1gH1ykgxRk7UORagG7gYxFAWMdNA==
+-----END CERTIFICATE-----

+ 49 - 0
jetlinks-components/network-component/tcp-component/src/test/resources/jetlinks-server.pem

@@ -0,0 +1,49 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEowIBAAKCAQEA0FzhuuEPPOfu2Z/dBsrRuUt427iD9LBL4eMQ+InilMYIBr5X
+SpXyQHcKifJqGaYg8YEbXnaWdXiPsTkUoyC4iFiL9RvQ94tqEvoIZe5B9XZrjjLI
+XUVRBv6aW3lt3M0HwmjaiNBdjHq89+ds6SxJ8CrwK3/aYQlvIJyyM20NyR/9j5uc
+BPvfriNCIH15O2klsdcRKOzYvp7lljV8UPHdnlL/l8UpCkttxvxw2hCPU/8bN7CJ
+shKzqcaykm36mpxmjKS3fbmwMjdVs3rWvqf++SowQwE/sZMW1vWadru2r0lavRJ6
+Qm8rcwdj2C2WwRLLuOGIz5b5R23Elhg3smR01wIDAQABAoIBAHGlB1VnZWlB00uN
+Xv9SZhsbZ/rnY8l00p6Mu3fjtNqSPFR5QqEisbOpee0NdAMzUiqG0YevRJcjv7PY
+v0HOGff727I/jrVFrABXsXbvAbjA8x7m4dPFd9FJ0qTr0RkqFv5ZNECE7VEurJzF
+8+glyNoegUoofNKKU7tE3AkihO9Z73pBCQnfheT/jzfRh/4bdvQiesmw0FcCvSBL
+AJ94QmmwwZQTp9FrC24N4hiuJ62vgZs0TudmRYD7vvOHlp8xB9r0m4uysij4hPSb
+PZ25KdbRlFNyH6NUtrqcoNIzG1qk36ZQBsH96Y6dsCjDNeka2eeIvTCpHH9evG/M
+9lAW54ECgYEA/MU/VAqdZX2PNTC7ZnP9BSF2u9Gt3j4KnHe0NF18xTJR/DL5U87m
+VuH1H3l4ydMHQtdlXjiuMokWO78hZyAPJ04RRi/xKAalpsV1PbNDM0Yi8FWPPe4o
+/+ZOTXqPLcfZWlnPySCdYHDb/xCvJEmhe0fgXkhBM1JipfGJMxzS0hcCgYEA0wZj
+KV5zpj0O441/2zhTVRz+9ibMi+G4zYjB1NIvMIPCMeASQY3oTXeX6hP7GLQ7NRhs
+2BHJxDB444THrgz8OC0BL+KpdSwWRzYKOQh6/k4WGjLiIc8vmwfcy+w5nZFX6C/B
+P5ERFd4OyKIzF7PKPiPhmUTzJXxM37q27EFO60ECgYAY1qkqcQCWgIgaCffgUXWD
+k44/VpNnS2FvjrO+kZE5L4Cu77NgcOxhjUKxHRapfnswLkpfv+IeRSpSJK37nELg
+8eLwqrVf4YdVpAGpVnw8BaBZTY8N3uFYhCEdq/V14jWRk1G/ydytJWn43oLXZgTg
+QGfj2+XMTUQ0Wk/JHgP5cQKBgQCsgT4ypTmZHbS8JAHu7P6CR2/NPS5c8yqlKjR5
+i1B0MmFfcDK8UAskuK0A91B+g2cbIlpnzzLHJHDOXM724zqTmYas5Hduh5m2oOB+
+ewZAOQksZYgh50KdUzIVqYf33Mal0dQB6M60t5ASRzXJHHopjh9vscOqUxBsnC/I
+tiKTgQKBgApQYy4M1UuOju8G/MYP6BSh4BQuRX0k0jYXjq1GgIyF5iMFvlV0Qn2d
+iBj7HPPGyfgvYZAYhItydQ1H/kYuUaOPfR0IY7jkzECsOkTjMpDQE/ZHv/6v/QI5
+BLGms+DY9zkK3aH8U4Cub6Jt0guBLAkt7VrWTBTHh8FEeWWmLSSu
+-----END RSA PRIVATE KEY-----
+-----BEGIN CERTIFICATE-----
+MIIDqzCCApOgAwIBAgIINaFTJP8mRsEwDQYJKoZIhvcNAQELBQAwgYkxCzAJBgNV
+BAYTAmNuMQ4wDAYDVQQIEwVjaGluYTESMBAGA1UEBxMJY2hvbmdxaW5nMREwDwYD
+VQQKEwhqZXRsaW5rczERMA8GA1UECxMIamV0bGlua3MxETAPBgNVBAMTCGpldGxp
+bmtzMR0wGwYJKoZIhvcNAQkBFg5hZG1pbkBoc3dlYi5tZTAeFw0xOTExMjEwMjQz
+MDBaFw0yMDExMjEwMjQzMDBaMIGJMQswCQYDVQQGEwJjbjEOMAwGA1UECBMFY2hp
+bmExEjAQBgNVBAcTCWNob25ncWluZzERMA8GA1UEChMIamV0bGlua3MxETAPBgNV
+BAsTCGpldGxpbmtzMREwDwYDVQQDEwhqZXRsaW5rczEdMBsGCSqGSIb3DQEJARYO
+YWRtaW5AaHN3ZWIubWUwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDQ
+XOG64Q885+7Zn90GytG5S3jbuIP0sEvh4xD4ieKUxggGvldKlfJAdwqJ8moZpiDx
+gRtedpZ1eI+xORSjILiIWIv1G9D3i2oS+ghl7kH1dmuOMshdRVEG/ppbeW3czQfC
+aNqI0F2Merz352zpLEnwKvArf9phCW8gnLIzbQ3JH/2Pm5wE+9+uI0IgfXk7aSWx
+1xEo7Ni+nuWWNXxQ8d2eUv+XxSkKS23G/HDaEI9T/xs3sImyErOpxrKSbfqanGaM
+pLd9ubAyN1Wzeta+p/75KjBDAT+xkxbW9Zp2u7avSVq9EnpCbytzB2PYLZbBEsu4
+4YjPlvlHbcSWGDeyZHTXAgMBAAGjFTATMBEGCWCGSAGG+EIBAQQEAwIA9zANBgkq
+hkiG9w0BAQsFAAOCAQEAMQVAhcTmfogoM7uMYAUOn9My7axdnwqm8iRdw4yGrCb5
+mn7rtZeopDEzqfNwDd1WhWyvdw2bJrdlrbMACnNV67bQaFOxLnwOBVSbFxPbkMtq
+Bate59n51lM3Ij+Q8fNuLNAOgNPYLe7HE0uXYXZrWtfvQ9CjsSX8p7EwxcBd8883
+9R3jRq+CgvvDCBuHhapf0ltLANuCOD4DGI2Om+ZgW98F+U/i/MUFQku5/7Lt04BZ
+x9RmJwjhxJl24Pah9K2FkRV5GM4PseqUAH9QAw2InlmZK8q0kw4dMbI7c3qnGGqv
+xCjtNWs0fA3j4z1gH1ykgxRk7UORagG7gYxFAWMdNA==
+-----END CERTIFICATE-----

+ 6 - 0
jetlinks-manager/network-manager/pom.xml

@@ -68,6 +68,12 @@
             <version>${project.version}</version>
             <version>${project.version}</version>
         </dependency>
         </dependency>
 
 
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>tcp-component</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
 
 
     </dependencies>
     </dependencies>