Browse Source

优化设备注册

zhou-hao 4 years ago
parent
commit
ccbc50f492

+ 3 - 2
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceInstanceEntity.java

@@ -125,7 +125,8 @@ public class DeviceInstanceEntity extends GenericEntity<String> implements Recor
     private String parentId;
 
     public DeviceInfo toDeviceInfo() {
-        DeviceInfo info = DeviceInfo.builder()
+        DeviceInfo info = org.jetlinks.core.device.DeviceInfo
+            .builder()
             .id(this.getId())
             .productId(this.getProductId())
             .build()
@@ -133,7 +134,7 @@ public class DeviceInstanceEntity extends GenericEntity<String> implements Recor
         info.addConfig("deviceName", name);
         info.addConfig("productName", productName);
         if (!CollectionUtils.isEmpty(configuration)) {
-            configuration.forEach(info::addConfig);
+            info.addConfigs(configuration);
         }
         if (StringUtils.hasText(deriveMetadata)) {
             info.addConfig(DeviceConfigKey.metadata, deriveMetadata);

+ 4 - 2
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceProductEntity.java

@@ -170,14 +170,16 @@ public class DeviceProductEntity extends GenericEntity<String> implements Record
     }
 
     public ProductInfo toProductInfo() {
-        return ProductInfo.builder()
+        return ProductInfo
+            .builder()
             .id(getId())
             .protocol(getMessageProtocol())
             .metadata(getMetadata())
             .build()
             .addConfig(DeviceConfigKey.isGatewayDevice, getDeviceType() == gateway)
             .addConfig("storePolicy", storePolicy)
-            .addConfig("storePolicyConfiguration", storePolicyConfiguration);
+            .addConfig("storePolicyConfiguration", storePolicyConfiguration)
+            .addConfigs(configuration);
     }
 
 }

+ 2 - 1
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/events/DeviceProductDeployEvent.java

@@ -2,6 +2,7 @@ package org.jetlinks.community.device.events;
 
 import lombok.Getter;
 import lombok.Setter;
+import org.hswebframework.web.event.DefaultAsyncEvent;
 
 import java.util.Map;
 
@@ -11,7 +12,7 @@ import java.util.Map;
  **/
 @Getter
 @Setter
