Explorar o código

Merge branch 'master' into 1.20

# Conflicts:
#	jetlinks-components/io-component/pom.xml
#	jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/file/FileManagerConfiguration.java
#	jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/file/FileProperties.java
#	jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/file/web/FileManagerController.java
#	jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/strateies/DelimitedPayloadParserBuilder.java
#	jetlinks-components/notify-component/notify-email/src/main/java/org/jetlinks/community/notify/email/embedded/DefaultEmailNotifier.java
#	pom.xml
zhouhao %!s(int64=3) %!d(string=hai) anos
pai
achega
ede32aa7fc

+ 31 - 1
jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/DefaultImportExportService.java

@@ -1,7 +1,13 @@
 package org.jetlinks.community.io.excel;
 
+import org.hswebframework.reactor.excel.converter.RowWrapper;
+import org.hswebframework.utils.StringUtils;
 import org.jetlinks.community.io.excel.easyexcel.ExcelReadDataListener;
+import org.jetlinks.community.io.file.FileManager;
+import org.jetlinks.community.io.utils.FileUtils;
 import org.springframework.core.io.Resource;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.DataBufferUtils;
 import org.springframework.http.MediaType;
 import org.springframework.stereotype.Component;
 import org.springframework.web.reactive.function.client.WebClient;
@@ -11,6 +17,8 @@ import reactor.core.publisher.Mono;
 import java.io.FileInputStream;
 import java.io.InputStream;
 
