Prechádzať zdrojové kódy

优化网络组件相关逻辑

zhou-hao 5 rokov pred
rodič
commit
6261b1c818

+ 1 - 1
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/socket/MessagingRequest.java

@@ -19,6 +19,6 @@ public class MessagingRequest {
 
 
     public enum Type{
-        pub,sub,unsub
+        pub,sub,unsub,ping
     }
 }

+ 3 - 0
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/socket/WebSocketMessagingHandler.java

@@ -61,6 +61,9 @@ public class WebSocketMessagingHandler implements WebSocketHandler {
                 .doOnNext(message -> {
                     try {
                         MessagingRequest request = JSON.parseObject(message.getPayloadAsText(), MessagingRequest.class);
+                        if (request == null || request.getType() == MessagingRequest.Type.ping) {
+                            return;
+                        }
                         if (StringUtils.isEmpty(request.getId())) {
                             session
                                 .send(Mono.just(session.textMessage(JSON.toJSONString(

+ 125 - 90
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java

@@ -3,6 +3,7 @@ package org.jetlinks.community.network.mqtt.gateway.device;
 import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.hswebframework.web.logger.ReactiveLogger;
 import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
 import org.jetlinks.community.gateway.monitor.GatewayMonitors;
 import org.jetlinks.community.gateway.monitor.MonitorSupportDeviceGateway;
@@ -10,11 +11,9 @@ import org.jetlinks.core.device.AuthenticationResponse;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.device.MqttAuthenticationRequest;
+import org.jetlinks.core.message.CommonDeviceMessage;
 import org.jetlinks.core.message.Message;
-import org.jetlinks.core.message.codec.DefaultTransport;
-import org.jetlinks.core.message.codec.EncodedMessage;
-import org.jetlinks.core.message.codec.FromDeviceMessageContext;
-import org.jetlinks.core.message.codec.Transport;
+import org.jetlinks.core.message.codec.*;
 import org.jetlinks.core.server.session.DeviceSession;
 import org.jetlinks.core.server.session.DeviceSessionManager;
 import org.jetlinks.community.gateway.DeviceGateway;
@@ -30,6 +29,7 @@ import reactor.core.publisher.EmitterProcessor;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple3;
 import reactor.util.function.Tuples;
 
 import javax.annotation.Nonnull;
@@ -41,19 +41,27 @@ import java.util.function.Function;
 class MqttServerDeviceGateway implements DeviceGateway , MonitorSupportDeviceGateway {
 
     @Getter
-    private String id;
+    private final String id;
 
-    private DeviceRegistry registry;
+    private final DeviceRegistry registry;
 
-    private DeviceSessionManager sessionManager;
+    private final DeviceSessionManager sessionManager;
 
-    private MqttServer mqttServer;
+    private final MqttServer mqttServer;
 
-    private DecodedClientMessageHandler messageHandler;
+    private final DecodedClientMessageHandler messageHandler;
 
-    private DeviceGatewayMonitor gatewayMonitor;
+    private final DeviceGatewayMonitor gatewayMonitor;
 
-    private LongAdder counter = new LongAdder();
+    private final LongAdder counter = new LongAdder();
+
+    private final EmitterProcessor<Message> messageProcessor = EmitterProcessor.create(false);
+
+    private final FluxSink<Message> sink = messageProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
+
+    private final AtomicBoolean started = new AtomicBoolean();
+
+    private Disposable disposable;
 
     public MqttServerDeviceGateway(String id,
                                    DeviceRegistry registry,
@@ -68,14 +76,6 @@ class MqttServerDeviceGateway implements DeviceGateway , MonitorSupportDeviceGat
         this.messageHandler = messageHandler;
     }
 
-    private EmitterProcessor<Message> messageProcessor = EmitterProcessor.create(false);
-
-    private FluxSink<Message> sink = messageProcessor.sink();
-
-    private AtomicBoolean started = new AtomicBoolean();
-
-    private Disposable disposable;
-
     @Override
     public long totalConnection() {
         return counter.sum();
@@ -94,46 +94,60 @@ class MqttServerDeviceGateway implements DeviceGateway , MonitorSupportDeviceGat
                 }
                 return started.get();
             })
-            .flatMap(con -> Mono.justOrEmpty(con.getAuth())
-                //没有认证信息,则拒绝连接.
-                .switchIfEmpty(Mono.fromRunnable(() -> {
-                    con.reject(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
-                    gatewayMonitor.rejected();
-                }))
-                .flatMap(auth ->
-                    registry.getDevice(con.getClientId())
-                        .flatMap(device -> device
-                            .authenticate(new MqttAuthenticationRequest(con.getClientId(), auth.getUsername(), auth.getPassword(), getTransport()))
-                            .switchIfEmpty(Mono.fromRunnable(() -> con.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD)))
-                            .flatMap(resp -> {
-                                String deviceId = StringUtils.isEmpty(resp.getDeviceId()) ? device.getDeviceId() : resp.getDeviceId();
-                                //认证返回了新的设备ID,则使用新的设备
-                                if (!deviceId.equals(device.getDeviceId())) {
-                                    return registry
-                                        .getDevice(deviceId)
-                                        .map(operator -> Tuples.of(operator, resp, con));
-                                }
-                                return Mono.just(Tuples.of(device, resp, con));
-                            })
-                        ))
-                //设备注册信息不存在,拒绝连接
-                .switchIfEmpty(Mono.fromRunnable(() -> {
-                    con.reject(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
-                    gatewayMonitor.rejected();
-                }))
-                .onErrorResume((err) -> Mono.fromRunnable(() -> {
-                    gatewayMonitor.rejected();
-                    con.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
-                    log.error("MQTT连接认证[{}]失败", con.getClientId(), err);
-                })))
-            .flatMap(tuple3 -> {
-                counter.increment();
-                DeviceOperator device = tuple3.getT1();
-                AuthenticationResponse resp = tuple3.getT2();
-                MqttConnection con = tuple3.getT3();
+            .flatMap(this::handleConnection)
+            .flatMap(tuple3 -> handleAuthResponse(tuple3.getT1(), tuple3.getT2(), tuple3.getT3()))
+            .onErrorContinue((err, obj) -> log.error("处理MQTT连接失败", err))
+            .subscriberContext(ReactiveLogger.start("network", mqttServer.getId()))
+            .subscribe(tp -> handleAcceptedMqttConnection(tp.getT1(), tp.getT2(), tp.getT3()));
+
+    }
+
+    //处理连接,并进行认证
+    private Mono<Tuple3<DeviceOperator,AuthenticationResponse,MqttConnection>> handleConnection(MqttConnection connection) {
+        return Mono.justOrEmpty(connection.getAuth())
+            //没有认证信息,则拒绝连接.
+            .switchIfEmpty(Mono.fromRunnable(() -> {
+                connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
+                gatewayMonitor.rejected();
+            }))
+            .flatMap(auth ->
+                registry.getDevice(connection.getClientId())
+                    .flatMap(device -> device
+                        .authenticate(new MqttAuthenticationRequest(connection.getClientId(), auth.getUsername(), auth.getPassword(), getTransport()))
+                        .switchIfEmpty(Mono.fromRunnable(() -> connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD)))
+                        .flatMap(resp -> {
+                            String deviceId = StringUtils.isEmpty(resp.getDeviceId()) ? device.getDeviceId() : resp.getDeviceId();
+                            //认证返回了新的设备ID,则使用新的设备
+                            if (!deviceId.equals(device.getDeviceId())) {
+                                return registry
+                                    .getDevice(deviceId)
+                                    .map(operator -> Tuples.of(operator, resp, connection));
+                            }
+                            return Mono.just(Tuples.of(device, resp, connection));
+                        })
+                    ))
+            //设备注册信息不存在,拒绝连接
+            .switchIfEmpty(Mono.fromRunnable(() -> {
+                connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
+                gatewayMonitor.rejected();
+            }))
+            .onErrorResume((err) -> Mono.fromRunnable(() -> {
+                gatewayMonitor.rejected();
+                connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
+                log.error("MQTT连接认证[{}]失败", connection.getClientId(), err);
+            }));
+    }
+
+    //处理认证结果
+    private Mono<Tuple3<MqttConnection, DeviceOperator, DeviceSession>> handleAuthResponse(DeviceOperator device,
+                                                                                           AuthenticationResponse resp,
+                                                                                           MqttConnection connection) {
+        return Mono
+            .fromCallable(() -> {
                 String deviceId = device.getDeviceId();
                 if (resp.isSuccess()) {
-                    DeviceSession session = new MqttConnectionSession(deviceId, device, getTransport(), con) {
+                    counter.increment();
+                    DeviceSession session = new MqttConnectionSession(deviceId, device, getTransport(), connection) {
                         @Override
                         public Mono<Boolean> send(EncodedMessage encodedMessage) {
                             return super.send(encodedMessage).doOnSuccess(s -> gatewayMonitor.sentMessage());
@@ -143,53 +157,74 @@ class MqttServerDeviceGateway implements DeviceGateway , MonitorSupportDeviceGat
                     gatewayMonitor.connected();
                     gatewayMonitor.totalConnection(counter.sum());
                     //监听断开连接
-                    con.onClose(conn -> {
+                    connection.onClose(conn -> {
                         counter.decrement();
                         sessionManager.unregister(deviceId);
                         gatewayMonitor.disconnected();
                         gatewayMonitor.totalConnection(counter.sum());
                     });
-                    return Mono.just(Tuples.of(con.accept(), device, session));
+                    return Tuples.of(connection.accept(), device, session);
                 } else {
+                    connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
                     gatewayMonitor.rejected();
                     log.warn("MQTT客户端认证[{}]失败:{}", deviceId, resp.getMessage());
                 }
-                return Mono.empty();
+                return null;
             })
-            .onErrorContinue((err,obj) -> log.error("处理MQTT连接失败", err))
-            .subscribe(tp -> tp.getT1()
-                .handleMessage()
-                .filter(pb -> started.get())
-                .takeWhile(pub -> disposable != null)
-                .doOnNext(msg -> gatewayMonitor.receivedMessage())
-                .flatMap(publishing -> tp.getT2()
-                    .getProtocol()
-                    .flatMap(protocol -> protocol.getMessageCodec(getTransport()))
-                    .flatMapMany(codec -> codec.decode(new FromDeviceMessageContext() {
-                        @Override
-                        public DeviceSession getSession() {
-                            return tp.getT3();
-                        }
+            .onErrorResume(error -> Mono.fromRunnable(() -> {
+                log.error(error.getMessage(), error);
+                gatewayMonitor.rejected();
+                connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
+            }));
+    }
 
-                        @Override
-                        @Nonnull
-                        public EncodedMessage getMessage() {
-                            return publishing.getMessage();
-                        }
-                    }))
-                    .flatMap(msg -> {
-                        if (messageProcessor.hasDownstreams()) {
-                            sink.next(msg);
-                        }
-                        return messageHandler.handleMessage(tp.getT2(), msg);
-                    })
-                    .onErrorResume((err) ->
-                        Mono.fromRunnable(() -> log.error("处理MQTT连接[{}]消息失败:{}", tp.getT2().getDeviceId(), publishing.getMessage(), err))
-                    ))
-                .subscribe());
+    //处理已经建立连接的MQTT连接
+    private void handleAcceptedMqttConnection(MqttConnection connection, DeviceOperator operator, DeviceSession session) {
 
+        connection.handleMessage()
+            .filter(pb -> started.get())
+            .takeWhile(pub -> disposable != null)
+            .doOnNext(msg -> gatewayMonitor.receivedMessage())
+            .flatMap(publishing ->
+                this.decodeAndHandleMessage(operator, session, publishing.getMessage())
+                    //ack
+                    .doOnSuccess(s -> publishing.acknowledge())
+            )
+            //合并遗言消息
+            .mergeWith(
+                Mono.justOrEmpty(connection.getWillMessage())
+                    .flatMap(mqttMessage -> this.decodeAndHandleMessage(operator, session, mqttMessage))
+            )
+            .subscriberContext(ReactiveLogger.start("network", mqttServer.getId()))
+            .subscribe();
     }
 
+    //解码消息并处理
+    private Mono<Void> decodeAndHandleMessage(DeviceOperator operator,
+                                              DeviceSession session,
+                                              MqttMessage message) {
+        return operator
+            .getProtocol()
+            .flatMap(protocol -> protocol.getMessageCodec(getTransport()))
+            .flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(session, message)))
+            .flatMap(msg -> {
+                if(msg instanceof CommonDeviceMessage){
+                    CommonDeviceMessage _msg= ((CommonDeviceMessage) msg);
+                    if(StringUtils.isEmpty(_msg.getDeviceId())){
+                        _msg.setDeviceId(operator.getDeviceId());
+                    }
+                }
+                if (messageProcessor.hasDownstreams()) {
+                    sink.next(msg);
+                }
+                //丢给默认的消息处理逻辑
+                return messageHandler.handleMessage(operator, msg);
+            })
+            .then()
+            .doOnEach(ReactiveLogger.onError(err -> log.error("处理MQTT连接[{}]消息失败:{}", operator.getDeviceId(), message, err)))
+            .onErrorResume((err) -> Mono.empty())//发生错误不中断流
+            ;
+    }
     @Override
     public Transport getTransport() {
         return DefaultTransport.MQTT;

+ 5 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/TcpClient.java

@@ -50,4 +50,9 @@ public interface TcpClient extends Network {
      * @param timeout 超时时间
      */
     void setKeepAliveTimeout(Duration timeout);
+
+    /**
+     * 重置
+     */
+    void reset();
 }

+ 78 - 36
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClient.java

@@ -7,10 +7,14 @@ import io.vertx.core.net.SocketAddress;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Hex;
 import org.jetlinks.community.network.DefaultNetworkType;
 import org.jetlinks.community.network.NetworkType;
 import org.jetlinks.community.network.tcp.TcpMessage;
 import org.jetlinks.community.network.tcp.parser.PayloadParser;
+import reactor.core.publisher.EmitterProcessor;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.Mono;
 
 import java.net.InetSocketAddress;
@@ -19,13 +23,14 @@ import java.time.Duration;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Function;
 
 @Slf4j
-public class VertxTcpClient extends AbstractTcpClient {
+public class VertxTcpClient implements TcpClient {
 
     public volatile NetClient client;
 
-    public volatile NetSocket socket;
+    public NetSocket socket;
 
     volatile PayloadParser payloadParser;
 
@@ -39,6 +44,10 @@ public class VertxTcpClient extends AbstractTcpClient {
 
     private final List<Runnable> disconnectListener = new CopyOnWriteArrayList<>();
 
+    private final EmitterProcessor<TcpMessage> processor = EmitterProcessor.create(false);
+
+    private final FluxSink<TcpMessage> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
+
     @Override
     public void keepAlive() {
         lastKeepAliveTime = System.currentTimeMillis();
@@ -49,6 +58,13 @@ public class VertxTcpClient extends AbstractTcpClient {
         keepAliveTimeoutMs = timeout.toMillis();
     }
 
+    @Override
+    public void reset() {
+        if (null != payloadParser) {
+            payloadParser.reset();
+        }
+    }
+
     @Override
     public boolean isAlive() {
         return socket != null && (keepAliveTimeoutMs < 0 || System.currentTimeMillis() - lastKeepAliveTime < keepAliveTimeoutMs);
@@ -63,6 +79,20 @@ public class VertxTcpClient extends AbstractTcpClient {
         this.id = id;
     }
 
+    protected void received(TcpMessage message) {
+        if (processor.getPending() > processor.getBufferSize() / 2) {
+            log.warn("tcp [{}] message pending {} ,drop message:{}", processor.getPending(), getRemoteAddress(), message.toString());
+            return;
+        }
+        sink.next(message);
+    }
+
+    @Override
+    public Flux<TcpMessage> subscribe() {
+        return processor
+            .map(Function.identity());
+    }
+
     private void execute(Runnable runnable) {
         try {
             runnable.run();
@@ -88,17 +118,19 @@ public class VertxTcpClient extends AbstractTcpClient {
     @Override
     public void shutdown() {
         log.debug("tcp client [{}] disconnect", getId());
-        if (null != client) {
-            execute(client::close);
-            client = null;
-        }
-        if (null != socket) {
-            execute(socket::close);
-            socket = null;
-        }
-        if (null != payloadParser) {
-            execute(payloadParser::close);
-            payloadParser = null;
+        synchronized (this) {
+            if (null != client) {
+                execute(client::close);
+                client = null;
+            }
+            if (null != socket) {
+                execute(socket::close);
+                this.socket = null;
+            }
+            if (null != payloadParser) {
+                execute(payloadParser::close);
+                payloadParser = null;
+            }
         }
         for (Runnable runnable : disconnectListener) {
             execute(runnable);
@@ -115,33 +147,42 @@ public class VertxTcpClient extends AbstractTcpClient {
     }
 
     public void setRecordParser(PayloadParser payloadParser) {
-        if (null != this.payloadParser && this.payloadParser != payloadParser) {
-            this.payloadParser.close();
+        synchronized (this) {
+            if (null != this.payloadParser && this.payloadParser != payloadParser) {
+                this.payloadParser.close();
+            }
+            this.payloadParser = payloadParser;
+            this.payloadParser
+                .handlePayload()
+                .onErrorContinue((err, res) -> {
+                    log.error(err.getMessage(), err);
+                })
+                .subscribe(buffer -> received(new TcpMessage(buffer.getByteBuf())));
         }
-        this.payloadParser = payloadParser;
-        this.payloadParser
-            .handlePayload()
-            .onErrorContinue((err, res) -> log.error(err.getMessage(), err))
-            .subscribe(buffer -> received(new TcpMessage(buffer.getByteBuf())));
     }
 
     public void setSocket(NetSocket socket) {
-        Objects.requireNonNull(payloadParser);
-        if (this.socket != null && this.socket != socket) {
-            this.socket.close();
-        }
-        this.socket = socket;
-        this.socket.closeHandler(v -> {
-            shutdown();
-        });
-        this.socket.handler(buffer -> {
-            keepAlive();
-            payloadParser.handle(buffer);
-            if (this.socket != socket) {
-                log.warn("tcp client [{}] memory leak ", socket.remoteAddress());
-                socket.close();
+        synchronized (this) {
+            Objects.requireNonNull(payloadParser);
+            if (this.socket != null && this.socket != socket) {
+                this.socket.close();
             }
-        });
+            this.socket = socket
+                .closeHandler(v -> shutdown())
+                .handler(buffer -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug("handle tcp client[{}] payload:[{}]",
+                            socket.remoteAddress(),
+                            Hex.encodeHexString(buffer.getBytes()));
+                    }
+                    keepAlive();
+                    payloadParser.handle(buffer);
+                    if (this.socket != socket) {
+                        log.warn("tcp client [{}] memory leak ", socket.remoteAddress());
+                        socket.close();
+                    }
+                });
+        }
     }
 
     @Override
@@ -151,7 +192,8 @@ public class VertxTcpClient extends AbstractTcpClient {
                 sink.error(new SocketException("socket closed"));
                 return;
             }
-            socket.write(Buffer.buffer(message.getPayload()), r -> {
+            Buffer buffer = Buffer.buffer(message.getPayload());
+            socket.write(buffer, r -> {
                 keepAlive();
                 if (r.succeeded()) {
                     sink.success(true);

+ 4 - 2
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java

@@ -223,8 +223,10 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
                             log.error("处理TCP[{}]消息失败:\n{}",
                                 clientAddr,
                                 tcpMessage
-                                , err))))
-                    .onErrorResume((err) -> Mono.empty())
+                                , err)))
+                        .onErrorResume((err) -> Mono.fromRunnable(client::reset))
+                    )
+                    .onErrorResume((err) -> Mono.fromRunnable(client::reset))
                     .subscriberContext(ReactiveLogger.start("network", tcpServer.getId()))
                     .subscribe();
             }));

+ 5 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/PayloadParser.java

@@ -32,4 +32,9 @@ public interface PayloadParser {
      * 关闭以释放相关资源
      */
     void close();
+
+    /**
+     * 重置规则
+     */
+    default void reset(){}
 }

+ 6 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/strateies/PipePayloadParser.java

@@ -139,6 +139,12 @@ public class PipePayloadParser implements PayloadParser {
         return processor.map(Function.identity());
     }
 
+    @Override
+    public void reset() {
+        this.result.clear();
+        complete();
+    }
+
     @Override
     public void close() {
         processor.onComplete();

+ 18 - 10
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/strateies/VertxPayloadParserBuilder.java

@@ -9,8 +9,10 @@ import org.jetlinks.community.network.tcp.parser.PayloadParserBuilderStrategy;
 import org.jetlinks.community.network.tcp.parser.PayloadParserType;
 import reactor.core.publisher.EmitterProcessor;
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
 
 import java.util.function.Function;
+import java.util.function.Supplier;
 
 public abstract class VertxPayloadParserBuilder implements PayloadParserBuilderStrategy {
     @Override
@@ -20,18 +22,19 @@ public abstract class VertxPayloadParserBuilder implements PayloadParserBuilderS
 
     @Override
     public PayloadParser build(ValueObject config) {
-        return new RecordPayloadParser(createParser(config));
+        return new RecordPayloadParser(() -> createParser(config));
     }
 
-    class RecordPayloadParser implements PayloadParser {
-        RecordParser recordParser;
-        EmitterProcessor<Buffer> processor = EmitterProcessor.create(false);
+    static class RecordPayloadParser implements PayloadParser {
+        private final Supplier<RecordParser> recordParserSupplier;
+        private final EmitterProcessor<Buffer> processor = EmitterProcessor.create(false);
+        private final FluxSink<Buffer> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
 
-        public RecordPayloadParser(RecordParser recordParser) {
-            this.recordParser = recordParser;
-            this.recordParser.handler(buffer -> {
-                processor.onNext(buffer);
-            });
+        private RecordParser recordParser;
+
+        public RecordPayloadParser(Supplier<RecordParser> recordParserSupplier) {
+            this.recordParserSupplier = recordParserSupplier;
+            reset();
         }
 
         @Override
@@ -48,7 +51,12 @@ public abstract class VertxPayloadParserBuilder implements PayloadParserBuilderS
         public void close() {
             processor.onComplete();
         }
-    }
 
+        @Override
+        public void reset() {
+            this.recordParser = recordParserSupplier.get();
+            this.recordParser.handler(sink::next);
+        }
+    }
 
 }