浏览代码

优化设备状态同步

zhou-hao 5 年之前
父节点
当前提交
00ddbb68e8

+ 41 - 38
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java

@@ -4,6 +4,7 @@ import com.alibaba.excel.EasyExcel;
 import com.alibaba.excel.ExcelWriter;
 import com.alibaba.excel.write.metadata.WriteSheet;
 import com.alibaba.fastjson.JSON;
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
 import org.hswebframework.ezorm.core.dsl.Query;
@@ -101,6 +102,7 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
             .doOnNext(instance -> instance.setState(null))
             .as(super::save);
     }
+
     /**
      * 获取设备所有信息
      *
@@ -113,7 +115,7 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
             .zipWhen(instance -> deviceProductService.findById(instance.getProductId()), DeviceInfo::of) //产品型号信息
             .switchIfEmpty(Mono.error(NotFoundException::new))
             .zipWhen(deviceInfo -> getDeviceRunRealInfo(id), DeviceAllInfoResponse::of) //设备运行状态
-            .zipWhen(info -> getDeviceLatestProperties( id).collectList(), DeviceAllInfoResponse::ofProperties) //设备属性
+            .zipWhen(info -> getDeviceLatestProperties(id).collectList(), DeviceAllInfoResponse::ofProperties) //设备属性
             .zipWhen(info -> {
                     DeviceMetadata deviceMetadata = new JetLinksDeviceMetadata(JSON.parseObject(info.getDeviceInfo().getDeriveMetadata()));
                     return getEventCounts(deviceMetadata.getEvents(), id, info.getDeviceInfo().getProductId()); //事件数量统计
@@ -170,8 +172,7 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
                     .id(instance.getId())
                     .productId(instance.getProductId())
                     .build()
-                    .addConfig(DeviceConfigKey.parentGatewayId, instance.getParentId())
-                )
+                    .addConfig(DeviceConfigKey.parentGatewayId, instance.getParentId()))
                     //设置其他配置信息
                     .flatMap(deviceOperator -> deviceOperator.getState()
                         .flatMap(r -> {
@@ -205,6 +206,7 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
 
     /**
      * 取消发布(取消激活),取消后,设备无法再连接到服务. 注册中心也无法再获取到该设备信息.
+     *
      * @param id 设备ID
      * @return 取消结果
      */
@@ -330,7 +332,8 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
             .subscribe(Subscription.asList("/device/*/online", "/device/*/offline"), "device-state-synchronizer", false)
             .flatMap(message -> Mono.justOrEmpty(DeviceMessageUtils.convert(message))
                 .map(DeviceMessage::getDeviceId)), 800, 200, Duration.ofSeconds(2))
-            .flatMap(list -> syncStateBatch(Flux.just(list), false).count())
+            .publishOn(Schedulers.parallel())
+            .concatMap(list -> syncStateBatch(Flux.just(list), false).reduce(Math::addExact))
             .onErrorContinue((err, obj) -> log.error(err.getMessage(), err))
             .subscribe((i) -> log.info("同步设备状态成功:{}", i));
     }
