Browse Source

Merge remote-tracking branch 'origin/master'

zhouhao 5 years ago
parent
commit
978fc950cb
31 changed files with 673 additions and 73 deletions
  1. 8 4
      README.md
  2. 2 2
      docker/run-all/docker-compose-embedded.yml
  3. 2 2
      docker/run-all/docker-compose.yml
  4. 2 1
      jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/AbstractElasticSearchIndexStrategy.java
  5. 33 1
      jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/DefaultElasticSearchService.java
  6. 6 0
      jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/ElasticSearchService.java
  7. 14 5
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/DefaultMessagingManager.java
  8. 1 1
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/SubscriptionProvider.java
  9. 40 30
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/socket/WebSocketMessagingHandler.java
  10. 8 5
      jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java
  11. 1 0
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClient.java
  12. 1 1
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClientProvider.java
  13. 2 1
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceTagEntity.java
  14. 10 1
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusChangeMeasurement.java
  15. 4 0
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/response/DeviceDetail.java
  16. 37 4
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java
  17. 7 0
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceInstanceController.java
  18. 14 0
      jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/DebugAuthenticationHandler.java
  19. 22 0
      jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/DebugUtils.java
  20. 87 0
      jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/MqttClientDebugSubscriptionProvider.java
  21. 148 0
      jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/MqttServerDebugSubscriptionProvider.java
  22. 82 0
      jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/TcpClientDebugSubscriptionProvider.java
  23. 128 0
      jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/TcpServerDebugSubscriptionProvider.java
  24. 2 3
      jetlinks-manager/visualization-manager/src/main/java/org/jetlinks/pro/visualization/entity/DataVisualizationEntity.java
  25. 1 1
      jetlinks-manager/visualization-manager/src/main/java/org/jetlinks/pro/visualization/enums/DataVisualizationState.java
  26. 2 2
      jetlinks-manager/visualization-manager/src/main/java/org/jetlinks/pro/visualization/service/DataVisualizationService.java
  27. 3 3
      jetlinks-manager/visualization-manager/src/main/java/org/jetlinks/pro/visualization/web/DataVisualizationController.java
  28. 1 4
      jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/JetLinksApplication.java
  29. 1 0
      jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/JetLinksConfiguration.java
  30. 3 1
      jetlinks-standalone/src/main/resources/application.yml
  31. 1 1
      pom.xml

+ 8 - 4
README.md

