فهرست منبع

优化协议文件下载

zhou-hao 4 سال پیش
والد
کامیت
aa8fc10e0c

+ 30 - 14
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/protocol/AutoDownloadJarProtocolSupportLoader.java

@@ -11,6 +11,8 @@ import org.jetlinks.supports.protocol.management.jar.JarProtocolSupportLoader;
 import org.jetlinks.supports.protocol.management.jar.ProtocolClassLoader;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.core.io.Resource;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.DataBufferUtils;
 import org.springframework.stereotype.Component;
 import org.springframework.util.StreamUtils;
 import org.springframework.web.reactive.function.client.WebClient;
@@ -18,16 +20,18 @@ import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
 import javax.annotation.PreDestroy;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.InputStream;
-import java.io.OutputStream;
+import java.io.*;
 import java.net.URL;
+import java.nio.channels.AsynchronousFileChannel;
+import java.nio.file.Path;
 import java.time.Duration;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.TimeoutException;
 
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.WRITE;
+
 @Component
 @Slf4j
 public class AutoDownloadJarProtocolSupportLoader extends JarProtocolSupportLoader {
@@ -86,19 +90,31 @@ public class AutoDownloadJarProtocolSupportLoader extends JarProtocolSupportLoad
                     .load(newDef)
                     .subscribeOn(Schedulers.elastic());
             }
-            return webClient.get()
+            return webClient
+                .get()
                 .uri(location)
-                .exchange()
-                .flatMap(clientResponse -> clientResponse.bodyToMono(Resource.class))
-                .flatMap(resource -> Mono.fromCallable(resource::getInputStream))
-                .flatMap(stream -> Mono.fromCallable(() -> {
+                .retrieve()
+                .bodyToFlux(DataBuffer.class)
+                .as(dataStream -> {
+                    Path filePath = file.toPath();
                     log.debug("write protocol file {} to {}", location, file.getAbsolutePath());
-                    try (InputStream input = stream;
-                         OutputStream out = new FileOutputStream(file)) {
-                        StreamUtils.copy(input, out);
+                    try {
+                        @SuppressWarnings("all")
+                        AsynchronousFileChannel asynchronousFileChannel = AsynchronousFileChannel.open(filePath, CREATE,WRITE);
+                        return DataBufferUtils
+                            .write(dataStream, asynchronousFileChannel)
+                            .doOnNext(DataBufferUtils.releaseConsumer())
+                            .doAfterTerminate(() -> {
+                                try {
+                                    asynchronousFileChannel.close();
+                                } catch (IOException ignored) {
+                                }
+                            })
+                            .then(Mono.just(file.getAbsolutePath()));
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
                     }
-                    return file.getAbsolutePath();
-                }))
+                })
                 .subscribeOn(Schedulers.elastic())
                 .doOnNext(path -> config.put("location", path))
                 .then(super.load(newDef))