浏览代码

Merge remote-tracking branch 'origin/master'

zhouhao 5 年之前
父节点
当前提交
e425e5290a
共有 20 个文件被更改,包括 241 次插入129 次删除
  1. 3 2
      docker/run-all/docker-compose.yml
  2. 2 1
      jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/configuration/ElasticSearchConfiguration.java
  3. 26 0
      jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/ElasticSearchIndexProperties.java
  4. 22 0
      jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/AbstractElasticSearchIndexStrategy.java
  5. 3 2
      jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/DirectElasticSearchIndexStrategy.java
  6. 12 3
      jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/TemplateElasticSearchIndexStrategy.java
  7. 3 2
      jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/TimeByMonthElasticSearchIndexStrategy.java
  8. 9 6
      jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java
  9. 1 1
      jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttServer.java
  10. 28 18
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java
  11. 3 0
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/TcpServerProperties.java
  12. 19 13
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/TcpServerProvider.java
  13. 18 11
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/VertxTcpServer.java
  14. 3 2
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/response/DeviceDetail.java
  15. 19 3
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java
  16. 58 54
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceInstanceController.java
  17. 1 1
      jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/JetLinksConfiguration.java
  18. 9 6
      jetlinks-standalone/src/main/resources/application.yml
  19. 1 3
      jetlinks-standalone/src/main/resources/logback-spring.xml
  20. 1 1
      simulator/start.sh

+ 3 - 2
docker/run-all/docker-compose.yml

@@ -41,6 +41,8 @@ services:
     container_name: jetlinks-ce-postgres
     container_name: jetlinks-ce-postgres
     volumes:
     volumes:
       - "postgres-volume:/var/lib/postgresql/data"
       - "postgres-volume:/var/lib/postgresql/data"
+    ports:
+      - "5432:5432"
     environment:
     environment:
       POSTGRES_PASSWORD: jetlinks
       POSTGRES_PASSWORD: jetlinks
       POSTGRES_DB: jetlinks
       POSTGRES_DB: jetlinks
@@ -68,14 +70,13 @@ services:
     volumes:
     volumes:
       - "jetlinks-volume:/static/upload"  # 持久化上传的文件
       - "jetlinks-volume:/static/upload"  # 持久化上传的文件
     environment:
     environment:
-      #      - "JAVA_OPTS=-Xms4g -Xmx10g -XX:+UseG1GC"
+     # - "JAVA_OPTS=-Xms4g -Xmx18g -XX:+UseG1GC"
       - "hsweb.file.upload.static-location=http://127.0.0.1:8848/upload"  #上传的静态文件访问根地址,为ui的地址.
       - "hsweb.file.upload.static-location=http://127.0.0.1:8848/upload"  #上传的静态文件访问根地址,为ui的地址.
       - "spring.r2dbc.url=r2dbc:postgresql://postgres:5432/jetlinks" #数据库连接地址
       - "spring.r2dbc.url=r2dbc:postgresql://postgres:5432/jetlinks" #数据库连接地址
       - "spring.r2dbc.username=postgres"
       - "spring.r2dbc.username=postgres"
       - "spring.r2dbc.password=jetlinks"
       - "spring.r2dbc.password=jetlinks"
       - "elasticsearch.client.host=elasticsearch"
       - "elasticsearch.client.host=elasticsearch"
       - "elasticsearch.client.post=9200"
       - "elasticsearch.client.post=9200"
-      - "device.message.writer.elastic.enabled=false"
       - "spring.redis.host=redis"
       - "spring.redis.host=redis"
       - "spring.redis.port=6379"
       - "spring.redis.port=6379"
       - "logging.level.io.r2dbc=warn"
       - "logging.level.io.r2dbc=warn"

+ 2 - 1
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/configuration/ElasticSearchConfiguration.java

@@ -4,6 +4,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.client.RestHighLevelClient;
 import org.jetlinks.community.elastic.search.ElasticRestClient;
 import org.jetlinks.community.elastic.search.ElasticRestClient;
+import org.jetlinks.community.elastic.search.index.ElasticSearchIndexProperties;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Bean;
@@ -15,7 +16,7 @@ import org.springframework.context.annotation.Configuration;
  **/
  **/
 @Configuration
 @Configuration
 @Slf4j
 @Slf4j
-@EnableConfigurationProperties(ElasticSearchProperties.class)
+@EnableConfigurationProperties({ElasticSearchProperties.class, ElasticSearchIndexProperties.class})
 public class ElasticSearchConfiguration {
 public class ElasticSearchConfiguration {
 
 
     @Autowired
     @Autowired

+ 26 - 0
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/ElasticSearchIndexProperties.java

@@ -0,0 +1,26 @@
+package org.jetlinks.community.elastic.search.index;
+
+import lombok.*;
+import org.elasticsearch.common.settings.Settings;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+@Getter
+@Setter
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ConfigurationProperties(prefix = "elasticsearch.index.settings")
+public class ElasticSearchIndexProperties {
+
+    private int numberOfShards = 1;
+
+    private int numberOfReplicas = 0;
+
+    public Settings toSettings() {
+
+        return Settings.builder()
+            .put("number_of_shards", Math.min(1, numberOfShards))
+            .put("number_of_replicas", numberOfReplicas)
+            .build();
+    }
+}

+ 22 - 0
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/AbstractElasticSearchIndexStrategy.java

@@ -7,6 +7,7 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.indices.*;
 import org.elasticsearch.client.indices.*;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
+import org.jetlinks.community.elastic.search.index.ElasticSearchIndexProperties;
 import org.jetlinks.core.metadata.DataType;
 import org.jetlinks.core.metadata.DataType;
 import org.jetlinks.core.metadata.PropertyMetadata;
 import org.jetlinks.core.metadata.PropertyMetadata;
 import org.jetlinks.core.metadata.SimplePropertyMetadata;
 import org.jetlinks.core.metadata.SimplePropertyMetadata;
@@ -31,6 +32,8 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc
 
 
     protected ElasticRestClient client;
     protected ElasticRestClient client;
 
 
+    protected ElasticSearchIndexProperties properties;
+
     protected String wrapIndex(String index) {
     protected String wrapIndex(String index) {
         return index.toLowerCase();
         return index.toLowerCase();
     }
     }
@@ -185,4 +188,23 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc
         }
         }
         return metadata;
         return metadata;
     }
     }