@@ -1,7 +1,7 @@
 # JetLinks 物联网基础平台
 # JetLinks 物联网基础平台
 
 
 ![GitHub Workflow Status](https://img.shields.io/github/workflow/status/jetlinks/jetlinks-community/Auto%20Deploy%20Docker?label=docker)
 ![GitHub Workflow Status](https://img.shields.io/github/workflow/status/jetlinks/jetlinks-community/Auto%20Deploy%20Docker?label=docker)
-![Version](https://img.shields.io/badge/Version-1.0--RELEASE-brightgreen)
+![Version](https://img.shields.io/badge/Version-1.1--RELEASE-brightgreen)
 ![QQ群2021514](https://img.shields.io/badge/QQ群-2021514-brightgreen)
 ![QQ群2021514](https://img.shields.io/badge/QQ群-2021514-brightgreen)
 
 
 JetLinks 基于Java8,Spring Boot 2.x,WebFlux,Netty,Vert.x,Reactor等开发, 
 JetLinks 基于Java8,Spring Boot 2.x,WebFlux,Netty,Vert.x,Reactor等开发, 
@@ -69,6 +69,7 @@ JetLinks 基于Java8,Spring Boot 2.x,WebFlux,Netty,Vert.x,Reactor等开发,
 | 设备管理,设备接入|  ✅ | ✅ |       ✅     |
 | 设备管理,设备接入|  ✅ | ✅ |       ✅     |
 | 多消息协议支持|  ✅ | ✅ |       ✅     |
 | 多消息协议支持|  ✅ | ✅ |       ✅     |
 | 规则引擎-设备告警        |  ✅ |  ✅ |     ✅     |
 | 规则引擎-设备告警        |  ✅ |  ✅ |     ✅     |
+| 规则引擎-数据转发 |  ✅  |  ✅ |     ✅ |
 | 系统监控,数据统计  |  ✅  |  ✅ |  ✅   |
 | 系统监控,数据统计  |  ✅  |  ✅ |  ✅   |
 | 邮件消息通知    |  ✅  |  ✅ |     ✅      |
 | 邮件消息通知    |  ✅  |  ✅ |     ✅      |
 | 微信企业消息    |  ✅  |  ✅ |     ✅      |
 | 微信企业消息    |  ✅  |  ✅ |     ✅      |
@@ -77,13 +78,16 @@ JetLinks 基于Java8,Spring Boot 2.x,WebFlux,Netty,Vert.x,Reactor等开发,
 | TCP(TLS)     |  ✅  |  ✅ |  ✅    |
 | TCP(TLS)     |  ✅  |  ✅ |  ✅    |
 | CoAP(DTLS)    |  ⭕  |  ✅ |     ✅       |
 | CoAP(DTLS)    |  ⭕  |  ✅ |     ✅       |
 | Http,WebSocket(TLS) |  ⭕  |  ✅ |     ✅ |
 | Http,WebSocket(TLS) |  ⭕  |  ✅ |     ✅ |
-| 规则引擎-数据转发 |  ⭕  |  ✅ |     ✅ |
+| 数据转发:MQTT,HTTP,Kafka... |  ⭕  |  ✅ |     ✅ |
 | Geo地理位置支持     | ⭕   |  ✅ |  ✅    |
 | Geo地理位置支持     | ⭕   |  ✅ |  ✅    |
 | 可视化图表配置   |  ⭕  |  ✅ |     ✅    |
 | 可视化图表配置   |  ⭕  |  ✅ |     ✅    |
 | OpenAPI    |  ⭕  |  ✅ |     ✅     |
 | OpenAPI    |  ⭕  |  ✅ |     ✅     |
 | 集群支持    |  ⭕  |  ✅ |     ✅     |
 | 集群支持    |  ⭕  |  ✅ |     ✅     |
-| 线上技术支持 |  ⭕  |  ✅ |   ✅   |
-| 线下技术支持 |  ⭕  |  ⭕ |   ✅   |
+| QQ群技术支持 |  ⭕  |  ✅ |   ✅   |
+| 一对一技术支持 |  ⭕  |  ⭕ |   ✅   |
+| 微服务架构(建设中)   |  ⭕  |  ⭕ |   ✅   |
+| 多租户(建设中)   |  ⭕  |  ⭕ |   ✅   |
+| 统一认证(建设中)   |  ⭕  |  ⭕ |   ✅   |
 | 定制开发   |  ⭕  |  ⭕ |   ✅   |
 | 定制开发   |  ⭕  |  ⭕ |   ✅   |
 | 商业限制   |  无  |  单个项目 |   无   |
 | 商业限制   |  无  |  单个项目 |   无   |
 | 定价   |  免费  | 联系我们  |  联系我们   |
 | 定价   |  免费  | 联系我们  |  联系我们   |

+ 2 - 2
docker/run-all/docker-compose-embedded.yml

@@ -1,7 +1,7 @@
 version: '2'
 version: '2'
 services:
 services:
   ui:
   ui:
-    image: registry.cn-shenzhen.aliyuncs.com/jetlinks/jetlinks-ui-antd:1.1.0-RELEASE
+    image: registry.cn-shenzhen.aliyuncs.com/jetlinks/jetlinks-ui-antd:1.1.1-RELEASE
     container_name: jetlinks-ce-ui
     container_name: jetlinks-ce-ui
     ports:
     ports:
       - 9000:80
       - 9000:80
@@ -12,7 +12,7 @@ services:
     links:
     links:
       - jetlinks:jetlinks
       - jetlinks:jetlinks
   jetlinks:
   jetlinks:
-    image: registry.cn-shenzhen.aliyuncs.com/jetlinks/jetlinks-standalone:1.1
+    image: registry.cn-shenzhen.aliyuncs.com/jetlinks/jetlinks-standalone:1.2-SNAPSHOT
     container_name: jetlinks-ce
     container_name: jetlinks-ce
     ports:
     ports:
       - 8848:8848 # API端口
       - 8848:8848 # API端口

+ 2 - 2
docker/run-all/docker-compose.yml

@@ -48,7 +48,7 @@ services:
       POSTGRES_DB: jetlinks
       POSTGRES_DB: jetlinks
       TZ: Asia/Shanghai
       TZ: Asia/Shanghai
   ui:
   ui:
-    image: registry.cn-shenzhen.aliyuncs.com/jetlinks/jetlinks-ui-antd:1.1.0-RELEASE
+    image: registry.cn-shenzhen.aliyuncs.com/jetlinks/jetlinks-ui-antd:1.1.1-RELEASE
     container_name: jetlinks-ce-ui
     container_name: jetlinks-ce-ui
     ports:
     ports:
       - 9000:80
       - 9000:80
@@ -59,7 +59,7 @@ services:
     links:
     links:
       - jetlinks:jetlinks
       - jetlinks:jetlinks
   jetlinks:
   jetlinks:
-    image: registry.cn-shenzhen.aliyuncs.com/jetlinks/jetlinks-standalone:1.1
+    image: registry.cn-shenzhen.aliyuncs.com/jetlinks/jetlinks-standalone:1.2-SNAPSHOT
     container_name: jetlinks-ce
     container_name: jetlinks-ce
     ports:
     ports:
       - 8848:8848 # API端口
       - 8848:8848 # API端口

+ 2 - 1
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/AbstractElasticSearchIndexStrategy.java

@@ -28,7 +28,7 @@ import java.util.stream.Collectors;
 @Slf4j
 @Slf4j
 public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearchIndexStrategy {
 public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearchIndexStrategy {
     @Getter
     @Getter
-    private String id;
+    private final String id;
 
 
     protected ElasticRestClient client;
     protected ElasticRestClient client;
 
 
@@ -151,6 +151,7 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc
             property.put("properties", createElasticProperties(objectType.getProperties()));
             property.put("properties", createElasticProperties(objectType.getProperties()));
         } else {
         } else {
             property.put("type", "keyword");
             property.put("type", "keyword");
+            property.put("ignore_above",512);
         }
         }
         return property;
         return property;
     }
     }

+ 33 - 1
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/DefaultElasticSearchService.java

@@ -3,9 +3,11 @@ package org.jetlinks.community.elastic.search.service;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSON;
 import lombok.AllArgsConstructor;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.Getter;
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.CollectionUtils;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.index.IndexRequest;
@@ -122,6 +124,25 @@ public class DefaultElasticSearchService implements ElasticSearchService {
             .then();
             .then();
     }
     }
 
 
+    @Override
+    public <T> Mono<Void> save(String index, T payload) {
+        return save(index, Mono.just(payload));
+    }
+
+    @Override
+    public <T> Mono<Void> save(String index, Publisher<T> data) {
+        return Flux.from(data)
+            .map(v -> new Buffer(index, v))
+            .collectList()
+            .flatMap(this::doSave)
+            .then();
+    }
+
+    @Override
+    public <T> Mono<Void> save(String index, Collection<T> payload) {
+        return save(index, Flux.fromIterable(payload));
+    }
+
     @PreDestroy
     @PreDestroy
     public void shutdown() {
     public void shutdown() {
         sink.complete();
         sink.complete();
@@ -196,10 +217,21 @@ public class DefaultElasticSearchService implements ElasticSearchService {
                 lst.forEach(request::add);
                 lst.forEach(request::add);
                 return ReactorActionListener.<BulkResponse>mono(listener ->
                 return ReactorActionListener.<BulkResponse>mono(listener ->
                     restClient.getWriteClient().bulkAsync(request, RequestOptions.DEFAULT, listener)
                     restClient.getWriteClient().bulkAsync(request, RequestOptions.DEFAULT, listener)
-                );
+                ).doOnNext(this::checkResponse);
             }).thenReturn(buffers.size());
             }).thenReturn(buffers.size());
     }
     }
 
 
