Pārlūkot izejas kodu

增加发送指令到设备API

zhou-hao 4 gadi atpakaļ
vecāks
revīzija
18b25a05c1

+ 112 - 4
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceInstanceController.java

@@ -19,7 +19,9 @@ import org.hswebframework.web.authorization.annotation.*;
 import org.hswebframework.web.bean.FastBeanCopier;
 import org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController;
 import org.hswebframework.web.exception.BusinessException;
+import org.hswebframework.web.exception.NotFoundException;
 import org.hswebframework.web.exception.ValidationException;
+import org.hswebframework.web.id.IDGenerator;
 import org.jetlinks.community.device.entity.*;
 import org.jetlinks.community.device.enums.DeviceState;
 import org.jetlinks.community.device.response.DeviceDeployResult;
@@ -37,17 +39,20 @@ import org.jetlinks.community.io.utils.FileUtils;
 import org.jetlinks.community.timeseries.query.AggregationData;
 import org.jetlinks.core.ProtocolSupport;
 import org.jetlinks.core.Values;
-import org.jetlinks.core.device.DeviceConfigKey;
-import org.jetlinks.core.device.DeviceOperator;
-import org.jetlinks.core.device.DeviceProductOperator;
-import org.jetlinks.core.device.DeviceRegistry;
+import org.jetlinks.core.device.*;
 import org.jetlinks.core.device.manager.DeviceBindHolder;
 import org.jetlinks.core.device.manager.DeviceBindProvider;
+import org.jetlinks.core.exception.DeviceOperationException;
+import org.jetlinks.core.message.DeviceMessage;
+import org.jetlinks.core.message.Message;
+import org.jetlinks.core.message.MessageType;
+import org.jetlinks.core.message.RepayableDeviceMessage;
 import org.jetlinks.core.metadata.ConfigMetadata;
 import org.jetlinks.core.metadata.ConfigPropertyMetadata;
 import org.jetlinks.core.metadata.DeviceMetadata;
 import org.springframework.core.io.buffer.DataBufferFactory;
 import org.springframework.core.io.buffer.DefaultDataBufferFactory;
+import org.springframework.data.util.Lazy;
 import org.springframework.http.HttpHeaders;
 import org.springframework.http.MediaType;
 import org.springframework.http.server.reactive.ServerHttpResponse;
@@ -64,6 +69,7 @@ import java.io.IOException;
 import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
@@ -630,4 +636,106 @@ public class DeviceInstanceController implements
             .map(AggregationData::values);
     }
 
+    //发送设备指令
+    @PostMapping("/{deviceId:.+}/message")
+    @SneakyThrows
+    @QueryAction
+    @Operation(summary = "发送指令到设备")
+    @SuppressWarnings("all")
+    public Flux<?> sendMessage(@PathVariable @Parameter(description = "设备ID") String deviceId,
+                               @RequestBody Mono<Map<String, Object>> properties) {
+        return properties
+            .flatMapMany(props -> {
+                return Mono
+                    .zip(
+                        registry
+                            .getDevice(deviceId)
+                            .map(DeviceOperator::messageSender)
+                            .switchIfEmpty(Mono.error(() -> new NotFoundException("设备不存在或未激活"))),
+                        Mono.<Message>justOrEmpty(MessageType.convertMessage(props))
+                            .cast(DeviceMessage.class)
+                            .switchIfEmpty(Mono.error(() -> new UnsupportedOperationException("不支持的消息格式")))
+                    ).flatMapMany(tp2 -> {
+                        DeviceMessageSender sender = tp2.getT1();
+                        DeviceMessage message = tp2.getT2();
+
+                        Map<String, String> copy = new HashMap<>();
+                        copy.put("deviceId", deviceId);
+                        if (!StringUtils.hasText(message.getMessageId())) {
+                            copy.put("messageId", IDGenerator.SNOW_FLAKE_STRING.generate());
+                        }
+                        FastBeanCopier.copy(copy, message);
+                        return sender
+                            .send(message)
+                            .onErrorResume(DeviceOperationException.class, error -> {
+                                if (message instanceof RepayableDeviceMessage) {
+                                    return Mono.just(
+                                        ((RepayableDeviceMessage) message).newReply().error(error)
+                                    );
+                                }
+                                return Mono.error(error);
+                            });
+                    });
+            });
+    }
+
+    //发送设备指令
+    @PostMapping("/messages")
+    @SneakyThrows
+    @QueryAction
+    @Operation(summary = "批量发送指令到设备")
+    @SuppressWarnings("all")
+    public Flux<?> sendMessage(@RequestParam(required = false)
+                               @Parameter(description = "按查询条件发送指令") String where,
+                               @RequestBody Flux<Map<String, Object>> messages) {
+
+        Lazy<Flux<DeviceOperator>> operators = Lazy.of(() -> {
+            if (StringUtils.isEmpty(where)) {
+                throw new ValidationException("where", "[where]参数不能为空");
+            }
+            QueryParamEntity entity = new QueryParamEntity();
+            entity.setWhere(where);
+            entity.includes("id");
+            return service.query(entity)
+                          .flatMap(device -> registry.getDevice(device.getId()))
+                          .cache();
+        });
+        return messages
+            .flatMap(message -> {
+                DeviceMessage msg = MessageType
+                    .convertMessage(message)
+                    .filter(DeviceMessage.class::isInstance)
+                    .map(DeviceMessage.class::cast)
+                    .orElseThrow(() -> new UnsupportedOperationException("不支持的消息格式:" + message));
+
+                String deviceId = msg.getDeviceId();
+                Flux<DeviceOperator> devices = StringUtils.isEmpty(deviceId)
+                    ? operators.get()
+                    : registry.getDevice(deviceId).flux();
+
+                return devices
+                    .flatMap(device -> {
+                        Map<String, Object> copy = new HashMap<>(message);
+                        copy.put("deviceId", device.getDeviceId());
+                        copy.putIfAbsent("messageId", IDGenerator.SNOW_FLAKE_STRING.generate());
+                        //复制为新的消息,防止冲突
+                        DeviceMessage copiedMessage = MessageType
+                            .convertMessage(copy)
+                            .map(DeviceMessage.class::cast)
+                            .orElseThrow(() -> new UnsupportedOperationException("不支持的消息格式"));
+                        return device
+                            .messageSender()
+                            .send(copiedMessage)
+                            .onErrorResume(Exception.class, error -> {
+                                if (copiedMessage instanceof RepayableDeviceMessage) {
+                                    return Mono.just(
+                                        ((RepayableDeviceMessage) copiedMessage).newReply().error(error)
+                                    );
+                                }
+                                return Mono.error(error);
+                            });
+                    });
+            });
+    }
+
 }