+
+    protected List<?> createDynamicTemplates() {
+        List<Map<String, Object>> maps = new ArrayList<>();
+        {
+            Map<String, Object> config = new HashMap<>();
+            config.put("match_mapping_type", "string");
+            config.put("mapping", createElasticProperty(new StringType()));
+            maps.add(Collections.singletonMap("string_fields", config));
+        }
+        {
+            Map<String, Object> config = new HashMap<>();
+            config.put("match_mapping_type", "date");
+            config.put("mapping", createElasticProperty(new DateTimeType()));
+            maps.add(Collections.singletonMap("date_fields", config));
+        }
+
+        return maps;
+    }
+
 }
 }

+ 3 - 2
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/DirectElasticSearchIndexStrategy.java

@@ -2,14 +2,15 @@ package org.jetlinks.community.elastic.search.index.strategies;
 
 
 import org.jetlinks.community.elastic.search.ElasticRestClient;
 import org.jetlinks.community.elastic.search.ElasticRestClient;
 import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata;
 import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata;
+import org.jetlinks.community.elastic.search.index.ElasticSearchIndexProperties;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
 
 
 @Component
 @Component
 public class DirectElasticSearchIndexStrategy extends AbstractElasticSearchIndexStrategy {
 public class DirectElasticSearchIndexStrategy extends AbstractElasticSearchIndexStrategy {
 
 
-    public DirectElasticSearchIndexStrategy(ElasticRestClient client) {
-        super("direct", client);
+    public DirectElasticSearchIndexStrategy(ElasticRestClient client, ElasticSearchIndexProperties properties) {
+        super("direct", client,properties);
     }
     }
 
 
     @Override
     @Override

+ 12 - 3
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/TemplateElasticSearchIndexStrategy.java

@@ -8,16 +8,19 @@ import org.elasticsearch.client.indices.GetIndexTemplatesResponse;
 import org.elasticsearch.client.indices.PutIndexTemplateRequest;
 import org.elasticsearch.client.indices.PutIndexTemplateRequest;
 import org.jetlinks.community.elastic.search.ElasticRestClient;
 import org.jetlinks.community.elastic.search.ElasticRestClient;
 import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata;
 import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata;
+import org.jetlinks.community.elastic.search.index.ElasticSearchIndexProperties;
 import org.jetlinks.community.elastic.search.utils.ReactorActionListener;
 import org.jetlinks.community.elastic.search.utils.ReactorActionListener;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
 
 
 import java.util.Collections;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.List;
+import java.util.Map;
 
 
 public abstract class TemplateElasticSearchIndexStrategy extends AbstractElasticSearchIndexStrategy {
 public abstract class TemplateElasticSearchIndexStrategy extends AbstractElasticSearchIndexStrategy {
 
 
-    public TemplateElasticSearchIndexStrategy(String id, ElasticRestClient client) {
-        super(id, client);
+    public TemplateElasticSearchIndexStrategy(String id, ElasticRestClient client, ElasticSearchIndexProperties properties) {
+        super(id, client,properties);
     }
     }
 
 
     protected String getTemplate(String index) {
     protected String getTemplate(String index) {
@@ -50,15 +53,21 @@ public abstract class TemplateElasticSearchIndexStrategy extends AbstractElastic
             .then(doPutIndex(metadata.newIndexName(getIndexForSave(metadata.getIndex())), true));
             .then(doPutIndex(metadata.newIndexName(getIndexForSave(metadata.getIndex())), true));
     }
     }
 
 
+
     protected PutIndexTemplateRequest createIndexTemplateRequest(ElasticSearchIndexMetadata metadata) {
     protected PutIndexTemplateRequest createIndexTemplateRequest(ElasticSearchIndexMetadata metadata) {
         String index = wrapIndex(metadata.getIndex());
         String index = wrapIndex(metadata.getIndex());
         PutIndexTemplateRequest request = new PutIndexTemplateRequest(getTemplate(index));
         PutIndexTemplateRequest request = new PutIndexTemplateRequest(getTemplate(index));
         request.alias(new Alias(getAlias(index)));
         request.alias(new Alias(getAlias(index)));
-        request.mapping(Collections.singletonMap("properties", createElasticProperties(metadata.getProperties())));
+        request.settings(properties.toSettings());
+        Map<String, Object> mappingConfig = new HashMap<>();
+        mappingConfig.put("properties", createElasticProperties(metadata.getProperties()));
+        mappingConfig.put("dynamic_templates", createDynamicTemplates());
+        request.mapping(mappingConfig);
         request.patterns(getIndexPatterns(index));
         request.patterns(getIndexPatterns(index));
         return request;
         return request;
     }
     }
 
 
+
     @Override
     @Override
     public Mono<ElasticSearchIndexMetadata> loadIndexMetadata(String index) {
     public Mono<ElasticSearchIndexMetadata> loadIndexMetadata(String index) {
 
 

+ 3 - 2
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/TimeByMonthElasticSearchIndexStrategy.java

@@ -2,6 +2,7 @@ package org.jetlinks.community.elastic.search.index.strategies;
 
 
 import org.hswebframework.utils.time.DateFormatter;
 import org.hswebframework.utils.time.DateFormatter;
 import org.jetlinks.community.elastic.search.ElasticRestClient;
 import org.jetlinks.community.elastic.search.ElasticRestClient;
+import org.jetlinks.community.elastic.search.index.ElasticSearchIndexProperties;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
 
 
 import java.util.Date;
 import java.util.Date;
@@ -17,8 +18,8 @@ public class TimeByMonthElasticSearchIndexStrategy extends TemplateElasticSearch
 
 
     private final String format = "yyyy-MM";
     private final String format = "yyyy-MM";
 
 
-    public TimeByMonthElasticSearchIndexStrategy(ElasticRestClient client) {
-        super("time-by-month", client);
+    public TimeByMonthElasticSearchIndexStrategy(ElasticRestClient client, ElasticSearchIndexProperties properties) {
+        super("time-by-month", client,properties);
     }
     }
 
 
     @Override
     @Override

+ 9 - 6
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java

@@ -32,6 +32,7 @@ import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
 import reactor.util.function.Tuples;
 import reactor.util.function.Tuples;
 
 
+import javax.annotation.Nonnull;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.Function;
 import java.util.function.Function;
@@ -120,11 +121,11 @@ class MqttServerDeviceGateway implements DeviceGateway , MonitorSupportDeviceGat
                     con.reject(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
                     con.reject(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
                     gatewayMonitor.rejected();
                     gatewayMonitor.rejected();
                 }))
                 }))
-                .onErrorContinue((err, res) -> {
+                .onErrorResume((err) -> Mono.fromRunnable(() -> {
                     gatewayMonitor.rejected();
                     gatewayMonitor.rejected();
                     con.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
                     con.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
                     log.error("MQTT连接认证[{}]失败", con.getClientId(), err);
                     log.error("MQTT连接认证[{}]失败", con.getClientId(), err);
-                }))
+                })))
             .flatMap(tuple3 -> {
             .flatMap(tuple3 -> {
                 counter.increment();
                 counter.increment();
                 DeviceOperator device = tuple3.getT1();
                 DeviceOperator device = tuple3.getT1();
@@ -155,7 +156,7 @@ class MqttServerDeviceGateway implements DeviceGateway , MonitorSupportDeviceGat
                 }
                 }
                 return Mono.empty();
                 return Mono.empty();
             })
             })
