Bläddra i källkod

增加循环依赖检查

zhou-hao 3 år sedan
förälder
incheckning
182de36567

+ 14 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java

@@ -30,6 +30,7 @@ import org.jetlinks.core.message.property.WritePropertyMessageReply;
 import org.jetlinks.core.metadata.ConfigMetadata;
 import org.jetlinks.core.metadata.PropertyMetadata;
 import org.jetlinks.core.metadata.types.StringType;
+import org.jetlinks.core.utils.CyclicDependencyChecker;
 import org.reactivestreams.Publisher;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Flux;
@@ -457,5 +458,18 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
             .flatMap(mapReply(FunctionInvokeMessageReply::getOutput));
     }
 
+    private final CyclicDependencyChecker<DeviceInstanceEntity, Void> checker = CyclicDependencyChecker
+        .of(DeviceInstanceEntity::getId, DeviceInstanceEntity::getParentId, this::findById);
+
+    public Mono<Void> checkCyclicDependency(DeviceInstanceEntity device) {
+        return checker.check(device);
+    }
+
+    public Mono<Void> checkCyclicDependency(String id, String parentId) {
+        DeviceInstanceEntity instance = new DeviceInstanceEntity();
+        instance.setId(id);
+        instance.setParentId(parentId);
+        return checker.check(instance);
+    }
 
 }

+ 59 - 50
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/GatewayDeviceController.java

@@ -67,37 +67,37 @@ public class GatewayDeviceController {
     public Mono<PagerResult<GatewayDeviceInfo>> queryGatewayDevice(@Parameter(hidden = true) QueryParamEntity param) {
         return getGatewayProductList()
             .flatMap(productIdList ->
-                param.toNestQuery(query -> query.in(DeviceInstanceEntity::getProductId, productIdList))
-                    .execute(instanceService::queryPager)
-                    .filter(r -> r.getTotal() > 0)
-                    .flatMap(result -> {
-                        Map<String, DeviceInstanceEntity> mapping =
-                            result.getData()
-                                .stream()
-                                .collect(Collectors.toMap(DeviceInstanceEntity::getId, Function.identity()));
-
-                        //查询所有子设备并按父设备ID分组
-                        return instanceService.createQuery()
-                            .where()
-                            .in(DeviceInstanceEntity::getParentId, mapping.keySet())
-                            .fetch()
-                            .groupBy(DeviceInstanceEntity::getParentId,Integer.MAX_VALUE)
-                            .flatMap(group -> {
-                                String parentId = group.key();
-                                return group
-                                    .collectList()
-                                    //将父设备和分组的子设备合并在一起
-                                    .map(children -> GatewayDeviceInfo.of(mapping.get(parentId), children));
-                            })
-                            .collectMap(GatewayDeviceInfo::getId)//收集所有有子设备的网关设备信息
-                            .defaultIfEmpty(Collections.emptyMap())
-                            .flatMapMany(map -> Flux.fromIterable(mapping.values())
-                                .flatMap(ins -> Mono.justOrEmpty(map.get(ins.getId()))
-                                    //处理没有子设备的网关信息
-                                    .switchIfEmpty(Mono.fromSupplier(() -> GatewayDeviceInfo.of(ins, Collections.emptyList())))))
-                            .collectList()
-                            .map(list -> PagerResult.of(result.getTotal(), list, param));
-                    }))
+                         param.toNestQuery(query -> query.in(DeviceInstanceEntity::getProductId, productIdList))
+                              .execute(instanceService::queryPager)
+                              .filter(r -> r.getTotal() > 0)
+                              .flatMap(result -> {
+                                  Map<String, DeviceInstanceEntity> mapping =
+                                      result.getData()
+                                            .stream()
+                                            .collect(Collectors.toMap(DeviceInstanceEntity::getId, Function.identity()));
+
+                                  //查询所有子设备并按父设备ID分组
+                                  return instanceService.createQuery()
+                                                        .where()
+                                                        .in(DeviceInstanceEntity::getParentId, mapping.keySet())
+                                                        .fetch()
+                                                        .groupBy(DeviceInstanceEntity::getParentId, Integer.MAX_VALUE)
+                                                        .flatMap(group -> {
+                                                            String parentId = group.key();
+                                                            return group
+                                                                .collectList()
+                                                                //将父设备和分组的子设备合并在一起
+                                                                .map(children -> GatewayDeviceInfo.of(mapping.get(parentId), children));
+                                                        })
+                                                        .collectMap(GatewayDeviceInfo::getId)//收集所有有子设备的网关设备信息
+                                                        .defaultIfEmpty(Collections.emptyMap())
+                                                        .flatMapMany(map -> Flux.fromIterable(mapping.values())
+                                                                                .flatMap(ins -> Mono.justOrEmpty(map.get(ins.getId()))
+                                                                                                    //处理没有子设备的网关信息
+                                                                                                    .switchIfEmpty(Mono.fromSupplier(() -> GatewayDeviceInfo.of(ins, Collections.emptyList())))))
+                                                        .collectList()
+                                                        .map(list -> PagerResult.of(result.getTotal(), list, param));
+                              }))
             .defaultIfEmpty(PagerResult.empty());
     }
 
