Explorar o código

优化线程池逻辑

zhouhao %!s(int64=2) %!d(string=hai) anos
pai
achega
779eddd4c8

+ 11 - 8
jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/server/vertx/DefaultHttpServerProvider.java

@@ -107,14 +107,17 @@ public class DefaultHttpServerProvider implements NetworkProvider<HttpServerConf
                 server.setBindAddress(new InetSocketAddress(config.getHost(), config.getPort()));
                 server.setHttpServers(instances);
                 for (HttpServer httpServer : instances) {
-                    httpServer.listen(result -> {
-                        if (result.succeeded()) {
-                            log.debug("startup http server on [{}]", server.getBindAddress());
-                        } else {
-                            server.setLastError(result.cause().getMessage());
-                            log.warn("startup http server on [{}] failed", server.getBindAddress(), result.cause());
-                        }
-                    });
+                    vertx.nettyEventLoopGroup()
+                        .execute(()->{
+                            httpServer.listen(result -> {
+                                if (result.succeeded()) {
+                                    log.debug("startup http server on [{}]", server.getBindAddress());
+                                } else {
+                                    server.setLastError(result.cause().getMessage());
+                                    log.warn("startup http server on [{}] failed", server.getBindAddress(), result.cause());
+                                }
+                            });
+                        });
                 }
                 return server;
             });

+ 5 - 13
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java

@@ -5,10 +5,14 @@ import io.opentelemetry.api.common.AttributeKey;
 import io.opentelemetry.api.trace.StatusCode;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
-import org.hswebframework.web.logger.ReactiveLogger;
+import org.jetlinks.community.gateway.AbstractDeviceGateway;
+import org.jetlinks.community.gateway.DeviceGatewayHelper;
+import org.jetlinks.community.network.mqtt.gateway.device.session.MqttConnectionSession;
 import org.jetlinks.community.network.mqtt.server.MqttConnection;
 import org.jetlinks.community.network.mqtt.server.MqttPublishing;
 import org.jetlinks.community.network.mqtt.server.MqttServer;
+import org.jetlinks.community.utils.ObjectMappers;
+import org.jetlinks.community.utils.SystemUtils;
 import org.jetlinks.core.ProtocolSupport;
 import org.jetlinks.core.device.*;
 import org.jetlinks.core.device.session.DeviceSessionManager;
@@ -18,20 +22,11 @@ import org.jetlinks.core.server.session.DeviceSession;
 import org.jetlinks.core.server.session.KeepOnlineSession;
 import org.jetlinks.core.trace.FluxTracer;
 import org.jetlinks.core.trace.MonoTracer;
-import org.jetlinks.community.gateway.AbstractDeviceGateway;
-import org.jetlinks.community.gateway.GatewayState;
-import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
-import org.jetlinks.community.gateway.monitor.GatewayMonitors;
-import org.jetlinks.community.network.mqtt.gateway.device.session.MqttConnectionSession;
-import org.jetlinks.community.gateway.DeviceGatewayHelper;
-import org.jetlinks.community.utils.ObjectMappers;
-import org.jetlinks.community.utils.SystemUtils;
 import org.jetlinks.supports.server.DecodedClientMessageHandler;
 import org.springframework.util.StringUtils;
 import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
 import reactor.util.function.Tuple3;
 import reactor.util.function.Tuples;
 
@@ -125,7 +120,6 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway {
                 }
                 return true;
             })
-//            .publishOn(Schedulers.parallel())
             //处理mqtt连接请求
             .flatMap(connection -> this
                          .handleConnection(connection)
@@ -207,7 +201,6 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway {
                 //应答SERVER_UNAVAILABLE
                 connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
             }))
-//            .subscribeOn(Schedulers.parallel())
             ;
     }
 
@@ -299,7 +292,6 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway {
                        MqttConnection::close)
             //网关暂停或者已停止时,则不处理消息
             .filter(pb -> isStarted())