+import static org.hswebframework.reactor.excel.ReactorExcel.read;
+
 /**
  * @author bsetfeng
  * @since 1.0
@@ -20,8 +28,12 @@ public class DefaultImportExportService implements ImportExportService {
 
     private WebClient client;
 
-    public DefaultImportExportService(WebClient.Builder builder) {
+    private final FileManager fileManager;
+
+    public DefaultImportExportService(WebClient.Builder builder,
+                                      FileManager fileManager) {
         client = builder.build();
+        this.fileManager = fileManager;
     }
 
     public <T> Flux<RowResult<T>> doImport(Class<T> clazz, String fileUrl) {
@@ -34,6 +46,24 @@ public class DefaultImportExportService implements ImportExportService {
         return ExcelReadDataListener.of(stream, clazz);
     }
 
+
+    @Override
+    public <T> Flux<T> readData(String fileUrl, String fileId, RowWrapper<T> wrapper) {
+        if (!StringUtils.isNullOrEmpty(fileUrl)) {
+            return getInputStream(fileUrl)
+                .flatMapMany(inputStream -> read(inputStream, FileUtils.getExtension(fileUrl), wrapper));
+        } else {
+
+            return Mono
+                .zip(fileManager
+                         .read(fileId)
+                         .as(DataBufferUtils::join)
+                         .map(DataBuffer::asInputStream),
+                     fileManager.getFile(fileId))
+                .flatMapMany(t2 -> read(t2.getT1(), t2.getT2().getExtension(), wrapper));
+        }
+    }
+
     public Mono<InputStream> getInputStream(String fileUrl) {
 
         return Mono.defer(() -> {

+ 5 - 0
jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/ImportExportService.java

@@ -1,6 +1,7 @@
 package org.jetlinks.community.io.excel;
 
 
+import org.hswebframework.reactor.excel.converter.RowWrapper;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -19,4 +20,8 @@ public interface ImportExportService {
 
     Mono<InputStream> getInputStream(String fileUrl);
 
+    <T> Flux<T> readData(String fileUrl, String fileId, RowWrapper<T> wrapper);
+
+
+
 }

+ 213 - 0
jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/file/DefaultFileManager.java

@@ -0,0 +1,213 @@
+package org.jetlinks.community.io.file;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.ByteBufUtil;
+import lombok.AllArgsConstructor;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
+import org.hswebframework.web.exception.BusinessException;
+import org.hswebframework.web.id.IDGenerator;
+import org.springframework.core.io.FileSystemResource;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.DataBufferFactory;
+import org.springframework.core.io.buffer.DataBufferUtils;
+import org.springframework.core.io.buffer.NettyDataBufferFactory;
+import org.springframework.http.ContentDisposition;
+import org.springframework.http.HttpRange;
+import org.springframework.http.MediaType;
+import org.springframework.http.client.MultipartBodyBuilder;
+import org.springframework.http.codec.multipart.FilePart;
+import org.springframework.web.reactive.function.BodyInserters;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.security.MessageDigest;
+import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.function.Function;
+
+
+public class DefaultFileManager implements FileManager {
+
+    private final FileProperties properties;
+
+    private final DataBufferFactory bufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
+
+    private final ReactiveRepository<FileEntity, String> repository;
+
+
+    private final WebClient client;
+
+    public DefaultFileManager(WebClient.Builder builder,
+                              FileProperties properties,
+                              ReactiveRepository<FileEntity, String> repository) {
+        new File(properties.getStorageBasePath()).mkdirs();
+        this.properties = properties;
+        this.client = builder
+            .clone()
+            .filter(this.properties.createWebClientRute())
+            .build();
+        this.repository = repository;
+    }
+
+    @Override
+    public Mono<FileInfo> saveFile(FilePart filePart) {
+        return saveFile(filePart.filename(), filePart.content());
+    }
+
+    private DataBuffer updateDigest(MessageDigest digest, DataBuffer dataBuffer) {
+        dataBuffer = DataBufferUtils.retain(dataBuffer);
+        digest.update(dataBuffer.asByteBuffer());
+        DataBufferUtils.release(dataBuffer);
+        return dataBuffer;
+    }
+
+    public Mono<FileInfo> saveFileToCluster(String name, Flux<DataBuffer> stream) {
+        String serverId = properties.selectServerNode();
+        MultipartBodyBuilder builder = new MultipartBodyBuilder();
+        builder.asyncPart("file", stream, DataBuffer.class)
+               .headers(header -> header
+                   .setContentDisposition(ContentDisposition
+                                              .builder("form-data")
+                                              .name("file")
+                                              .filename(name)
+                                              .build()))
+               .contentType(MediaType.APPLICATION_OCTET_STREAM);
+        return client
+            .post()
+            .uri("http://" + serverId + "/file/" +serverId)
+            .attribute(FileProperties.serverNodeIdAttr, serverId)
+            .contentType(MediaType.MULTIPART_FORM_DATA)
+            .body(BodyInserters.fromMultipartData(builder.build()))
+            .retrieve()
+            .bodyToMono(FileInfo.class);
+    }
+
+    public Mono<FileInfo> doSaveFile(String name, Flux<DataBuffer> stream) {
+        LocalDate now = LocalDate.now();
+        FileInfo fileInfo = new FileInfo();
+        fileInfo.setId(IDGenerator.MD5.generate());
+        fileInfo.withFileName(name);
+
+        String storagePath = now.format(DateTimeFormatter.BASIC_ISO_DATE)
+            + "/" + fileInfo.getId() + "." + fileInfo.getExtension();
+
+        MessageDigest md5 = DigestUtils.getMd5Digest();
+        MessageDigest sha256 = DigestUtils.getSha256Digest();
+        String storageBasePath = properties.getStorageBasePath();
+        String serverNodeId = properties.getServerNodeId();
+        Path path = Paths.get(storageBasePath, storagePath);
+        path.toFile().getParentFile().mkdirs();
+        return stream
+            .map(buffer -> updateDigest(md5, updateDigest(sha256, buffer)))
+            .as(buf -> DataBufferUtils
+                .write(buf, path,
+                       StandardOpenOption.WRITE,
+                       StandardOpenOption.CREATE_NEW,
+                       StandardOpenOption.TRUNCATE_EXISTING))
+            .then(Mono.defer(() -> {
+                File savedFile = Paths.get(storageBasePath, storagePath).toFile();
+                if (!savedFile.exists()) {
+                    return Mono.error(new BusinessException("error.file_storage_failed"));
+                }
+                fileInfo.setMd5(ByteBufUtil.hexDump(md5.digest()));
+                fileInfo.setSha256(ByteBufUtil.hexDump(sha256.digest()));
+                fileInfo.setLength(savedFile.length());
+                fileInfo.setCreateTime(System.currentTimeMillis());
+                FileEntity entity = FileEntity.of(fileInfo, storagePath, serverNodeId);
+                return repository
+                    .insert(entity)
+                    .then(Mono.fromSupplier(entity::toInfo));
+            }));
+    }
+
+    @Override
+    public Mono<FileInfo> saveFile(String name, Flux<DataBuffer> stream) {
+        if (properties.getClusterRute().isEmpty()
+            || properties.getClusterRute().containsKey(properties.getServerNodeId())) {
+            return doSaveFile(name, stream);
+        }
+        //配置里集群,但是并不支持本节点,则保存到其他节点
+        return saveFileToCluster(name, stream);
+    }
+
+    @Override
+    public Mono<FileInfo> getFile(String id) {
+        return repository
+            .findById(id)
+            .map(FileEntity::toInfo);
+    }
+
+    private Flux<DataBuffer> readFile(String filePath, long position) {
+        return DataBufferUtils
+            .read(new FileSystemResource(Paths.get(properties.getStorageBasePath(), filePath)),
+                  position,
+                  bufferFactory,
+                  properties.getReadBufferSize());
+    }
+
+    private Flux<DataBuffer> readFile(FileEntity file, long position) {
+        if (Objects.equals(file.getServerNodeId(), properties.getServerNodeId())) {
+            return readFile(file.getStoragePath(), position);
+        }
+        return readFromAnotherServer(file, position);
+    }
+
+    protected Flux<DataBuffer> readFromAnotherServer(FileEntity file, long position) {
+        return client
+            .get()
+            .uri("http://" + file.getServerNodeId() + "/file/{serverNodeId}/{fileId}", file.getServerNodeId(), file.getId())
+            .attribute(FileProperties.serverNodeIdAttr, file.getServerNodeId())
+            .headers(header -> header.setRange(Collections.singletonList(HttpRange.createByteRange(position))))
+            .retrieve()
+            .bodyToFlux(DataBuffer.class);
+    }
+
+    @Override
+    public Flux<DataBuffer> read(String id) {
+        return read(id, 0);
+    }
+
+    @Override
+    public Flux<DataBuffer> read(String id, long position) {
+        return repository
+            .findById(id)
+            .flatMapMany(file -> readFile(file, position));
+    }
+
+    @Override
+    public Flux<DataBuffer> read(String id, Function<ReaderContext, Mono<Void>> beforeRead) {
+        return repository
+            .findById(id)
+            .flatMapMany(file -> {
+                DefaultReaderContext context = new DefaultReaderContext(file.toInfo(), 0);
+                return beforeRead
+                    .apply(context)
+                    .thenMany(Flux.defer(() -> readFile(file, context.position)));
+            });
+    }
+
+    @AllArgsConstructor
+    private static class DefaultReaderContext implements ReaderContext {
+        private final FileInfo info;
+        private long position;
+
+        @Override
+        public FileInfo info() {
+            return info;
+        }
+
+        @Override
+        public void position(long position) {
+            this.position = position;
+        }
+    }
+
+}

+ 6 - 0
jetlinks-components/network-component/tcp-component/pom.xml

@@ -30,6 +30,12 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-text</artifactId>
+            <version>1.9</version>
+        </dependency>
+
     </dependencies>
 
 </project>

+ 7 - 0
jetlinks-components/notify-component/notify-email/pom.xml

@@ -44,6 +44,13 @@
             <version>1.14.3</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.jetlinks.community</groupId>
+            <artifactId>io-component</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+
     </dependencies>
 
 </project>

+ 21 - 6
jetlinks-components/notify-component/notify-email/src/main/java/org/jetlinks/community/notify/email/embedded/DefaultEmailNotifier.java

@@ -11,12 +11,15 @@ import org.hswebframework.web.utils.ExpressionUtils;
 import org.hswebframework.web.utils.TemplateParser;
 import org.hswebframework.web.validator.ValidatorUtils;
 import org.jetlinks.core.Values;
+import org.jetlinks.community.io.file.FileManager;
 import org.jetlinks.community.notify.*;
 import org.jetlinks.community.notify.email.EmailProvider;
 import org.jetlinks.community.notify.template.TemplateManager;
 import org.jsoup.Jsoup;
 import org.jsoup.nodes.Document;
 import org.jsoup.nodes.Element;
+import org.springframework.core.io.*;
+import org.springframework.core.io.buffer.DataBufferUtils;
 import org.springframework.core.io.ByteArrayResource;
 import org.springframework.core.io.FileSystemResource;
 import org.springframework.core.io.InputStreamSource;
@@ -67,17 +70,23 @@ public class DefaultEmailNotifier extends AbstractNotifier<EmailTemplate> {
     @Setter
     private boolean enableFileSystemAttachment = Boolean.getBoolean("email.attach.local-file.enabled");
 
+    private final FileManager fileManager;
+
     public static Scheduler scheduler = Schedulers.elastic();
 
-    public DefaultEmailNotifier(NotifierProperties properties, TemplateManager templateManager) {
+    public DefaultEmailNotifier(NotifierProperties properties,
+                                TemplateManager templateManager,
+                                FileManager fileManager) {
         this(properties.getId(),
              new JSONObject(properties.getConfiguration()).toJavaObject(DefaultEmailProperties.class),
-             templateManager);
+             templateManager,
+             fileManager);
     }
 
     public DefaultEmailNotifier(String id,
                                 DefaultEmailProperties properties,
-                                TemplateManager templateManager) {
+                                TemplateManager templateManager,
+                                FileManager fileManager) {
         super(templateManager);
         ValidatorUtils.tryValidate(properties);
         JavaMailSenderImpl mailSender = new JavaMailSenderImpl();
@@ -89,6 +98,7 @@ public class DefaultEmailNotifier extends AbstractNotifier<EmailTemplate> {
         this.notifierId = id;
         this.sender = properties.getSender();
         this.javaMailSender = mailSender;
+        this.fileManager = fileManager;
     }
 
     @Nonnull
@@ -159,7 +169,7 @@ public class DefaultEmailNotifier extends AbstractNotifier<EmailTemplate> {
     }
 
 
-    protected Mono<InputStreamSource> convertResource(String resource) {
+    protected Mono<? extends InputStreamSource> convertResource(String resource) {
         if (resource.startsWith("http")) {
             return WebClient
                 .create()
@@ -172,12 +182,17 @@ public class DefaultEmailNotifier extends AbstractNotifier<EmailTemplate> {
             return Mono.just(
                 new ByteArrayResource(Base64.decodeBase64(base64))
             );
-        } else if (enableFileSystemAttachment) {
+        } else if (enableFileSystemAttachment && resource.contains("/")) {
             return Mono.just(
                 new FileSystemResource(resource)
             );
         } else {
-            throw new UnsupportedOperationException("不支持的文件地址:" + resource);
+            return fileManager
+                .read(resource)
+                .as(DataBufferUtils::join)
+                .map(dataBuffer -> new ByteArrayResource(dataBuffer.asByteBuffer().array()))
+                .onErrorResume(e-> Mono.error(()-> new UnsupportedOperationException("不支持的文件地址:" + resource)))
+                .switchIfEmpty(Mono.error(()-> new UnsupportedOperationException("不支持的文件地址:" + resource)));
         }
     }
 

+ 7 - 2
jetlinks-components/notify-component/notify-email/src/main/java/org/jetlinks/community/notify/email/embedded/DefaultEmailNotifierProvider.java

@@ -1,6 +1,7 @@
 package org.jetlinks.community.notify.email.embedded;
 
 import com.alibaba.fastjson.JSON;
+import org.jetlinks.community.io.file.FileManager;
 import org.jetlinks.community.notify.*;
 import org.jetlinks.community.notify.email.EmailProvider;
 import org.jetlinks.community.notify.template.TemplateManager;
@@ -22,8 +23,12 @@ public class DefaultEmailNotifierProvider implements NotifierProvider, TemplateP
 
     private final TemplateManager templateManager;
 
-    public DefaultEmailNotifierProvider(TemplateManager templateManager) {
+    private final FileManager fileManager;
+
+    public DefaultEmailNotifierProvider(TemplateManager templateManager,
+                                        FileManager fileManager) {
         this.templateManager = templateManager;
+        this.fileManager = fileManager;
     }
 
     @Nonnull
@@ -111,7 +116,7 @@ public class DefaultEmailNotifierProvider implements NotifierProvider, TemplateP
     @Nonnull
     @Override
     public Mono<DefaultEmailNotifier> createNotifier(@Nonnull NotifierProperties properties) {
-        return Mono.fromSupplier(() -> new DefaultEmailNotifier(properties, templateManager));
+        return Mono.fromSupplier(() -> new DefaultEmailNotifier(properties, templateManager, fileManager));
     }
 
     @Override

+ 3 - 3
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceInstanceController.java

@@ -446,7 +446,8 @@ public class DeviceInstanceController implements
     @SaveAction
     @Operation(summary = "导入设备数据")
     public Flux<ImportDeviceInstanceResult> doBatchImportByProduct(@PathVariable @Parameter(description = "产品ID") String productId,
-                                                                   @RequestParam @Parameter(description = "文件地址,支持csv,xlsx文件格式") String fileUrl) {
+                                                                   @RequestParam(required = false) @Parameter(description = "文件地址,支持csv,xlsx文件格式") String fileUrl,
+                                                                   @RequestParam(required = false) @Parameter(description = "文件Id") String fileId) {
         return Authentication
             .currentReactive()
             .flatMapMany(auth -> {
@@ -461,8 +462,7 @@ public class DeviceInstanceController implements
                     .getDeviceProductDetail(productId)
                     .map(tp4 -> Tuples.of(new DeviceWrapper(tp4.getT3().getTags(), tp4.getT4()), tp4.getT1()))
                     .flatMapMany(wrapper -> importExportService
-                        .getInputStream(fileUrl)
-                        .flatMapMany(inputStream -> read(inputStream, FileUtils.getExtension(fileUrl), wrapper.getT1()))
+                        .readData(fileUrl, fileId, wrapper.getT1())
                         .doOnNext(info -> info.setProductName(wrapper.getT2().getName()))
                     )
                     .map(info -> {

+ 23 - 4
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/GatewayDeviceController.java

@@ -137,7 +137,14 @@ public class GatewayDeviceController {
                     .execute()
                     .then(registry
                               .getDevice(deviceId)
-                              .flatMap(operator -> operator.setConfig(DeviceConfigKey.parentGatewayId, gatewayId)))
+                              .flatMap(operator -> operator.setConfig(DeviceConfigKey.parentGatewayId, gatewayId))
+                    ).then(registry.getDevice(gatewayId)
+                        .flatMap(gwOperator -> gwOperator.getProtocol()
+                            .map(protocolSupport -> protocolSupport.onChildBind(gwOperator,
+                                Flux.from(registry.getDevice(deviceId)))
+                            )
+                        )
+                    )
             )
             .then(getGatewayInfo(gatewayId));
     }
@@ -168,9 +175,14 @@ public class GatewayDeviceController {
                               .getDevice(id)
                               .flatMap(operator -> operator.setConfig(DeviceConfigKey.parentGatewayId, gatewayId)))
                           .then()
-                ))
-            .then(getGatewayInfo(gatewayId));
-
+                ).then(registry.getDevice(gatewayId)
+                    .flatMap(gwOperator -> gwOperator.getProtocol()
+                        .map(protocolSupport -> protocolSupport.onChildBind(gwOperator,
+                            Flux.fromIterable(deviceIdList).flatMap(id -> registry.getDevice(id)))
+                        )
+                    )
+                )
+            ).then(getGatewayInfo(gatewayId));
     }
 
     @PostMapping("/{gatewayId}/unbind/{deviceId}")
@@ -188,6 +200,13 @@ public class GatewayDeviceController {
             .flatMap(i -> registry
                 .getDevice(deviceId)
                 .flatMap(operator -> operator.removeConfig(DeviceConfigKey.parentGatewayId.getKey())))
+            .then(registry.getDevice(gatewayId)
+                .flatMap(gwOperator -> gwOperator.getProtocol()
+                    .map(protocolSupport -> protocolSupport.onChildUnbind(gwOperator,
+                        Flux.from(registry.getDevice(deviceId)))
+                    )
+                )
+            )
             .then(getGatewayInfo(gatewayId));
     }