Przeglądaj źródła

Merge remote-tracking branch 'origin/2.0' into 2.0

zhouhao 2 lat temu
rodzic
commit
a8b0cf5df2

+ 6 - 2
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/parser/DefaultLinkTypeParser.java

@@ -4,6 +4,7 @@ import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.hswebframework.ezorm.core.param.Term;
 import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
 
 import java.util.List;
 import java.util.function.Consumer;
@@ -15,10 +16,13 @@ import java.util.function.Consumer;
 @Component
 public class DefaultLinkTypeParser implements LinkTypeParser {
 
-    private TermTypeParser parser = new DefaultTermTypeParser();
+    private final TermTypeParser parser = new DefaultTermTypeParser();
 
     @Override
     public BoolQueryBuilder process(Term term, Consumer<Term> consumer, BoolQueryBuilder queryBuilders) {
+        if (term.getValue() == null && CollectionUtils.isEmpty(term.getTerms())) {
+            return queryBuilders;
+        }
         if (term.getType() == Term.Type.or) {
             handleOr(queryBuilders, term, consumer);
         } else {
@@ -52,4 +56,4 @@ public class DefaultLinkTypeParser implements LinkTypeParser {
     }
 
 
-}
+}

+ 31 - 16
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/utils/ElasticSearchConverter.java

@@ -6,9 +6,7 @@ import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata;
 import org.jetlinks.core.metadata.Converter;
 import org.jetlinks.core.metadata.DataType;
 import org.jetlinks.core.metadata.PropertyMetadata;
-import org.jetlinks.core.metadata.types.DateTimeType;
-import org.jetlinks.core.metadata.types.GeoPoint;
-import org.jetlinks.core.metadata.types.GeoType;
+import org.jetlinks.core.metadata.types.*;
 
 import java.util.Date;
 import java.util.HashMap;
@@ -22,7 +20,9 @@ public class ElasticSearchConverter {
         return QueryParamTranslator.convertSearchSourceBuilder(queryParam, metadata);
     }
 
-    public static Map<String, Object> convertDataToElastic(Map<String, Object> data, List<PropertyMetadata> properties) {
+    public static Map<String, Object> convertDataToElastic(Map<String, Object> data,
+                                                           List<PropertyMetadata> properties) {
+        Map<String, Object> newValue = new HashMap<>(data);
         for (PropertyMetadata property : properties) {
             DataType type = property.getValueType();
             Object val = data.get(property.getId());
@@ -32,37 +32,52 @@ public class ElasticSearchConverter {
             //处理地理位置类型
             if (type instanceof GeoType) {
                 GeoPoint point = ((GeoType) type).convert(val);
+
                 Map<String, Object> geoData = new HashMap<>();
                 geoData.put("lat", point.getLat());
                 geoData.put("lon", point.getLon());
-                data.put(property.getId(), geoData);
+
+                newValue.put(property.getId(), geoData);
+            } else if (type instanceof GeoShapeType) {
+                GeoShape shape = ((GeoShapeType) type).convert(val);
+                if (shape == null) {
+                    throw new UnsupportedOperationException("不支持的GeoShape格式:" + val);
+                }
+                Map<String, Object> geoData = new HashMap<>();
+                geoData.put("type", shape.getType().name());
+                geoData.put("coordinates", shape.getCoordinates());
+                newValue.put(property.getId(), geoData);
             } else if (type instanceof DateTimeType) {
                 Date date = ((DateTimeType) type).convert(val);
-                data.put(property.getId(), date.getTime());
+                newValue.put(property.getId(), date.getTime());
             } else if (type instanceof Converter) {
-                data.put(property.getId(), ((Converter<?>) type).convert(val));
+                newValue.put(property.getId(), ((Converter<?>) type).convert(val));
             }
         }
-        return data;
+        return newValue;
     }
 
-    public static Map<String, Object> convertDataFromElastic(Map<String, Object> data, List<PropertyMetadata> properties) {
+    public static Map<String, Object> convertDataFromElastic(Map<String, Object> data,
+                                                             List<PropertyMetadata> properties) {
+        Map<String, Object> newData = new HashMap<>(data);
         for (PropertyMetadata property : properties) {
             DataType type = property.getValueType();
-            Object val = data.get(property.getId());
+            Object val = newData.get(property.getId());
             if (val == null) {
                 continue;
             }
             //处理地理位置类型
             if (type instanceof GeoType) {
-                data.put(property.getId(), ((GeoType) type).convertToMap(val));
-            } else if (type instanceof DateTimeType) {
+                newData.put(property.getId(), ((GeoType) type).convertToMap(val));
+            }else if (type instanceof GeoShapeType) {
+                newData.put(property.getId(),GeoShape.of(val).toMap());
+            }  else if (type instanceof DateTimeType) {
                 Date date = ((DateTimeType) type).convert(val);
-                data.put(property.getId(), date);
+                newData.put(property.getId(), date);
             } else if (type instanceof Converter) {
-                data.put(property.getId(), ((Converter<?>) type).convert(val));
+                newData.put(property.getId(), ((Converter<?>) type).convert(val));
             }
         }
-        return data;
+        return newData;
     }
-}
+}

+ 15 - 0
jetlinks-manager/device-manager/pom.xml

@@ -13,6 +13,21 @@
     <artifactId>device-manager</artifactId>
 
 
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-resources-plugin</artifactId>
+                <configuration>
+                    <nonFilteredFileExtensions>
+                        <nonFilteredFileExtension>zip</nonFilteredFileExtension>
+                        <nonFilteredFileExtension>jar</nonFilteredFileExtension>
+                    </nonFilteredFileExtensions>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
     <dependencies>
 
         <dependency>

+ 22 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/response/DeviceDetail.java

@@ -140,6 +140,23 @@ public class DeviceDetail {
     private List<Feature> features = new ArrayList<>();
 
 
+    @Schema(description = "设备接入方式ID")
+    private String accessId;
+
+    @Schema(description = "设备接入方式")
+    private String accessProvider;
+
+    @Schema(description = "设备接入方式名称")
+    private String accessName;
+
+    @Schema(description = "产品所属品类ID")
+    private String classifiedId;
+
+    @Schema(description = "产品所属品类名称")
+    private String classifiedName;
+
+
+
     public DeviceDetail notActive() {
 
         state = DeviceState.notActive;
@@ -245,6 +262,11 @@ public class DeviceDetail {
         setProductName(productEntity.getName());
         setDeviceType(productEntity.getDeviceType());
         setProtocolName(productEntity.getProtocolName());
+        setAccessProvider(productEntity.getAccessProvider());
+        setAccessId(productEntity.getAccessId());
+        setAccessName(productEntity.getAccessName());
+        setClassifiedId(productEntity.getClassifiedId());
+        setClassifiedName(productEntity.getClassifiedName());
         return this;
     }
 

+ 59 - 5
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/ThingsBridgingDeviceDataService.java

@@ -4,6 +4,8 @@ import lombok.AllArgsConstructor;
 import org.hswebframework.web.api.crud.entity.PagerResult;
 import org.hswebframework.web.api.crud.entity.QueryParamEntity;
 import org.hswebframework.web.bean.FastBeanCopier;
+import org.jetlinks.community.things.data.AggregationRequest;
+import org.jetlinks.community.things.data.operations.SaveOperations;
 import org.jetlinks.core.device.DeviceThingType;
 import org.jetlinks.core.message.DeviceMessage;
 import org.jetlinks.core.metadata.DeviceMetadata;
@@ -60,16 +62,16 @@ public class ThingsBridgingDeviceDataService implements DeviceDataService {
     @Nonnull
     @Override
     public Flux<DeviceProperty> queryEachOneProperties(@Nonnull String deviceId, @Nonnull QueryParamEntity query, @Nonnull String... properties) {
-        return repository
-            .opsForThing(thingType, deviceId)
-            .flatMapMany(opt -> opt.forQuery().queryEachProperty(query, properties))
-            .map(DeviceProperty::of);
+        return queryEachProperties(deviceId, query.clone().doPaging(0, 1), properties);
     }
 
     @Nonnull
     @Override
     public Flux<DeviceProperty> queryEachProperties(@Nonnull String deviceId, @Nonnull QueryParamEntity query, @Nonnull String... properties) {
-        return queryEachOneProperties(deviceId, query.clone().doPaging(0, 1), properties);
+        return repository
+            .opsForThing(thingType, deviceId)
+            .flatMapMany(opt -> opt.forQuery().queryEachProperty(query, properties))
+            .map(DeviceProperty::of);
     }
 
     @Nonnull
@@ -81,6 +83,22 @@ public class ThingsBridgingDeviceDataService implements DeviceDataService {
             .map(DeviceProperty::of);
     }
 
+    @Nonnull
+    public Flux<DeviceProperty> queryPropertyByProductId(@Nonnull String productId, @Nonnull QueryParamEntity query, @Nonnull String... property) {
+        return repository
+            .opsForTemplate(thingType, productId)
+            .flatMapMany(opt -> opt.forQuery().queryProperty(query, property))
+            .map(DeviceProperty::of);
+    }
+
+    @Nonnull
+    public Flux<DeviceProperty> queryTopProperty(@Nonnull String deviceId,
+                                                 @Nonnull AggregationRequest request,
+                                                 int numberOfTop,
+                                                 @Nonnull String... properties) {
+        return Flux.error(new UnsupportedOperationException("unsupported"));
+    }
+
     @Override
     public Flux<AggregationData> aggregationPropertiesByProduct(@Nonnull String productId,
                                                                 @Nonnull AggregationRequest request,
@@ -137,6 +155,18 @@ public class ThingsBridgingDeviceDataService implements DeviceDataService {
         return newResult;
     }
 
+    @Nonnull
+    public Mono<PagerResult<DeviceProperty>> queryPropertyPageByProductId(@Nonnull String productId, @Nonnull String property, @Nonnull QueryParamEntity query) {
+        return queryPropertyPageByProductId(property, query, property);
+    }
+
+    @Nonnull
+    public Mono<PagerResult<DeviceProperty>> queryPropertyPageByProductId(@Nonnull String productId, @Nonnull QueryParamEntity query, @Nonnull String... property) {
+        return repository
+            .opsForTemplate(thingType, productId)
+            .flatMap(opt -> opt.forQuery().queryPropertyPage(query, property))
+            .map(page -> convertPage(page,DeviceProperty::of));
+    }
 
     @Override
     public Mono<PagerResult<DeviceOperationLogEntity>> queryDeviceMessageLog(@Nonnull String deviceId, @Nonnull QueryParamEntity query) {
@@ -146,6 +176,19 @@ public class ThingsBridgingDeviceDataService implements DeviceDataService {
             .map(page -> convertPage(page,DeviceOperationLogEntity::of));
     }
 
+    public Flux<DeviceOperationLogEntity> queryDeviceMessageLogNoPaging(@Nonnull String deviceId, @Nonnull QueryParamEntity query) {
+        return repository
+            .opsForThing(thingType, deviceId)
+            .flatMapMany(opt -> opt.forQuery().queryMessageLog(query))
+            .map(DeviceOperationLogEntity::of);
+    }
+
+    public Flux<DeviceOperationLogEntity> queryDeviceMessageLogNoPagingByProduct(@Nonnull String productId, @Nonnull QueryParamEntity query) {
+        return repository
+            .opsForTemplate(thingType, productId)
+            .flatMapMany(opt -> opt.forQuery().queryMessageLog(query))
+            .map(DeviceOperationLogEntity::of);
+    }
 
     @Nonnull
     @Override
@@ -165,4 +208,15 @@ public class ThingsBridgingDeviceDataService implements DeviceDataService {
             .map(page -> convertPage(page,DeviceEvent::new));
     }
 
+    @Nonnull
+    public Mono<PagerResult<DeviceEvent>> queryEventPageByProductId(@Nonnull String productId,
+                                                                    @Nonnull String event,
+                                                                    @Nonnull QueryParamEntity query,
+                                                                    boolean format) {
+        return repository
+            .opsForTemplate(thingType, productId)
+            .flatMap(opt -> opt.forQuery().queryEventPage(event, query, format))
+            .map(page ->convertPage(page,DeviceEvent::new));
+    }
+
 }

+ 1 - 1
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/ProtocolSupportController.java

@@ -251,7 +251,7 @@ public class ProtocolSupportController
     public Mono<Void> saveDefaultProtocol() {
 
         String defaultProtocolName = "JetLinks官方协议";
-        String fileNeme = "jetlinks-official-protocol-3.0-SNAPSHOT.zip";
+        String fileNeme = "jetlinks-official-protocol-3.0-SNAPSHOT.jar";
         return fileManager
             .saveFile(fileNeme,
                       DataBufferUtils.read(new ClassPathResource(fileNeme),

BIN
jetlinks-manager/device-manager/src/main/resources/jetlinks-official-protocol-3.0-SNAPSHOT.jar


BIN
jetlinks-manager/device-manager/src/main/resources/jetlinks-official-protocol-3.0-SNAPSHOT.zip


+ 5 - 1
jetlinks-manager/network-manager/pom.xml

@@ -86,7 +86,11 @@
             <scope>compile</scope>
         </dependency>
 
-
+        <dependency>
+            <groupId>io.opentelemetry</groupId>
+            <artifactId>opentelemetry-semconv</artifactId>
+            <version>1.12.0-alpha</version>
+        </dependency>
     </dependencies>
 
 </project>

+ 279 - 0
jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/DeviceDebugSubscriptionProvider.java

@@ -0,0 +1,279 @@
+package org.jetlinks.community.network.manager.debug;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
+import lombok.*;
+import lombok.extern.slf4j.Slf4j;
+import org.hswebframework.web.id.IDGenerator;
+import org.jetlinks.core.device.DeviceRegistry;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.event.Subscription;
+import org.jetlinks.core.trace.DeviceTracer;
+import org.jetlinks.core.trace.EventBusSpanExporter;
+import org.jetlinks.core.trace.ProtocolTracer;
+import org.jetlinks.core.trace.TraceHolder;
+import org.jetlinks.core.trace.data.SpanDataInfo;
+import org.jetlinks.core.utils.TopicUtils;
+import org.jetlinks.community.gateway.annotation.Subscribe;
+import org.jetlinks.community.gateway.external.SubscribeRequest;
+import org.jetlinks.community.gateway.external.SubscriptionProvider;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
+import reactor.core.Disposable;
+import reactor.core.Disposables;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+@Component
+@AllArgsConstructor
+@Slf4j
+public class DeviceDebugSubscriptionProvider implements SubscriptionProvider {
+    private final EventBus eventBus;
+
+    private final DeviceRegistry registry;
+
+    @Override
+    public String id() {
+        return "device-debug";
+    }
+
+    @Override
+    public String name() {
+        return "设备诊断";
+    }
+
+    @Override
+    public String[] getTopicPattern() {
+        return new String[]{"/debug/device/*/trace"};
+    }
+
+    @Override
+    public Flux<?> subscribe(SubscribeRequest request) {
+        String deviceId = TopicUtils
+            .getPathVariables("/debug/device/{deviceId}/trace", request.getTopic())
+            .get("deviceId");
+        return startDebug(deviceId);
+    }
+
+    /**
+     * @param deviceId deviceId
+     * @see DeviceTracer
+     * @see EventBusSpanExporter
+     */
+    Flux<TraceData> startDebug(String deviceId) {
+        if (TraceHolder.isDisabled()) {
+            return Flux
+                .just(TraceData
+                          .of(TraceDataType.log,
+                              true,
+                              "0",
+                              "error",
+                              "链路追踪功能已禁用,请联系管理员.",
+                              System.currentTimeMillis(),
+                              System.currentTimeMillis()));
+        }
+        //订阅设备跟踪信息
+        return Flux
+            .merge(this
+                       .getTraceData(DeviceTracer.SpanName.operation(deviceId, "*"))
+                       .flatMap(this::convertDeviceTrace),
+                   registry
+                       .getDevice(deviceId)
+                       .flatMap(device -> device
+                           .getProtocol()
+                           .map(pro -> ProtocolTracer.SpanName.operation(pro.getId(), "*")))
+                       .flatMapMany(this::getTraceData)
+                       .flatMap(this::convertProtocolTrace)
+            );
+    }
+
+    private Mono<TraceData> convertProtocolTrace(SpanDataInfo traceData) {
+        String errorInfo = traceData
+            .getEvent(SemanticAttributes.EXCEPTION_EVENT_NAME)
+            .flatMap(event -> event.getAttribute(SemanticAttributes.EXCEPTION_STACKTRACE))
+            .orElse(null);
+        String operation = traceData.getName().substring(traceData.getName().lastIndexOf("/") + 1);
+        //协议跟踪只展示错误信息,因为协议无法定位到具体的设备,如果现实全部跟踪信息可能会有很多信息
+        if (StringUtils.hasText(errorInfo)) {
+            return Mono.just(TraceData
+                                 .of(TraceDataType.log,
+                                     true,
+                                     traceData.getTraceId(),
+                                     operation,
+                                     getDeviceTraceDetail(traceData),
+                                     traceData.getStartWithNanos() / 1000 / 1000,
+                                     traceData.getStartWithNanos() / 1000 / 1000
+                                 ));
+        }
+        return Mono.empty();
+
+    }
+
+    private boolean hasError(SpanDataInfo data) {
+        return data
+            .getEvent(SemanticAttributes.EXCEPTION_EVENT_NAME)
+            .isPresent();
+    }
+
+    private Object getDeviceTraceDetail(SpanDataInfo data) {
+
+        String message = data
+            .getAttribute(DeviceTracer.SpanKey.message)
+            .orElse(null);
+
+        String response = data
+            .getAttribute(DeviceTracer.SpanKey.response)
+            .orElse(null);
+
+        if (StringUtils.hasText(message)) {
+            if (StringUtils.hasText(response)) {
+                return String.join("\n\n", response);
+            }
+            return message;
+        }
+
+        if (StringUtils.hasText(response)) {
+            return response;
+        }
+
+        String errorInfo = data
+            .getEvent(SemanticAttributes.EXCEPTION_EVENT_NAME)
+            .flatMap(event -> event.getAttribute(SemanticAttributes.EXCEPTION_STACKTRACE))
+            .orElse(null);
+
+        if (StringUtils.hasText(errorInfo)) {
+            return errorInfo;
+        }
+
+        return JSON.toJSONString(data.getAttributes(), SerializerFeature.PrettyFormat);
+
+    }
+
+    private Mono<TraceData> convertDeviceTrace(SpanDataInfo traceData) {
+        String name = traceData.getName();
+        String operation = name.substring(name.lastIndexOf("/") + 1);
+        return Mono.just(TraceData
+                             .of(TraceDataType.data,
+                                 hasError(traceData),
+                                 traceData.getTraceId(),
+                                 operation,
+                                 getDeviceTraceDetail(traceData),
+                                 traceData.getStartWithNanos() / 1000 / 1000,
+                                 traceData.getStartWithNanos() / 1000 / 1000
+                             ));
+    }
+
+    private Flux<SpanDataInfo> getTraceData(String name) {
+        //启用跟踪
+        Disposable disposable = enableSpan(name);
+
+        return eventBus
+            .subscribe(Subscription
+                           .builder()
+                           .subscriberId("device_debug_tracer")
+                           .topics("/trace/*" + name)
+                           .broker()
+                           .local()
+                           .build(),
+                       SpanDataInfo.class)
+            //完成时关闭跟踪
+            .doFinally(s -> disposable.dispose());
+    }
+
+    private Disposable enableSpan(String name) {
+        Disposable.Composite disposable = Disposables.composite();
+
+        String id = IDGenerator.UUID.generate();
+
+        eventBus
+            .publish("/_sys/_trace/opt", new TraceOpt(id, name, true))
+            .subscribe();
+
+        disposable.add(() -> eventBus
+            .publish("/_sys/_trace/opt", new TraceOpt(id, name, false))
+            .subscribe());
+
+        return disposable;
+    }
+
+    @Subscribe(value = "/_sys/_trace/opt", features = {Subscription.Feature.broker, Subscription.Feature.local})
+    public Mono<Void> handleTraceEnable(TraceOpt opt) {
+        if (opt.enable) {
+            log.debug("enable trace {} id:{}", opt.span, opt.id);
+            TraceHolder.enable(opt.span, opt.id);
+        } else {
+            log.debug("remove trace {} id:{}", opt.span, opt.id);
+            TraceHolder.removeEnabled(opt.span, opt.id);
+        }
+        return Mono.empty();
+    }
+
+    @Getter
+    @Setter
+    @AllArgsConstructor
+    @NoArgsConstructor
+    public static class TraceOpt {
+        private String id;
+        private String span;
+        private boolean enable;
+    }
+
+    public enum TraceDataType {
+        /**
+         * 和设备有关联的数据
+         */
+        data,
+        /**
+         * 和设备没有关联的日志信息
+         */
+        log
+    }
+
+    @Setter
+    @Getter
+    @AllArgsConstructor(staticName = "of")
+    @NoArgsConstructor
+    @ToString
+    public static class TraceData implements Serializable {
+
+        static Set<String> downstreamOperation = new HashSet<>(
+            Arrays.asList(
+                "downstream", "encode", "request"
+            )
+        );
+        private static final long serialVersionUID = 1L;
+        // 跟踪数据类型
+        private TraceDataType type;
+        //是否有错误信息
+        private boolean error;
+        // 跟踪ID
+        private String traceId;
+        /**
+         * @see DeviceTracer.SpanName
+         * 操作. encode,decode
+         */
+        private String operation;
+        // 数据内容
+        private Object detail;
+        //开始时间 毫秒
+        private long startTime;
+        //结束时间 毫秒
+        private long endTime;
+
+        //是否上行操作
+        public boolean isUpstream() {
+            return !isDownstream();
+        }
+
+        //是否下行操作
+        public boolean isDownstream() {
+            return operation != null && downstreamOperation.contains(operation);
+        }
+    }
+}