Browse Source

docs(数据流【设备网关-->物模型转换->时序入库】): 增加数据流入库的代码注释 优化代码结构

Signed-off-by: Tensai <517634644@qq.com>
Tensai 4 years ago
parent
commit
9c65346a6c
19 changed files with 301 additions and 222 deletions
  1. 1 1
      jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/ElasticSearchService.java
  2. 157 161
      jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticSearchService.java
  3. 1 1
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DefaultDeviceGatewayManager.java
  4. 8 3
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayProperties.java
  5. 3 1
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayPropertiesManager.java
  6. 5 2
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayProvider.java
  7. 7 2
      jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/DefaultNetworkManager.java
  8. 37 2
      jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/NetworkManager.java
  9. 4 1
      jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/utils/DeviceGatewayHelper.java
  10. 4 4
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java
  11. 11 5
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGatewayProvider.java
  12. 13 8
      jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/TimeSeriesData.java
  13. 0 1
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java
  14. 1 1
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/writer/TimeSeriesMessageWriterConnector.java
  15. 4 2
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DefaultDeviceDataService.java
  16. 22 19
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesColumnDeviceDataStoragePolicy.java
  17. 8 0
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesDeviceDataStoragePolicy.java
  18. 12 7
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesRowDeviceDataStoreStoragePolicy.java
  19. 3 1
      jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/service/DeviceGatewayConfigService.java

+ 1 - 1
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/ElasticSearchService.java

@@ -15,7 +15,7 @@ import java.util.function.Function;
 /**
  * ES数据库业务操作类
  *
- * @author JetLinks
+ * @author zhouhao
  */
 public interface ElasticSearchService {
 

+ 157 - 161
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticSearchService.java

@@ -57,6 +57,8 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 /**
+ * 响应式ES数据库操作类
+ *
  * @author zhouhao
  * @since 1.0
  **/
@@ -66,20 +68,23 @@ import java.util.stream.Collectors;
 @ConfigurationProperties(prefix = "elasticsearch")
 public class ReactiveElasticSearchService implements ElasticSearchService {
 
-    private final ReactiveElasticsearchClient restClient;
-
-    private final ElasticSearchIndexManager indexManager;
-
-    private FluxSink<Buffer> sink;
-
     public static final IndicesOptions indexOptions = IndicesOptions.fromOptions(
         true, true, false, false
     );
+    //使用对象池处理Buffer,减少GC消耗
+    static ObjectPool<Buffer> pool = ObjectPool.newPool(Buffer::new);
 
     static {
         DateFormatter.supportFormatter.add(new DefaultDateFormatter(Pattern.compile("[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.+"), "yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
     }
 
+    private final ReactiveElasticsearchClient restClient;
+    private final ElasticSearchIndexManager indexManager;
+    private FluxSink<Buffer> sink;
+    @Getter
+    @Setter
+    private BufferConfig buffer = new BufferConfig();
+
     public ReactiveElasticSearchService(ReactiveElasticsearchClient restClient,
                                         ElasticSearchIndexManager indexManager) {
         this.restClient = restClient;
@@ -136,16 +141,16 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
     @Override
     public <T> Mono<PagerResult<T>> queryPager(String[] index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
         return this.doQuery(index, queryParam)
-                   .flatMap(tp2 -> this
-                       .convertQueryResult(tp2.getT1(), tp2.getT2(), mapper)
-                       .collectList()
-                       .filter(CollectionUtils::isNotEmpty)
-                       .map(list -> PagerResult.of((int) tp2
-                           .getT2()
-                           .getHits()
-                           .getTotalHits().value, list, queryParam))
-                   )
-                   .switchIfEmpty(Mono.fromSupplier(PagerResult::empty));
+            .flatMap(tp2 -> this
+                .convertQueryResult(tp2.getT1(), tp2.getT2(), mapper)
+                .collectList()
+                .filter(CollectionUtils::isNotEmpty)
+                .map(list -> PagerResult.of((int) tp2
+                    .getT2()
+                    .getHits()
+                    .getTotalHits().value, list, queryParam))
+            )
+            .switchIfEmpty(Mono.fromSupplier(PagerResult::empty));
     }
 
     private <T> Flux<T> convertQueryResult(List<ElasticSearchIndexMetadata> indexList,
@@ -164,8 +169,8 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
                 }
                 return mapper
                     .apply(Optional
-                               .ofNullable(metadata.get(hit.getIndex())).orElse(indexList.get(0))
-                               .convertFromElastic(hitMap));
+                        .ofNullable(metadata.get(hit.getIndex())).orElse(indexList.get(0))
+                        .convertFromElastic(hitMap));
             });
 
     }
@@ -186,7 +191,6 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
             });
     }
 
