Explorar o código

优化使用脚本作为粘拆包规则时的tcp性能

zhouhao %!s(int64=2) %!d(string=hai) anos
pai
achega
a00016018a
Modificáronse 15 ficheiros con 225 adicións e 88 borrados
  1. 5 1
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClientProvider.java
  2. 7 4
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/DefaultPayloadParserBuilder.java
  3. 13 9
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/DirectRecordParser.java
  4. 1 1
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/PayloadParser.java
  5. 16 1
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/PayloadParserBuilder.java
  6. 25 1
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/PayloadParserBuilderStrategy.java
  7. 0 0
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/PayloadParserType.java
  8. 26 5
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/strateies/DelimitedPayloadParserBuilder.java
  9. 4 2
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/strateies/DirectPayloadParserBuilder.java
  10. 13 3
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/strateies/FixLengthPayloadParserBuilder.java
  11. 44 26
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/strateies/PipePayloadParser.java
  12. 32 5
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/strateies/ScriptPayloadParserBuilder.java
  13. 12 11
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/strateies/VertxPayloadParserBuilder.java
  14. 5 2
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/TcpServerProvider.java
  15. 22 17
      jetlinks-components/network-component/tcp-component/src/test/java/org/jetlinks/community/network/tcp/parser/strateies/PipePayloadParserTest.java

+ 5 - 1
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClientProvider.java

@@ -8,6 +8,7 @@ import org.hswebframework.web.bean.FastBeanCopier;
 import org.jetlinks.community.network.*;
 import org.jetlinks.community.network.security.CertificateManager;
 import org.jetlinks.community.network.security.VertxKeyCertTrustOptions;
+import org.jetlinks.community.network.tcp.parser.PayloadParser;
 import org.jetlinks.community.network.tcp.parser.PayloadParserBuilder;
 import org.jetlinks.core.metadata.ConfigMetadata;
 import org.springframework.stereotype.Component;
@@ -16,6 +17,7 @@ import reactor.core.publisher.Mono;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.time.Duration;
+import java.util.function.Supplier;
 
 @Component
 @Slf4j
@@ -58,10 +60,12 @@ public class VertxTcpClientProvider implements NetworkProvider<TcpClientProperti
         NetClient netClient = vertx.createNetClient(properties.getOptions());
         client.setClient(netClient);
         client.setKeepAliveTimeoutMs(properties.getLong("keepAliveTimeout").orElse(Duration.ofMinutes(10).toMillis()));