+    @SneakyThrows
+    protected void checkResponse(BulkResponse response) {
+        if (response.hasFailures()) {
+            for (BulkItemResponse item : response.getItems()) {
+                if (item.isFailed()) {
+                    throw item.getFailure().getCause();
+                }
+            }
+        }
+    }
+
     private <T> PagerResult<T> translatePageResult(Function<Map<String, Object>, T> mapper, QueryParam param, SearchResponse response) {
     private <T> PagerResult<T> translatePageResult(Function<Map<String, Object>, T> mapper, QueryParam param, SearchResponse response) {
         long total = response.getHits().getTotalHits();
         long total = response.getHits().getTotalHits();
         return PagerResult.of((int) total, translate(mapper, response), param);
         return PagerResult.of((int) total, translate(mapper, response), param);

+ 6 - 0
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/ElasticSearchService.java

@@ -26,6 +26,12 @@ public interface ElasticSearchService {
 
 
     <T> Mono<Void> commit(String index, Publisher<T> data);
     <T> Mono<Void> commit(String index, Publisher<T> data);
 
 
+    <T> Mono<Void> save(String index,  T payload);
+
+    <T> Mono<Void> save(String index,  Collection<T> payload);
+
+    <T> Mono<Void> save(String index, Publisher<T> data);
+
     default <T> Flux<T> query(String index, QueryParam queryParam, Class<T> type) {
     default <T> Flux<T> query(String index, QueryParam queryParam, Class<T> type) {
         return query(index, queryParam, map -> FastBeanCopier.copy(map, type));
         return query(index, queryParam, map -> FastBeanCopier.copy(map, type));
     }
     }

+ 14 - 5
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/DefaultMessagingManager.java

@@ -20,13 +20,22 @@ public class DefaultMessagingManager implements MessagingManager, BeanPostProces
     @Override
     @Override
     public Flux<Message> subscribe(SubscribeRequest request) {
     public Flux<Message> subscribe(SubscribeRequest request) {
 
 
-        for (Map.Entry<String, SubscriptionProvider> entry : subProvider.entrySet()) {
-            if (matcher.match(entry.getKey(), request.getTopic())) {
-                return entry.getValue().subscribe(request);
+        return Flux.defer(() -> {
+            for (Map.Entry<String, SubscriptionProvider> entry : subProvider.entrySet()) {
+                if (matcher.match(entry.getKey(), request.getTopic())) {
+                    return entry.getValue()
+                        .subscribe(request)
+                        .map(v -> {
+                            if (v instanceof Message) {
+                                return ((Message) v);
+                            }
+                            return Message.success(request.getId(), request.getTopic(), v);
+                        });
+                }
             }
             }
-        }
 
 
-        return Flux.empty();
+            return Flux.error(new UnsupportedOperationException("不支持的topic"));
+        });
     }
     }
 
 
     public void register(SubscriptionProvider provider) {
     public void register(SubscriptionProvider provider) {

+ 1 - 1
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/SubscriptionProvider.java

@@ -10,6 +10,6 @@ public interface SubscriptionProvider {
 
 
     String[] getTopicPattern();
     String[] getTopicPattern();
 
 
-    Flux<Message> subscribe(SubscribeRequest request);
+    Flux<?> subscribe(SubscribeRequest request);
 
 
 }
 }

+ 40 - 30
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/socket/WebSocketMessagingHandler.java

@@ -2,6 +2,7 @@ package org.jetlinks.community.gateway.external.socket;
 
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSON;
 import lombok.AllArgsConstructor;
 import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.authorization.ReactiveAuthenticationManager;
 import org.hswebframework.web.authorization.ReactiveAuthenticationManager;
 import org.hswebframework.web.authorization.token.UserToken;
 import org.hswebframework.web.authorization.token.UserToken;
 import org.hswebframework.web.authorization.token.UserTokenManager;
 import org.hswebframework.web.authorization.token.UserTokenManager;
@@ -23,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ConcurrentSkipListSet;
 
 
 @AllArgsConstructor
 @AllArgsConstructor
+@Slf4j
 public class WebSocketMessagingHandler implements WebSocketHandler {
 public class WebSocketMessagingHandler implements WebSocketHandler {
 
 
     private final MessagingManager messagingManager;
     private final MessagingManager messagingManager;
@@ -57,41 +59,49 @@ public class WebSocketMessagingHandler implements WebSocketHandler {
             .flatMap(auth -> session
             .flatMap(auth -> session
                 .receive()
                 .receive()
                 .doOnNext(message -> {
                 .doOnNext(message -> {
-                    MessagingRequest request = JSON.parseObject(message.getPayloadAsText(), MessagingRequest.class);
-                    if (StringUtils.isEmpty(request.getId())) {
-                        session
-                            .send(Mono.just(session.textMessage(JSON.toJSONString(
-                                Message.error(request.getType().name(), null, "id不能为空")
-                            )))).subscribe();
-                    }
-                    if (request.getType() == MessagingRequest.Type.sub) {
-                        //重复订阅
-                        if (subs.containsKey(request.getId())) {
-                            return;
+                    try {
+                        MessagingRequest request = JSON.parseObject(message.getPayloadAsText(), MessagingRequest.class);
+                        if (StringUtils.isEmpty(request.getId())) {
+                            session
+                                .send(Mono.just(session.textMessage(JSON.toJSONString(
+                                    Message.error(request.getType().name(), null, "id不能为空")
+                                )))).subscribe();
                         }
                         }
-                        subs.put(request.getId(), messagingManager
-                            .subscribe(SubscribeRequest.of(request, auth))
-                            .onErrorResume(err -> Mono.just(Message.error(request.getId(), request.getTopic(), err.getMessage())))
-                            .map(msg -> session.textMessage(JSON.toJSONString(msg)))
-                            .doOnComplete(() -> {
-                                subs.remove(request.getId());
-                                Mono.just(session.textMessage(JSON.toJSONString(Message.complete(request.getId()))))
-                                    .as(session::send)
-                                    .subscribe();
-                            })
-                            .flatMap(msg -> session.send(Mono.just(msg)))
-                            .subscribe()
-                        );
+                        if (request.getType() == MessagingRequest.Type.sub) {
+                            //重复订阅
+                            if (subs.containsKey(request.getId())) {
+                                return;
+                            }
+                            subs.put(request.getId(), messagingManager
+                                .subscribe(SubscribeRequest.of(request, auth))
+                                .onErrorResume(err -> Mono.just(Message.error(request.getId(), request.getTopic(), err.getMessage())))
+                                .map(msg -> session.textMessage(JSON.toJSONString(msg)))
+                                .doOnComplete(() -> {
+                                    subs.remove(request.getId());
+                                    Mono.just(session.textMessage(JSON.toJSONString(Message.complete(request.getId()))))
+                                        .as(session::send)
+                                        .subscribe();
+                                })
+                                .flatMap(msg -> session.send(Mono.just(msg)))
+                                .subscribe()
+                            );
 
 
-                    } else if (request.getType() == MessagingRequest.Type.unsub) {
-                        Optional.ofNullable(subs.remove(request.getId()))
-                            .ifPresent(Disposable::dispose);
-                    } else {
+                        } else if (request.getType() == MessagingRequest.Type.unsub) {
+                            Optional.ofNullable(subs.remove(request.getId()))
+                                .ifPresent(Disposable::dispose);
+                        } else {
+                            session.send(Mono.just(session.textMessage(JSON.toJSONString(
+                                Message.error(request.getId(), request.getTopic(), "不支持的类型:" + request.getType())
+                            )))).subscribe();
+                        }
+                    } catch (Exception e) {
+                        log.warn(e.getMessage(),e);
                         session.send(Mono.just(session.textMessage(JSON.toJSONString(
                         session.send(Mono.just(session.textMessage(JSON.toJSONString(
-                            Message.error(request.getId(), request.getTopic(), "不支持的类型:" + request.getType())
+                            Message.error("illegal_argument", null, "消息格式错误")
                         )))).subscribe();
                         )))).subscribe();
                     }
                     }
-                }).then())
+                })
+                .then())
             .doFinally(r -> {
             .doFinally(r -> {
                 subs.values().forEach(Disposable::dispose);
                 subs.values().forEach(Disposable::dispose);
                 subs.clear();
                 subs.clear();

+ 8 - 5
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java

@@ -198,14 +198,17 @@ class VertxMqttConnection implements MqttConnection {
         keepAliveTimeoutMs = duration.toMillis();
         keepAliveTimeoutMs = duration.toMillis();
     }
     }
 
 
+    private volatile InetSocketAddress clientAddress;
+
     @Override
     @Override
     public InetSocketAddress getClientAddress() {
     public InetSocketAddress getClientAddress() {
-
-        SocketAddress address = endpoint.remoteAddress();
-        if (address != null) {
-            return new InetSocketAddress(address.host(), address.port());
+        if (clientAddress == null) {
+            SocketAddress address = endpoint.remoteAddress();
+            if (address != null) {
+                clientAddress = new InetSocketAddress(address.host(), address.port());
+            }
         }
         }
-        return null;
+        return clientAddress;
     }
     }
 
 
     @Override
     @Override

+ 1 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClient.java

@@ -110,6 +110,7 @@ public class VertxTcpClient extends AbstractTcpClient {
         if (this.client != null && this.client != client) {
         if (this.client != null && this.client != client) {
             this.client.close();
             this.client.close();
         }
         }
+        keepAlive();
         this.client = client;
         this.client = client;
     }
     }
 
 

+ 1 - 1
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClientProvider.java

@@ -56,13 +56,13 @@ public class VertxTcpClientProvider implements NetworkProvider<TcpClientProperti
     }
     }
 
 
     public void initClient(VertxTcpClient client, TcpClientProperties properties) {
     public void initClient(VertxTcpClient client, TcpClientProperties properties) {
-        client.setRecordParser(payloadParserBuilder.build(properties.getParserType(), properties));
         NetClient netClient = vertx.createNetClient(properties.getOptions());
         NetClient netClient = vertx.createNetClient(properties.getOptions());
         client.setClient(netClient);
         client.setClient(netClient);
         client.setKeepAliveTimeoutMs(properties.getLong("keepAliveTimeout").orElse(Duration.ofMinutes(10).toMillis()));
         client.setKeepAliveTimeoutMs(properties.getLong("keepAliveTimeout").orElse(Duration.ofMinutes(10).toMillis()));
         netClient.connect(properties.getPort(), properties.getHost(), result -> {
         netClient.connect(properties.getPort(), properties.getHost(), result -> {
             if (result.succeeded()) {
             if (result.succeeded()) {
                 log.debug("connect tcp [{}:{}] success", properties.getHost(), properties.getPort());
                 log.debug("connect tcp [{}:{}] success", properties.getHost(), properties.getPort());
+                client.setRecordParser(payloadParserBuilder.build(properties.getParserType(), properties));
                 client.setSocket(result.result());
                 client.setSocket(result.result());
             } else {
             } else {
                 log.error("connect tcp [{}:{}] error", properties.getHost(), properties.getPort(),result.cause());
                 log.error("connect tcp [{}:{}] error", properties.getHost(), properties.getPort(),result.cause());

+ 2 - 1
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceTagEntity.java

@@ -3,6 +3,7 @@ package org.jetlinks.community.device.entity;
 
 
 import lombok.Getter;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.Setter;
+import org.apache.commons.codec.digest.DigestUtils;
 import org.hswebframework.ezorm.rdb.mapping.annotation.DefaultValue;
 import org.hswebframework.ezorm.rdb.mapping.annotation.DefaultValue;
 import org.hswebframework.web.api.crud.entity.GenericEntity;
 import org.hswebframework.web.api.crud.entity.GenericEntity;
 import org.hswebframework.web.crud.generator.Generators;
 import org.hswebframework.web.crud.generator.Generators;
@@ -59,6 +60,6 @@ public class DeviceTagEntity extends GenericEntity<String> {
     }
     }
 
 
     public static String createTagId(String deviceId,String key){
     public static String createTagId(String deviceId,String key){
-        return deviceId.concat(":").concat(key);
+        return DigestUtils.md5Hex(deviceId.concat(":").concat(key));
     }
     }
 }
 }

+ 10 - 1
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusChangeMeasurement.java

@@ -1,6 +1,7 @@
 package org.jetlinks.community.device.measurements.status;
 package org.jetlinks.community.device.measurements.status;
 
 
 import org.jetlinks.community.Interval;
 import org.jetlinks.community.Interval;
+import org.jetlinks.core.message.DeviceMessage;
 import org.jetlinks.core.message.MessageType;
 import org.jetlinks.core.message.MessageType;
 import org.jetlinks.core.metadata.ConfigMetadata;
 import org.jetlinks.core.metadata.ConfigMetadata;
 import org.jetlinks.core.metadata.DataType;
 import org.jetlinks.core.metadata.DataType;
@@ -25,6 +26,8 @@ import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.time.ZoneId;
 import java.util.Arrays;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
 
 
 class DeviceStatusChangeMeasurement extends StaticMeasurement {
 class DeviceStatusChangeMeasurement extends StaticMeasurement {
 
 
@@ -145,10 +148,16 @@ class DeviceStatusChangeMeasurement extends StaticMeasurement {
                             new Subscription("/device/*/" + deviceId + "/online"),
                             new Subscription("/device/*/" + deviceId + "/online"),
                             new Subscription("/device/*/" + deviceId + "/offline")), true)
                             new Subscription("/device/*/" + deviceId + "/offline")), true)
                         .flatMap(val -> Mono.justOrEmpty(DeviceMessageUtils.convert(val)))
                         .flatMap(val -> Mono.justOrEmpty(DeviceMessageUtils.convert(val)))
-                        .map(msg -> SimpleMeasurementValue.of(msg.getMessageType().name().toLowerCase(), msg.getTimestamp()))
+                        .map(msg -> SimpleMeasurementValue.of(createStateValue(msg), msg.getTimestamp()))
                         ;
                         ;
                 });
                 });
+        }
 
 
+        Map<String, Object> createStateValue(DeviceMessage message) {
+            Map<String, Object> val = new HashMap<>();
+            val.put("type", message.getMessageType().name().toLowerCase());
+            val.put("deviceId", message.getDeviceId());
+            return val;
         }
         }
     }
     }
 }
 }

+ 4 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/response/DeviceDetail.java

@@ -72,6 +72,9 @@ public class DeviceDetail {
     //设备配置信息
     //设备配置信息
     private Map<String, Object> configuration = new HashMap<>();
     private Map<String, Object> configuration = new HashMap<>();
 
 
+    //设备单独的配置信息
+    private boolean aloneConfiguration;
+
     //标签
     //标签
     private List<DeviceTagEntity> tags = new ArrayList<>();
     private List<DeviceTagEntity> tags = new ArrayList<>();
 
 
@@ -151,6 +154,7 @@ public class DeviceDetail {
 
 
         if (!CollectionUtils.isEmpty(device.getConfiguration())) {
         if (!CollectionUtils.isEmpty(device.getConfiguration())) {
             setConfiguration(device.getConfiguration());
             setConfiguration(device.getConfiguration());
+            setAloneConfiguration(true);
         }
         }
         if (StringUtils.hasText(device.getDeriveMetadata())) {
         if (StringUtils.hasText(device.getDeriveMetadata())) {
             setMetadata(device.getDeriveMetadata());
             setMetadata(device.getDeriveMetadata());

+ 37 - 4
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java

@@ -7,6 +7,7 @@ import com.alibaba.fastjson.JSON;
 import lombok.SneakyThrows;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
 import org.hswebframework.ezorm.core.dsl.Query;
 import org.hswebframework.ezorm.core.dsl.Query;
 import org.hswebframework.ezorm.core.param.QueryParam;
 import org.hswebframework.ezorm.core.param.QueryParam;
 import org.hswebframework.ezorm.core.param.TermType;
 import org.hswebframework.ezorm.core.param.TermType;
@@ -64,10 +65,7 @@ import java.io.OutputStream;
 import java.net.URLEncoder;
 import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.time.Duration;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.function.Function;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
 
 
@@ -101,6 +99,41 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
             .as(super::save);
             .as(super::save);
     }
     }
 
 
+
+    /**
+     * 重置设备配置
+     *
+     * @param deviceId 设备ID
+     * @return 重置后的配置
+     * @since 1.2
+     */
+    public Mono<Map<String, Object>> resetConfiguration(String deviceId) {
+        return findById(deviceId)
+            .flatMap(device ->
+                Mono.defer(() -> {
+                    if (!MapUtils.isEmpty(device.getConfiguration())) {
+                        //重置注册中心里的配置
+                        return registry.getDevice(deviceId)
+                            .flatMap(opts -> opts.removeConfigs(device.getConfiguration().keySet()))
+                            .then();
+                    }
+                    return Mono.empty();
+                }).then(
+                    //更新数据库
+                    createUpdate()
+                        .set(DeviceInstanceEntity::getConfiguration, new HashMap<>())
+                        .where(DeviceInstanceEntity::getId, deviceId)
+                        .execute()
+                ).then(
+                    //获取产品信息的配置
+                    deviceProductService
+                        .findById(device.getProductId())
+                        .flatMap(product -> Mono.justOrEmpty(product.getConfiguration()))
+                ))
+            .defaultIfEmpty(Collections.emptyMap())
+            ;
+    }
+
     /**
     /**
      * 发布设备到设备注册中心
      * 发布设备到设备注册中心
      *
      *

+ 7 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceInstanceController.java

@@ -99,6 +99,13 @@ public class DeviceInstanceController implements
         return service.getDeviceDetail(id);
         return service.getDeviceDetail(id);
     }
     }
 
 
+    //重置配置信息
+    @PutMapping("/{deviceId:.+}/configuration/_reset")
+    @SaveAction
+    public Mono<Map<String,Object>> resetConfiguration(@PathVariable String deviceId){
+        return service.resetConfiguration(deviceId);
+    }
+
     //获取设备运行状态
     //获取设备运行状态
     @GetMapping("/{id:.+}/state")
     @GetMapping("/{id:.+}/state")
     @QueryAction
     @QueryAction

+ 14 - 0
jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/DebugAuthenticationHandler.java

@@ -0,0 +1,14 @@
+package org.jetlinks.community.network.manager.debug;
+
+import org.hswebframework.web.authorization.exception.AccessDenyException;
+import org.jetlinks.community.gateway.external.SubscribeRequest;
+
+public class DebugAuthenticationHandler {
+
+    public static void handle(SubscribeRequest request){
+        if (!request.getAuthentication().hasPermission("network-config", "save")) {
+          throw new AccessDenyException();
+        }
+    }
+
+}

+ 22 - 0
jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/DebugUtils.java

@@ -0,0 +1,22 @@
+package org.jetlinks.community.network.manager.debug;
+
+import io.netty.buffer.ByteBufUtil;
+import org.springframework.util.StringUtils;
+
+public class DebugUtils {
+
+    static byte[] stringToBytes(String text){
+        byte[] payload;
+        if (StringUtils.isEmpty(text)) {
+            payload = new byte[0];
+        } else {
+            if (text.startsWith("0x")) {
+                payload = ByteBufUtil.decodeHexDump(text, 2, text.length()-2);
+            } else {
+                payload = text.getBytes();
+            }
+        }
+        return payload;
+    }
+
+}

+ 87 - 0
jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/MqttClientDebugSubscriptionProvider.java

@@ -0,0 +1,87 @@
+package org.jetlinks.community.network.manager.debug;
+
+import org.hswebframework.web.bean.FastBeanCopier;
+import org.jetlinks.community.gateway.external.Message;
+import org.jetlinks.community.gateway.external.SubscribeRequest;
+import org.jetlinks.community.gateway.external.SubscriptionProvider;
+import org.jetlinks.community.network.DefaultNetworkType;
+import org.jetlinks.community.network.NetworkManager;
+import org.jetlinks.community.network.manager.web.request.MqttMessageRequest;
+import org.jetlinks.community.network.manager.web.response.MqttMessageResponse;
+import org.jetlinks.community.network.mqtt.client.MqttClient;
+import org.jetlinks.rule.engine.executor.PayloadType;
+import org.jetlinks.supports.utils.MqttTopicUtils;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+
+import java.util.Arrays;
+import java.util.Map;
+
+@Component
+public class MqttClientDebugSubscriptionProvider implements SubscriptionProvider {
+
+    private final NetworkManager networkManager;
+
+    public MqttClientDebugSubscriptionProvider(NetworkManager networkManager) {
+        this.networkManager = networkManager;
+    }
+
+    @Override
+    public String id() {
+        return "network-client-mqtt-debug";
+    }
+
+    @Override
+    public String name() {
+        return "MQTT客户端调试";
+    }
+
+    @Override
+    public String[] getTopicPattern() {
+        return new String[]{
+            "/network/mqtt/client/*/_subscribe/*",
+            "/network/mqtt/client/*/_publish/*"
+        };
+    }
+
+    @Override
+    public Flux<Object> subscribe(SubscribeRequest request) {
+        DebugAuthenticationHandler.handle(request);
+        Map<String, String> vars = MqttTopicUtils.getPathVariables("/network/mqtt/client/{id}/{pubsub}/{type}", request.getTopic());
+
+        String clientId = vars.get("id");
+        String pubsub = vars.get("pubsub");
+        PayloadType type = PayloadType.valueOf(vars.get("type").toUpperCase());
+
+        return networkManager
+            .<MqttClient>getNetwork(DefaultNetworkType.MQTT_CLIENT, clientId)
+            .flatMapMany(mqtt ->
+                "_subscribe".equals(pubsub)
+                    ? mqttClientSubscribe(mqtt, type, request)
+                    : mqttClientPublish(mqtt, type, request))
+            ;
+    }
+
+    public Flux<Object> mqttClientSubscribe(MqttClient client,
+                                            PayloadType type,
+                                            SubscribeRequest request) {
+        String topics = request.getString("topics", "/#");
+
+        return client
+            .subscribe(Arrays.asList(topics.split("[\n]")))
+            .map(mqttMessage -> Message.success(request.getId(), request.getTopic(), MqttMessageResponse.of(mqttMessage, type)));
+
+    }
+
+    public Flux<String> mqttClientPublish(MqttClient client,
+                                          PayloadType type,
+                                          SubscribeRequest request) {
+        MqttMessageRequest messageRequest = FastBeanCopier.copy(request.getAll(), new MqttMessageRequest());
+
+        return client
+            .publish(MqttMessageRequest.of(messageRequest, type))
+            .thenReturn("推送成功")
+            .flux();
+
+    }
+}

+ 148 - 0
jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/MqttServerDebugSubscriptionProvider.java

@@ -0,0 +1,148 @@
+package org.jetlinks.community.network.manager.debug;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.jetlinks.community.gateway.external.SubscribeRequest;
+import org.jetlinks.community.gateway.external.SubscriptionProvider;
+import org.jetlinks.community.network.DefaultNetworkType;
+import org.jetlinks.community.network.NetworkManager;
+import org.jetlinks.community.network.manager.web.response.MqttMessageResponse;
+import org.jetlinks.community.network.mqtt.server.*;
+import org.jetlinks.rule.engine.executor.PayloadType;
+import org.jetlinks.supports.utils.MqttTopicUtils;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Component
+@Slf4j
+public class MqttServerDebugSubscriptionProvider implements SubscriptionProvider {
+
+    private final NetworkManager networkManager;
+
+    public MqttServerDebugSubscriptionProvider(NetworkManager networkManager) {
+        this.networkManager = networkManager;
+    }
+
+    @Override
+    public String id() {
+        return "network-server-mqtt-debug";
+    }
+
+    @Override
+    public String name() {
+        return "MQTT服务调试";
+    }
+
+    @Override
+    public String[] getTopicPattern() {
+        return new String[]{
+            "/network/mqtt/server/*/_subscribe/*"
+        };
+    }
+
+    @Override
+    public Flux<MqttClientMessage> subscribe(SubscribeRequest request) {
+        DebugAuthenticationHandler.handle(request);
+
+        Map<String, String> vars = MqttTopicUtils.getPathVariables("/network/mqtt/server/{id}/_subscribe/{type}", request.getTopic());
+
+        String clientId = vars.get("id");
+        PayloadType type = PayloadType.valueOf(vars.get("type").toUpperCase());
+
+        return Flux.create(sink ->
+            sink.onDispose(networkManager
+                .<MqttServer>getNetwork(DefaultNetworkType.MQTT_SERVER, clientId)
+                .flatMap(mqtt ->
+                    mqtt
+                        .handleConnection()
+                        .doOnNext(conn -> {
+                            sink.next(MqttClientMessage.of(conn.accept()));
+                            conn.onClose(disconnect -> sink.next(MqttClientMessage.ofDisconnect(disconnect)));
+                        })
+                        .flatMap(conn -> Flux.merge(
+                            conn.handleSubscribe(true).map(sub -> MqttClientMessage.of(conn, sub)),
+                            conn.handleUnSubscribe(true).map(sub -> MqttClientMessage.of(conn, sub)),
+                            conn.handleMessage().map(sub -> MqttClientMessage.of(conn, sub, type)))
+                        )
+                        .doOnNext(sink::next)
+                        .then()
+                )
+                .doOnError(sink::error)
+                .doOnSubscribe(sub -> log.debug("start mqtt server[{}] debug", clientId))
+                .doOnCancel(() -> log.debug("stop mqtt server[{}] debug", clientId))
+                .subscribe()
+            ));
+    }
+
+
+    @AllArgsConstructor(staticName = "of")
+    @Getter
+    @Setter
+    public static class MqttClientMessage {
+        private String type;
+
+        private String typeText;
+
+        private Object data;
+
+        public static MqttClientMessage of(MqttConnection connection) {
+            Map<String, Object> data = new HashMap<>();
+            data.put("clientId", connection.getClientId());
+            data.put("address", connection.getClientAddress().toString());
+            connection.getAuth().ifPresent(auth -> {
+                data.put("username", auth.getUsername());
+                data.put("password", auth.getPassword());
+            });
+            return MqttClientMessage.of("connection", "连接", data);
+        }
+
+        public static MqttClientMessage ofDisconnect(MqttConnection connection) {
+            Map<String, Object> data = new HashMap<>();
+            data.put("clientId", connection.getClientId());
+            data.put("address", connection.getClientAddress().toString());
+            connection.getAuth().ifPresent(auth -> {
+                data.put("username", auth.getUsername());
+                data.put("password", auth.getPassword());
+            });
+            return MqttClientMessage.of("disconnection", "断开连接", data);
+        }
+
+        public static MqttClientMessage of(MqttConnection connection, MqttSubscription subscription) {
+            Map<String, Object> data = new HashMap<>();
+            data.put("clientId", connection.getClientId());
+            data.put("address", connection.getClientAddress().toString());
+            data.put("topics", subscription
+                .getMessage()
+                .topicSubscriptions()
+                .stream()
+                .map(subs -> "QoS:" + subs.qualityOfService().value() + " Topic:" + subs.topicName())
+            );
+            return MqttClientMessage.of("subscription", "订阅", data);
+        }
+
+        public static MqttClientMessage of(MqttConnection connection, MqttUnSubscription subscription) {
+            Map<String, Object> data = new HashMap<>();
+            data.put("clientId", connection.getClientId());
+            data.put("address", connection.getClientAddress().toString());
+            data.put("topics", subscription
+                .getMessage()
+                .topics()
+            );
+            return MqttClientMessage.of("unsubscription", "取消订阅", data);
+        }
+
+        public static MqttClientMessage of(MqttConnection connection, MqttPublishing subscription, PayloadType type) {
+            Map<String, Object> data = new HashMap<>();
+            data.put("clientId", connection.getClientId());
+            data.put("address", connection.getClientAddress().toString());
+            data.put("message", MqttMessageResponse.of(subscription.getMessage(), type));
+            return MqttClientMessage.of("publish", "推送消息", data);
+        }
+
+    }
+}

+ 82 - 0
jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/TcpClientDebugSubscriptionProvider.java

@@ -0,0 +1,82 @@
+package org.jetlinks.community.network.manager.debug;
+
+import io.netty.buffer.Unpooled;
+import org.jetlinks.community.gateway.external.SubscribeRequest;
+import org.jetlinks.community.gateway.external.SubscriptionProvider;
+import org.jetlinks.community.network.DefaultNetworkType;
+import org.jetlinks.community.network.NetworkManager;
+import org.jetlinks.community.network.tcp.TcpMessage;
+import org.jetlinks.community.network.tcp.client.TcpClient;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
+import reactor.core.publisher.Flux;
+
+@Component
+public class TcpClientDebugSubscriptionProvider implements SubscriptionProvider {
+
+    private final NetworkManager networkManager;
+
+    public TcpClientDebugSubscriptionProvider(NetworkManager networkManager) {
+        this.networkManager = networkManager;
+    }
+
+    @Override
+    public String id() {
+        return "network-tcp-client-debug";
+    }
+
+    @Override
+    public String name() {
+        return "TCP客户端调试";
+    }
+
+    @Override
+    public String[] getTopicPattern() {
+        return new String[]{
+            "/network/tcp/client/*/_send",
+            "/network/tcp/client/*/_subscribe"
+        };
+    }
+
+    @Override
+    public Flux<String> subscribe(SubscribeRequest request) {
+        String id = request.getTopic().split("[/]")[4];
+        if (request.getTopic().endsWith("_send")) {
+            return send(id, request);
+        } else {
+            return subscribe(id, request);
+        }
+    }
+
+    public Flux<String> send(String id, SubscribeRequest request) {
+        String message = request.getString("request")
+            .orElseThrow(() -> new IllegalArgumentException("参数[request]不能为空"));
+
+        byte[] payload=DebugUtils.stringToBytes(message);
+
+        return networkManager
+            .<TcpClient>getNetwork(DefaultNetworkType.TCP_CLIENT, id)
+            .flatMap(client -> client.send(new TcpMessage(Unpooled.wrappedBuffer(payload))))
+            .thenReturn("推送成功")
+            .flux();
+    }
+
+    @SuppressWarnings("all")
+    public Flux<String> subscribe(String id, SubscribeRequest request) {
+        String message = request.getString("response").filter(StringUtils::hasText).orElse(null);
+
+        byte[] payload =DebugUtils.stringToBytes(message);
+
+        return networkManager
+            .<TcpClient>getNetwork(DefaultNetworkType.TCP_CLIENT, id)
+            .flatMapMany(client -> client
+                .subscribe()
+                .flatMap(msg -> client
+                    .send(new TcpMessage(Unpooled.wrappedBuffer(payload)))
+                    .thenReturn(msg))
+                .map(TcpMessage::toString)
+            );
+    }
+
+
+}