-
     @Override
     public Mono<Long> count(String[] index, QueryParam queryParam) {
         QueryParam param = queryParam.clone();
@@ -225,8 +229,8 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
     @Override
     public <T> Mono<Void> commit(String index, Publisher<T> data) {
         return Flux.from(data)
-                   .flatMap(d -> commit(index, d))
-                   .then();
+            .flatMap(d -> commit(index, d))
+            .then();
     }
 
     @Override
@@ -237,10 +241,10 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
     @Override
     public <T> Mono<Void> save(String index, Publisher<T> data) {
         return Flux.from(data)
-                   .map(v -> Buffer.of(index, v))
-                   .collectList()
-                   .flatMap(this::doSave)
-                   .then();
+            .map(v -> Buffer.of(index, v))
+            .collectList()
+            .flatMap(this::doSave)
+            .then();
     }
 
     @Override
@@ -253,32 +257,6 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
         sink.complete();
     }
 
-    @Getter
-    @Setter
-    private BufferConfig buffer = new BufferConfig();
-
-    @Getter
-    @Setter
-    public static class BufferConfig {
-        //最小间隔
-        private int rate = Integer.getInteger("elasticsearch.buffer.rate", 1000);
-        //缓冲最大数量
-        private int bufferSize = Integer.getInteger("elasticsearch.buffer.size", 3000);
-        //缓冲超时时间
-        private Duration bufferTimeout = Duration.ofSeconds(Integer.getInteger("elasticsearch.buffer.timeout", 3));
-        //背压堆积数量限制.
-        private int bufferBackpressure = Integer.getInteger("elasticsearch.buffer.backpressure", Runtime
-            .getRuntime()
-            .availableProcessors());
-        //最大缓冲字节
-        private DataSize bufferBytes = DataSize.parse(System.getProperty("elasticsearch.buffer.bytes", "15MB"));
-
-        //最大重试次数
-        private int maxRetry = 3;
-        //重试间隔
-        private Duration minBackoff = Duration.ofSeconds(3);
-    }
-
     //@PostConstruct
     public void init() {
         int flushRate = buffer.rate;
@@ -290,10 +268,10 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
 
         FluxUtils
             .bufferRate(Flux.<Buffer>create(sink -> this.sink = sink),
-                        flushRate,
-                        bufferSize,
-                        bufferTimeout,
-                        (b, l) -> bufferedBytes.addAndGet(b.numberOfBytes()) >= bufferBytes)
+                flushRate,
+                bufferSize,
+                bufferTimeout,
+                (b, l) -> bufferedBytes.addAndGet(b.numberOfBytes()) >= bufferBytes)
             .doOnNext(buf -> bufferedBytes.set(0))
             .onBackpressureBuffer(bufferBackpressure, drop -> {
                 // TODO: 2020/11/25 将丢弃的数据存储到本地磁盘
@@ -321,56 +299,10 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
             })
             .onErrorResume((err) -> Mono
                 .fromRunnable(() -> System.err.println("保存ElasticSearch数据失败:\n" +
-                                                           org.hswebframework.utils.StringUtils.throwable2String(err))))
+                    org.hswebframework.utils.StringUtils.throwable2String(err))))
             .subscribe();
     }
 
