Bladeren bron

优化协议加载

zhou-hao 4 jaren geleden
bovenliggende
commit
577994bc6b

+ 77 - 3
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/LazyInitManagementProtocolSupports.java

@@ -1,11 +1,85 @@
 package org.jetlinks.community.standalone.configuration;
 
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.jetlinks.core.ProtocolSupport;
+import org.jetlinks.core.cluster.ClusterManager;
+import org.jetlinks.supports.protocol.StaticProtocolSupports;
 import org.jetlinks.supports.protocol.management.ManagementProtocolSupports;
+import org.jetlinks.supports.protocol.management.ProtocolSupportDefinition;
+import org.jetlinks.supports.protocol.management.ProtocolSupportLoader;
+import org.jetlinks.supports.protocol.management.ProtocolSupportManager;
 import org.springframework.boot.CommandLineRunner;
+import reactor.core.publisher.Mono;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+@Slf4j
+@Getter
+@Setter
+public class LazyInitManagementProtocolSupports extends StaticProtocolSupports implements CommandLineRunner {
+
+    private ProtocolSupportManager manager;
+
+    private ProtocolSupportLoader loader;
+
+    private ClusterManager clusterManager;
+
+    @Setter(AccessLevel.PRIVATE)
+    private Map<String, String> configProtocolIdMapping = new ConcurrentHashMap<>();
+
+    private Duration loadTimeOut = Duration.ofSeconds(30);
+
+    public void init() {
+
+        clusterManager.<ProtocolSupportDefinition>getTopic("_protocol_changed")
+            .subscribe()
+            .subscribe(protocol -> this.init(protocol).subscribe());
+
+        try {
+            manager.loadAll()
+                .filter(de -> de.getState() == 1)
+                .flatMap(this::init)
+                .blockLast(loadTimeOut);
+        } catch (Exception e) {
+            log.error("load protocol error", e);
+        }
+
+    }
+
+    public Mono<Void> init(ProtocolSupportDefinition definition) {
+        if (definition.getState() != 1) {
+            String protocol = configProtocolIdMapping.get(definition.getId());
+            if (protocol != null) {
+                log.debug("uninstall protocol:{}", definition);
+                unRegister(protocol);
+                return Mono.empty();
+            }
+        }
+        String operation = definition.getState() != 1 ? "uninstall" : "install";
+        Consumer<ProtocolSupport> consumer = definition.getState() != 1 ? this::unRegister : this::register;
+
+        log.debug("{} protocol:{}", operation, definition);
+
+        return loader
+            .load(definition)
+            .doOnNext(e -> {
+                log.debug("{} protocol[{}] success: {}", operation, definition.getId(), e);
+                configProtocolIdMapping.put(definition.getId(), e.getId());
+                consumer.accept(e);
+            })
+            .onErrorContinue((e, obj) -> log.error("{} protocol[{}] error: {}", operation, definition.getId(), e))
+            .then();
+
+    }
 
-public class LazyInitManagementProtocolSupports extends ManagementProtocolSupports implements CommandLineRunner {
     @Override
-    public void run(String... args) throws Exception {
+    public void run(String... args) {
         init();
     }
-}
+}

+ 19 - 9
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/protocol/AutoDownloadJarProtocolSupportLoader.java

@@ -1,6 +1,7 @@
 package org.jetlinks.community.standalone.configuration.protocol;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.digest.DigestUtils;
 import org.hswebframework.web.bean.FastBeanCopier;
 import org.jetlinks.core.ProtocolSupport;
 import org.jetlinks.core.spi.ServiceContext;
@@ -31,6 +32,7 @@ import java.util.concurrent.TimeoutException;
 @Slf4j
 public class AutoDownloadJarProtocolSupportLoader extends JarProtocolSupportLoader {
 
+
     final WebClient webClient;
 
     final File tempPath;
@@ -39,7 +41,7 @@ public class AutoDownloadJarProtocolSupportLoader extends JarProtocolSupportLoad
 
     public AutoDownloadJarProtocolSupportLoader(WebClient.Builder builder) {
         this.webClient = builder.build();
-        tempPath = new File(".temp");
+        tempPath = new File("./data/protocols");
         tempPath.mkdir();
     }
 
@@ -58,30 +60,38 @@ public class AutoDownloadJarProtocolSupportLoader extends JarProtocolSupportLoad
     @Override
     protected void closeLoader(ProtocolClassLoader loader) {
         super.closeLoader(loader);
-        for (URL url : loader.getUrls()) {
-            if (new File(url.getFile()).delete()) {
-                log.debug("delete old protocol:{}", url);
-            }
-        }
+//        for (URL url : loader.getUrls()) {
+//            if (new File(url.getFile()).delete()) {
+//                log.debug("delete old protocol:{}", url);
+//            }
+//        }
     }
 
     @Override
     public Mono<? extends ProtocolSupport> load(ProtocolSupportDefinition definition) {
 
-        ProtocolSupportDefinition newDef = FastBeanCopier.copy(definition,new ProtocolSupportDefinition());
+        ProtocolSupportDefinition newDef = FastBeanCopier.copy(definition, new ProtocolSupportDefinition());
 
-        Map<String, Object> config =newDef.getConfiguration();
+        Map<String, Object> config = newDef.getConfiguration();
         String location = Optional.ofNullable(config.get("location"))
             .map(String::valueOf).orElseThrow(() -> new IllegalArgumentException("location"));
 
         if (location.startsWith("http")) {
+            String urlMd5 = DigestUtils.md5Hex(location);
+            //地址没变则直接加载本地文件
+            File file = new File(tempPath, (newDef.getId() + "_" + urlMd5) + ".jar");
+            if (file.exists()) {
+                config.put("location", file.getAbsolutePath());
+                return super
+                    .load(newDef)
+                    .subscribeOn(Schedulers.elastic());
+            }
             return webClient.get()
                 .uri(location)
                 .exchange()
                 .flatMap(clientResponse -> clientResponse.bodyToMono(Resource.class))
                 .flatMap(resource -> Mono.fromCallable(resource::getInputStream))
                 .flatMap(stream -> Mono.fromCallable(() -> {
-                    File file = new File(tempPath, (newDef.getId() + "_" + System.currentTimeMillis()) + ".jar");
                     log.debug("write protocol file {} to {}", location, file.getAbsolutePath());
                     try (InputStream input = stream;
                          OutputStream out = new FileOutputStream(file)) {