+ 128 - 0
jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/TcpServerDebugSubscriptionProvider.java

@@ -0,0 +1,128 @@
+package org.jetlinks.community.network.manager.debug;
+
+import io.netty.buffer.Unpooled;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import org.jetlinks.community.gateway.external.SubscribeRequest;
+import org.jetlinks.community.gateway.external.SubscriptionProvider;
+import org.jetlinks.community.network.DefaultNetworkType;
+import org.jetlinks.community.network.NetworkManager;
+import org.jetlinks.community.network.tcp.TcpMessage;
+import org.jetlinks.community.network.tcp.client.TcpClient;
+import org.jetlinks.community.network.tcp.server.TcpServer;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Component
+public class TcpServerDebugSubscriptionProvider implements SubscriptionProvider {
+
+    private final NetworkManager networkManager;
+
+    public TcpServerDebugSubscriptionProvider(NetworkManager networkManager) {
+        this.networkManager = networkManager;
+    }
+
+    @Override
+    public String id() {
+        return "network-tcp-server-debug";
+    }
+
+    @Override
+    public String name() {
+        return "TCP服务调试";
+    }
+
+    @Override
+    public String[] getTopicPattern() {
+        return new String[]{
+            "/network/tcp/server/*/_subscribe"
+        };
+    }
+
+    @Override
+    public Flux<TcpClientMessage> subscribe(SubscribeRequest request) {
+        String id = request.getTopic().split("[/]")[4];
+
+        return subscribe(id, request);
+    }
+
+    @SuppressWarnings("all")
+    public Flux<TcpClientMessage> subscribe(String id, SubscribeRequest request) {
+
+        String message = request.getString("response").filter(StringUtils::hasText).orElse(null);
+
+        byte[] payload = DebugUtils.stringToBytes(message);
+
+        return Flux.create(sink ->
+            sink.onDispose(networkManager
+                .<TcpServer>getNetwork(DefaultNetworkType.TCP_SERVER, id)
+                .flatMap(server ->
+                    server
+                        .handleConnection()
+                        .doOnNext(client -> sink.next(TcpClientMessage.of(client)))
+                        .flatMap(client -> {
+                            client.onDisconnect(() -> {
+                                sink.next(TcpClientMessage.ofDisconnect(client));
+                            });
+                            return client
+                                .subscribe()
+                                .map(msg -> TcpClientMessage.of(client, msg))
+                                .doOnNext(sink::next)
+                                .flatMap(msg -> {
+                                    if (payload.length > 0) {
+                                        return client.send(new TcpMessage(Unpooled.wrappedBuffer(payload)));
+                                    }
+                                    return Mono.empty();
+                                })
+                                .then();
+                        })
+                        .then()
+                )
+                .doOnError(sink::error)
+                .subscriberContext(sink.currentContext())
+                .subscribe()
+            ));
+    }
+
+
+    @AllArgsConstructor(staticName = "of")
+    @Getter
+    @Setter
+    public static class TcpClientMessage {
+        private String type;
+
+        private String typeText;
+
+        private Object data;
+
+        public static TcpClientMessage of(TcpClient client) {
+            Map<String, Object> data = new HashMap<>();
+            data.put("address", client.getRemoteAddress());
+
+            return TcpClientMessage.of("connection", "连接", data);
+        }
+
+        public static TcpClientMessage ofDisconnect(TcpClient client) {
+            Map<String, Object> data = new HashMap<>();
+            data.put("address", client.getRemoteAddress());
+
+            return TcpClientMessage.of("disconnection", "断开连接", data);
+        }
+
+        public static TcpClientMessage of(TcpClient connection, TcpMessage message) {
+            Map<String, Object> data = new HashMap<>();
+            data.put("address", connection.getRemoteAddress().toString());
+            data.put("message", message.toString());
+
+            return TcpClientMessage.of("publish", "订阅", data);
+        }
+
+
+    }
+}