-            .onErrorContinue((err, res) -> log.error("处理MQTT连接失败", err))
+            .onErrorResume((err) -> Mono.fromRunnable(() -> log.error("处理MQTT连接失败", err)))
             .subscribe(tp -> tp.getT1()
             .subscribe(tp -> tp.getT1()
                 .handleMessage()
                 .handleMessage()
                 .filter(pb -> started.get())
                 .filter(pb -> started.get())
@@ -171,6 +172,7 @@ class MqttServerDeviceGateway implements DeviceGateway , MonitorSupportDeviceGat
                         }
                         }
 
 
                         @Override
                         @Override
+                        @Nonnull
                         public EncodedMessage getMessage() {
                         public EncodedMessage getMessage() {
                             return publishing.getMessage();
                             return publishing.getMessage();
                         }
                         }
@@ -181,9 +183,10 @@ class MqttServerDeviceGateway implements DeviceGateway , MonitorSupportDeviceGat
                         }
                         }
                         return messageHandler.handleMessage(tp.getT2(), msg);
                         return messageHandler.handleMessage(tp.getT2(), msg);
                     })
                     })
-                    .onErrorContinue((err, res) -> log.error("处理MQTT连接[{}]消息失败:{}", tp.getT2().getDeviceId(), publishing.getMessage(), err)))
-                .subscribe()
-            );
+                    .onErrorResume((err) ->
+                        Mono.fromRunnable(() -> log.error("处理MQTT连接[{}]消息失败:{}", tp.getT2().getDeviceId(), publishing.getMessage(), err))
+                    ))
+                .subscribe());
 
 
     }
     }
 
 

+ 1 - 1
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttServer.java

