Sfoglia il codice sorgente

调整状态同步逻辑

zhouhao 2 anni fa
parent
commit
80185b911a

+ 31 - 47
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java

@@ -9,6 +9,7 @@ import org.hswebframework.ezorm.rdb.mapping.ReactiveUpdate;
 import org.hswebframework.ezorm.rdb.mapping.defaults.SaveResult;
 import org.hswebframework.ezorm.rdb.operator.dml.Terms;
 import org.hswebframework.web.crud.events.EntityDeletedEvent;
+import org.hswebframework.web.crud.events.EntityEventHelper;
 import org.hswebframework.web.crud.service.GenericReactiveCrudService;
 import org.hswebframework.web.exception.BusinessException;
 import org.hswebframework.web.id.IDGenerator;
@@ -318,14 +319,17 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
                 .flatMap(id -> registry
                     .getDevice(id)
                     .flatMap(operator -> {
-                        Mono<Byte> state = force ? operator.checkState() : operator.getState();
-                        return Mono.zip(
-                            state.defaultIfEmpty(org.jetlinks.core.device.DeviceState.offline),//状态
-                            Mono.just(operator.getDeviceId()), //设备id
-                            operator
-                                .getConfig(DeviceConfigKey.isGatewayDevice)
-                                .defaultIfEmpty(false)//是否为网关设备
-                        );
+                        Mono<Byte> state = force
+                            ? operator
+                            .checkState()
+                            .onErrorResume(err -> operator.getState())
+                            : operator.getState();
+                        return Mono
+                            .zip(
+                                state.defaultIfEmpty(org.jetlinks.core.device.DeviceState.offline),//状态
+                                Mono.just(operator.getDeviceId()), //设备id
+                                operator.getConfig(DeviceConfigKey.isGatewayDevice).defaultIfEmpty(false)//是否为网关设备
+                            );
                     })
                     //注册中心里不存在设备就认为是未激活.
                     .defaultIfEmpty(Tuples.of(org.jetlinks.core.device.DeviceState.noActive, id, false)))
@@ -338,45 +342,25 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
                         .map(Tuple3::getT2)
                         .collect(Collectors.toList());
                     DeviceState state = DeviceState.of(group.getKey());
-                    return Flux
-                        .concat(
-                            //批量修改设备状态
-                            getRepository()
-                                .createUpdate()
-                                .set(DeviceInstanceEntity::getState, state)
-                                .where()
-                                .in(DeviceInstanceEntity::getId, deviceIdList)
-                                .execute()
-                                .thenReturn(group.getValue().size()),
-                            //修改子设备状态
-                            Flux.fromIterable(group.getValue())
-                                .filter(Tuple3::getT3)
-                                .map(Tuple3::getT2)
-                                .collectList()
-                                .filter(CollectionUtils::isNotEmpty)
-                                .flatMap(parents -> this
-                                    .getRepository()
-                                    .createUpdate()
-                                    .set(DeviceInstanceEntity::getState, state)
-                                    .where()
-                                    .in(DeviceInstanceEntity::getParentId, parents)
-                                    //不修改未激活的状态
-                                    .not(DeviceInstanceEntity::getState, DeviceState.notActive)
-                                    .nest()
-                                    /* */.accept(DeviceInstanceEntity::getFeatures, Terms.Enums.notInAny, DeviceFeature.selfManageState)
-                                    /* */.or()
-                                    /* */.isNull(DeviceInstanceEntity::getFeatures)
-                                    .end()
-                                    .execute())
-                                .defaultIfEmpty(0)
-                        )
-                        .then(Mono.just(
-                            deviceIdList
-                                .stream()
-                                .map(id -> DeviceStateInfo.of(id, state))
-                                .collect(Collectors.toList())
-                        ));
-                }));
+                    return
+                        //批量修改设备状态
+                        getRepository()
+                            .createUpdate()
+                            .set(DeviceInstanceEntity::getState, state)
+                            .where()
+                            .in(DeviceInstanceEntity::getId, deviceIdList)
+                            .when(state != DeviceState.notActive, where -> where.not(DeviceInstanceEntity::getState, DeviceState.notActive))
+                            .execute()
+                            .thenReturn(group.getValue().size())
+                            .then(Mono.just(
+                                deviceIdList
+                                    .stream()
+                                    .map(id -> DeviceStateInfo.of(id, state))
+                                    .collect(Collectors.toList())
+                            ));
+                }))
+            //更新状态不触发事件
+            .as(EntityEventHelper::setDoNotFireEvent);
     }
 
     private static <R extends DeviceMessageReply, T> Function<R, Mono<T>> mapReply(Function<R, T> function) {