+ 2 - 3
jetlinks-manager/visualization-manager/src/main/java/org/jetlinks/pro/visualization/entity/DataVisualizationEntity.java

@@ -1,4 +1,4 @@
-package org.jetlinks.pro.visualization.entity;
+package org.jetlinks.community.visualization.entity;
 
 
 import lombok.Data;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.EqualsAndHashCode;
@@ -6,8 +6,7 @@ import org.hswebframework.ezorm.rdb.mapping.annotation.ColumnType;
 import org.hswebframework.ezorm.rdb.mapping.annotation.DefaultValue;
 import org.hswebframework.ezorm.rdb.mapping.annotation.DefaultValue;
 import org.hswebframework.ezorm.rdb.mapping.annotation.EnumCodec;
 import org.hswebframework.ezorm.rdb.mapping.annotation.EnumCodec;
 import org.hswebframework.web.api.crud.entity.GenericEntity;
 import org.hswebframework.web.api.crud.entity.GenericEntity;
-import org.jetlinks.pro.visualization.enums.DataVisualizationState;
-import org.springframework.stereotype.Component;
+import org.jetlinks.community.visualization.enums.DataVisualizationState;
 
 
 import javax.persistence.Column;
 import javax.persistence.Column;
 import javax.persistence.Index;
 import javax.persistence.Index;