-    //使用对象池处理Buffer,减少GC消耗
-    static ObjectPool<Buffer> pool = ObjectPool.newPool(Buffer::new);
-
-    @Getter
-    static class Buffer {
-        String index;
-        String id;
-        String payload;
-        final ObjectPool.Handle<Buffer> handle;
-
-        public Buffer(ObjectPool.Handle<Buffer> handle) {
-            this.handle = handle;
-        }
-
-        public static Buffer of(String index, Object payload) {
-            Buffer buffer;
-            try {
-                buffer = pool.get();
-            } catch (Exception e) {
-                buffer = new Buffer(null);
-            }
-            buffer.index = index;
-            Map<String, Object> data = payload instanceof Map
-                ? ((Map) payload) :
-                FastBeanCopier.copy(payload, HashMap::new);
-            Object id = data.get("id");
-            buffer.id = id == null ? null : String.valueOf(id);
-            buffer.payload = JSON.toJSONString(data);
-            return buffer;
-        }
-
-        void release() {
-            this.index = null;
-            this.id = null;
-            this.payload = null;
-            if (null != handle) {
-                handle.recycle(this);
-            }
-        }
-
-        int numberOfBytes() {
-            return payload == null ? 0 : payload.length() * 2;
-        }
-    }
-
-
     private Mono<String> getIndexForSave(String index) {
         return indexManager
             .getIndexStrategy(index)
@@ -387,48 +319,48 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
 
     protected Mono<Integer> doSave(Collection<Buffer> buffers) {
         return Flux.fromIterable(buffers)
-                   .groupBy(Buffer::getIndex,Integer.MAX_VALUE)
-                   .flatMap(group -> {
-                       String index = group.key();
-                       return this
-                           .getIndexForSave(index)
-                           .flatMapMany(realIndex -> group
-                               .map(buffer -> {
-                                   try {
-                                       IndexRequest request;
-                                       if (buffer.id != null) {
-                                           request = new IndexRequest(realIndex).type("_doc").id(buffer.id);
-                                       } else {
-                                           request = new IndexRequest(realIndex).type("_doc");
-                                       }
-                                       request.source(buffer.payload, XContentType.JSON);
-                                       return request;
-                                   } finally {
-                                       buffer.release();
-                                   }
-                               }));
-                   })
-                   .collectList()
-                   .filter(CollectionUtils::isNotEmpty)
-                   .flatMap(lst -> {
-                       BulkRequest request = new BulkRequest();
-                       request.timeout(TimeValue.timeValueSeconds(9));
-                       lst.forEach(request::add);
-                       return restClient
-                           .bulk(request)
-                           .as(save -> {
-                               if (buffer.maxRetry > 0) {
-                                   return save.retryWhen(Retry.backoff(buffer.maxRetry, buffer.minBackoff));
-                               }
-                               return save;
-                           });
-                   })
-                   .doOnNext(response -> {
-                       if (response.hasFailures()) {
-                           System.err.println(response.buildFailureMessage());
-                       }
-                   })
-                   .thenReturn(buffers.size());
+            .groupBy(Buffer::getIndex, Integer.MAX_VALUE)
+            .flatMap(group -> {
+                String index = group.key();
+                return this
+                    .getIndexForSave(index)
+                    .flatMapMany(realIndex -> group
+                        .map(buffer -> {
+                            try {
+                                IndexRequest request;
+                                if (buffer.id != null) {
+                                    request = new IndexRequest(realIndex).type("_doc").id(buffer.id);
+                                } else {
+                                    request = new IndexRequest(realIndex).type("_doc");
+                                }
+                                request.source(buffer.payload, XContentType.JSON);
+                                return request;
+                            } finally {
+                                buffer.release();
+                            }
+                        }));
+            })
+            .collectList()
+            .filter(CollectionUtils::isNotEmpty)
+            .flatMap(lst -> {
+                BulkRequest request = new BulkRequest();
+                request.timeout(TimeValue.timeValueSeconds(9));
+                lst.forEach(request::add);
+                return restClient
+                    .bulk(request)
+                    .as(save -> {
+                        if (buffer.maxRetry > 0) {
+                            return save.retryWhen(Retry.backoff(buffer.maxRetry, buffer.minBackoff));
+                        }
+                        return save;
+                    });
+            })
+            .doOnNext(response -> {
+                if (response.hasFailures()) {
+                    System.err.println(response.buildFailureMessage());
+                }
+            })
+            .thenReturn(buffers.size());
     }
 
     @SneakyThrows
@@ -444,14 +376,14 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
 
     private <T> List<T> translate(Function<Map<String, Object>, T> mapper, SearchResponse response) {
         return Arrays.stream(response.getHits().getHits())
-                     .map(hit -> {
-                         Map<String, Object> hitMap = hit.getSourceAsMap();
-                         if (StringUtils.isEmpty(hitMap.get("id"))) {
-                             hitMap.put("id", hit.getId());
-                         }
-                         return mapper.apply(hitMap);
-                     })
-                     .collect(Collectors.toList());
+            .map(hit -> {
+                Map<String, Object> hitMap = hit.getSourceAsMap();
+                if (StringUtils.isEmpty(hitMap.get("id"))) {
+                    hitMap.put("id", hit.getId());
+                }
+                return mapper.apply(hitMap);
+            })
+            .collect(Collectors.toList());
     }
 
     private Flux<SearchHit> doSearch(SearchRequest request) {
@@ -484,12 +416,12 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
 
         SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, indexes.get(0));
         return Flux.fromIterable(indexes)
-                   .flatMap(index -> getIndexForSearch(index.getIndex()))
-                   .collectList()
-                   .map(indexList ->
-                            new SearchRequest(indexList.toArray(new String[0]))
-                                .source(builder)
-                                .indicesOptions(indexOptions));
+            .flatMap(index -> getIndexForSearch(index.getIndex()))
+            .collectList()
+            .map(indexList ->
+                new SearchRequest(indexList.toArray(new String[0]))
+                    .source(builder)
+                    .indicesOptions(indexOptions));
     }
 
     protected Mono<QueryBuilder> createQueryBuilder(QueryParam queryParam, String index) {
@@ -506,9 +438,9 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
 
         SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, indexes.get(0));
         return Flux.fromIterable(indexes)
