Преглед на файлове

优化设备消息发送

zhou-hao преди 4 години
родител
ревизия
107cb8295d

+ 91 - 4
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java

@@ -19,20 +19,25 @@ import org.hswebframework.web.bean.FastBeanCopier;
 import org.hswebframework.web.crud.service.GenericReactiveCrudService;
 import org.hswebframework.web.exception.BusinessException;
 import org.hswebframework.web.exception.NotFoundException;
+import org.hswebframework.web.id.IDGenerator;
 import org.hswebframework.web.logger.ReactiveLogger;
 import org.jetlinks.community.device.entity.*;
 import org.jetlinks.community.device.message.DeviceMessageUtils;
 import org.jetlinks.community.gateway.Subscription;
 import org.jetlinks.community.gateway.annotation.Subscribe;
+import org.jetlinks.community.utils.ErrorUtils;
 import org.jetlinks.core.device.DeviceConfigKey;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
+import org.jetlinks.core.enums.ErrorCode;
+import org.jetlinks.core.exception.DeviceOperationException;
 import org.jetlinks.core.message.*;
-import org.jetlinks.core.metadata.DataType;
-import org.jetlinks.core.metadata.DeviceMetadata;
-import org.jetlinks.core.metadata.EventMetadata;
-import org.jetlinks.core.metadata.Metadata;
+import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
+import org.jetlinks.core.message.property.ReadPropertyMessageReply;
+import org.jetlinks.core.message.property.WritePropertyMessageReply;
+import org.jetlinks.core.metadata.*;
 import org.jetlinks.core.metadata.types.ObjectType;
+import org.jetlinks.core.metadata.types.StringType;
 import org.jetlinks.core.utils.FluxUtils;
 import org.jetlinks.community.device.entity.excel.DeviceInstanceImportExportEntity;
 import org.jetlinks.community.device.enums.DeviceState;
@@ -415,6 +420,88 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
                 }));
     }
 
+    private static <R extends DeviceMessageReply, T> Function<R, Mono<T>> mapReply(Function<R, T> function) {
+        return reply -> {
+            if (ErrorCode.REQUEST_HANDLING.name().equals(reply.getCode())) {
+                throw new DeviceOperationException(ErrorCode.REQUEST_HANDLING, reply.getMessage());
+            }
+            if (!reply.isSuccess()) {
+                throw new BusinessException(reply.getMessage(), reply.getCode());
+            }
+            return Mono.justOrEmpty(function.apply(reply));
+        };
+    }
+
+    //获取标准设备属性
+    @SneakyThrows
+    public Mono<DevicePropertiesEntity> readAndConvertProperty(String deviceId,
+                                                               String property) {
+        return registry
+            .getDevice(deviceId)
+            .switchIfEmpty(ErrorUtils.notFound("设备不存在"))
+            .flatMap(deviceOperator -> deviceOperator.messageSender()
+                .readProperty(property).messageId(IDGenerator.SNOW_FLAKE_STRING.generate())
+                .send()
+                .flatMap(mapReply(ReadPropertyMessageReply::getProperties))
+                .reduceWith(LinkedHashMap::new, (main, map) -> {
+                    main.putAll(map);
+                    return main;
+                })
+                .flatMap(map -> {
+                    Object value = map.get(property);
+                    return deviceOperator.getMetadata()
+                        .map(deviceMetadata -> deviceMetadata.getProperty(property)
+                            .map(PropertyMetadata::getValueType)
+                            .orElse(new StringType()))
+                        .map(dataType -> DevicePropertiesEntity.builder()
+                            .deviceId(deviceId)
+                            .productId(property)
+                            .build()
+                            .withValue(dataType, value));
+                }));
+
+    }
+
+    //设置设备属性
+    @SneakyThrows
+    public Mono<Map<String, Object>> writeProperties(String deviceId,
+                                                     Map<String, Object> properties) {
+
+        return registry
+            .getDevice(deviceId)
+            .switchIfEmpty(ErrorUtils.notFound("设备不存在"))
+            .map(operator -> operator
+                .messageSender()
+                .writeProperty()
+                .messageId(IDGenerator.SNOW_FLAKE_STRING.generate())
+                .write(properties)
+            )
+            .flatMapMany(WritePropertyMessageSender::send)
+            .flatMap(mapReply(WritePropertyMessageReply::getProperties))
+            .reduceWith(LinkedHashMap::new, (main, map) -> {
+                main.putAll(map);
+                return main;
+            });
+    }
+
+    //设备功能调用
+    @SneakyThrows
+    public Flux<?> invokeFunction(String deviceId,
+                                  String functionId,
+                                  Map<String, Object> properties) {
+        return registry
+            .getDevice(deviceId)
+            .switchIfEmpty(ErrorUtils.notFound("设备不存在"))
+            .flatMap(operator -> operator
+                .messageSender()
+                .invokeFunction(functionId)
+                .messageId(IDGenerator.SNOW_FLAKE_STRING.generate())
+                .setParameter(properties)
+                .validate()
+            )
+            .flatMapMany(FunctionInvokeMessageSender::send)
+            .flatMap(mapReply(FunctionInvokeMessageReply::getOutput));
+    }
 
 
     @Subscribe("/device/*/*/register")

+ 30 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceInstanceController.java

@@ -531,4 +531,34 @@ public class DeviceInstanceController implements
             .defaultIfEmpty("{\n}");
     }
 
+    //获取设备属性
+    @GetMapping("/{deviceId}/property/{property:.+}")
+    @SneakyThrows
+    @QueryAction
+    public Mono<DevicePropertiesEntity> getStandardProperty(@PathVariable String deviceId, @PathVariable String property) {
+        return service.readAndConvertProperty(deviceId, property);
+
+    }
+
+    //设置设备属性
+    @PutMapping("/{deviceId}/property")
+    @SneakyThrows
+    @QueryAction
+    public Flux<?> writeProperties(@PathVariable String deviceId,
+                                   @RequestBody Mono<Map<String, Object>> properties) {
+        return properties.flatMapMany(props -> service.writeProperties(deviceId, props));
+    }
+
+    //设备功能调用
+    @PostMapping("/{deviceId}/function/{functionId}")
+    @SneakyThrows
+    @QueryAction
+    public Flux<?> invokedFunction(@PathVariable String deviceId,
+                                   @PathVariable String functionId,
+                                   @RequestBody Mono<Map<String, Object>> properties) {
+
+        return properties.flatMapMany(props -> service.invokeFunction(deviceId, functionId, props));
+    }
+
+
 }