+        Supplier<PayloadParser>  supplier= payloadParserBuilder.build(properties.getParserType(), properties);
+        supplier.get();
         netClient.connect(properties.getPort(), properties.getHost(), result -> {
             if (result.succeeded()) {
                 log.debug("connect tcp [{}:{}] success", properties.getHost(), properties.getPort());
-                client.setRecordParser(payloadParserBuilder.build(properties.getParserType(), properties));
+                client.setRecordParser(supplier.get());
                 client.setSocket(result.result());
             } else {
                 log.error("connect tcp [{}:{}] error", properties.getHost(), properties.getPort(),result.cause());

+ 7 - 4
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/DefaultPayloadParserBuilder.java

@@ -9,14 +9,16 @@ import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.config.BeanPostProcessor;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.Nonnull;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
 
 @Component
 public class DefaultPayloadParserBuilder implements PayloadParserBuilder, BeanPostProcessor {
 
-    private Map<PayloadParserType, PayloadParserBuilderStrategy> strategyMap = new ConcurrentHashMap<>();
+    private final Map<PayloadParserType, PayloadParserBuilderStrategy> strategyMap = new ConcurrentHashMap<>();
 
     public DefaultPayloadParserBuilder(){
         register(new FixLengthPayloadParserBuilder());
@@ -25,9 +27,10 @@ public class DefaultPayloadParserBuilder implements PayloadParserBuilder, BeanPo
         register(new DirectPayloadParserBuilder());
     }
     @Override
-    public PayloadParser build(PayloadParserType type, ValueObject configuration) {
+    public Supplier<PayloadParser> build(PayloadParserType type, ValueObject configuration) {
+
         return Optional.ofNullable(strategyMap.get(type))
-                .map(builder -> builder.build(configuration))
+                .map(builder -> builder.buildLazy(configuration))
                 .orElseThrow(() -> new UnsupportedOperationException("unsupported parser:" + type));
     }
 
@@ -36,7 +39,7 @@ public class DefaultPayloadParserBuilder implements PayloadParserBuilder, BeanPo
     }
 
     @Override
-    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
+    public Object postProcessAfterInitialization(@Nonnull Object bean,@Nonnull String beanName) throws BeansException {
         if (bean instanceof PayloadParserBuilderStrategy) {
             register(((PayloadParserBuilderStrategy) bean));
         }

+ 13 - 9
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/DirectRecordParser.java

@@ -1,28 +1,32 @@
 package org.jetlinks.community.network.tcp.parser;
 
 import io.vertx.core.buffer.Buffer;
-import reactor.core.publisher.EmitterProcessor;
+import org.jetlinks.core.utils.Reactors;
 import reactor.core.publisher.Flux;
-
-import java.util.function.Function;
-
+import reactor.core.publisher.Sinks;
+
+/**
+ * 不处理直接返回数据包
+ *
+ * @author zhouhao
+ * @since 1.0
+ */
 public class DirectRecordParser implements PayloadParser {
 
-    EmitterProcessor<Buffer> processor = EmitterProcessor.create(false);
-
+    private final Sinks.Many<Buffer> sink = Reactors.createMany();
 
     @Override
     public void handle(Buffer buffer) {
-        processor.onNext(buffer);
+        sink.emitNext(buffer, Reactors.emitFailureHandler());
     }
 
     @Override
     public Flux<Buffer> handlePayload() {
-        return processor.map(Function.identity());
+        return sink.asFlux();
     }
 
     @Override
     public void close() {
-        processor.onComplete();
+        sink.emitComplete(Reactors.emitFailureHandler());
     }
 }

+ 1 - 1
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/PayloadParser.java

@@ -36,5 +36,5 @@ public interface PayloadParser {
     /**
      * 重置规则
      */
-    default void reset(){}
+   default void reset(){}
 }

+ 16 - 1
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/PayloadParserBuilder.java

@@ -2,8 +2,23 @@ package org.jetlinks.community.network.tcp.parser;
 
 import org.jetlinks.community.ValueObject;
 
+import java.util.function.Supplier;
+
+/**
+ * 解析器构造器,用于根据解析器类型和配置信息构造对应的解析器
+ *
+ * @author zhouhao
+ * @since 1.0
+ */
 public interface PayloadParserBuilder {
 
-    PayloadParser build(PayloadParserType type, ValueObject configuration);
+    /**
+     * 构造解析器
+     *
+     * @param type          解析器类型
+     * @param configuration 配置信息
+     * @return 解析器
+     */
+    Supplier<PayloadParser> build(PayloadParserType type, ValueObject configuration);
 
 }

+ 25 - 1
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/PayloadParserBuilderStrategy.java

@@ -2,8 +2,32 @@ package org.jetlinks.community.network.tcp.parser;
 
 import org.jetlinks.community.ValueObject;
 
+import java.util.function.Supplier;
+
+/**
+ * 解析器构造器策略,用于实现不同类型的解析器构造逻辑
+ *
+ * @author zhouhao
+ * @since 1.0
+ * @see org.jetlinks.community.network.tcp.parser.strateies.FixLengthPayloadParserBuilder
+ * @see org.jetlinks.community.network.tcp.parser.strateies.DelimitedPayloadParserBuilder
+ * @see org.jetlinks.community.network.tcp.parser.strateies.ScriptPayloadParserBuilder
+ */
 public interface PayloadParserBuilderStrategy {
+    /**
+     * @return 解析器类型
+     */
     PayloadParserType getType();
 
-    PayloadParser build(ValueObject config);
+    /**
+     * 构造解析器
+     *
+     * @param config 配置信息
+     * @return 解析器
+     */
+    Supplier<PayloadParser> buildLazy(ValueObject config);
+
+   default PayloadParser build(ValueObject config){
+       return buildLazy(config).get();
+   }
 }

+ 0 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/PayloadParserType.java


+ 26 - 5
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/strateies/DelimitedPayloadParserBuilder.java

@@ -1,10 +1,21 @@
 package org.jetlinks.community.network.tcp.parser.strateies;
 
+import io.vertx.core.buffer.Buffer;
 import io.vertx.core.parsetools.RecordParser;
+import lombok.SneakyThrows;
+import org.apache.commons.codec.binary.Hex;
 import org.apache.commons.text.StringEscapeUtils;
 import org.jetlinks.community.ValueObject;
 import org.jetlinks.community.network.tcp.parser.PayloadParserType;
 
+import java.util.function.Supplier;
+
+/**
+ * 以分隔符读取数据包
+ *
+ * @author zhouhao
+ * @since 1.0
+ */
 public class DelimitedPayloadParserBuilder extends VertxPayloadParserBuilder {
     @Override
     public PayloadParserType getType() {
@@ -12,12 +23,22 @@ public class DelimitedPayloadParserBuilder extends VertxPayloadParserBuilder {
     }
 
     @Override
-    protected RecordParser createParser(ValueObject config) {
+    @SneakyThrows
+    protected Supplier<RecordParser> createParser(ValueObject config) {
+
+        String delimited = config
+            .getString("delimited")
+            .map(String::trim)
+            .orElseThrow(() -> new IllegalArgumentException("delimited can not be null"));
+
+        if (delimited.startsWith("0x")) {
+
+            byte[] hex = Hex.decodeHex(delimited.substring(2));
+            return () -> RecordParser
+                .newDelimited(Buffer.buffer(hex));
+        }
 
-        return RecordParser.newDelimited(StringEscapeUtils.unescapeJava(
-            config
-                .getString("delimited")
-                .orElseThrow(() -> new IllegalArgumentException("delimited can not be null"))));
+        return () -> RecordParser.newDelimited(StringEscapeUtils.unescapeJava(delimited));
     }
 
 

+ 4 - 2
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/strateies/DirectPayloadParserBuilder.java

@@ -7,6 +7,8 @@ import org.jetlinks.community.network.tcp.parser.PayloadParser;
 import org.jetlinks.community.network.tcp.parser.PayloadParserBuilderStrategy;
 import org.jetlinks.community.network.tcp.parser.PayloadParserType;
 
+import java.util.function.Supplier;
+
 public class DirectPayloadParserBuilder implements PayloadParserBuilderStrategy {
 
     @Override
@@ -16,7 +18,7 @@ public class DirectPayloadParserBuilder implements PayloadParserBuilderStrategy
 
     @Override
     @SneakyThrows
-    public PayloadParser build(ValueObject config) {
-        return new DirectRecordParser();
+    public Supplier<PayloadParser> buildLazy(ValueObject config) {
+        return DirectRecordParser::new;
     }
 }

+ 13 - 3
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/strateies/FixLengthPayloadParserBuilder.java

@@ -4,6 +4,14 @@ import io.vertx.core.parsetools.RecordParser;
 import org.jetlinks.community.ValueObject;
 import org.jetlinks.community.network.tcp.parser.PayloadParserType;
 
+import java.util.function.Supplier;
+
+/**
+ * 固定长度解析器构造器,每次读取固定长度的数据包
+ *
+ * @author zhouhao
+ * @since 1.0
+ */
 public class FixLengthPayloadParserBuilder extends VertxPayloadParserBuilder {
     @Override
     public PayloadParserType getType() {
@@ -11,9 +19,11 @@ public class FixLengthPayloadParserBuilder extends VertxPayloadParserBuilder {
     }
 
     @Override
-    protected RecordParser createParser(ValueObject config) {
-        return RecordParser.newFixed(config.getInt("size")
-                .orElseThrow(() -> new IllegalArgumentException("size can not be null")));
+    protected Supplier<RecordParser> createParser(ValueObject config) {
+        int size = config.getInt("size")
+                         .orElseThrow(() -> new IllegalArgumentException("size can not be null"));
+
+        return () -> RecordParser.newFixed(size);
     }
 
 

+ 44 - 26
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/strateies/PipePayloadParser.java

@@ -3,36 +3,48 @@ package org.jetlinks.community.network.tcp.parser.strateies;
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.parsetools.RecordParser;
 import lombok.extern.slf4j.Slf4j;
+import org.jetlinks.core.utils.Reactors;
 import org.jetlinks.community.network.tcp.parser.PayloadParser;
-import reactor.core.publisher.EmitterProcessor;
 import reactor.core.publisher.Flux;
-import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Sinks;
 
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
 /**
- * <pre>
- * PipePayloadParser parser = new PipePayloadParser();
- * parser.fixed(4)
- *       .handler(buffer -> {
- *            int len = BytesUtils.highBytes2Int(buffer.getBytes());
- *            parser.fixed(len);
+ * <pre>{@code
+ * PipePayloadParser payloadParser =
+ * //先读取4个字节
+ * new PipePayloadParser()
+ *       .fixed(4)
+ *        //第一次读取数据
+ *       .handler((buffer,parser) -> {
+ *            //4字节转为int,表示接下来要读取的包长度
+ *            int len = buffer.getInt(0);
+ *            parser
+ *              .result(buffer) //将已读取的4字节设置到结果中
+ *              .fixed(len);//设置接下来要读取的字节长度
  *         })
- *       .handler(buffer -> parser.result(buffer.toString("UTF-8")).complete());
- * </pre>
+ *         //第二次读取数据
+ *       .handler((buffer,parser) -> parser
+ *                .result(buffer) //设置结果
+ *                .complete() //完成本次读取,输出结果,开始下一次读取
+ *               );
+ * }</pre>
  */
 @Slf4j
 public class PipePayloadParser implements PayloadParser {
 
-    private final EmitterProcessor<Buffer> processor = EmitterProcessor.create(true);
+    private final static AtomicIntegerFieldUpdater<PipePayloadParser> CURRENT_PIPE =
+        AtomicIntegerFieldUpdater.newUpdater(PipePayloadParser.class, "currentPipe");
 
-    private final FluxSink<Buffer> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
+    private final Sinks.Many<Buffer> sink = Reactors.createMany();
 
-    private final List<Consumer<Buffer>> pipe = new CopyOnWriteArrayList<>();
+    private final List<BiConsumer<Buffer, PipePayloadParser>> pipe = new CopyOnWriteArrayList<>();
 
     private final List<Buffer> result = new CopyOnWriteArrayList<>();
 
@@ -42,7 +54,7 @@ public class PipePayloadParser implements PayloadParser {
 
     private Consumer<RecordParser> firstInit;
 
-    private final AtomicInteger currentPipe = new AtomicInteger();
+    private volatile int currentPipe;
 
     public Buffer newBuffer() {
         return Buffer.buffer();
@@ -56,11 +68,17 @@ public class PipePayloadParser implements PayloadParser {
         return result(Buffer.buffer(buffer));
     }
 
-    public PipePayloadParser handler(Consumer<Buffer> handler) {
+//    public PipePayloadParser handler(Consumer<Buffer> handler) {
+//
+//        return handler((payloadParser, buffer) -> handler.accept(buffer));
+//    }
+
+    public PipePayloadParser handler(BiConsumer<Buffer, PipePayloadParser> handler) {
         pipe.add(handler);
         return this;
     }
 
+
     public PipePayloadParser delimited(String delimited) {
         if (recordParser == null) {
             setParser(RecordParser.newDelimited(delimited));
@@ -90,22 +108,22 @@ public class PipePayloadParser implements PayloadParser {
         return this;
     }
 
-    private Consumer<Buffer> getNextHandler() {
-        int i = currentPipe.getAndIncrement();
+    private BiConsumer<Buffer, PipePayloadParser> getNextHandler() {
+        int i = CURRENT_PIPE.getAndIncrement(this);
         if (i < pipe.size()) {
             return pipe.get(i);
         }
-        currentPipe.set(0);
+        CURRENT_PIPE.set(this, 0);
         return pipe.get(0);
     }
 
     private void setParser(RecordParser parser) {
         this.recordParser = parser;
-        this.recordParser.handler(buffer -> getNextHandler().accept(buffer));
+        this.recordParser.handler(buffer -> getNextHandler().accept(buffer, this));
     }
 
     public PipePayloadParser complete() {
-        currentPipe.set(0);
+        CURRENT_PIPE.set(this, 0);
         if (recordParser != null) {
             firstInit.accept(recordParser);
         }
@@ -115,7 +133,7 @@ public class PipePayloadParser implements PayloadParser {
                 buffer.appendBuffer(buf);
             }
             this.result.clear();
-            sink.next(buffer);
+            sink.emitNext(buffer, Reactors.emitFailureHandler());
         }
         return this;
 
@@ -138,13 +156,13 @@ public class PipePayloadParser implements PayloadParser {
         }
         Buffer buf = directMapper.apply(buffer);
         if (null != buf) {
-            sink.next(buf);
+            sink.emitNext(buf, Reactors.emitFailureHandler());
         }
     }
 
     @Override
     public Flux<Buffer> handlePayload() {
-        return processor.map(Function.identity());
+        return sink.asFlux();
     }
 
     @Override
@@ -155,8 +173,8 @@ public class PipePayloadParser implements PayloadParser {
 
     @Override
     public void close() {
-        processor.onComplete();
-        currentPipe.set(0);
+        sink.tryEmitComplete();
+        CURRENT_PIPE.set(this, 0);
         this.result.clear();
     }
 

+ 32 - 5
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/strateies/ScriptPayloadParserBuilder.java

@@ -1,9 +1,9 @@
 package org.jetlinks.community.network.tcp.parser.strateies;
 
 import lombok.SneakyThrows;
-import org.apache.commons.codec.digest.DigestUtils;
 import org.hswebframework.expands.script.engine.DynamicScriptEngine;
 import org.hswebframework.expands.script.engine.DynamicScriptEngineFactory;
+import org.hswebframework.web.utils.DigestUtils;
 import org.jetlinks.community.ValueObject;
 import org.jetlinks.community.network.tcp.parser.PayloadParser;
 import org.jetlinks.community.network.tcp.parser.PayloadParserBuilderStrategy;
@@ -11,7 +11,28 @@ import org.jetlinks.community.network.tcp.parser.PayloadParserType;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.function.Supplier;
 
+/**
+ * 利用脚本引擎和{@link PipePayloadParser}来描述解析器
+ * <p>
+ * 在脚本中可以使用内置变量: parser,对应类型为:{@link PipePayloadParser},如:
+ *
+ * <pre>{@code
+ * parser.fixed(4)
+ *       .handler(function(buffer,parser){
+ *             var len = buffer.getInt(0);
+ *             parser.fixed(len).result(buffer);
+ *         })
+ *       .handler(function(buffer,parser){
+ *             parser.result(buffer)
+ *                    .complete();
+ *         });
+ * }</pre>
+ *
+ * @author zhouhao
+ * @since 1.0
+ */
 public class ScriptPayloadParserBuilder implements PayloadParserBuilderStrategy {
     @Override
     public PayloadParserType getType() {
@@ -20,21 +41,27 @@ public class ScriptPayloadParserBuilder implements PayloadParserBuilderStrategy
 
     @Override
     @SneakyThrows
-    public PayloadParser build(ValueObject config) {
+    public Supplier<PayloadParser> buildLazy(ValueObject config) {
         String script = config.getString("script")
-            .orElseThrow(() -> new IllegalArgumentException("script不能为空"));
+                              .orElseThrow(() -> new IllegalArgumentException("script不能为空"));
         String lang = config.getString("lang")
-            .orElseThrow(() -> new IllegalArgumentException("lang不能为空"));
+                            .orElseThrow(() -> new IllegalArgumentException("lang不能为空"));
 
         DynamicScriptEngine engine = DynamicScriptEngineFactory.getEngine(lang);
         if (engine == null) {
             throw new IllegalArgumentException("不支持的脚本:" + lang);
         }
-        PipePayloadParser parser = new PipePayloadParser();
         String id = DigestUtils.md5Hex(script);
         if (!engine.compiled(id)) {
             engine.compile(id, script);
         }
+        doCreateParser(id,engine);
+        return ()-> doCreateParser(id, engine);
+    }
+
+    @SneakyThrows
+    private PipePayloadParser doCreateParser(String id,DynamicScriptEngine engine){
+        PipePayloadParser parser = new PipePayloadParser();
         Map<String, Object> ctx = new HashMap<>();
         ctx.put("parser", parser);
         engine.execute(id, ctx).getIfSuccess();

+ 12 - 11
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/strateies/VertxPayloadParserBuilder.java

@@ -2,32 +2,31 @@ package org.jetlinks.community.network.tcp.parser.strateies;
 
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.parsetools.RecordParser;
+import org.jetlinks.core.utils.Reactors;
 import org.jetlinks.community.ValueObject;
 import org.jetlinks.community.network.tcp.parser.PayloadParser;
 import org.jetlinks.community.network.tcp.parser.PayloadParserBuilderStrategy;
 import org.jetlinks.community.network.tcp.parser.PayloadParserType;
-import reactor.core.publisher.EmitterProcessor;
 import reactor.core.publisher.Flux;
-import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Sinks;
 
-import java.util.function.Function;
 import java.util.function.Supplier;
 
 public abstract class VertxPayloadParserBuilder implements PayloadParserBuilderStrategy {
     @Override
     public abstract PayloadParserType getType();
 
-    protected abstract RecordParser createParser(ValueObject config);
+    protected abstract Supplier<RecordParser> createParser(ValueObject config);
 
     @Override
-    public PayloadParser build(ValueObject config) {
-        return new RecordPayloadParser(() -> createParser(config));
+    public Supplier<PayloadParser> buildLazy(ValueObject config) {
+        Supplier<RecordParser> parser = createParser(config);
+        return () -> new RecordPayloadParser(parser);
     }
 
     static class RecordPayloadParser implements PayloadParser {
         private final Supplier<RecordParser> recordParserSupplier;
-        private final EmitterProcessor<Buffer> processor = EmitterProcessor.create(false);
-        private final FluxSink<Buffer> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
+        private final Sinks.Many<Buffer> sink = Reactors.createMany();
 
         private RecordParser recordParser;
 
@@ -43,18 +42,20 @@ public abstract class VertxPayloadParserBuilder implements PayloadParserBuilderS
 
         @Override
         public Flux<Buffer> handlePayload() {
-            return processor.map(Function.identity());
+            return sink.asFlux();
         }
 
         @Override
         public void close() {
-            processor.onComplete();
+            sink.emitComplete(Reactors.emitFailureHandler());
         }
 
         @Override
         public void reset() {
             this.recordParser = recordParserSupplier.get();
-            this.recordParser.handler(sink::next);
+            this.recordParser.handler(payload -> {
+                sink.emitNext(payload, Reactors.emitFailureHandler());
+            });
         }
     }
 

+ 5 - 2
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/TcpServerProvider.java

@@ -8,6 +8,7 @@ import org.hswebframework.web.bean.FastBeanCopier;
 import org.jetlinks.community.network.*;
 import org.jetlinks.community.network.security.CertificateManager;
 import org.jetlinks.community.network.security.VertxKeyCertTrustOptions;
+import org.jetlinks.community.network.tcp.parser.PayloadParser;
 import org.jetlinks.community.network.tcp.parser.PayloadParserBuilder;
 import org.jetlinks.core.metadata.ConfigMetadata;
 import org.springframework.stereotype.Component;
@@ -18,6 +19,7 @@ import javax.annotation.Nullable;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.function.Supplier;
 
 /**
  * TCP服务提供商
@@ -69,8 +71,9 @@ public class TcpServerProvider implements NetworkProvider<TcpServerProperties> {
             instances.add(vertx.createNetServer(properties.getOptions()));
         }
         // 根据解析类型配置数据解析器
-        payloadParserBuilder.build(properties.getParserType(), properties);
-        tcpServer.setParserSupplier(() -> payloadParserBuilder.build(properties.getParserType(), properties));
+        Supplier<PayloadParser> parser= payloadParserBuilder.build(properties.getParserType(), properties);
+        parser.get();
+        tcpServer.setParserSupplier(parser);
         tcpServer.setServer(instances);
         tcpServer.setKeepAliveTimeout(properties.getLong("keepAliveTimeout", Duration.ofMinutes(10).toMillis()));
         // 针对JVM做的多路复用优化

+ 22 - 17
jetlinks-components/network-component/tcp-component/src/test/java/org/jetlinks/community/network/tcp/parser/strateies/PipePayloadParserTest.java

@@ -1,5 +1,6 @@
 package org.jetlinks.community.network.tcp.parser.strateies;
 
+import io.netty.buffer.Unpooled;
 import io.vertx.core.buffer.Buffer;
 import org.jetlinks.community.network.utils.BytesUtils;
 import org.junit.jupiter.api.Test;
@@ -18,25 +19,30 @@ class PipePayloadParserTest {
         PipePayloadParser parser = new PipePayloadParser();
 
         parser.fixed(4)
-                .handler(buffer -> {
-                    int len = BytesUtils.lowBytesToInt(buffer.getBytes());
+                .handler((buffer,p) -> {
+                    int len = buffer.getInt(0);
                     parser.fixed(len);
                 })
-                .handler(buffer -> parser.result(buffer).complete());
+                .handler((buffer,p)  -> parser.result(buffer).complete());
 
 
         parser.handlePayload()
                 .doOnSubscribe(sb -> {
                     Mono.delay(Duration.ofMillis(100))
                             .subscribe(r -> {
-                                Buffer buffer = Buffer.buffer(BytesUtils.toLowBytes(5));
-                                buffer.appendString("1234");
-                                parser.handle(buffer);
-                                parser.handle(Buffer.buffer("5"));
 
-                                parser.handle(Buffer.buffer(new byte[]{0, 0}));
-                                parser.handle(Buffer.buffer(new byte[]{0, 6}).appendString("12"));
-                                parser.handle(Buffer.buffer("3456"));
+                                {
+                                    Buffer buffer = Buffer.buffer(Unpooled.buffer().writeInt(5));
+                                    buffer.appendString("1234");
+                                    parser.handle(buffer);
+                                    parser.handle(Buffer.buffer("5"));
+                                }
+                                {
+                                    Buffer buffer = Buffer.buffer(Unpooled.buffer().writeInt(6));
+                                    buffer.appendString("1234");
+                                    parser.handle(buffer);
+                                    parser.handle(Buffer.buffer("56"));
+                                }
                             });
                 })
                 .take(2)
@@ -52,20 +58,19 @@ class PipePayloadParserTest {
         PipePayloadParser parser = new PipePayloadParser();
 
         parser.fixed(4)
-                .handler(buffer -> {
-                    int len = BytesUtils.highBytesToInt(buffer.getBytes());
-                    parser.fixed(len);
+                .handler((buffer,p) -> {
+                    int len = buffer.getInt(0);
+                    p.fixed(len);
                 })
-                .handler(buffer -> {
-                    parser.result(buffer)
-                            .complete();
+                .handler((buffer,p) -> {
+                    p.result(buffer).complete();
                 });
 
         byte[] payload = "hello".getBytes();
 
         Buffer buffer = Buffer.buffer(payload.length + 4);
 
-        buffer.appendBytes(BytesUtils.toHighBytes(payload.length));
+        buffer.appendBuffer(Buffer.buffer(Unpooled.buffer().writeInt(payload.length)));
         buffer.appendBytes(payload);
 
         parser.handlePayload()