zhou-hao 4 vuotta sitten
vanhempi
commit
d2a9902058

+ 14 - 12
jetlinks-components/notify-component/notify-core/src/main/java/org/jetlinks/community/notify/template/AbstractTemplateManager.java

@@ -17,34 +17,36 @@ public abstract class AbstractTemplateManager implements TemplateManager {
 
     protected void register(TemplateProvider provider) {
         providers.computeIfAbsent(provider.getType().getId(), ignore -> new ConcurrentHashMap<>())
-                .put(provider.getProvider().getId(), provider);
+                 .put(provider.getProvider().getId(), provider);
     }
 
     @Override
     @Nonnull
-    public Mono<? extends Template> createTemplate(@Nonnull NotifyType type,@Nonnull TemplateProperties prop) {
+    public Mono<? extends Template> createTemplate(@Nonnull NotifyType type, @Nonnull TemplateProperties prop) {
         return Mono.justOrEmpty(providers.get(type.getId()))
-                .switchIfEmpty(Mono.error(() -> new UnsupportedOperationException("不支持的通知类型:" + prop.getType())))
-                .flatMap(map -> Mono.justOrEmpty(map.get(prop.getProvider()))
-                        .switchIfEmpty(Mono.error(() -> new UnsupportedOperationException("不支持的服务商:" + prop.getProvider())))
-                        .flatMap(provider -> provider.createTemplate(prop)));
+                   .switchIfEmpty(Mono.error(() -> new UnsupportedOperationException("不支持的通知类型:" + prop.getType())))
+                   .flatMap(map -> Mono.justOrEmpty(map.get(prop.getProvider()))
+                                       .switchIfEmpty(Mono.error(() -> new UnsupportedOperationException("不支持的服务商:" + prop
+                                           .getProvider())))
+                                       .flatMap(provider -> provider.createTemplate(prop)));
     }
 
     @Nonnull
     @Override
     public Mono<? extends Template> getTemplate(@Nonnull NotifyType type, @Nonnull String id) {
         return Mono.justOrEmpty(templates.get(id))
-                .switchIfEmpty(Mono.defer(() ->
-                        getProperties(type, id)
-                                .flatMap(prop -> this.createTemplate(type, prop))
-                                .switchIfEmpty(Mono.error(() -> new UnsupportedOperationException("通知类型不支持:" + type.getId())))
-                ));
+                   .switchIfEmpty(Mono.defer(() -> this
+                       .getProperties(type, id)
+                       .flatMap(prop -> this.createTemplate(type, prop))
+                       .doOnNext(temp -> templates.put(id, temp))
+                       .switchIfEmpty(Mono.error(() -> new UnsupportedOperationException("通知类型不支持:" + type
+                           .getId())))
+                   ));
     }
 
     @Override
     @Nonnull
     public Mono<Void> reload(String templateId) {
-        // TODO: 2019/12/20 集群支持
         return Mono.fromRunnable(() -> templates.remove(templateId));
     }
 }

+ 187 - 153
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java

@@ -80,8 +80,8 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
     @Override
     public Mono<SaveResult> save(Publisher<DeviceInstanceEntity> entityPublisher) {
         return Flux.from(entityPublisher)
-            .doOnNext(instance -> instance.setState(null))
-            .as(super::save);
+                   .doOnNext(instance -> instance.setState(null))
+                   .as(super::save);
     }
 
 
@@ -149,7 +149,8 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
         return flux
             .flatMap(instance -> registry
                 .register(instance.toDeviceInfo())
-                .flatMap(deviceOperator -> deviceOperator.getState()
+                .flatMap(deviceOperator -> deviceOperator
+                    .getState()
                     .flatMap(r -> {
                         if (r.equals(org.jetlinks.core.device.DeviceState.unknown) ||
                             r.equals(org.jetlinks.core.device.DeviceState.noActive)) {
@@ -164,19 +165,21 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
                 .thenReturn(instance))
             .buffer(50)
             .publishOn(Schedulers.single())
-            .flatMap(all -> Flux.fromIterable(all)
+            .flatMap(all -> Flux
+                .fromIterable(all)
                 .groupBy(DeviceInstanceEntity::getState)
-                .flatMap(group ->
-                    group.map(DeviceInstanceEntity::getId)
-                        .collectList()
-                        .flatMap(list -> createUpdate()
-                            .where()
-                            .set(DeviceInstanceEntity::getState, group.key())
-                            .set(DeviceInstanceEntity::getRegistryTime, new Date())
-                            .in(DeviceInstanceEntity::getId, list)
-                            .execute()
-                            .map(r -> DeviceDeployResult.success(list.size()))
-                            .onErrorResume(err -> Mono.just(DeviceDeployResult.error(err.getMessage()))))));
+                .flatMap(group -> group
+                    .map(DeviceInstanceEntity::getId)
+                    .collectList()
+                    .flatMap(list -> createUpdate()
+                        .where()
+                        .set(DeviceInstanceEntity::getState, group.key())
+                        .set(DeviceInstanceEntity::getRegistryTime, new Date())
+                        .in(DeviceInstanceEntity::getId, list)
+                        .execute()
+                        .map(r -> DeviceDeployResult.success(list.size()))
+                        .onErrorResume(err -> Mono.just(DeviceDeployResult.error(err.getMessage()))))))
+            ;
     }
 
     /**
@@ -190,9 +193,9 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
             .flatMap(product -> registry
                 .unregisterDevice(id)
                 .then(createUpdate()
-                    .set(DeviceInstanceEntity::getState, DeviceState.notActive.getValue())
-                    .where(DeviceInstanceEntity::getId, id)
-                    .execute()));
+                          .set(DeviceInstanceEntity::getState, DeviceState.notActive.getValue())
+                          .where(DeviceInstanceEntity::getId, id)
+                          .execute()));
     }
 
     /**
@@ -203,12 +206,12 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
      */
     public Mono<Integer> unregisterDevice(String id) {
         return this.findById(Mono.just(id))
-            .flatMap(device -> registry
-                .unregisterDevice(id)
-                .then(createUpdate()
-                    .set(DeviceInstanceEntity::getState, DeviceState.notActive.getValue())
-                    .where(DeviceInstanceEntity::getId, id)
-                    .execute()));
+                   .flatMap(device -> registry
+                       .unregisterDevice(id)
+                       .then(createUpdate()
+                                 .set(DeviceInstanceEntity::getState, DeviceState.notActive.getValue())
+                                 .where(DeviceInstanceEntity::getId, id)
+                                 .execute()));
     }
 
     /**
@@ -219,12 +222,12 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
      */
     public Mono<Integer> unregisterDevice(Publisher<String> ids) {
         return Flux.from(ids)
-            .flatMap(id -> registry.unregisterDevice(id).thenReturn(id))
-            .collectList()
-            .flatMap(list -> createUpdate()
-                .set(DeviceInstanceEntity::getState, DeviceState.notActive.getValue())
-                .where().in(DeviceInstanceEntity::getId, list)
-                .execute());
+                   .flatMap(id -> registry.unregisterDevice(id).thenReturn(id))
+                   .collectList()
+                   .flatMap(list -> createUpdate()
+                       .set(DeviceInstanceEntity::getState, DeviceState.notActive.getValue())
+                       .where().in(DeviceInstanceEntity::getId, list)
+                       .execute());
     }
 
     protected Mono<DeviceDetail> createDeviceDetail(DeviceProductEntity product,
@@ -235,30 +238,35 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
 
         return registry
             .getDevice(device.getId())
-            .flatMap(operator ->
-                         //检查设备的真实状态,可能出现设备已经离线,但是数据库状态未及时更新的.
-                         operator.checkState()
-                                 .map(DeviceState::of)
-                                 .filter(state -> state != detail.getState())
-                                 .doOnNext(detail::setState)
-                                 .flatMap(state -> createUpdate()
-                                     .set(DeviceInstanceEntity::getState, state)
-                                     .where(DeviceInstanceEntity::getId, device.getId())
-                                     .execute())
-                                 .thenReturn(operator))
+            .flatMap(operator -> operator
+                //检查设备的真实状态,可能出现设备已经离线,但是数据库状态未及时更新的.
+                .checkState()
+                .map(DeviceState::of)
+                //检查失败,则返回原始状态
+                .onErrorReturn(device.getState())
+                //如果状态不一致,则需要更新数据库中的状态
+                .filter(state -> state != detail.getState())
+                .doOnNext(detail::setState)
+                .flatMap(state -> createUpdate()
+                    .set(DeviceInstanceEntity::getState, state)
+                    .where(DeviceInstanceEntity::getId, device.getId())
+                    .execute())
+                .thenReturn(operator))
+            //填充详情信息
             .flatMap(detail::with)
-            .switchIfEmpty(Mono.defer(() -> {
-                //如果设备注册中心里没有设备信息,并且数据库里的状态不是未激活.
-                //可能是因为注册中心信息丢失,修改数据库中的状态信息.
-                if (detail.getState() != DeviceState.notActive) {
-                    return createUpdate()
-                        .set(DeviceInstanceEntity::getState, DeviceState.notActive)
-                        .where(DeviceInstanceEntity::getId, detail.getId())
-                        .execute()
-                        .thenReturn(detail.notActive());
-                }
-                return Mono.just(detail.notActive());
-            }).thenReturn(detail))
+            .switchIfEmpty(
+                Mono.defer(() -> {
+                    //如果设备注册中心里没有设备信息,并且数据库里的状态不是未激活.
+                    //可能是因为注册中心信息丢失,修改数据库中的状态信息.
+                    if (detail.getState() != DeviceState.notActive) {
+                        return createUpdate()
+                            .set(DeviceInstanceEntity::getState, DeviceState.notActive)
+                            .where(DeviceInstanceEntity::getId, detail.getId())
+                            .execute()
+                            .thenReturn(detail.notActive());
+                    }
+                    return Mono.just(detail.notActive());
+                }).thenReturn(detail))
             .onErrorResume(err -> {
                 log.warn("get device detail error", err);
                 return Mono.just(detail);
@@ -282,11 +290,14 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
     }
 
     public Mono<DeviceState> getDeviceState(String deviceId) {
-        return registry.getDevice(deviceId)
+        return registry
+            .getDevice(deviceId)
             .flatMap(DeviceOperator::checkState)
             .flatMap(state -> {
                 DeviceState deviceState = DeviceState.of(state);
-                return createUpdate().set(DeviceInstanceEntity::getState, deviceState)
+                return this
+                    .createUpdate()
+                    .set(DeviceInstanceEntity::getState, deviceState)
                     .where(DeviceInstanceEntity::getId, deviceId)
                     .execute()
                     .thenReturn(deviceState);
@@ -312,62 +323,74 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
         //否则缓冲,数量超过500后批量更新
         //无论缓冲区是否超过500条,都每2秒更新一次.
         FluxUtils.bufferRate(eventBus
-                .subscribe(subscription,DeviceMessage.class)
-                .map(DeviceMessage::getDeviceId),
-            800, Integer.getInteger("device.state.sync.batch", 500), Duration.ofSeconds(2))
-            .publishOn(Schedulers.parallel())
-            .concatMap(list -> syncStateBatch(Flux.just(list), false).map(List::size))
-            .onErrorContinue((err, obj) -> log.error(err.getMessage(), err))
-            .subscribe((i) -> log.info("同步设备状态成功:{}", i));
+                                 .subscribe(subscription, DeviceMessage.class)
+                                 .map(DeviceMessage::getDeviceId),
+                             800, Integer.getInteger("device.state.sync.batch", 500), Duration.ofSeconds(2))
+                 .publishOn(Schedulers.parallel())
+                 .concatMap(list -> syncStateBatch(Flux.just(list), false).map(List::size))
+                 .onErrorContinue((err, obj) -> log.error(err.getMessage(), err))
+                 .subscribe((i) -> log.info("同步设备状态成功:{}", i));
     }
 
 
     public Flux<List<DeviceStateInfo>> syncStateBatch(Flux<List<String>> batch, boolean force) {
 
         return batch
-            .concatMap(list -> Flux.fromIterable(list)
+            .concatMap(list -> Flux
+                .fromIterable(list)
                 .publishOn(Schedulers.parallel())
-                .flatMap(id ->
-                    registry.getDevice(id)
-                        .flatMap(operator -> {
-                            Mono<Byte> state = force ? operator.checkState() : operator.getState();
-                            return Mono.zip(
-                                state.defaultIfEmpty(org.jetlinks.core.device.DeviceState.offline),//状态
-                                Mono.just(operator.getDeviceId()), //设备id
-                                operator.getConfig(DeviceConfigKey.isGatewayDevice).defaultIfEmpty(false)//是否为网关设备
-                            );
-                        })
-                        //注册中心里不存在设备就认为是未激活.
-                        .defaultIfEmpty(Tuples.of(org.jetlinks.core.device.DeviceState.noActive, id, false)))
+                .flatMap(id -> registry
+                    .getDevice(id)
+                    .flatMap(operator -> {
+                        Mono<Byte> state = force ? operator.checkState() : operator.getState();
+                        return Mono.zip(
+                            state.defaultIfEmpty(org.jetlinks.core.device.DeviceState.offline),//状态
+                            Mono.just(operator.getDeviceId()), //设备id
+                            operator
+                                .getConfig(DeviceConfigKey.isGatewayDevice)
+                                .defaultIfEmpty(false)//是否为网关设备
+                        );
+                    })
+                    //注册中心里不存在设备就认为是未激活.
+                    .defaultIfEmpty(Tuples.of(org.jetlinks.core.device.DeviceState.noActive, id, false)))
                 .collect(Collectors.groupingBy(Tuple2::getT1))
                 .flatMapIterable(Map::entrySet)
                 .flatMap(group -> {
-                    List<String> deviceId=group.getValue().stream().map(Tuple3::getT2).collect(Collectors.toList());
+                    List<String> deviceId = group
+                        .getValue()
+                        .stream()
+                        .map(Tuple3::getT2)
+                        .collect(Collectors.toList());
                     DeviceState state = DeviceState.of(group.getKey());
-                    return Mono.zip(
-                        //批量修改设备状态
-                        getRepository()
-                            .createUpdate()
-                            .set(DeviceInstanceEntity::getState, state)
-                            .where()
-                            .in(DeviceInstanceEntity::getId,deviceId)
-                            .execute()
-                            .thenReturn(group.getValue().size()),
-                        //修改子设备状态
-                        Flux.fromIterable(group.getValue())
-                            .filter(Tuple3::getT3)
-                            .map(Tuple3::getT2)
-                            .collectList()
-                            .filter(CollectionUtils::isNotEmpty)
-                            .flatMap(parents ->
-                                getRepository()
+                    return Mono
+                        .zip(
+                            //批量修改设备状态
+                            this.getRepository()
+                                .createUpdate()
+                                .set(DeviceInstanceEntity::getState, state)
+                                .where()
+                                .in(DeviceInstanceEntity::getId, deviceId)
+                                .execute()
+                                .thenReturn(group.getValue().size()),
+                            //修改子设备状态
+                            Flux.fromIterable(group.getValue())
+                                .filter(Tuple3::getT3)
+                                .map(Tuple3::getT2)
+                                .collectList()
+                                .filter(CollectionUtils::isNotEmpty)
+                                .flatMap(parents -> this
+                                    .getRepository()
                                     .createUpdate()
                                     .set(DeviceInstanceEntity::getState, state)
                                     .where()
                                     .in(DeviceInstanceEntity::getParentId, parents)
                                     .execute())
-                            .defaultIfEmpty(0))
-                        .thenReturn(deviceId.stream().map(id->DeviceStateInfo.of(id,state)).collect(Collectors.toList()));
+                                .defaultIfEmpty(0)
+                        )
+                        .thenReturn(deviceId
+                                        .stream()
+                                        .map(id -> DeviceStateInfo.of(id, state))
+                                        .collect(Collectors.toList()));
                 }));
     }
 
@@ -390,8 +413,10 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
         return registry
             .getDevice(deviceId)
             .switchIfEmpty(ErrorUtils.notFound("设备不存在"))
-            .flatMap(deviceOperator -> deviceOperator.messageSender()
-                .readProperty(property).messageId(IDGenerator.SNOW_FLAKE_STRING.generate())
+            .flatMap(deviceOperator -> deviceOperator
+                .messageSender()
+                .readProperty(property)
+                .messageId(IDGenerator.SNOW_FLAKE_STRING.generate())
                 .send()
                 .flatMap(mapReply(ReadPropertyMessageReply::getProperties))
                 .reduceWith(LinkedHashMap::new, (main, map) -> {
@@ -400,11 +425,14 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
                 })
                 .flatMap(map -> {
                     Object value = map.get(property);
-                    return deviceOperator.getMetadata()
-                        .map(deviceMetadata -> deviceMetadata.getProperty(property)
+                    return deviceOperator
+                        .getMetadata()
+                        .map(deviceMetadata -> deviceMetadata
+                            .getProperty(property)
                             .map(PropertyMetadata::getValueType)
                             .orElse(new StringType()))
-                        .map(dataType -> DevicePropertiesEntity.builder()
+                        .map(dataType -> DevicePropertiesEntity
+                            .builder()
                             .deviceId(deviceId)
                             .productId(property)
                             .build()
@@ -469,32 +497,37 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
 
     private Mono<DeviceOperator> doAutoRegister(DeviceRegisterMessage message) {
         //自动注册
-        return Mono.zip(
-            Mono.justOrEmpty(message.getDeviceId()),//1. 设备ID
-            Mono.justOrEmpty(message.getHeader("deviceName")).map(String::valueOf),//2. 设备名称
-            Mono.justOrEmpty(message.getHeader("productId").map(String::valueOf)), //3. 产品ID
-            Mono.justOrEmpty(message.getHeader("productId").map(String::valueOf)) //4. 产品
-                .flatMap(deviceProductService::findById),
-            Mono.justOrEmpty(message.getHeader("configuration").map(Map.class::cast).orElse(new HashMap()))//配置信息
-        ).flatMap(tps -> {
-            DeviceInstanceEntity instance = new DeviceInstanceEntity();
-            instance.setId(tps.getT1());
-            instance.setName(tps.getT2());
-            instance.setProductId(tps.getT3());
-            instance.setProductName(tps.getT4().getName());
-            instance.setConfiguration(tps.getT5());
-            instance.setRegistryTime(message.getTimestamp());
-            instance.setCreateTimeNow();
-            instance.setCreatorId(tps.getT4().getCreatorId());
-            instance.setOrgId(tps.getT4().getOrgId());
-            instance.setState(DeviceState.online);
-            return super
-                .save(Mono.just(instance))
-                .thenReturn(instance)
-                .flatMap(device -> registry
-                    .register(device.toDeviceInfo()
-                                    .addConfig("state", DeviceState.online)));
-        });
+        return Mono
+            .zip(
+                //T1. 设备ID
+                Mono.justOrEmpty(message.getDeviceId()),
+                //T2. 设备名称
+                Mono.justOrEmpty(message.getHeader("deviceName")).map(String::valueOf),
+                //T3. 产品ID
+                Mono.justOrEmpty(message.getHeader("productId").map(String::valueOf)),
+                //T4. 产品
+                Mono.justOrEmpty(message.getHeader("productId").map(String::valueOf))
+                    .flatMap(deviceProductService::findById),
+                //T5. 配置信息
+                Mono.justOrEmpty(message.getHeader("configuration").map(Map.class::cast).orElse(new HashMap()))
+            ).flatMap(tps -> {
+                DeviceInstanceEntity instance = new DeviceInstanceEntity();
+                instance.setId(tps.getT1());
+                instance.setName(tps.getT2());
+                instance.setProductId(tps.getT3());
+                instance.setProductName(tps.getT4().getName());
+                instance.setConfiguration(tps.getT5());
+                instance.setRegistryTime(message.getTimestamp());
+                instance.setCreateTimeNow();
+                instance.setCreatorId(tps.getT4().getCreatorId());
+                instance.setOrgId(tps.getT4().getOrgId());
+                instance.setState(DeviceState.online);
+                return super
+                    .save(Mono.just(instance))
+                    .thenReturn(instance)
+                    .flatMap(device -> registry
+                        .register(device.toDeviceInfo().addConfig("state", DeviceState.online)));
+            });
     }
 
     /**
@@ -515,12 +548,12 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
                 .switchIfEmpty(Mono.defer(() -> doAutoRegister(((DeviceRegisterMessage) childMessage))))
                 .flatMap(dev -> dev.setConfig(DeviceConfigKey.parentGatewayId, message.getDeviceId()).thenReturn(dev))
                 .flatMap(DeviceOperator::getState)
-                .flatMap(state ->
-                    createUpdate()
-                        .set(DeviceInstanceEntity::getParentId, message.getDeviceId())
-                        .set(DeviceInstanceEntity::getState, DeviceState.of(state))
-                        .where(DeviceInstanceEntity::getId, childId)
-                        .execute()
+                .flatMap(state -> this
+                    .createUpdate()
+                    .set(DeviceInstanceEntity::getParentId, message.getDeviceId())
+                    .set(DeviceInstanceEntity::getState, DeviceState.of(state))
+                    .where(DeviceInstanceEntity::getId, childId)
+                    .execute()
                 ).then();
         }
         return Mono.empty();
@@ -538,7 +571,8 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
         Message childMessage = message.getChildDeviceMessage();
         if (childMessage instanceof DeviceUnRegisterMessage) {
 
-            return registry.getDevice(childId)
+            return registry
+                .getDevice(childId)
                 .flatMap(dev -> dev
                     .removeConfig(DeviceConfigKey.parentGatewayId.getKey())
                     .then(dev.checkState()))
@@ -563,29 +597,29 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
         return registry
             .getDevice(deviceId)
             .flatMap(DeviceOperator::getMetadata)
-            .flatMapMany(metadata ->
-                Flux.fromIterable(tags.entrySet())
-                    .map(e -> {
-                        DeviceTagEntity tagEntity =
-                            metadata.getTag(e.getKey())
-                                .map(DeviceTagEntity::of)
-                                .orElseGet(() -> {
-                                    DeviceTagEntity entity = new DeviceTagEntity();
-                                    entity.setKey(e.getKey());
-                                    entity.setType("string");
-                                    entity.setName(e.getKey());
-                                    entity.setCreateTime(new Date());
-                                    entity.setDescription("设备上报");
-                                    return entity;
-                                });
-                        tagEntity.setDeviceId(deviceId);
-                        tagEntity.setId(DeviceTagEntity.createTagId(deviceId, tagEntity.getKey()));
-                        return tagEntity;
-                    }))
+            .flatMapMany(metadata -> Flux
+                .fromIterable(tags.entrySet())
+                .map(e -> {
+                    DeviceTagEntity tagEntity = metadata
+                        .getTag(e.getKey())
+                        .map(DeviceTagEntity::of)
+                        .orElseGet(() -> {
+                            DeviceTagEntity entity = new DeviceTagEntity();
+                            entity.setKey(e.getKey());
+                            entity.setType("string");
+                            entity.setName(e.getKey());
+                            entity.setCreateTime(new Date());
+                            entity.setDescription("设备上报");
+                            return entity;
+                        });
+                    tagEntity.setValue(String.valueOf(e.getValue()));
+                    tagEntity.setDeviceId(deviceId);
+                    tagEntity.setId(DeviceTagEntity.createTagId(deviceId, tagEntity.getKey()));
+                    return tagEntity;
+                }))
             .as(tagRepository::save)
             .then();
     }
 
 
-
 }