+ 1 - 1
jetlinks-manager/visualization-manager/src/main/java/org/jetlinks/pro/visualization/enums/DataVisualizationState.java

@@ -1,4 +1,4 @@
-package org.jetlinks.pro.visualization.enums;
+package org.jetlinks.community.visualization.enums;
 
 
 import lombok.AllArgsConstructor;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.Getter;

+ 2 - 2
jetlinks-manager/visualization-manager/src/main/java/org/jetlinks/pro/visualization/service/DataVisualizationService.java

@@ -1,8 +1,8 @@
-package org.jetlinks.pro.visualization.service;
+package org.jetlinks.community.visualization.service;
 
 
 import org.hswebframework.ezorm.rdb.mapping.defaults.SaveResult;
 import org.hswebframework.ezorm.rdb.mapping.defaults.SaveResult;
 import org.hswebframework.web.crud.service.GenericReactiveCrudService;
 import org.hswebframework.web.crud.service.GenericReactiveCrudService;
-import org.jetlinks.pro.visualization.entity.DataVisualizationEntity;
+import org.jetlinks.community.visualization.entity.DataVisualizationEntity;
 import org.reactivestreams.Publisher;
 import org.reactivestreams.Publisher;
 import org.springframework.stereotype.Service;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Flux;

+ 3 - 3
jetlinks-manager/visualization-manager/src/main/java/org/jetlinks/pro/visualization/web/DataVisualizationController.java