@@ -18,7 +18,7 @@ public class VertxMqttServer implements MqttServer {
 
 
     private EmitterProcessor<MqttConnection> connectionProcessor = EmitterProcessor.create(false);
     private EmitterProcessor<MqttConnection> connectionProcessor = EmitterProcessor.create(false);
 
 
-    FluxSink<MqttConnection> sink = connectionProcessor.sink();
+    FluxSink<MqttConnection> sink = connectionProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
 
 
     private Collection<io.vertx.mqtt.MqttServer> mqttServer;
     private Collection<io.vertx.mqtt.MqttServer> mqttServer;
 
 

+ 28 - 18
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java

@@ -30,6 +30,7 @@ import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
 
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nonnull;
+import java.net.InetSocketAddress;
 import java.time.Duration;
 import java.time.Duration;
 import java.util.List;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -113,6 +114,7 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
         disposable.add(tcpServer
         disposable.add(tcpServer
             .handleConnection()
             .handleConnection()
             .flatMap(client -> {
             .flatMap(client -> {
+                InetSocketAddress clientAddr = client.getRemoteAddress();
                 counter.increment();
                 counter.increment();
                 gatewayMonitor.totalConnection(counter.intValue());
                 gatewayMonitor.totalConnection(counter.intValue());
                 client.onDisconnect(() -> {
                 client.onDisconnect(() -> {
@@ -121,7 +123,7 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
                     gatewayMonitor.totalConnection(counter.sum());
                     gatewayMonitor.totalConnection(counter.sum());
                 });
                 });
                 AtomicReference<Duration> keepaliveTimeout = new AtomicReference<>();
                 AtomicReference<Duration> keepaliveTimeout = new AtomicReference<>();
-                DeviceSession session = sessionManager.getSession(client.getId());
+                AtomicReference<DeviceSession> sessionRef = new AtomicReference<>(sessionManager.getSession(client.getId()));
                 return client.subscribe()
                 return client.subscribe()
                     .filter(r -> started.get())
                     .filter(r -> started.get())
                     .doOnNext(r -> gatewayMonitor.receivedMessage())
                     .doOnNext(r -> gatewayMonitor.receivedMessage())
@@ -137,7 +139,7 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
                             @Override
                             @Override
                             public DeviceSession getSession() {
                             public DeviceSession getSession() {
                                 //session还未注册
                                 //session还未注册
-                                if (session == null) {
+                                if (sessionRef.get() == null) {
                                     return new UnknownTcpDeviceSession(client.getId(), client, getTransport()) {
                                     return new UnknownTcpDeviceSession(client.getId(), client, getTransport()) {
                                         @Override
                                         @Override
                                         public Mono<Boolean> send(EncodedMessage encodedMessage) {
                                         public Mono<Boolean> send(EncodedMessage encodedMessage) {
@@ -150,7 +152,7 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
                                         }
                                         }
                                     };
                                     };
                                 }
                                 }
-                                return session;
+                                return sessionRef.get();
                             }
                             }
 
 
                             @Override
                             @Override
@@ -160,7 +162,7 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
                         }))
                         }))
                         .switchIfEmpty(Mono.fromRunnable(() ->
                         .switchIfEmpty(Mono.fromRunnable(() ->
                             log.warn("无法识别的TCP客户端[{}]消息:[{}]",
                             log.warn("无法识别的TCP客户端[{}]消息:[{}]",
-                                client.getRemoteAddress(),
+                                clientAddr,
                                 ByteBufUtil.hexDump(tcpMessage.getPayload())
                                 ByteBufUtil.hexDump(tcpMessage.getPayload())
                             )))
                             )))
                         .cast(DeviceMessage.class)
                         .cast(DeviceMessage.class)
@@ -169,18 +171,18 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
                             .switchIfEmpty(Mono.fromRunnable(() -> {
                             .switchIfEmpty(Mono.fromRunnable(() -> {
                                 log.warn("设备[{}]未注册,TCP[{}]消息:[{}],设备消息:{}",
                                 log.warn("设备[{}]未注册,TCP[{}]消息:[{}],设备消息:{}",
                                     message.getDeviceId(),
                                     message.getDeviceId(),
-                                    client.getRemoteAddress(),
+                                    clientAddr,
                                     ByteBufUtil.hexDump(tcpMessage.getPayload()),
                                     ByteBufUtil.hexDump(tcpMessage.getPayload()),
                                     message
                                     message
                                 );
                                 );
                             }))
                             }))
                             .flatMap(device -> {
                             .flatMap(device -> {
+                                DeviceSession fSession = sessionRef.get() == null ?
+                                    sessionManager.getSession(device.getDeviceId()) :
+                                    sessionRef.get();
+
                                 //处理设备上线消息
                                 //处理设备上线消息
                                 if (message instanceof DeviceOnlineMessage) {
                                 if (message instanceof DeviceOnlineMessage) {
-                                    DeviceSession fSession = session == null ?
-                                        sessionManager.getSession(device.getDeviceId()) :
-                                        session;
-
                                     if (fSession == null) {
                                     if (fSession == null) {
                                         fSession = new TcpDeviceSession(client.getId(), device, client, getTransport()) {
                                         fSession = new TcpDeviceSession(client.getId(), device, client, getTransport()) {
                                             @Override
                                             @Override
@@ -192,34 +194,42 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
                                         if (message.getHeader(Headers.keepOnline).orElse(false)) {
                                         if (message.getHeader(Headers.keepOnline).orElse(false)) {
                                             fSession = new KeepOnlineSession(fSession, Duration.ofMillis(-1));
                                             fSession = new KeepOnlineSession(fSession, Duration.ofMillis(-1));
                                         } else {
                                         } else {
-                                            client.onDisconnect(() -> sessionManager.unregister(client.getId()));
+                                            client.onDisconnect(() -> sessionManager.unregister(device.getDeviceId()));
                                         }
                                         }
+                                        sessionRef.set(fSession);
                                         sessionManager.register(fSession);
                                         sessionManager.register(fSession);
                                     }
                                     }
+                                    fSession.keepAlive();
                                     if (keepaliveTimeout.get() != null) {
                                     if (keepaliveTimeout.get() != null) {
                                         fSession.setKeepAliveTimeout(keepaliveTimeout.get());
                                         fSession.setKeepAliveTimeout(keepaliveTimeout.get());
                                     }
                                     }
                                     return Mono.empty();
                                     return Mono.empty();
                                 }
                                 }
+                                if (fSession != null) {
+                                    fSession.keepAlive();
+                                }
                                 //设备下线
                                 //设备下线
                                 if (message instanceof DeviceOfflineMessage) {
                                 if (message instanceof DeviceOfflineMessage) {
                                     sessionManager.unregister(device.getDeviceId());
                                     sessionManager.unregister(device.getDeviceId());
                                     return Mono.empty();
                                     return Mono.empty();
                                 }
                                 }
-                                message.addHeaderIfAbsent(Headers.clientAddress, String.valueOf(client.getRemoteAddress()));
+                                message.addHeaderIfAbsent(Headers.clientAddress, String.valueOf(clientAddr));
 
 
                                 if (processor.hasDownstreams()) {
                                 if (processor.hasDownstreams()) {
                                     sink.next(message);
                                     sink.next(message);
                                 }
                                 }
                                 return clientMessageHandler.handleMessage(device, message);
                                 return clientMessageHandler.handleMessage(device, message);
                             }))
                             }))
-                        .onErrorContinue((err, o) ->
-                            log.error("处理TCP[{}]消息[{}]失败",
-                                client.getRemoteAddress(),
-                                ByteBufUtil.hexDump(tcpMessage.getPayload())
-                                , err))
-                    );
-            }).subscribe());
+                        .onErrorResume((err) -> {
+                                log.error("处理TCP[{}]消息[{}]失败",
+                                    clientAddr,
+                                    ByteBufUtil.hexDump(tcpMessage.getPayload())
+                                    , err);
+                                return Mono.empty();
+                            }
+                        ));
+            })
+            .subscribe());
     }
     }
 
 
     @Override
     @Override