-public class DeviceProductDeployEvent {
+public class DeviceProductDeployEvent extends DefaultAsyncEvent {
 
     private String id;
 

+ 57 - 17
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/response/DeviceDetail.java

@@ -1,7 +1,9 @@
 package org.jetlinks.community.device.response;
 
+import io.swagger.v3.oas.annotations.media.Schema;
 import lombok.Getter;
 import lombok.Setter;
+import org.apache.commons.collections4.MapUtils;
 import org.jetlinks.community.device.enums.DeviceType;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.community.device.entity.DeviceInstanceEntity;
@@ -22,66 +24,91 @@ import java.util.stream.Stream;
 public class DeviceDetail {
 
     //设备ID
+    @Schema(description = "设备ID")
     private String id;
 
     //设备名称
+    @Schema(description = "设备名称")
     private String name;
 
     //设备图片
+    @Schema(description = "图片地址")
     private String photoUrl;
 
     //消息协议标识
+    @Schema(description = "消息协议ID")
     private String protocol;
 
     //协议名称
+    @Schema(description = "消息协议名称")
     private String protocolName;
 
     //通信协议
+    @Schema(description = "通信协议")
     private String transport;
 
     //所属机构ID
+    @Schema(description = "机构ID")
     private String orgId;
 
     //所属机构名称
+    @Schema(description = "机构名称")
     private String orgName;
 
-    //型号ID
+    //产品ID
+    @Schema(description = "产品ID")
     private String productId;
 
     //型号名称
+    @Schema(description = "产品名称")
     private String productName;
 
-    //设备状态
-    private DeviceState state;
-
     //设备类型
+    @Schema(description = "设备类型")
     private DeviceType deviceType;
 
+    //设备状态
+    @Schema(description = "设备状态")
+    private DeviceState state;
+
     //客户端地址 /id:port
+    @Schema(description = "ip地址")
     private String address;
 
     //上线时间
+    @Schema(description = "上线时间")
     private long onlineTime;
 
     //离线时间
+    @Schema(description = "离线时间")
     private long offlineTime;
 
     //创建时间
+    @Schema(description = "创建时间")
     private long createTime;
 
-    //注册时间
+    //激活时间
+    @Schema(description = "激活时间")
     private long registerTime;
 
     //设备元数据
+    @Schema(description = "物模型")
     private String metadata;
 
     //设备配置信息
+    @Schema(description = "配置信息")
     private Map<String, Object> configuration = new HashMap<>();
 
     //设备单独的配置信息
+    @Schema(description = "是否为单独的配置,false表示部分配置信息继承自产品.")
     private boolean aloneConfiguration;
 
+    //父设备ID
+    @Schema(description = "父设备ID")
+    private String parentId;
+
     //标签
+    @Schema(description = "标签信息")
     private List<DeviceTagEntity> tags = new ArrayList<>();
 
     public DeviceDetail notActive() {
@@ -101,15 +128,17 @@ public class DeviceDetail {
             setOfflineTime(tp.getT3());
             setAddress(tp.getT1());
             with(tp.getT4()
-                .getTags()
-                .stream()
-                .map(DeviceTagEntity::of)
-                .collect(Collectors.toList()));
+                   .getTags()
+                   .stream()
+                   .map(DeviceTagEntity::of)
+                   .collect(Collectors.toList()));
         }).thenReturn(this);
     }
 
     public synchronized DeviceDetail with(List<DeviceTagEntity> tags) {
-
+        if (CollectionUtils.isEmpty(tags)) {
+            return this;
+        }
         Map<String, DeviceTagEntity> map = Stream
             .concat(tags.stream(), this.tags.stream())
             .collect(
@@ -130,11 +159,14 @@ public class DeviceDetail {
     }
 
     public DeviceDetail with(DeviceProductEntity productEntity) {
+        if (productEntity == null) {
+            return this;
+        }
         if (StringUtils.isEmpty(metadata)) {
             setMetadata(productEntity.getMetadata());
         }
         if (CollectionUtils.isEmpty(configuration) && !CollectionUtils.isEmpty(productEntity.getConfiguration())) {
-            setConfiguration(productEntity.getConfiguration());
+            configuration.putAll(productEntity.getConfiguration());
         }
         setProtocol(productEntity.getMessageProtocol());
         setTransport(productEntity.getTransportProtocol());
@@ -152,16 +184,24 @@ public class DeviceDetail {
         setName(device.getName());
         setState(device.getState());
         setOrgId(device.getOrgId());
-
+        setParentId(device.getParentId());
         Optional.ofNullable(device.getRegistryTime())
-            .ifPresent(this::setRegisterTime);
+                .ifPresent(this::setRegisterTime);
 
         Optional.ofNullable(device.getCreateTime())
-            .ifPresent(this::setCreateTime);
+                .ifPresent(this::setCreateTime);
 
-        if (!CollectionUtils.isEmpty(device.getConfiguration())) {
-            setConfiguration(device.getConfiguration());
-            setAloneConfiguration(true);
+        if (MapUtils.isNotEmpty(device.getConfiguration())) {
+            boolean hasConfig = device
+                .getConfiguration()
+                .keySet()
+                .stream()
+                .map(configuration::get)
+                .anyMatch(Objects::nonNull);
+            if (hasConfig) {
+                setAloneConfiguration(true);
+            }
+            configuration.putAll(device.getConfiguration());
         }
         if (StringUtils.hasText(device.getDeriveMetadata())) {
             setMetadata(device.getDeriveMetadata());

+ 80 - 58
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java

@@ -93,28 +93,35 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
      * @since 1.2
      */
     public Mono<Map<String, Object>> resetConfiguration(String deviceId) {
-        return findById(deviceId)
-            .flatMap(device ->
-                Mono.defer(() -> {
-                    if (!MapUtils.isEmpty(device.getConfiguration())) {
-                        //重置注册中心里的配置
-                        return registry.getDevice(deviceId)
-                            .flatMap(opts -> opts.removeConfigs(device.getConfiguration().keySet()))
-                            .then();
-                    }
-                    return Mono.empty();
-                }).then(
-                    //更新数据库
-                    createUpdate()
-                        .set(DeviceInstanceEntity::getConfiguration, new HashMap<>())
-                        .where(DeviceInstanceEntity::getId, deviceId)
-                        .execute()
-                ).then(
-                    //获取产品信息的配置
-                    deviceProductService
-                        .findById(device.getProductId())
-                        .flatMap(product -> Mono.justOrEmpty(product.getConfiguration()))
-                ))
+        return this
+            .findById(deviceId)
+            .zipWhen(device -> deviceProductService.findById(device.getProductId()))
+            .flatMap(tp2 -> {
+                DeviceProductEntity product = tp2.getT2();
+                DeviceInstanceEntity device = tp2.getT1();
+                return Mono
+                    .defer(() -> {
+                        if (MapUtils.isNotEmpty(product.getConfiguration())) {
+                            if (MapUtils.isNotEmpty(device.getConfiguration())) {
+                                product.getConfiguration()
+                                       .keySet()
+                                       .forEach(device.getConfiguration()::remove);
+                            }
+                            //重置注册中心里的配置
+                            return registry.getDevice(deviceId)
+                                           .flatMap(opts -> opts.removeConfigs(product.getConfiguration().keySet()))
+                                           .then();
+                        }
+                        return Mono.empty();
+                    }).then(
+                        //更新数据库
+                        createUpdate()
+                            .set(device::getConfiguration)
+                            .where(device::getId)
+                            .execute()
+                    )
+                    .thenReturn(device.getConfiguration());
+            })
             .defaultIfEmpty(Collections.emptyMap())
             ;
     }
@@ -220,43 +227,58 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
                 .execute());
     }
 
+    protected Mono<DeviceDetail> createDeviceDetail(DeviceProductEntity product,
+                                                    DeviceInstanceEntity device,
+                                                    List<DeviceTagEntity> tags) {
+
+        DeviceDetail detail = new DeviceDetail().with(product).with(device).with(tags);
+
+        return registry
+            .getDevice(device.getId())
+            .flatMap(operator ->
+                         //检查设备的真实状态,可能出现设备已经离线,但是数据库状态未及时更新的.
+                         operator.checkState()
+                                 .map(DeviceState::of)
+                                 .filter(state -> state != detail.getState())
+                                 .doOnNext(detail::setState)
+                                 .flatMap(state -> createUpdate()
+                                     .set(DeviceInstanceEntity::getState, state)
+                                     .where(DeviceInstanceEntity::getId, device.getId())
+                                     .execute())
+                                 .thenReturn(operator))
+            .flatMap(detail::with)
+            .switchIfEmpty(Mono.defer(() -> {
+                //如果设备注册中心里没有设备信息,并且数据库里的状态不是未激活.
+                //可能是因为注册中心信息丢失,修改数据库中的状态信息.
+                if (detail.getState() != DeviceState.notActive) {
+                    return createUpdate()
+                        .set(DeviceInstanceEntity::getState, DeviceState.notActive)
+                        .where(DeviceInstanceEntity::getId, detail.getId())
+                        .execute()
+                        .thenReturn(detail.notActive());
+                }
+                return Mono.just(detail.notActive());
+            }).thenReturn(detail))
+            .onErrorResume(err -> {
+                log.warn("get device detail error", err);
+                return Mono.just(detail);
+            })
+            ;
+
+    }
+
     public Mono<DeviceDetail> getDeviceDetail(String deviceId) {
-        return this.findById(deviceId)
-            .zipWhen(
-                //合并设备和型号信息
-                (device) -> deviceProductService.findById(device.getProductId()),
-                (device, product) -> new DeviceDetail().with(device).with(product)
-            ).flatMap(detail -> registry
-                .getDevice(deviceId)
-                .flatMap(
-                    operator -> operator.checkState() //检查设备的真实状态,设备已经离线,但是数据库状态未及时更新的.
-                        .map(DeviceState::of)
-                        .filter(state -> state != detail.getState())
-                        .doOnNext(detail::setState)
-                        .flatMap(state -> createUpdate()
-                            .set(DeviceInstanceEntity::getState, state)
-                            .where(DeviceInstanceEntity::getId, deviceId)
-                            .execute())
-                        .thenReturn(operator))
-                .flatMap(detail::with)
-                .switchIfEmpty(Mono.defer(() -> {
-                    if (detail.getState() != DeviceState.notActive) {
-                        return createUpdate()
-                            .set(DeviceInstanceEntity::getState, DeviceState.notActive)
-                            .where(DeviceInstanceEntity::getId, deviceId)
-                            .execute()
-                            .thenReturn(detail.notActive());
-                    }
-                    return Mono.just(detail.notActive());
-                })))
-            //设备标签信息
-            .flatMap(detail -> tagRepository
-                .createQuery()
-                .where(DeviceTagEntity::getDeviceId, deviceId)
-                .fetch()
-                .collectList()
-                .map(detail::with)
-                .defaultIfEmpty(detail));
+        return this
+            .findById(deviceId)
+            .zipWhen(device -> deviceProductService.findById(device.getProductId()))//合并型号
+            .zipWith(tagRepository
+                         .createQuery()
+                         .where(DeviceTagEntity::getDeviceId, deviceId)
+                         .fetch()
+                         .collectList()
+                         .defaultIfEmpty(Collections.emptyList()) //合并标签
+                , (left, right) -> Tuples.of(left.getT2(), left.getT1(), right))
+            .flatMap(tp3 -> createDeviceDetail(tp3.getT1(), tp3.getT2(), tp3.getT3()));
     }
 
     public Mono<DeviceState> getDeviceState(String deviceId) {

+ 12 - 17
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceProductService.java

@@ -33,25 +33,20 @@ public class LocalDeviceProductService extends GenericReactiveCrudService<Device
     @Autowired
     private ReactiveRepository<DeviceInstanceEntity, String> instanceRepository;
 
-
     public Mono<Integer> deploy(String id) {
         return findById(Mono.just(id))
-            .flatMap(product -> registry.register(
-                ProductInfo.builder()
-                    .id(id)
-                    .protocol(product.getMessageProtocol())
-                    .metadata(product.getMetadata())
-                    .build()
-                    .addConfig(DeviceConfigKey.isGatewayDevice, product.getDeviceType() == gateway))
-                .flatMap(deviceProductOperator -> deviceProductOperator.setConfigs(product.getConfiguration()))
-                .flatMap(re -> createUpdate()
-                    .set(DeviceProductEntity::getState, DeviceProductState.registered.getValue())
-                    .where(DeviceProductEntity::getId, id)
-                    .execute())
-                .doOnNext(i -> {
-                    log.debug("设备型号:{}发布成功", product.getName());
-                    eventPublisher.publishEvent(FastBeanCopier.copy(product, new DeviceProductDeployEvent()));
-                })
+            .flatMap(product -> registry
+                .register(product.toProductInfo())
+                .then(
+                    createUpdate()
+                        .set(DeviceProductEntity::getState, DeviceProductState.registered.getValue())
+                        .where(DeviceProductEntity::getId, id)
+                        .execute()
+                )
+                .flatMap(i -> FastBeanCopier
+                    .copy(product, new DeviceProductDeployEvent())
+                    .publish(eventPublisher)
+                    .thenReturn(i))
             );
     }