-            .publishOn(Schedulers.parallel())
             //解码收到的mqtt报文
             .flatMap(publishing -> this
                 .decodeAndHandleMessage(operator, session, publishing, connection)

+ 13 - 10
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/DefaultVertxMqttServerProvider.java

@@ -60,16 +60,19 @@ public class DefaultVertxMqttServerProvider implements NetworkProvider<VertxMqtt
                 server.setBind(new InetSocketAddress(options.getHost(), options.getPort()));
                 server.setMqttServer(instances);
                 for (MqttServer instance : instances) {
-                    instance.listen(result -> {
-                        if (result.succeeded()) {
-                            log.debug("startup mqtt server [{}] on port :{} ", properties.getId(), result
-                                .result()
-                                .actualPort());
-                        } else {
-                            server.setLastError(result.cause().getMessage());
-                            log.warn("startup mqtt server [{}] error ", properties.getId(), result.cause());
-                        }
-                    });
+                   vertx.nettyEventLoopGroup()
+                       .execute(()->{
+                           instance.listen(result -> {
+                               if (result.succeeded()) {
+                                   log.debug("startup mqtt server [{}] on port :{} ", properties.getId(), result
+                                       .result()
+                                       .actualPort());
+                               } else {
+                                   server.setLastError(result.cause().getMessage());
+                                   log.warn("startup mqtt server [{}] error ", properties.getId(), result.cause());
+                               }
+                           });
+                       });
                 }
                 return server;
             });

+ 13 - 7
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttServer.java

@@ -60,20 +60,26 @@ public class VertxMqttServer implements MqttServer {
         }
     }
 
+    private boolean emitNext(Sinks.Many<MqttConnection> sink, VertxMqttConnection connection){
+        if (sink.currentSubscriberCount() <= 0) {
+            return false;
+        }
+        try{
+            sink.emitNext(connection,Reactors.emitFailureHandler());
+            return true;
+        }catch (Throwable ignore){}
+        return false;
+    }
 
     private void handleConnection(VertxMqttConnection connection) {
-        boolean anyHandled = false;
-        if (sink.currentSubscriberCount() > 0) {
-            if (sink.tryEmitNext(connection).isSuccess()) {
-                anyHandled = true;
-            }
-        }
+        boolean anyHandled = emitNext(sink, connection);
+
         for (List<Sinks.Many<MqttConnection>> value : sinks.values()) {
             if (value.size() == 0) {
                 continue;
             }
             Sinks.Many<MqttConnection> sink = value.get(ThreadLocalRandom.current().nextInt(value.size()));
-            if (sink.currentSubscriberCount() > 0 && sink.tryEmitNext(connection).isSuccess()) {
+            if (emitNext(sink, connection)) {
                 anyHandled = true;
             }
         }

+ 11 - 8
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/DefaultTcpServerProvider.java

@@ -79,14 +79,17 @@ public class DefaultTcpServerProvider implements NetworkProvider<TcpServerProper
                     .toMillis()));
                 tcpServer.setBind(new InetSocketAddress(properties.getHost(), properties.getPort()));
                 for (NetServer netServer : instances) {
-                    netServer.listen(properties.createSocketAddress(), result -> {
-                        if (result.succeeded()) {
-                            log.info("tcp server startup on {}", result.result().actualPort());
-                        } else {
-                            tcpServer.setLastError(result.cause().getMessage());
-                            log.error("startup tcp server error", result.cause());
-                        }
-                    });
+                    vertx.nettyEventLoopGroup()
+                        .execute(()->{
+                            netServer.listen(properties.createSocketAddress(), result -> {
+                                if (result.succeeded()) {
+                                    log.info("tcp server startup on {}", result.result().actualPort());
+                                } else {
+                                    tcpServer.setLastError(result.cause().getMessage());
+                                    log.error("startup tcp server error", result.cause());
+                                }
+                            });
+                        });
                 }
                 return tcpServer;
             });