@@ -1,10 +1,10 @@
-package org.jetlinks.pro.visualization.web;
+package org.jetlinks.community.visualization.web;
 
 
 import org.hswebframework.web.authorization.annotation.QueryAction;
 import org.hswebframework.web.authorization.annotation.QueryAction;
 import org.hswebframework.web.authorization.annotation.Resource;
 import org.hswebframework.web.authorization.annotation.Resource;
 import org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController;
 import org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController;
-import org.jetlinks.pro.visualization.entity.DataVisualizationEntity;
-import org.jetlinks.pro.visualization.service.DataVisualizationService;
+import org.jetlinks.community.visualization.entity.DataVisualizationEntity;
+import org.jetlinks.community.visualization.service.DataVisualizationService;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMapping;

+ 1 - 4
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/JetLinksApplication.java

@@ -13,7 +13,6 @@ import org.springframework.cache.annotation.EnableCaching;
 import org.springframework.context.annotation.Profile;
 import org.springframework.context.annotation.Profile;
 import org.springframework.context.event.EventListener;
 import org.springframework.context.event.EventListener;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
-import reactor.tools.agent.ReactorDebugAgent;
 
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PostConstruct;
 
 
@@ -25,12 +24,10 @@ import javax.annotation.PostConstruct;
 @EnableEasyormRepository("org.jetlinks.community.**.entity")
 @EnableEasyormRepository("org.jetlinks.community.**.entity")
 @EnableAopAuthorize
 @EnableAopAuthorize
 @EnableAccessLogger
 @EnableAccessLogger
