|
@@ -34,10 +34,10 @@ public class TcpClientNode extends CommonExecutableRuleNodeFactoryStrategy<TcpCl
|
|
return data -> clientManager.<TcpClient>getNetwork(DefaultNetworkType.TCP_CLIENT,config.getClientId())
|
|
return data -> clientManager.<TcpClient>getNetwork(DefaultNetworkType.TCP_CLIENT,config.getClientId())
|
|
.flatMapMany(client -> RuleDataCodecs
|
|
.flatMapMany(client -> RuleDataCodecs
|
|
.getCodec(TcpMessage.class)
|
|
.getCodec(TcpMessage.class)
|
|
- .map(codec -> codec.decode(data, config.getSendPayloadType())
|
|
|
|
|
|
+ .map(codec -> codec.decode(data, config.getPayloadType())
|
|
.cast(TcpMessage.class)
|
|
.cast(TcpMessage.class)
|
|
.switchIfEmpty(Mono.fromRunnable(() -> context.logger().warn("can not decode rule data to tcp message:{}", data))))
|
|
.switchIfEmpty(Mono.fromRunnable(() -> context.logger().warn("can not decode rule data to tcp message:{}", data))))
|
|
- .orElseGet(() -> Flux.just(new TcpMessage(config.getSendPayloadType().write(data.getData()))))
|
|
|
|
|
|
+ .orElseGet(() -> Flux.just(new TcpMessage(config.getPayloadType().write(data.getData()))))
|
|
.flatMap(client::send)
|
|
.flatMap(client::send)
|
|
.all(r-> r))
|
|
.all(r-> r))
|
|
;
|
|
;
|
|
@@ -50,9 +50,9 @@ public class TcpClientNode extends CommonExecutableRuleNodeFactoryStrategy<TcpCl
|
|
context.onStop( clientManager.<TcpClient>getNetwork(DefaultNetworkType.TCP_CLIENT,config.getClientId())
|
|
context.onStop( clientManager.<TcpClient>getNetwork(DefaultNetworkType.TCP_CLIENT,config.getClientId())
|
|
.switchIfEmpty(Mono.fromRunnable(() -> context.logger().error("tcp client {} not found", config.getClientId())))
|
|
.switchIfEmpty(Mono.fromRunnable(() -> context.logger().error("tcp client {} not found", config.getClientId())))
|
|
.flatMapMany(TcpClient::subscribe)
|
|
.flatMapMany(TcpClient::subscribe)
|
|
- .doOnNext(msg -> context.logger().info("received tcp client message:{}", config.getSubPayloadType().read(msg.getPayload())))
|
|
|
|
|
|
+ .doOnNext(msg -> context.logger().info("received tcp client message:{}", config.getPayloadType().read(msg.getPayload())))
|
|
.map(r -> RuleDataCodecs.getCodec(TcpMessage.class)
|
|
.map(r -> RuleDataCodecs.getCodec(TcpMessage.class)
|
|
- .map(codec -> codec.encode(r, config.getSubPayloadType()))
|
|
|
|
|
|
+ .map(codec -> codec.encode(r, config.getPayloadType()))
|
|
.orElse(r.getPayload()))
|
|
.orElse(r.getPayload()))
|
|
.onErrorContinue((err, obj) -> {
|
|
.onErrorContinue((err, obj) -> {
|
|
context.logger().error("consume tcp message error", err);
|
|
context.logger().error("consume tcp message error", err);
|