浏览代码

优化协议处理

ayan 2 年之前
父节点
当前提交
7d7a6f2d93

+ 2 - 0
jetlinks-components/common-component/src/main/java/org/jetlinks/community/reference/DataReferenceManager.java

@@ -24,6 +24,8 @@ public interface DataReferenceManager {
     String TYPE_NETWORK = "network";
     //数据类型:关系配置
     String TYPE_RELATION = "relation";
+    //数据类型:消息协议
+    String TYPE_PROTOCOL = "protocol";
 
     /**
      * 判断指定数据类型的数据是否已经被其他地方所引用

+ 29 - 22
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalProtocolSupportService.java

@@ -1,48 +1,55 @@
 package org.jetlinks.community.device.service;
 
+import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.crud.service.GenericReactiveCrudService;
-import org.hswebframework.web.exception.BusinessException;
 import org.hswebframework.web.exception.NotFoundException;
 import org.jetlinks.community.device.entity.ProtocolSupportEntity;
-import org.jetlinks.supports.protocol.management.ProtocolSupportLoader;
+import org.jetlinks.community.reference.DataReferenceManager;
 import org.jetlinks.supports.protocol.management.ProtocolSupportManager;
+import org.reactivestreams.Publisher;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 @Service
+@Slf4j
 public class LocalProtocolSupportService extends GenericReactiveCrudService<ProtocolSupportEntity, String> {
 
     @Autowired
     private ProtocolSupportManager supportManager;
 
     @Autowired
-    private ProtocolSupportLoader loader;
+    private DataReferenceManager referenceManager;
+
+    @Override
+    public Mono<Integer> deleteById(Publisher<String> idPublisher) {
+        return Flux.from(idPublisher)
+                   .flatMap(id -> supportManager.remove(id).thenReturn(id))
+                   .as(super::deleteById);
+    }
 
     public Mono<Boolean> deploy(String id) {
         return findById(Mono.just(id))
-                .switchIfEmpty(Mono.error(NotFoundException::new))
-                .map(ProtocolSupportEntity::toDeployDefinition)
-                .flatMap(def->loader.load(def).thenReturn(def))
-                .onErrorMap(err->new BusinessException("无法加载协议:"+err.getMessage(),err))
-                .flatMap(supportManager::save)
-                .flatMap(r -> createUpdate()
-                        .set(ProtocolSupportEntity::getState, 1)
-                        .where(ProtocolSupportEntity::getId, id)
-                        .execute())
-                .map(i -> i > 0);
+            .switchIfEmpty(Mono.error(NotFoundException::new))
+            .flatMap(r -> createUpdate()
+                .set(ProtocolSupportEntity::getState, 1)
+                .where(ProtocolSupportEntity::getId, id)
+                .execute())
+            .map(i -> i > 0);
     }
 
     public Mono<Boolean> unDeploy(String id) {
-        return findById(Mono.just(id))
-                .switchIfEmpty(Mono.error(NotFoundException::new))
-                .map(ProtocolSupportEntity::toUnDeployDefinition)
-                .flatMap(supportManager::save)
-                .flatMap(r -> createUpdate()
-                        .set(ProtocolSupportEntity::getState, 0)
-                        .where(ProtocolSupportEntity::getId, id)
-                        .execute())
-                .map(i -> i > 0);
+        // 消息协议被使用时,不能禁用
+        return referenceManager
+            .assertNotReferenced(DataReferenceManager.TYPE_PROTOCOL, id)
+            .then(findById(Mono.just(id)))
+            .switchIfEmpty(Mono.error(NotFoundException::new))
+            .flatMap(r -> createUpdate()
+                .set(ProtocolSupportEntity::getState, 0)
+                .where(ProtocolSupportEntity::getId, id)
+                .execute())
+            .map(i -> i > 0);
     }
 
 }

+ 73 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/ProtocolSupportHandler.java

@@ -0,0 +1,73 @@
+package org.jetlinks.community.device.service;
+
+import lombok.AllArgsConstructor;
+import org.hswebframework.web.crud.events.EntityBeforeDeleteEvent;
+import org.hswebframework.web.crud.events.EntityCreatedEvent;
+import org.hswebframework.web.crud.events.EntityModifyEvent;
+import org.hswebframework.web.crud.events.EntitySavedEvent;
+import org.hswebframework.web.exception.BusinessException;
+import org.jetlinks.community.device.entity.ProtocolSupportEntity;
+import org.jetlinks.community.reference.DataReferenceManager;
+import org.jetlinks.core.ProtocolSupport;
+import org.jetlinks.supports.protocol.management.ProtocolSupportLoader;
+import org.jetlinks.supports.protocol.management.ProtocolSupportManager;
+import org.springframework.context.event.EventListener;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.Collection;
+
+/**
+ * 协议事件处理类.
+ *
+ * @author zhangji 2022/4/1
+ */
+@Component
+@AllArgsConstructor
+public class ProtocolSupportHandler {
+    private final DataReferenceManager referenceManager;
+    private       ProtocolSupportLoader  loader;
+    private       ProtocolSupportManager supportManager;
+
+    //禁止删除已有网关使用的协议
+    @EventListener
+    public void handleProtocolDelete(EntityBeforeDeleteEvent<ProtocolSupportEntity> event) {
+        event.async(
+            Flux.fromIterable(event.getEntity())
+                .flatMap(protocol -> referenceManager
+                    .assertNotReferenced(DataReferenceManager.TYPE_PROTOCOL, protocol.getId()))
+        );
+    }
+
+    @EventListener
+    public void handleCreated(EntityCreatedEvent<ProtocolSupportEntity> event) {
+        event.async(reloadProtocol(event.getEntity()));
+    }
+
+    @EventListener
+    public void handleSaved(EntitySavedEvent<ProtocolSupportEntity> event) {
+        event.async(reloadProtocol(event.getEntity()));
+    }
+
+    @EventListener
+    public void handleModify(EntityModifyEvent<ProtocolSupportEntity> event) {
+        event.async(reloadProtocol(event.getAfter()));
+    }
+
+    // 重新加载协议
+    private Mono<Void> reloadProtocol(Collection<ProtocolSupportEntity> protocol) {
+        return Flux
+            .fromIterable(protocol)
+            .filter(entity -> entity.getState() != null)
+            .map(entity -> entity.getState() == 1 ? entity.toDeployDefinition() : entity.toUnDeployDefinition())
+            .flatMap(def -> loader
+                //加载一下检验是否正确,然后就卸载
+                .load(def)
+                .doOnNext(ProtocolSupport::dispose)
+                .thenReturn(def))
+            .onErrorMap(err -> new BusinessException("error.unable_to_load_protocol", 500, err.getMessage()))
+            .flatMap(supportManager::save)
+            .then();
+    }
+}

+ 30 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/ProtocolSupportController.java

@@ -13,6 +13,7 @@ import org.hswebframework.web.authorization.annotation.QueryAction;
 import org.hswebframework.web.authorization.annotation.Resource;
 import org.hswebframework.web.authorization.annotation.SaveAction;
 import org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController;
+import org.hswebframework.web.exception.BusinessException;
 import org.jetlinks.community.device.entity.ProtocolSupportEntity;
 import org.jetlinks.community.device.service.LocalProtocolSupportService;
 import org.jetlinks.community.device.web.protocol.ProtocolDetail;
@@ -20,6 +21,7 @@ import org.jetlinks.community.device.web.protocol.ProtocolInfo;
 import org.jetlinks.community.device.web.protocol.TransportInfo;
 import org.jetlinks.community.device.web.request.ProtocolDecodeRequest;
 import org.jetlinks.community.device.web.request.ProtocolEncodeRequest;
+import org.jetlinks.community.protocol.TransportDetail;
 import org.jetlinks.core.ProtocolSupport;
 import org.jetlinks.core.ProtocolSupports;
 import org.jetlinks.core.message.codec.Transport;
@@ -203,4 +205,32 @@ public class ProtocolSupportController
             .sort(Comparator.comparingLong(Tuple2::getT1))
             .map(Tuple2::getT2);
     }
+
+    @GetMapping("/{id}/transport/{transport}")
+    @Authorize(merge = false)
+    @Operation(summary = "获取消息协议对应的传输协议信息")
+    public Mono<TransportDetail> getTransportDetail(@PathVariable @Parameter(description = "协议ID") String id,
+                                                    @PathVariable @Parameter(description = "传输协议") String transport) {
+        return protocolSupports
+            .getProtocol(id)
+            .onErrorMap(e -> new BusinessException("error.unable_to_load_protocol_by_access_id", 404, id))
+            .flatMapMany(protocol -> protocol
+                .getSupportedTransport()
+                .filter(trans -> trans.isSame(transport))
+                .distinct()
+                .flatMap(_transport -> TransportDetail.of(protocol, _transport)))
+            .singleOrEmpty();
+    }
+
+
+    @PostMapping("/{id}/detail")
+    @QueryAction
+    @Operation(summary = "获取协议详情")
+    public Mono<ProtocolDetail> protocolDetail(@PathVariable String id) {
+        return protocolSupports
+            .getProtocol(id)
+            .onErrorMap(e -> new BusinessException("error.unable_to_load_protocol_by_access_id", 404, id))
+            .flatMap(ProtocolDetail::of);
+    }
+
 }