+@Slf4j
 public class JetLinksApplication {
 public class JetLinksApplication {
 
 
     public static void main(String[] args) {
     public static void main(String[] args) {
-        if (!Boolean.getBoolean("reactor.debug.agent.disabled")) {
-            ReactorDebugAgent.init();
-        }
         SpringApplication.run(JetLinksApplication.class, args);
         SpringApplication.run(JetLinksApplication.class, args);
     }
     }
 
 

+ 1 - 0
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/JetLinksConfiguration.java

@@ -224,6 +224,7 @@ public class JetLinksConfiguration {
     }
     }
 
 
     @Bean(initMethod = "init")
     @Bean(initMethod = "init")
+    @ConditionalOnProperty(prefix = "jetlinks.protocol.spi", name = "enabled", havingValue = "true")
     public ServiceLoaderProtocolSupports serviceLoaderProtocolSupports(ServiceContext serviceContext) {
     public ServiceLoaderProtocolSupports serviceLoaderProtocolSupports(ServiceContext serviceContext) {
         ServiceLoaderProtocolSupports supports = new ServiceLoaderProtocolSupports();
         ServiceLoaderProtocolSupports supports = new ServiceLoaderProtocolSupports();
         supports.setServiceContext(serviceContext);
         supports.setServiceContext(serviceContext);

+ 3 - 1
jetlinks-standalone/src/main/resources/application.yml

@@ -91,7 +91,9 @@ jetlinks:
     system:
     system:
       context:
       context:
         server: ${spring.application.name}
         server: ${spring.application.name}
-
+  protocol:
+    spi:
+      enabled: true # 为true时开启自动加载通过依赖引入的协议包
 logging:
 logging:
   level:
   level:
     org.jetlinks: debug
     org.jetlinks: debug

+ 1 - 1
pom.xml

@@ -21,7 +21,7 @@
         <project.build.jdk>${java.version}</project.build.jdk>
         <project.build.jdk>${java.version}</project.build.jdk>
         <hsweb.framework.version>4.0.1</hsweb.framework.version>
         <hsweb.framework.version>4.0.1</hsweb.framework.version>
         <hsweb.expands.version>3.0.2</hsweb.expands.version>
         <hsweb.expands.version>3.0.2</hsweb.expands.version>
-        <jetlinks.version>1.0.2</jetlinks.version>
+        <jetlinks.version>1.0.3-SNAPSHOT</jetlinks.version>
         <r2dbc.version>Arabba-RELEASE</r2dbc.version>
         <r2dbc.version>Arabba-RELEASE</r2dbc.version>
         <vertx.version>3.8.5</vertx.version>
         <vertx.version>3.8.5</vertx.version>
         <netty.version>4.1.46.Final</netty.version>
         <netty.version>4.1.46.Final</netty.version>