-                   .flatMap(index -> getIndexForSearch(index.getIndex()))
-                   .collectList()
-                   .map(indexList -> new CountRequest(indexList.toArray(new String[0])).source(builder));
+            .flatMap(index -> getIndexForSearch(index.getIndex()))
+            .collectList()
+            .map(indexList -> new CountRequest(indexList.toArray(new String[0])).source(builder));
     }
 
     private Mono<CountRequest> createCountRequest(QueryParam queryParam, String... index) {
@@ -518,4 +450,68 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
             .filter(CollectionUtils::isNotEmpty)
             .flatMap(list -> createCountRequest(queryParam, list));
     }
+
+    @Getter
+    @Setter
+    public static class BufferConfig {
+        //最小间隔
+        private int rate = Integer.getInteger("elasticsearch.buffer.rate", 1000);
+        //缓冲最大数量
+        private int bufferSize = Integer.getInteger("elasticsearch.buffer.size", 3000);
+        //缓冲超时时间
+        private Duration bufferTimeout = Duration.ofSeconds(Integer.getInteger("elasticsearch.buffer.timeout", 3));
+        //背压堆积数量限制.
+        private int bufferBackpressure = Integer.getInteger("elasticsearch.buffer.backpressure", Runtime
+            .getRuntime()
+            .availableProcessors());
+        //最大缓冲字节
+        private DataSize bufferBytes = DataSize.parse(System.getProperty("elasticsearch.buffer.bytes", "15MB"));
+
+        //最大重试次数
+        private int maxRetry = 3;
+        //重试间隔
+        private Duration minBackoff = Duration.ofSeconds(3);
+    }
+
+    @Getter
+    static class Buffer {
+        final ObjectPool.Handle<Buffer> handle;
+        String index;
+        String id;
+        String payload;
+
+        public Buffer(ObjectPool.Handle<Buffer> handle) {
+            this.handle = handle;
+        }
+
+        public static Buffer of(String index, Object payload) {
+            Buffer buffer;
+            try {
+                buffer = pool.get();
+            } catch (Exception e) {
+                buffer = new Buffer(null);
+            }
+            buffer.index = index;
+            Map<String, Object> data = payload instanceof Map
+                ? ((Map) payload) :
+                FastBeanCopier.copy(payload, HashMap::new);
+            Object id = data.get("id");
+            buffer.id = id == null ? null : String.valueOf(id);
+            buffer.payload = JSON.toJSONString(data);
+            return buffer;
+        }
+
+        void release() {
+            this.index = null;
+            this.id = null;
+            this.payload = null;
+            if (null != handle) {
+                handle.recycle(this);
+            }
+        }
+
+        int numberOfBytes() {
+            return payload == null ? 0 : payload.length() * 2;
+        }
+    }
 }

+ 1 - 1
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DefaultDeviceGatewayManager.java

