ソースを参照

优化线程池逻辑

zhouhao 2 年 前
コミット
4c18ef264f

+ 2 - 18
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java

@@ -1,13 +1,9 @@
 package org.jetlinks.community.network.mqtt.gateway.device;
 package org.jetlinks.community.network.mqtt.gateway.device;
 
 
 import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
 import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
-import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.logger.ReactiveLogger;
 import org.hswebframework.web.logger.ReactiveLogger;
 import org.jetlinks.community.gateway.AbstractDeviceGateway;
 import org.jetlinks.community.gateway.AbstractDeviceGateway;
-import org.jetlinks.community.gateway.DeviceGateway;
-import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
-import org.jetlinks.community.gateway.monitor.GatewayMonitors;
 import org.jetlinks.community.gateway.monitor.MonitorSupportDeviceGateway;
 import org.jetlinks.community.gateway.monitor.MonitorSupportDeviceGateway;
 import org.jetlinks.community.network.DefaultNetworkType;
 import org.jetlinks.community.network.DefaultNetworkType;
 import org.jetlinks.community.network.NetworkType;
 import org.jetlinks.community.network.NetworkType;
@@ -19,32 +15,23 @@ import org.jetlinks.community.utils.SystemUtils;
 import org.jetlinks.core.ProtocolSupport;
 import org.jetlinks.core.ProtocolSupport;
 import org.jetlinks.core.device.*;
 import org.jetlinks.core.device.*;
 import org.jetlinks.core.device.session.DeviceSessionManager;
 import org.jetlinks.core.device.session.DeviceSessionManager;
-import org.jetlinks.core.message.CommonDeviceMessage;
-import org.jetlinks.core.message.CommonDeviceMessageReply;
 import org.jetlinks.core.message.DeviceMessage;
 import org.jetlinks.core.message.DeviceMessage;
-import org.jetlinks.core.message.Message;
 import org.jetlinks.core.message.codec.DefaultTransport;
 import org.jetlinks.core.message.codec.DefaultTransport;
 import org.jetlinks.core.message.codec.FromDeviceMessageContext;
 import org.jetlinks.core.message.codec.FromDeviceMessageContext;
 import org.jetlinks.core.message.codec.MqttMessage;
 import org.jetlinks.core.message.codec.MqttMessage;
 import org.jetlinks.core.message.codec.Transport;
 import org.jetlinks.core.message.codec.Transport;
 import org.jetlinks.core.server.session.DeviceSession;
 import org.jetlinks.core.server.session.DeviceSession;
 import org.jetlinks.core.server.session.KeepOnlineSession;
 import org.jetlinks.core.server.session.KeepOnlineSession;
-import org.jetlinks.core.server.session.ReplaceableDeviceSession;
 import org.jetlinks.core.trace.DeviceTracer;
 import org.jetlinks.core.trace.DeviceTracer;
 import org.jetlinks.core.trace.FluxTracer;
 import org.jetlinks.core.trace.FluxTracer;
-import org.jetlinks.core.trace.MonoTracer;
 import org.jetlinks.supports.server.DecodedClientMessageHandler;
 import org.jetlinks.supports.server.DecodedClientMessageHandler;
 import org.springframework.util.StringUtils;
 import org.springframework.util.StringUtils;
 import reactor.core.Disposable;
 import reactor.core.Disposable;
-import reactor.core.publisher.EmitterProcessor;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Flux;
-import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
 import reactor.util.function.Tuple3;
 import reactor.util.function.Tuple3;
 import reactor.util.function.Tuples;
 import reactor.util.function.Tuples;
 
 
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.Function;
 import java.util.function.Function;
 
 
@@ -114,7 +101,6 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway implements MonitorSu
                 }
                 }
                 return isStarted();
                 return isStarted();
             })
             })
-            .publishOn(Schedulers.parallel())
             //处理mqtt连接请求
             //处理mqtt连接请求
             .flatMap(this::handleConnection)
             .flatMap(this::handleConnection)
             //处理认证结果
             //处理认证结果
@@ -169,8 +155,7 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway implements MonitorSu
                 monitor.rejected();
                 monitor.rejected();
                 //应答SERVER_UNAVAILABLE
                 //应答SERVER_UNAVAILABLE
                 connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
                 connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
-            }))
-            .subscribeOn(Schedulers.parallel());
+            }));
     }
     }
 
 
     //处理认证结果
     //处理认证结果
@@ -306,8 +291,7 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway implements MonitorSu
                                 .toJSONString())))
                                 .toJSONString())))
             //发生错误不中断流
             //发生错误不中断流
             .onErrorResume((err) -> Mono.empty())
             .onErrorResume((err) -> Mono.empty())
-            .then()
-            .subscribeOn(Schedulers.parallel());
+            .then();
     }
     }
 
 
     private Mono<DeviceMessage> handleMessage(DeviceOperator mainDevice,
     private Mono<DeviceMessage> handleMessage(DeviceOperator mainDevice,

+ 11 - 7
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttServerProvider.java

@@ -53,13 +53,17 @@ public class VertxMqttServerProvider implements NetworkProvider<VertxMqttServerP
         }
         }
         server.setMqttServer(instances);
         server.setMqttServer(instances);
         for (MqttServer instance : instances) {
         for (MqttServer instance : instances) {
-            instance.listen(result -> {
-                if (result.succeeded()) {
-                    log.debug("startup mqtt server [{}] on port :{} ", properties.getId(), result.result().actualPort());
-                } else {
-                    log.warn("startup mqtt server [{}] error ", properties.getId(), result.cause());
-                }
-            });
+            vertx
+                .nettyEventLoopGroup()
+                .execute(()->{
+                    instance.listen(result -> {
+                        if (result.succeeded()) {
+                            log.debug("startup mqtt server [{}] on port :{} ", properties.getId(), result.result().actualPort());
+                        } else {
+                            log.warn("startup mqtt server [{}] error ", properties.getId(), result.cause());
+                        }
+                    });
+                });
         }
         }
     }
     }
 
 

+ 10 - 7
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/TcpServerProvider.java

@@ -80,13 +80,16 @@ public class TcpServerProvider implements NetworkProvider<TcpServerProperties> {
         // 多个server listen同一个端口,每个client连接的时候vertx会分配
         // 多个server listen同一个端口,每个client连接的时候vertx会分配
         // 一个connection只能在一个server中处理
         // 一个connection只能在一个server中处理
         for (NetServer netServer : instances) {
         for (NetServer netServer : instances) {
-            netServer.listen(properties.createSocketAddress(), result -> {
-                if (result.succeeded()) {
-                    log.info("tcp server startup on {}", result.result().actualPort());
-                } else {
-                    log.error("startup tcp server error", result.cause());
-                }
-            });
+         vertx.nettyEventLoopGroup()
+             .execute(()->{
+                 netServer.listen(properties.createSocketAddress(), result -> {
+                     if (result.succeeded()) {
+                         log.info("tcp server startup on {}", result.result().actualPort());
+                     } else {
+                         log.error("startup tcp server error", result.cause());
+                     }
+                 });
+             });
         }
         }
     }
     }