ソースを参照

使用parallel线程池处理数据

zhouhao 2 年 前
コミット
74b2837435

+ 2 - 0
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttClientDeviceGateway.java

@@ -21,6 +21,7 @@ import org.jetlinks.core.message.codec.Transport;
 import org.jetlinks.supports.server.DecodedClientMessageHandler;
 import reactor.core.Disposable;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
@@ -117,6 +118,7 @@ public class MqttClientDeviceGateway extends AbstractDeviceGateway {
                             );
                     })
                     .then()
+                    .subscribeOn(Schedulers.parallel())
                     .onErrorResume((err) -> {
                         log.error("处理MQTT消息失败:{}", mqttMessage, err);
                         return Mono.empty();