@@ -17,7 +17,7 @@ import java.util.concurrent.ConcurrentHashMap;
  * <p>
  * TCP   UDP   MQTT  CoAP
  *
- * @author Jetlinks
+ * @author zhouhao
  */
 @Component
 public class DefaultDeviceGatewayManager implements DeviceGatewayManager, BeanPostProcessor {

+ 8 - 3
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayProperties.java

@@ -8,11 +8,16 @@ import java.util.HashMap;
 import java.util.Map;
 
 /**
- * @author Tensai
+ * 设备网关属性外观类
+ * <p>
+ * 转换设备网关属性数据
+ * </p>
+ *
+ * @author zhouhao
  */
 @Getter
 @Setter
-public class DeviceGatewayProperties  implements ValueObject {
+public class DeviceGatewayProperties implements ValueObject {
 
     private String id;
 
@@ -20,7 +25,7 @@ public class DeviceGatewayProperties  implements ValueObject {
 
     private String networkId;
 
-    private Map<String,Object> configuration=new HashMap<>();
+    private Map<String, Object> configuration = new HashMap<>();
 
     @Override
     public Map<String, Object> values() {

+ 3 - 1
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayPropertiesManager.java

@@ -3,7 +3,9 @@ package org.jetlinks.community.gateway.supports;
 import reactor.core.publisher.Mono;
 
 /**
- * @author Jetlinks
+ * 设备网关属性管理器
+ *
+ * @author zhouhao
  */
 public interface DeviceGatewayPropertiesManager {
 

+ 5 - 2
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayProvider.java

@@ -5,9 +5,12 @@ import org.jetlinks.community.network.NetworkType;
 import reactor.core.publisher.Mono;
 
 /**
- * 设备网关服务提供者
+ * 设备网关支持提供商,用于提供对各种设备网关的支持.在启动设备网关时,会根据对应的提供商以及配置来创建设备网关.
+ * 实现统一管理网关配置,动态创建设备网关.
  *
- * @author Jetlinks
+ * @author zhouhao
+ * @see DeviceGateway
+ * @since 1.0
  */
 public interface DeviceGatewayProvider {
 

+ 7 - 2
jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/DefaultNetworkManager.java

@@ -19,6 +19,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+/**
+ * 默认网络管理器
+ *
+ * @author zhouhao
+ */
 @Component
 @Slf4j
 public class DefaultNetworkManager implements NetworkManager, BeanPostProcessor {
@@ -26,9 +31,9 @@ public class DefaultNetworkManager implements NetworkManager, BeanPostProcessor
     private final NetworkConfigManager configManager;
 
 
-    private Map<String, Map<String, Network>> store = new ConcurrentHashMap<>();
+    private final Map<String, Map<String, Network>> store = new ConcurrentHashMap<>();
 
-    private Map<String, NetworkProvider<Object>> providerSupport = new ConcurrentHashMap<>();
+    private final Map<String, NetworkProvider<Object>> providerSupport = new ConcurrentHashMap<>();
 
     public DefaultNetworkManager(NetworkConfigManager configManager) {
         this.configManager = configManager;

+ 37 - 2
jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/NetworkManager.java

@@ -4,13 +4,48 @@ import reactor.core.publisher.Mono;
 
 import java.util.List;
 
+/**
+ * 网络服务管理器
+ * <p>
+ * 管理所有的网络组件
+ *
+ * @author zhouhao
+ * @since 1.0
+ */
 public interface NetworkManager {
 
+    /**
+     * 根据ID获取网络组件,否则根据type和id创建网络组件并返回
+     *
+     * @param type 网络类型
+     * @param id   网络组件id
+     * @param <T>  NetWork子类泛型
+     * @return 网络组件
+     */
     <T extends Network> Mono<T> getNetwork(NetworkType type, String id);
 
+    /**
+     * 获取所有的网络组件支持提供商
+     *
+     * @return 网络组件支持提供商
+     */
     List<NetworkProvider<?>> getProviders();
 
-   Mono<Void> reload(NetworkType type, String id);
+    /**
+     * 重新加载网络组件
+     *
+     * @param type 网络类型
+     * @param id   网络组件ID
+     * @return void
+     */
+    Mono<Void> reload(NetworkType type, String id);
 
-   Mono<Void> shutdown(NetworkType type, String id);
+    /**
+     * 停止网络组件
+     *
+     * @param type 网络类型
+     * @param id   网络组件ID
+     * @return void
+     */
+    Mono<Void> shutdown(NetworkType type, String id);
 }

+ 4 - 1
jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/utils/DeviceGatewayHelper.java

@@ -23,8 +23,11 @@ import java.util.function.Supplier;
 
 /**
  * 设备网关处理工具
+ * <p>
+ * 封装常用的设备消息处理操作
+ * </p>
  *
- * @author Jetlinks
+ * @author zhouhao
  */
 @AllArgsConstructor
 public class DeviceGatewayHelper {

+ 4 - 4
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java

@@ -43,7 +43,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.LongAdder;
 
 @Slf4j(topic = "system.tcp.gateway")
-public class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGateway {
+class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGateway {
 
     @Getter
     private final String id;
@@ -226,7 +226,7 @@ public class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDevi
         /**
          * 接收消息
          *
-         * @return null
+         * @return void
          */
         Mono<Void> accept() {
             return getProtocol()
@@ -251,7 +251,7 @@ public class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDevi
          * 处理TCP消息 ==>> 设备消息
          *
          * @param message tcp消息
-         * @return null
+         * @return void
          */
         Mono<Void> handleTcpMessage(TcpMessage message) {
             return getProtocol()
@@ -272,7 +272,7 @@ public class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDevi
          * 处理设备消息
          *
          * @param message 设备消息
-         * @return null
+         * @return void
          */
         Mono<Void> handleDeviceMessage(DeviceMessage message) {
             if (processor.hasDownstreams()) {

+ 11 - 5
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGatewayProvider.java

@@ -1,8 +1,5 @@
 package org.jetlinks.community.network.tcp.device;
 
-import org.jetlinks.core.ProtocolSupports;
-import org.jetlinks.core.device.DeviceRegistry;
-import org.jetlinks.core.server.session.DeviceSessionManager;
 import org.jetlinks.community.gateway.DeviceGateway;
 import org.jetlinks.community.gateway.supports.DeviceGatewayProperties;
 import org.jetlinks.community.gateway.supports.DeviceGatewayProvider;
@@ -10,11 +7,20 @@ import org.jetlinks.community.network.DefaultNetworkType;
 import org.jetlinks.community.network.NetworkManager;
 import org.jetlinks.community.network.NetworkType;
 import org.jetlinks.community.network.tcp.server.TcpServer;
+import org.jetlinks.core.ProtocolSupports;
+import org.jetlinks.core.device.DeviceRegistry;
+import org.jetlinks.core.server.session.DeviceSessionManager;
 import org.jetlinks.supports.server.DecodedClientMessageHandler;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 import reactor.core.publisher.Mono;
 
+/**
+ * TCP服务设备网关提供商
+ *
+ * @author zhouhao
+ * @since 1.0
+ */
 @Component
 public class TcpServerDeviceGatewayProvider implements DeviceGatewayProvider {
 
@@ -63,9 +69,9 @@ public class TcpServerDeviceGatewayProvider implements DeviceGatewayProvider {
             .map(mqttServer -> {
                 String protocol = (String) properties.getConfiguration().get("protocol");
 
-                Assert.hasText(protocol,"protocol can not be empty");
+                Assert.hasText(protocol, "protocol can not be empty");
 
-               return new TcpServerDeviceGateway(properties.getId(),
+                return new TcpServerDeviceGateway(properties.getId(),
                     protocol,
                     protocolSupports,
                     registry,

+ 13 - 8
jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/TimeSeriesData.java

@@ -7,8 +7,21 @@ import java.util.Date;
 import java.util.Map;
 import java.util.Optional;
 
+/**
+ * 时序数据封装类
+ *
+ * @author zhouhao
+ */
 public interface TimeSeriesData extends ValueObject {
 
+    static TimeSeriesData of(Date date, Map<String, Object> data) {
+        return of(date == null ? System.currentTimeMillis() : date.getTime(), data);
+    }
+
+    static TimeSeriesData of(long timestamp, Map<String, Object> data) {
+        return new SimpleTimeSeriesData(timestamp, data);
+    }
+
     long getTimestamp();
 
     Map<String, Object> getData();
@@ -23,14 +36,6 @@ public interface TimeSeriesData extends ValueObject {
         return Optional.ofNullable(getData().get(name));
     }
 
-    static TimeSeriesData of(Date date, Map<String, Object> data) {
-        return of(date == null ? System.currentTimeMillis() : date.getTime(), data);
-    }
-
-    static TimeSeriesData of(long timestamp, Map<String, Object> data) {
-        return new SimpleTimeSeriesData(timestamp, data);
-    }
-
     @Override
     default <T> T as(Class<T> type) {
         return FastBeanCopier.copy(getData(), type);

+ 0 - 1
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java

@@ -123,7 +123,6 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler {
         createFastBuilder(MessageType.DERIVED_METADATA, "/metadata/derived");
     }
 
-    //设备注册中心
     private final DeviceRegistry registry;
     private final EventBus eventBus;
     private final MessageHandler messageHandler;

+ 1 - 1
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/writer/TimeSeriesMessageWriterConnector.java

@@ -23,7 +23,7 @@ public class TimeSeriesMessageWriterConnector {
      * 订阅设备消息 入库
      *
      * @param message 设备消息
-     * @return null
+     * @return void
      */
     @Subscribe(topics = "/device/**", id = "device-message-ts-writer")
     public Mono<Void> writeDeviceMessageToTs(DeviceMessage message) {

+ 4 - 2
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DefaultDeviceDataService.java

@@ -25,8 +25,10 @@ import java.util.function.Function;
 
 /**
  * 默认设备数据服务
+ * <p>
+ * 管理设备存储策略、提供数据查询和入库操作
  *
- * @author JetLinks
+ * @author zhouhao
  */
 @Component
 public class DefaultDeviceDataService implements DeviceDataService {
@@ -116,7 +118,7 @@ public class DefaultDeviceDataService implements DeviceDataService {
                                                     @Nonnull String... properties) {
         return this
             .getDeviceStrategy(deviceId)
-            .flatMapMany(strategy -> strategy.queryEachProperties(deviceId, query,properties));
+            .flatMapMany(strategy -> strategy.queryEachProperties(deviceId, query, properties));
     }
 
     @Nonnull

+ 22 - 19
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesColumnDeviceDataStoragePolicy.java

@@ -32,6 +32,11 @@ import java.util.stream.Stream;
 
 import static org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric.devicePropertyMetric;
 
+/**
+ * 时序数据列存储策略
+ *
+ * @author zhouhao
+ */
 @Component
 public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDataStoragePolicy implements DeviceDataStoragePolicy {
 
@@ -67,10 +72,10 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat
     public Mono<Void> registerMetadata(@Nonnull String productId, @Nonnull DeviceMetadata metadata) {
         return Flux
             .concat(Flux
-                        .fromIterable(metadata.getEvents())
-                        .flatMap(event -> timeSeriesManager.registerMetadata(DeviceTimeSeriesMetadata.event(productId, event))),
-                    timeSeriesManager.registerMetadata(DeviceTimeSeriesMetadata.properties(productId, metadata.getProperties())),
-                    timeSeriesManager.registerMetadata(DeviceTimeSeriesMetadata.log(productId)))
+                    .fromIterable(metadata.getEvents())
+                    .flatMap(event -> timeSeriesManager.registerMetadata(DeviceTimeSeriesMetadata.event(productId, event))),
+                timeSeriesManager.registerMetadata(DeviceTimeSeriesMetadata.properties(productId, metadata.getProperties())),
+                timeSeriesManager.registerMetadata(DeviceTimeSeriesMetadata.log(productId)))
             .then();
     }
 
@@ -79,8 +84,6 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat
                                                          String deviceId,
                                                          Map<String, PropertyMetadata> property,
                                                          QueryParamEntity param) {
-
-
         //查询多个属性,分组聚合获取第一条数据
         return param
             .toQuery()
@@ -121,20 +124,20 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat
             .getDevice(deviceId)
             .flatMap(device -> Mono.zip(device.getProduct(), device.getMetadata()))
             .flatMap(tp2 -> {
-                         PropertyMetadata prop = tp2.getT2().getPropertyOrNull(property);
+                    PropertyMetadata prop = tp2.getT2().getPropertyOrNull(property);
 
-                         return param
-                             .toQuery()
-                             .includes(property)
-                             .where("deviceId", deviceId)
-                             .execute(query -> timeSeriesManager
-                                 .getService(devicePropertyMetric(tp2.getT1().getId()))
-                                 .queryPager(query,
-                                             data -> DeviceProperty
-                                                 .of(data, data.get(property).orElse(0), prop)
-                                                 .property(property)
-                                 ));
-                     }
+                    return param
+                        .toQuery()
+                        .includes(property)
+                        .where("deviceId", deviceId)
+                        .execute(query -> timeSeriesManager
+                            .getService(devicePropertyMetric(tp2.getT1().getId()))
+                            .queryPager(query,
+                                data -> DeviceProperty
+                                    .of(data, data.get(property).orElse(0), prop)
+                                    .property(property)
+                            ));
+                }
             );
     }
 

+ 8 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesDeviceDataStoragePolicy.java

@@ -10,6 +10,14 @@ import reactor.core.publisher.Mono;
 
 import java.util.function.Function;
 
+/**
+ * 抽象时序数据存储策略
+ * <p>
+ * 提供时序数据通用的查询存储逻辑
+ * </p>
+ *
+ * @author zhouhao
+ */
 public abstract class TimeSeriesDeviceDataStoragePolicy extends AbstractDeviceDataStoragePolicy {
 
 

+ 12 - 7
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesRowDeviceDataStoreStoragePolicy.java

@@ -28,6 +28,11 @@ import java.util.stream.Stream;
 
 import static org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric.devicePropertyMetric;
 
+/**
+ * 设备时序数据行存储策略
+ *
+ * @author zhouhao
+ */
 @Component
 public class TimeSeriesRowDeviceDataStoreStoragePolicy extends TimeSeriesDeviceDataStoragePolicy implements DeviceDataStoragePolicy {
 
@@ -184,11 +189,11 @@ public class TimeSeriesRowDeviceDataStoreStoragePolicy extends TimeSeriesDeviceD
                 return timeSeriesManager
                     .getService(devicePropertyMetric(tp2.getT1().getId()))
                     .aggregation(AggregationQueryParam
-                                     .of()
-                                     .agg(new LimitAggregationColumn("property", "property", Aggregation.TOP, query.getPageSize()))
-                                     .groupBy(new LimitGroup("property", "property", propertiesMap.size() * 2)) //按property分组
-                                     .filter(query)
-                                     .filter(q -> q.where("deviceId", deviceId).in("property", propertiesMap.keySet()))
+                        .of()
+                        .agg(new LimitAggregationColumn("property", "property", Aggregation.TOP, query.getPageSize()))
+                        .groupBy(new LimitGroup("property", "property", propertiesMap.size() * 2)) //按property分组
+                        .filter(query)
+                        .filter(q -> q.where("deviceId", deviceId).in("property", propertiesMap.keySet()))
                     ).map(data -> DeviceProperty
                         .of(data, data.getString("property").map(propertiesMap::get).orElse(null))
                         .deviceId(deviceId));
@@ -236,13 +241,13 @@ public class TimeSeriesRowDeviceDataStoreStoragePolicy extends TimeSeriesDeviceD
             //执行查询
             .execute(timeSeriesManager.getService(getTimeSeriesMetric(productId))::aggregation)
             //按时间分组,然后将返回的结果合并起来
-            .groupBy(agg -> agg.getString("time", ""),Integer.MAX_VALUE)
+            .groupBy(agg -> agg.getString("time", ""), Integer.MAX_VALUE)
             .flatMap(group ->
                 {
                     String time = group.key();
                     return group
                         //按属性分组
-                        .groupBy(agg -> agg.getString("property", ""),Integer.MAX_VALUE)
+                        .groupBy(agg -> agg.getString("property", ""), Integer.MAX_VALUE)
                         .flatMap(propsGroup -> {
                             String property = propsGroup.key();
                             return propsGroup

+ 3 - 1
jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/service/DeviceGatewayConfigService.java

@@ -8,7 +8,9 @@ import org.springframework.stereotype.Service;
 import reactor.core.publisher.Mono;
 
 /**
- * @author jetlinks
+ * 设备网关配置服务
+ *
+ * @author zhouhao
  */
 @Service
 public class DeviceGatewayConfigService implements DeviceGatewayPropertiesManager {