@@ -108,11 +108,11 @@ public class GatewayDeviceController {
         return Mono.zip(
             instanceService.findById(id),
             instanceService.createQuery()
-                .where()
-                .is(DeviceInstanceEntity::getParentId, id)
-                .fetch()
-                .collectList()
-                .defaultIfEmpty(Collections.emptyList()),
+                           .where()
+                           .is(DeviceInstanceEntity::getParentId, id)
+                           .fetch()
+                           .collectList()
+                           .defaultIfEmpty(Collections.emptyList()),
             GatewayDeviceInfo::of);
     }
 
@@ -123,13 +123,17 @@ public class GatewayDeviceController {
     public Mono<GatewayDeviceInfo> bindDevice(@PathVariable @Parameter(description = "网关设备ID") String gatewayId,
                                               @PathVariable @Parameter(description = "子设备ID") String deviceId) {
         return instanceService
-            .createUpdate()
-            .set(DeviceInstanceEntity::getParentId, gatewayId)
-            .where(DeviceInstanceEntity::getId, deviceId)
-            .execute()
-            .then(registry
-                .getDevice(deviceId)
-                .flatMap(operator -> operator.setConfig(DeviceConfigKey.parentGatewayId, gatewayId)))
+            .checkCyclicDependency(deviceId, gatewayId)
+            .then(
+                instanceService
+                    .createUpdate()
+                    .set(DeviceInstanceEntity::getParentId, gatewayId)
+                    .where(DeviceInstanceEntity::getId, deviceId)
+                    .execute()
+                    .then(registry
+                              .getDevice(deviceId)
+                              .flatMap(operator -> operator.setConfig(DeviceConfigKey.parentGatewayId, gatewayId)))
+            )
             .then(getGatewayInfo(gatewayId));
     }
 
@@ -141,6 +145,11 @@ public class GatewayDeviceController {
 
 
         return deviceId
+            .flatMapIterable(Function.identity())
+            .flatMap(childId -> instanceService
+                .checkCyclicDependency(childId, gatewayId)
+                .thenReturn(childId))
+            .collectList()
             .filter(CollectionUtils::isNotEmpty)
             .flatMap(deviceIdList -> instanceService
                 .createUpdate()
@@ -149,13 +158,13 @@ public class GatewayDeviceController {
                 .in(DeviceInstanceEntity::getId, deviceIdList)
                 .execute()
                 .then(Flux
-                    .fromIterable(deviceIdList)
-                    .flatMap(id -> registry
-                        .getDevice(id)
-                        .flatMap(operator -> operator.setConfig(DeviceConfigKey.parentGatewayId, gatewayId))).then()
-                )
-            ).then(getGatewayInfo(gatewayId));
-
+                          .fromIterable(deviceIdList)
+                          .flatMap(id -> registry
+                              .getDevice(id)
+                              .flatMap(operator -> operator.setConfig(DeviceConfigKey.parentGatewayId, gatewayId)))
+                          .then()
+                ))
+            .then(getGatewayInfo(gatewayId));
 
     }
 

+ 1 - 1
pom.xml

@@ -22,7 +22,7 @@
         <hsweb.framework.version>4.0.11</hsweb.framework.version>
         <easyorm.version>4.0.11</easyorm.version>
         <hsweb.expands.version>3.0.2</hsweb.expands.version>
-        <jetlinks.version>1.1.7</jetlinks.version>
+        <jetlinks.version>1.1.8-SNAPSHOT</jetlinks.version>
         <r2dbc.version>Arabba-SR10</r2dbc.version>
         <vertx.version>3.8.5</vertx.version>
         <netty.version>4.1.51.Final</netty.version>