+ 3 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/TcpServerProperties.java

@@ -40,6 +40,9 @@ public class TcpServerProperties implements ValueObject {
 
 
     private boolean ssl;
     private boolean ssl;
 
 
+    //服务实例数量(线程数)
+    private int instance = Runtime.getRuntime().availableProcessors();
+
     private String certId;
     private String certId;
 
 
     public SocketAddress createSocketAddress() {
     public SocketAddress createSocketAddress() {

+ 19 - 13
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/TcpServerProvider.java

@@ -17,6 +17,8 @@ import reactor.core.publisher.Mono;
 import javax.annotation.Nonnull;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.Nullable;
 import java.time.Duration;
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
 
 
 @Component
 @Component
 @Slf4j
 @Slf4j
@@ -51,20 +53,24 @@ public class TcpServerProvider implements NetworkProvider<TcpServerProperties> {
     }
     }
 
 
     private void initTcpServer(VertxTcpServer tcpServer, TcpServerProperties properties) {
     private void initTcpServer(VertxTcpServer tcpServer, TcpServerProperties properties) {
-        NetServer netServer = vertx.createNetServer(properties.getOptions());
-
+        int instance = Math.max(2, properties.getInstance());
+        List<NetServer> instances = new ArrayList<>(instance);
+        for (int i = 0; i < instance; i++) {
+            instances.add(vertx.createNetServer(properties.getOptions()));
+        }
         payloadParserBuilder.build(properties.getParserType(), properties);
         payloadParserBuilder.build(properties.getParserType(), properties);
-
-        tcpServer.setParserSupplier(() -> payloadParserBuilder.build(properties.getParserType(),properties));
-        tcpServer.setServer(netServer);
-        tcpServer.setKeepAliveTimeout(properties.getLong("keepAliveTimeout").orElse(Duration.ofMinutes(10).toMillis()));
-        netServer.listen(properties.createSocketAddress(), result -> {
-            if (result.succeeded()) {
-                log.info("tcp server startup on {}", result.result().actualPort());
-            } else {
-                log.error("startup tcp server error", result.cause());
-            }
-        });
+        tcpServer.setParserSupplier(() -> payloadParserBuilder.build(properties.getParserType(), properties));
+        tcpServer.setServer(instances);
+        tcpServer.setKeepAliveTimeout(properties.getLong("keepAliveTimeout", Duration.ofMinutes(10).toMillis()));
+        for (NetServer netServer : instances) {
+            netServer.listen(properties.createSocketAddress(), result -> {
+                if (result.succeeded()) {
+                    log.info("tcp server startup on {}", result.result().actualPort());
+                } else {
+                    log.error("startup tcp server error", result.cause());
+                }
+            });
+        }
     }
     }
 
 
     @Override
     @Override

+ 18 - 11
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/VertxTcpServer.java

@@ -11,6 +11,7 @@ import org.jetlinks.community.network.tcp.client.VertxTcpClient;
 import org.jetlinks.community.network.tcp.parser.PayloadParser;
 import org.jetlinks.community.network.tcp.parser.PayloadParser;
 
 
 import java.time.Duration;
 import java.time.Duration;
+import java.util.Collection;
 import java.util.function.Supplier;
 import java.util.function.Supplier;
 
 
 /**
 /**
@@ -20,8 +21,7 @@ import java.util.function.Supplier;
 @Slf4j
 @Slf4j
 public class VertxTcpServer extends AbstractTcpServer implements TcpServer {
 public class VertxTcpServer extends AbstractTcpServer implements TcpServer {
 
 
-    @Getter
-    volatile NetServer server;
+    Collection<NetServer> tcpServers;
 
 
     private Supplier<PayloadParser> parserSupplier;
     private Supplier<PayloadParser> parserSupplier;
 
 
@@ -47,14 +47,19 @@ public class VertxTcpServer extends AbstractTcpServer implements TcpServer {
         this.parserSupplier = parserSupplier;
         this.parserSupplier = parserSupplier;
     }
     }
 
 
-    public void setServer(NetServer server) {
-        if (this.server != null && this.server != server) {
-            this.server.close();
+    public void setServer(Collection<NetServer> mqttServer) {
+        if (this.tcpServers != null && !this.tcpServers.isEmpty()) {
+            shutdown();
+        }
+        this.tcpServers = mqttServer;
+
+        for (NetServer tcpServer : this.tcpServers) {
+            tcpServer.connectHandler(this::acceptTcpConnection);
         }
         }
-        this.server = server;
-        this.server.connectHandler(this::acceptTcpConnection);
+
     }
     }
 
 
+
     protected void acceptTcpConnection(NetSocket socket) {
     protected void acceptTcpConnection(NetSocket socket) {
         VertxTcpClient client = new VertxTcpClient(id + "_" + socket.remoteAddress());
         VertxTcpClient client = new VertxTcpClient(id + "_" + socket.remoteAddress());
         client.setKeepAliveTimeoutMs(keepAliveTimeout);
         client.setKeepAliveTimeoutMs(keepAliveTimeout);
@@ -82,15 +87,17 @@ public class VertxTcpServer extends AbstractTcpServer implements TcpServer {
 
 
     @Override
     @Override
     public void shutdown() {
     public void shutdown() {
-        if (null != server) {
-            execute(server::close);
-            server = null;
+        if (null != tcpServers) {
+            for (NetServer tcpServer : tcpServers) {
+                execute(tcpServer::close);
+            }
+            tcpServers = null;
         }
         }
     }
     }
 
 
     @Override
     @Override
     public boolean isAlive() {
     public boolean isAlive() {
-        return server != null;
+        return tcpServers != null;
     }
     }
 
 
     @Override
     @Override

+ 3 - 2
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/response/DeviceDetail.java

@@ -135,12 +135,13 @@ public class DeviceDetail {
         setId(device.getId());
         setId(device.getId());
         setName(device.getName());
         setName(device.getName());
         setState(device.getState());
         setState(device.getState());
+        setOrgId(device.getOrgId());
 
 
         Optional.ofNullable(device.getRegistryTime())
         Optional.ofNullable(device.getRegistryTime())
             .ifPresent(this::setRegisterTime);
             .ifPresent(this::setRegisterTime);
 
 
-        setCreateTime(device.getCreateTime());
-        setOrgId(device.getOrgId());
+        Optional.ofNullable(device.getCreateTime())
+            .ifPresent(this::setCreateTime);
 
 
         if (!CollectionUtils.isEmpty(device.getConfiguration())) {
         if (!CollectionUtils.isEmpty(device.getConfiguration())) {
             setConfiguration(device.getConfiguration());
             setConfiguration(device.getConfiguration());

+ 19 - 3
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java

@@ -216,9 +216,25 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
 
 
     public Mono<DeviceDetail> getDeviceDetail(String deviceId) {
     public Mono<DeviceDetail> getDeviceDetail(String deviceId) {
         return this.findById(deviceId)
         return this.findById(deviceId)
-            .zipWhen(device -> deviceProductService.findById(device.getProductId()),
-                (device, product) -> new DeviceDetail().with(device).with(product))
-            .flatMap(detail -> registry.getDevice(deviceId).flatMap(detail::with).defaultIfEmpty(detail))
+            .zipWhen(
+                //合并设备和型号信息
+                (device) -> deviceProductService.findById(device.getProductId()),
+                (device, product) -> new DeviceDetail().with(device).with(product)
+            ).flatMap(detail -> registry
+                .getDevice(deviceId)
+                .flatMap(
+                    operator -> operator.checkState() //检查设备的真实状态,设备已经离线,但是数据库状态未及时更新的.
+                        .map(DeviceState::of)
+                        .filter(state -> state != detail.getState())
+                        .doOnNext(detail::setState)
+                        .flatMap(state -> createUpdate()
+                            .set(DeviceInstanceEntity::getState, state)
+                            .where(DeviceInstanceEntity::getId, deviceId)
+                            .execute())
+                        .thenReturn(operator))
+                .flatMap(detail::with)
+                .defaultIfEmpty(detail))
+            //设备标签信息
             .flatMap(detail -> tagRepository
             .flatMap(detail -> tagRepository
                 .createQuery()
                 .createQuery()
                 .where(DeviceTagEntity::getDeviceId, deviceId)
                 .where(DeviceTagEntity::getDeviceId, deviceId)

+ 58 - 54
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceInstanceController.java

@@ -12,6 +12,7 @@ import org.hswebframework.ezorm.rdb.exception.DuplicateKeyException;
 import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
 import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
 import org.hswebframework.ezorm.rdb.mapping.defaults.SaveResult;
 import org.hswebframework.ezorm.rdb.mapping.defaults.SaveResult;
 import org.hswebframework.reactor.excel.ReactorExcel;
 import org.hswebframework.reactor.excel.ReactorExcel;
+import org.hswebframework.reactor.excel.utils.StreamUtils;
 import org.hswebframework.web.api.crud.entity.PagerResult;
 import org.hswebframework.web.api.crud.entity.PagerResult;
 import org.hswebframework.web.api.crud.entity.QueryParamEntity;
 import org.hswebframework.web.api.crud.entity.QueryParamEntity;
 import org.hswebframework.web.authorization.Authentication;
 import org.hswebframework.web.authorization.Authentication;
@@ -282,7 +283,7 @@ public class DeviceInstanceController implements
                                                @RequestBody Flux<DeviceTagEntity> tags) {
                                                @RequestBody Flux<DeviceTagEntity> tags) {
         return tags
         return tags
             .doOnNext(tag -> {
             .doOnNext(tag -> {
-                tag.setId(DeviceTagEntity.createTagId(deviceId,tag.getKey()));
+                tag.setId(DeviceTagEntity.createTagId(deviceId, tag.getKey()));
                 tag.setDeviceId(deviceId);
                 tag.setDeviceId(deviceId);
                 tag.tryValidate();
                 tag.tryValidate();
             })
             })
@@ -290,13 +291,6 @@ public class DeviceInstanceController implements
             .thenMany(getDeviceTags(deviceId));
             .thenMany(getDeviceTags(deviceId));
     }
     }
 
 
-    //已废弃
-    @GetMapping("/operation/log")
-    @QueryAction
-    @Deprecated
-    public Mono<PagerResult<DeviceOperationLogEntity>> queryOperationLog(QueryParamEntity queryParam) {
-        return timeSeriesManager.getService(TimeSeriesMetric.of("device_operation")).queryPager(queryParam, data -> data.as(DeviceOperationLogEntity.class));
-    }
 
 
     @GetMapping(value = "/import", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
     @GetMapping(value = "/import", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
     @ApiOperation("批量导入数据")
     @ApiOperation("批量导入数据")
@@ -346,12 +340,12 @@ public class DeviceInstanceController implements
 
 
     DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
     DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
 
 
-    //按产品型号导入数据
+
+    //按型号导入数据
     @GetMapping(value = "/{productId}/import", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
     @GetMapping(value = "/{productId}/import", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
     @SaveAction
     @SaveAction
     public Flux<ImportDeviceInstanceResult> doBatchImportByProduct(@PathVariable String productId,
     public Flux<ImportDeviceInstanceResult> doBatchImportByProduct(@PathVariable String productId,
                                                                    @RequestParam String fileUrl) {
                                                                    @RequestParam String fileUrl) {
-
         return registry.getProduct(productId)
         return registry.getProduct(productId)
             .flatMap(DeviceProductOperator::getMetadata)
             .flatMap(DeviceProductOperator::getMetadata)
             .map(metadata -> new DeviceWrapper(metadata.getTags()))
             .map(metadata -> new DeviceWrapper(metadata.getTags()))
@@ -371,19 +365,16 @@ public class DeviceInstanceController implements
             .publishOn(Schedulers.single())
             .publishOn(Schedulers.single())
             .concatMap(buffer ->
             .concatMap(buffer ->
                 Mono.zip(
                 Mono.zip(
-                    //保存设备
                     service.save(Flux.fromIterable(buffer).map(Tuple2::getT1)),
                     service.save(Flux.fromIterable(buffer).map(Tuple2::getT1)),
-                    //保存设备标签
                     tagRepository
                     tagRepository
                         .save(Flux.fromIterable(buffer).flatMapIterable(Tuple2::getT2))
                         .save(Flux.fromIterable(buffer).flatMapIterable(Tuple2::getT2))
                         .defaultIfEmpty(SaveResult.of(0, 0))
                         .defaultIfEmpty(SaveResult.of(0, 0))
                 ))
                 ))
             .map(res -> ImportDeviceInstanceResult.success(res.getT1()))
             .map(res -> ImportDeviceInstanceResult.success(res.getT1()))
-            //保存返回错误消息而不是抛出异常,抛异常SSE无法获取到错误消息.
             .onErrorResume(err -> Mono.just(ImportDeviceInstanceResult.error(err)));
             .onErrorResume(err -> Mono.just(ImportDeviceInstanceResult.error(err)));
     }
     }
 
 
-    //获取导模版
+    //获取导模版
     @GetMapping("/{productId}/template.{format}")
     @GetMapping("/{productId}/template.{format}")
     @QueryAction
     @QueryAction
     public Mono<Void> downloadExportTemplate(ServerHttpResponse response,
     public Mono<Void> downloadExportTemplate(ServerHttpResponse response,
@@ -392,27 +383,23 @@ public class DeviceInstanceController implements
                                              @PathVariable String productId) throws IOException {
                                              @PathVariable String productId) throws IOException {
         response.getHeaders().set(HttpHeaders.CONTENT_DISPOSITION,
         response.getHeaders().set(HttpHeaders.CONTENT_DISPOSITION,
             "attachment; filename=".concat(URLEncoder.encode("设备导入模版." + format, StandardCharsets.UTF_8.displayName())));
             "attachment; filename=".concat(URLEncoder.encode("设备导入模版." + format, StandardCharsets.UTF_8.displayName())));
-        return Authentication
-            .currentReactive()
-            .flatMap(auth -> {
-                parameter.setPaging(false);
-                parameter.toNestQuery(q -> q.is(DeviceInstanceEntity::getProductId, productId));
-                return registry.getProduct(productId)
-                    .flatMap(DeviceProductOperator::getMetadata)
-                    .map(meta -> DeviceExcelInfo.getTemplateHeaderMapping(meta.getTags()))
-                    .defaultIfEmpty(DeviceExcelInfo.getTemplateHeaderMapping(Collections.emptyList()))
-                    .flatMapMany(headers ->
-                        ReactorExcel.<DeviceExcelInfo>writer(format)
-                            .headers(headers)
-                            .converter(DeviceExcelInfo::toMap)
-                            .writeBuffer(Flux.empty()))
-                    .doOnError(err -> log.error(err.getMessage(), err))
-                    .map(bufferFactory::wrap)
-                    .as(response::writeWith);
-            });
+        parameter.setPaging(false);
+        parameter.toNestQuery(q -> q.is(DeviceInstanceEntity::getProductId, productId));
+        return registry.getProduct(productId)
+            .flatMap(DeviceProductOperator::getMetadata)
+            .map(meta -> DeviceExcelInfo.getTemplateHeaderMapping(meta.getTags()))
+            .defaultIfEmpty(DeviceExcelInfo.getTemplateHeaderMapping(Collections.emptyList()))
+            .flatMapMany(headers ->
+                ReactorExcel.<DeviceExcelInfo>writer(format)
+                    .headers(headers)
+                    .converter(DeviceExcelInfo::toMap)
+                    .writeBuffer(Flux.empty()))
+            .doOnError(err -> log.error(err.getMessage(), err))
+            .map(bufferFactory::wrap)
+            .as(response::writeWith);
     }
     }
 
 
-    //按照型号导出数据
+    //按照型号导出数据.
     @GetMapping("/{productId}/export.{format}")
     @GetMapping("/{productId}/export.{format}")
     @QueryAction
     @QueryAction
     public Mono<Void> export(ServerHttpResponse response,
     public Mono<Void> export(ServerHttpResponse response,
@@ -454,31 +441,48 @@ public class DeviceInstanceController implements
             .as(response::writeWith);
             .as(response::writeWith);
     }
     }
 
 
+
+    //直接导出数据,不支持导出标签.
+    @GetMapping("/export.{format}")
+    @QueryAction
+    public Mono<Void> export(ServerHttpResponse response,
+                             QueryParamEntity parameter,
+                             @PathVariable String format) throws IOException {
+        response.getHeaders().set(HttpHeaders.CONTENT_DISPOSITION,
+            "attachment; filename=".concat(URLEncoder.encode("设备实例." + format, StandardCharsets.UTF_8.displayName())));
+        return ReactorExcel.<DeviceExcelInfo>writer(format)
+            .headers(DeviceExcelInfo.getExportHeaderMapping(Collections.emptyList()))
+            .converter(DeviceExcelInfo::toMap)
+            .writeBuffer(
+                service.query(parameter)
+                    .map(entity -> FastBeanCopier.copy(entity, new DeviceExcelInfo()))
+                , 512 * 1024)//缓冲512k
+            .doOnError(err -> log.error(err.getMessage(), err))
+            .map(bufferFactory::wrap)
+            .as(response::writeWith);
+    }
+
     @PostMapping("/export")
     @PostMapping("/export")
     @QueryAction
     @QueryAction
     @SneakyThrows
     @SneakyThrows
-    public Mono<Void> export(ServerHttpResponse response, QueryParam parameter) {
+    public Mono<Void> export(ServerHttpResponse response, QueryParamEntity parameter) {
         response.getHeaders().set(HttpHeaders.CONTENT_DISPOSITION,
         response.getHeaders().set(HttpHeaders.CONTENT_DISPOSITION,
             "attachment; filename=".concat(URLEncoder.encode("设备实例.xlsx", StandardCharsets.UTF_8.displayName())));
             "attachment; filename=".concat(URLEncoder.encode("设备实例.xlsx", StandardCharsets.UTF_8.displayName())));
-        return Authentication
-            .currentReactive()
-            .flatMap(auth -> {
-                parameter.setPaging(false);
-                return response.writeWith(Mono.create(sink -> {
-                    ByteArrayOutputStream out = new ByteArrayOutputStream();
-                    ExcelWriter excelWriter = EasyExcel.write(out, DeviceInstanceImportExportEntity.class).build();
-                    WriteSheet writeSheet = EasyExcel.writerSheet().build();
-                    service.query(parameter)
-                        .map(entity -> FastBeanCopier.copy(entity, new DeviceInstanceImportExportEntity()))
-                        .buffer(100)
-                        .doOnNext(list -> excelWriter.write(list, writeSheet))
-                        .doFinally(s -> {
-                            excelWriter.finish();
-                            sink.success(bufferFactory.wrap(out.toByteArray()));
-                        })
-                        .doOnError(sink::error)
-                        .subscribe();
-                }));
-            });
+        parameter.setPaging(false);
+
+        return StreamUtils.buffer(
+            512 * 1024,
+            output -> {
+                ExcelWriter excelWriter = EasyExcel.write(output, DeviceInstanceImportExportEntity.class).build();
+                WriteSheet writeSheet = EasyExcel.writerSheet().build();
+                return service.query(parameter)
+                    .map(entity -> FastBeanCopier.copy(entity, new DeviceInstanceImportExportEntity()))
+                    .buffer(100)
+                    .doOnNext(list -> excelWriter.write(list, writeSheet))
+                    .doOnComplete(excelWriter::finish)
+                    .then();
+            })
+            .map(bufferFactory::wrap)
+            .as(response::writeWith);
     }
     }
 }
 }

+ 1 - 1
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/JetLinksConfiguration.java

@@ -191,7 +191,7 @@ public class JetLinksConfiguration {
             .flatMap(session -> {
             .flatMap(session -> {
                 DeviceOfflineMessage message = new DeviceOfflineMessage();
                 DeviceOfflineMessage message = new DeviceOfflineMessage();
                 message.setDeviceId(session.getDeviceId());
                 message.setDeviceId(session.getDeviceId());
-                message.setTimestamp(session.connectTime());
+                message.setTimestamp(System.currentTimeMillis());
                 return messageConnector.onMessage(message);
                 return messageConnector.onMessage(message);
             })
             })
             .onErrorContinue((err, r) -> log.error(err.getMessage(), err))
             .onErrorContinue((err, r) -> log.error(err.getMessage(), err))

+ 9 - 6
jetlinks-standalone/src/main/resources/application.yml

@@ -42,6 +42,9 @@ elasticsearch:
     connection-request-timeout: 8000
     connection-request-timeout: 8000
   index:
   index:
     default-strategy: time-by-month #默认es的索引按月进行分表, direct则为直接操作索引.
     default-strategy: time-by-month #默认es的索引按月进行分表, direct则为直接操作索引.
+    settings:
+      number-of-shards: 1 # es 分片数量
+      number-of-replicas: 0 # 副本数量
 device:
 device:
   message:
   message:
     writer:
     writer:
@@ -65,12 +68,12 @@ hsweb:
       static-location: http://localhost:8848/upload
       static-location: http://localhost:8848/upload
   webflux:
   webflux:
     response-wrapper: enabled #开启响应包装器(将返回值包装为ResponseMessage)
     response-wrapper: enabled #开启响应包装器(将返回值包装为ResponseMessage)
-#  auth:   #默认的用户配置
-#    users:
-#      admin:
-#        username: admin
-#        password: admin
-#        name: 超级管理员
+  #  auth:   #默认的用户配置
+  #    users:
+  #      admin:
+  #        username: admin
+  #        password: admin
+  #        name: 超级管理员
   authorize:
   authorize:
     auto-parse: true
     auto-parse: true
   cache:
   cache:

+ 1 - 3
jetlinks-standalone/src/main/resources/logback-spring.xml

@@ -3,10 +3,8 @@
     <appender name="LOGEventPublisher" class="org.jetlinks.community.logging.logback.SystemLoggingAppender" />
     <appender name="LOGEventPublisher" class="org.jetlinks.community.logging.logback.SystemLoggingAppender" />
 
 
     <appender name="ErrorLOGEventPublisher" class="org.jetlinks.community.logging.logback.SystemLoggingAppender">
     <appender name="ErrorLOGEventPublisher" class="org.jetlinks.community.logging.logback.SystemLoggingAppender">
-        <filter class="ch.qos.logback.classic.filter.LevelFilter">
+        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
             <level>WARN</level>
             <level>WARN</level>
-            <onMatch>ACCEPT</onMatch>
-            <onMismatch>DENY</onMismatch>
         </filter>
         </filter>
     </appender>
     </appender>
 
 

+ 1 - 1
simulator/start.sh

@@ -1,6 +1,6 @@
 #!/usr/bin/env bash
 #!/usr/bin/env bash
 
 
-java -jar device-simulator.jar \
+java -jar -Dfile.encoding=UTF-8 device-simulator.jar \
   mqtt.limit=1 \
   mqtt.limit=1 \
   mqtt.start=0 \
   mqtt.start=0 \
   mqtt.enableEvent=true \
   mqtt.enableEvent=true \