@@ -343,47 +346,46 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
     }
 
     public Flux<Integer> syncStateBatch(Flux<List<String>> batch, boolean force) {
+
         return batch
-            .flatMap(list -> Flux.fromIterable(list)
-                .flatMap(registry::getDevice)
+            .concatMap(list -> Flux.fromIterable(list)
                 .publishOn(Schedulers.parallel())
+                .flatMap(registry::getDevice)
                 .flatMap(operation -> {
                     Mono<Byte> state = force ? operation.checkState() : operation.getState();
                     return Mono.zip(
-                        state,//状态
+                        state.defaultIfEmpty(org.jetlinks.core.device.DeviceState.offline),//状态
                         Mono.just(operation.getDeviceId()), //设备id
-                        operation.getConfig(DeviceConfigKey.isGatewayDevice)//是否为网关设备
+                        operation.getConfig(DeviceConfigKey.isGatewayDevice).defaultIfEmpty(false)//是否为网关设备
                     );
                 })
-                .groupBy(Tuple2::getT1, Function.identity())
+                .collect(Collectors.groupingBy(Tuple2::getT1))
+                .flatMapIterable(Map::entrySet)
+                .publishOn(Schedulers.single())
                 .flatMap(group -> {
-                    @SuppressWarnings("all")
-                    DeviceState state = group.key() == null ? DeviceState.offline : DeviceState.of(group.key());
-                    return group
-                        .collectList()
-                        .flatMap(idList -> Mono.zip(
-                            //修改设备状态
-                            getRepository()
-                                .createUpdate()
-                                .set(DeviceInstanceEntity::getState, state)
-                                .where()
-                                .in(DeviceInstanceEntity::getId, idList.stream().map(Tuple3::getT2).collect(Collectors.toList()))
-                                .execute(),
-                            //修改子设备状态
-                            Flux.fromIterable(idList)
-                                .filter(Tuple3::getT3)
-                                .map(Tuple3::getT2)
-                                .collectList()
-                                .filter(CollectionUtils::isNotEmpty)
-                                .flatMap(parents ->
-                                    getRepository()
-                                        .createUpdate()
-                                        .set(DeviceInstanceEntity::getState, state)
-                                        .where()
-                                        .in(DeviceInstanceEntity::getParentId, parents)
-                                        .execute()
-                                ).defaultIfEmpty(0)
-                            , Math::addExact));
+                    DeviceState state = DeviceState.of(group.getKey());
+                    return Mono.zip(
+                        //修改设备状态
+                        getRepository()
+                            .createUpdate()
+                            .set(DeviceInstanceEntity::getState, state)
+                            .where()
+                            .in(DeviceInstanceEntity::getId, group.getValue().stream().map(Tuple3::getT2).collect(Collectors.toList()))
+                            .execute()
+                            .defaultIfEmpty(0),
+                        Flux.fromIterable(group.getValue())
+                            .filter(Tuple3::getT3)
+                            .map(Tuple3::getT2)
+                            .collectList()
+                            .filter(CollectionUtils::isNotEmpty)
+                            .flatMap(parents ->
+                                getRepository()
+                                    .createUpdate()
+                                    .set(DeviceInstanceEntity::getState, state)
+                                    .where()
+                                    .in(DeviceInstanceEntity::getParentId, parents)
+                                    .execute()
+                            ).defaultIfEmpty(0), Math::addExact);
                 }));
     }
 
@@ -431,7 +433,8 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
 
     private DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
 
-    public Mono<Void> doExport(ServerHttpResponse response, QueryParam queryParam, String fileName) throws IOException {
+    @SneakyThrows
+    public Mono<Void> doExport(ServerHttpResponse response, QueryParam queryParam, String fileName) {
         response.getHeaders()
             .set(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=".concat(
                 URLEncoder.encode(fileName, StandardCharsets.UTF_8.displayName())));
@@ -458,7 +461,7 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
                 WriteSheet writeSheet = EasyExcel.writerSheet().build();
                 sink.onCancel(query(queryParam)
                     .map(entity -> {
-                        return  FastBeanCopier.copy(entity, new DeviceInstanceImportExportEntity());
+                        return FastBeanCopier.copy(entity, new DeviceInstanceImportExportEntity());
                     })
                     .buffer(100)
                     .doOnNext(list -> excelWriter.write(list, writeSheet))

+ 8 - 8
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceInstanceController.java

@@ -31,6 +31,7 @@ import org.springframework.http.server.reactive.ServerHttpResponse;
 import org.springframework.web.bind.annotation.*;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 import java.io.IOException;
 import java.util.Map;
@@ -132,8 +133,10 @@ public class DeviceInstanceController implements
         return service
             .query(query.includes("id"))
             .map(DeviceInstanceEntity::getId)
-            .buffer(50)
-            .as(flux -> service.syncStateBatch(flux, true));
+            .buffer(200)
+            .publishOn(Schedulers.single())
+            .concatMap(flux -> service.syncStateBatch(Flux.just(flux), true))
+            .defaultIfEmpty(0);
     }
 
     //已废弃
@@ -198,11 +201,8 @@ public class DeviceInstanceController implements
     }
 
     @PostMapping("/export")
-    public Mono<Void> export(ServerHttpResponse response, QueryParam parameter) throws IOException {
-        return Authentication
-            .currentReactive()
-            .flatMap(auth -> {
-                return Mono.fromCallable(() -> service.doExport(response, parameter, "设备实例.xlsx"));
-            }).then();
+    @QueryAction
+    public Mono<Void> export(ServerHttpResponse response, QueryParam parameter){
+        return service.doExport(response, parameter, "设备实例.xlsx");
     }
 }