فهرست منبع

Merge remote-tracking branch 'origin/master'

zhouhao 5 سال پیش
والد
کامیت
c0f32fb378

+ 1 - 0
README.md

@@ -64,6 +64,7 @@ JetLinks 是一个物联网基础平台,用于快速建立物联网相关业务
 | CoAP(DTLS)    |  ⭕  |  ✅ |     ✅       |
 | Http,WebSocket(TLS) |  ⭕  |  ✅ |     ✅ |
 | 规则引擎-数据转发 |  ⭕  |  ✅ |     ✅ |
+| Geo地理位置支持     | ⭕   |  ✅ |  ✅    |
 | 可视化图表配置   |  ⭕  |  ✅ |     ✅    |
 | OpenAPI    |  ⭕  |  ✅ |     ✅     |
 | 集群支持    |  ⭕  |  ✅ |     ✅     |

+ 13 - 43
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertiesMeasurement.java

@@ -19,17 +19,16 @@ import reactor.core.publisher.Mono;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 class DevicePropertiesMeasurement extends StaticMeasurement {
 
-    private MessageGateway messageGateway;
+    private final MessageGateway messageGateway;
 
-    private TimeSeriesService timeSeriesService;
+    private final TimeSeriesService timeSeriesService;
 
-    private DeviceMetadata metadata;
-    private String productId;
+    private final DeviceMetadata metadata;
+
+    private final String productId;
 
     public DevicePropertiesMeasurement(String productId,
                                        MessageGateway messageGateway,
@@ -42,6 +41,7 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
         this.metadata = deviceMetadata;
         addDimension(new RealTimeDevicePropertyDimension());
         addDimension(new HistoryDevicePropertyDimension());
+
     }
 
     static AtomicLong num = new AtomicLong();
@@ -103,7 +103,7 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
     }
 
     static ConfigMetadata configMetadata = new DefaultConfigMetadata()
-        .add("deviceId", "设备", "指定设备", new StringType().expand("selector", "device-selector"));
+        .add("deviceId", "设备",  "指定设备", new StringType().expand("selector", "device-selector"));
 
     /**
      * 历史设备事件
@@ -117,25 +117,10 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
 
         @Override
         public DataType getValueType() {
-            SimplePropertyMetadata property = new SimplePropertyMetadata();
-            property.setId("property");
-            property.setName("属性");
-            property.setValueType(new StringType());
-
-            SimplePropertyMetadata value = new SimplePropertyMetadata();
-            value.setId("value");
-            value.setName("值");
-            value.setValueType(new StringType());
-
-            SimplePropertyMetadata formatValue = new SimplePropertyMetadata();
-            value.setId("formatValue");
-            value.setName("格式化值");
-            value.setValueType(new StringType());
-
             return new ObjectType()
-                .addPropertyMetadata(property)
-                .addPropertyMetadata(value)
-                .addPropertyMetadata(formatValue);
+                .addProperty("property","属性", StringType.GLOBAL)
+                .addProperty("value","值", StringType.GLOBAL)
+                .addProperty("formatValue","格式化值", StringType.GLOBAL);
         }
 
         @Override
@@ -171,25 +156,10 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
 
         @Override
         public DataType getValueType() {
-            SimplePropertyMetadata property = new SimplePropertyMetadata();
-            property.setId("property");
-            property.setName("属性");
-            property.setValueType(new StringType());
-
-            SimplePropertyMetadata value = new SimplePropertyMetadata();
-            value.setId("value");
-            value.setName("值");
-            value.setValueType(new StringType());
-
-            SimplePropertyMetadata formatValue = new SimplePropertyMetadata();
-            value.setId("formatValue");
-            value.setName("格式化值");
-            value.setValueType(new StringType());
-
             return new ObjectType()
-                .addPropertyMetadata(property)
-                .addPropertyMetadata(value)
-                .addPropertyMetadata(formatValue);
+                .addProperty("property","属性", StringType.GLOBAL)
+                .addProperty("value","值", StringType.GLOBAL)
+                .addProperty("formatValue","格式化值", StringType.GLOBAL);
         }
 
         @Override

+ 127 - 23
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertyMeasurement.java

@@ -5,18 +5,25 @@ import org.jetlinks.core.message.property.ReadPropertyMessageReply;
 import org.jetlinks.core.message.property.ReportPropertyMessage;
 import org.jetlinks.core.message.property.WritePropertyMessageReply;
 import org.jetlinks.core.metadata.*;
+import org.jetlinks.core.metadata.types.IntType;
+import org.jetlinks.core.metadata.types.NumberType;
+import org.jetlinks.core.metadata.types.ObjectType;
+import org.jetlinks.core.metadata.types.StringType;
+import org.jetlinks.community.Interval;
 import org.jetlinks.community.dashboard.*;
 import org.jetlinks.community.dashboard.supports.StaticMeasurement;
 import org.jetlinks.community.device.message.DeviceMessageUtils;
 import org.jetlinks.community.gateway.MessageGateway;
 import org.jetlinks.community.gateway.Subscription;
 import org.jetlinks.community.timeseries.TimeSeriesService;
-import org.jetlinks.core.metadata.types.IntType;
-import org.jetlinks.core.metadata.types.ObjectType;
-import org.jetlinks.core.metadata.types.StringType;
+import org.jetlinks.community.timeseries.query.Aggregation;
+import org.jetlinks.community.timeseries.query.AggregationQueryParam;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -24,24 +31,28 @@ import java.util.stream.Stream;
 
 class DevicePropertyMeasurement extends StaticMeasurement {
 
-    private PropertyMetadata metadata;
+    private final PropertyMetadata metadata;
 
-    private MessageGateway messageGateway;
+    private final MessageGateway messageGateway;
 
-    private TimeSeriesService timeSeriesService;
+    private final TimeSeriesService timeSeriesService;
 
-    private String productId;
+    private final String productId;
 
     public DevicePropertyMeasurement(String productId,
                                      MessageGateway messageGateway,
                                      PropertyMetadata metadata,
                                      TimeSeriesService timeSeriesService) {
         super(MetadataMeasurementDefinition.of(metadata));
-        this.productId=productId;
+        this.productId = productId;
         this.messageGateway = messageGateway;
         this.metadata = metadata;
         this.timeSeriesService = timeSeriesService;
         addDimension(new RealTimeDevicePropertyDimension());
+        addDimension(new HistoryDevicePropertyDimension());
+        if (metadata.getValueType() instanceof NumberType) {
+            addDimension(new AggDevicePropertyDimension());
+        }
     }
 
 
@@ -67,8 +78,8 @@ class DevicePropertyMeasurement extends StaticMeasurement {
     Flux<MeasurementValue> fromRealTime(String deviceId) {
         return messageGateway
             .subscribe(Stream.of(
-                "/device/"+productId+"/" + deviceId + "/message/property/report"
-                , "/device/"+productId+"/" + deviceId + "/message/property/*/reply")
+                "/device/" + productId + "/" + deviceId + "/message/property/report"
+                , "/device/" + productId + "/" + deviceId + "/message/property/*/reply")
                 .map(Subscription::new)
                 .collect(Collectors.toList()), true)
             .flatMap(val -> Mono.justOrEmpty(DeviceMessageUtils.convert(val)))
@@ -92,32 +103,123 @@ class DevicePropertyMeasurement extends StaticMeasurement {
         .add("deviceId", "设备", "指定设备", new StringType().expand("selector", "device-selector"))
         .add("history", "历史数据量", "查询出历史数据后开始推送实时数据", new IntType().min(0).expand("defaultValue", 10));
 
+    static ConfigMetadata aggConfigMetadata = new DefaultConfigMetadata()
+        .add("deviceId", "设备ID", "", StringType.GLOBAL)
+        .add("time", "周期", "例如: 1h,10m,30s", StringType.GLOBAL)
+        .add("agg", "聚合类型", "count,sum,avg,max,min", StringType.GLOBAL)
+        .add("format", "时间格式", "如: MM-dd:HH", StringType.GLOBAL)
+        .add("limit", "最大数据量", "", StringType.GLOBAL)
+        .add("from", "时间从", "", StringType.GLOBAL)
+        .add("to", "时间至", "", StringType.GLOBAL);
+
     /**
-     * 实时设备事件
+     * 聚合数据
      */
-    private class RealTimeDevicePropertyDimension implements MeasurementDimension {
+    private class AggDevicePropertyDimension implements MeasurementDimension {
 
         @Override
         public DimensionDefinition getDefinition() {
-            return CommonDimensionDefinition.realTime;
+            return CommonDimensionDefinition.agg;
+        }
+
+        @Override
+        public DataType getValueType() {
+            return new ObjectType()
+                .addProperty("value", "数据", new ObjectType()
+                    .addProperty("property", StringType.GLOBAL)
+                    .addProperty("value", metadata.getValueType())
+                    .addProperty("formatValue", StringType.GLOBAL))
+                .addProperty("timeString", "时间", StringType.GLOBAL);
+        }
+
+        @Override
+        public ConfigMetadata getParams() {
+            return aggConfigMetadata;
+        }
+
+        @Override
+        public boolean isRealTime() {
+            return false;
+        }
+
+        @Override
+        public Flux<SimpleMeasurementValue> getValue(MeasurementParameter parameter) {
+
+            return AggregationQueryParam.of()
+                .agg("numberValue", "value", parameter.getString("agg").map(String::toUpperCase).map(Aggregation::valueOf).orElse(Aggregation.AVG))
+                .filter(query -> query
+                    .where("property", metadata.getId())
+                    .and("deviceId", parameter.getString("deviceId").orElse(null))
+                )
+                .limit(parameter.getInt("limit", 10))
+                .groupBy(parameter.getInterval("time", Interval.ofSeconds(10)), parameter.getString("format", "HH:mm:ss"))
+                .from(parameter.getDate("from").orElseGet(() -> Date.from(LocalDateTime.now().plusDays(-1).atZone(ZoneId.systemDefault()).toInstant())))
+                .to(parameter.getDate("to").orElse(new Date()))
+                .execute(timeSeriesService::aggregation)
+                .index((index, data) -> SimpleMeasurementValue.of(
+                    createValue(data.getInt("value").orElse(0)),
+                    data.getString("time").orElse(""),
+                    index))
+                .sort();
+
+        }
+    }
+
+    /**
+     * 历史设备数据
+     */
+    private class HistoryDevicePropertyDimension implements MeasurementDimension {
+
+        @Override
+        public DimensionDefinition getDefinition() {
+            return CommonDimensionDefinition.history;
         }
 
         @Override
         public DataType getValueType() {
+            return new ObjectType()
+                .addProperty("property", "属性", StringType.GLOBAL)
+                .addProperty("value", "值", metadata.getValueType())
+                .addProperty("formatValue", "格式化值", StringType.GLOBAL);
+        }
+
+        @Override
+        public ConfigMetadata getParams() {
+            return configMetadata;
+        }
+
+        @Override
+        public boolean isRealTime() {
+            return false;
+        }
+
+        @Override
+        public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
+            return Mono.justOrEmpty(parameter.getString("deviceId"))
+                .flatMapMany(deviceId -> {
+                    int history = parameter.getInt("history").orElse(1);
+                    //合并历史数据和实时数据
+                    return fromHistory(deviceId, history);
+                });
+        }
+    }
 
-            SimplePropertyMetadata value = new SimplePropertyMetadata();
-            value.setId("value");
-            value.setName("值");
-            value.setValueType(metadata.getValueType());
+    /**
+     * 实时设备事件
+     */
+    private class RealTimeDevicePropertyDimension implements MeasurementDimension {
 
-            SimplePropertyMetadata formatValue = new SimplePropertyMetadata();
-            value.setId("formatValue");
-            value.setName("格式化值");
-            value.setValueType(new StringType());
+        @Override
+        public DimensionDefinition getDefinition() {
+            return CommonDimensionDefinition.realTime;
+        }
 
+        @Override
+        public DataType getValueType() {
             return new ObjectType()
-                .addPropertyMetadata(value)
-                .addPropertyMetadata(formatValue);
+                .addProperty("property", "属性", StringType.GLOBAL)
+                .addProperty("value", "值", metadata.getValueType())
+                .addProperty("formatValue", "格式化值", StringType.GLOBAL);
         }
 
         @Override
@@ -146,4 +248,6 @@ class DevicePropertyMeasurement extends StaticMeasurement {
                 });
         }
     }
+
+
 }

+ 15 - 15
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurement.java

@@ -1,16 +1,17 @@
 package org.jetlinks.community.device.measurements.message;
 
-import org.jetlinks.community.Interval;
 import org.jetlinks.core.metadata.ConfigMetadata;
 import org.jetlinks.core.metadata.DataType;
 import org.jetlinks.core.metadata.DefaultConfigMetadata;
 import org.jetlinks.core.metadata.types.DateTimeType;
 import org.jetlinks.core.metadata.types.IntType;
 import org.jetlinks.core.metadata.types.StringType;
+import org.jetlinks.community.Interval;
 import org.jetlinks.community.dashboard.*;
 import org.jetlinks.community.dashboard.supports.StaticMeasurement;
 import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
 import org.jetlinks.community.gateway.MessageGateway;
+import org.jetlinks.community.gateway.Subscription;
 import org.jetlinks.community.timeseries.TimeSeriesManager;
 import org.jetlinks.community.timeseries.query.AggregationQueryParam;
 import reactor.core.publisher.Flux;
@@ -18,13 +19,14 @@ import reactor.core.publisher.Flux;
 import java.time.Duration;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
+import java.util.Collections;
 import java.util.Date;
 
 class DeviceMessageMeasurement extends StaticMeasurement {
 
-    private MessageGateway messageGateway;
+    private final MessageGateway messageGateway;
 
-    private TimeSeriesManager timeSeriesManager;
+    private final TimeSeriesManager timeSeriesManager;
 
     static MeasurementDefinition definition = MeasurementDefinition.of("quantity", "设备消息量");
 
@@ -37,8 +39,6 @@ class DeviceMessageMeasurement extends StaticMeasurement {
 
     }
 
-    static DataType valueType = new IntType();
-
     static ConfigMetadata realTimeConfigMetadata = new DefaultConfigMetadata()
         .add("interval", "数据统计周期", "例如: 1s,10s", new StringType());
 
@@ -51,7 +51,7 @@ class DeviceMessageMeasurement extends StaticMeasurement {
 
         @Override
         public DataType getValueType() {
-            return valueType;
+            return IntType.GLOBAL;
         }
 
         @Override
@@ -68,7 +68,7 @@ class DeviceMessageMeasurement extends StaticMeasurement {
         public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
             //通过订阅消息来统计实时数据量
             return messageGateway
-                .subscribe("/device/**")
+                .subscribe(Collections.singleton(new Subscription("/device/**")),true)
                 .window(parameter.getDuration("interval").orElse(Duration.ofSeconds(1)))
                 .flatMap(Flux::count)
                 .map(total -> SimpleMeasurementValue.of(total, System.currentTimeMillis()));
@@ -95,7 +95,7 @@ class DeviceMessageMeasurement extends StaticMeasurement {
 
         @Override
         public DataType getValueType() {
-            return valueType;
+            return IntType.GLOBAL;
         }
 
         @Override
@@ -113,16 +113,17 @@ class DeviceMessageMeasurement extends StaticMeasurement {
 
             return AggregationQueryParam.of()
                 .sum("count")
-                .groupBy(parameter.getInterval("time", Interval.ofHours(1)),
-                    parameter.getString("format", "MM月dd日 HH时"))
+                .groupBy(
+                    parameter.getInterval("time").orElse(Interval.ofHours(1)),
+                    parameter.getString("format").orElse("MM月dd日 HH时"))
                 .filter(query ->
                     query.where("name", "message-count")
-                        .is("productId", parameter.getString("productId", null))
-                        .is("msgType", parameter.getString("msgType", null))
+                        .is("productId", parameter.getString("productId").orElse(null))
+                        .is("msgType", parameter.getString("msgType").orElse(null))
                 )
-                .limit(parameter.getInt("limit", 1))
+                .limit(parameter.getInt("limit").orElse(1))
                 .from(parameter.getDate("from").orElseGet(() -> Date.from(LocalDateTime.now().plusDays(-1).atZone(ZoneId.systemDefault()).toInstant())))
-                .to(parameter.getDate("to").orElseGet(Date::new))
+                .to(parameter.getDate("to").orElse(new Date()))
                 .execute(timeSeriesManager.getService(DeviceTimeSeriesMetric.deviceMetrics())::aggregation)
                 .index((index, data) -> SimpleMeasurementValue.of(
                     data.getInt("count").orElse(0),
@@ -132,5 +133,4 @@ class DeviceMessageMeasurement extends StaticMeasurement {
         }
     }
 
-
 }

+ 4 - 2
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceMessageController.java

@@ -129,11 +129,13 @@ public class DeviceMessageController {
         return registry
             .getDevice(deviceId)
             .switchIfEmpty(ErrorUtils.notFound("设备不存在"))
-            .map(operator -> operator
+            .flatMap(operator -> operator
                 .messageSender()
                 .invokeFunction(functionId)
                 .messageId(IDGenerator.SNOW_FLAKE_STRING.generate())
-                .setParameter(properties))
+                .setParameter(properties)
+                .validate()
+            )
             .flatMapMany(FunctionInvokeMessageSender::send)
             .map(mapReply(FunctionInvokeMessageReply::getOutput));
     }

+ 6 - 0
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/entity/DeviceAlarmEntity.java

@@ -8,6 +8,7 @@ import org.hswebframework.ezorm.rdb.mapping.annotation.DefaultValue;
 import org.hswebframework.ezorm.rdb.mapping.annotation.EnumCodec;
 import org.hswebframework.ezorm.rdb.mapping.annotation.JsonCodec;
 import org.hswebframework.web.api.crud.entity.GenericEntity;
+import org.hswebframework.web.crud.generator.Generators;
 import org.jetlinks.community.rule.engine.device.DeviceAlarmRule;
 import org.jetlinks.community.rule.engine.enums.AlarmState;
 import org.jetlinks.community.rule.engine.model.DeviceAlarmModelParser;
@@ -17,6 +18,7 @@ import javax.persistence.Index;
 import javax.persistence.Table;
 import javax.validation.constraints.NotBlank;
 import java.sql.JDBCType;
+import java.util.Date;
 
 @Getter
 @Setter
@@ -55,6 +57,10 @@ public class DeviceAlarmEntity extends GenericEntity<String> {
     @DefaultValue("stopped")
     private AlarmState state;
 
+    @Column
+    @DefaultValue(generator = Generators.CURRENT_TIME)
+    private Date createTime;
+
     public RuleInstanceEntity toRuleInstance() {
         RuleInstanceEntity instanceEntity = new RuleInstanceEntity();
         if (alarmRule == null) {

+ 6 - 1
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/DeviceAlarmService.java

@@ -22,7 +22,12 @@ public class DeviceAlarmService extends GenericReactiveCrudService<DeviceAlarmEn
     }
 
     public Mono<Void> stop(String id) {
-        return instanceService.stop(id);
+        return instanceService.stop(id)
+            .then(createUpdate()
+                .set(DeviceAlarmEntity::getState,AlarmState.stopped)
+                .where(DeviceAlarmEntity::getId,id)
+                .execute())
+            .then();
     }
 
     private Mono<Void> doStart(DeviceAlarmEntity entity) {

+ 7 - 0
jetlinks-standalone/pom.xml

@@ -216,6 +216,13 @@
             <version>${hsweb.framework.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>de.ruedigermoeller</groupId>
+            <artifactId>fst</artifactId>
+            <version>2.57</version>
+        </dependency>
+
+
     </dependencies>
 
 

+ 41 - 0
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/JetLinksRedisConfiguration.java

@@ -0,0 +1,41 @@
+package org.jetlinks.community.standalone.configuration;
+
+import org.jetlinks.community.standalone.configuration.fst.FstSerializationRedisSerializer;
+import org.nustaq.serialization.FSTConfiguration;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.core.io.ResourceLoader;
+import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
+import org.springframework.data.redis.core.ReactiveRedisTemplate;
+import org.springframework.data.redis.serializer.RedisSerializationContext;
+import org.springframework.data.redis.serializer.RedisSerializer;
+import org.springframework.data.redis.serializer.StringRedisSerializer;
+
+@Configuration
+@ConditionalOnProperty(prefix = "spring.redis",name = "serializer",havingValue = "fst")
+public class JetLinksRedisConfiguration {
+
+    @Bean
+    public ReactiveRedisTemplate<Object, Object> reactiveRedisTemplate(
+        ReactiveRedisConnectionFactory reactiveRedisConnectionFactory, ResourceLoader resourceLoader) {
+
+        FstSerializationRedisSerializer serializer = new FstSerializationRedisSerializer(() -> {
+            FSTConfiguration configuration = FSTConfiguration.createDefaultConfiguration()
+                .setForceSerializable(true);
+            configuration.setClassLoader(resourceLoader.getClassLoader());
+            return configuration;
+        });
+        @SuppressWarnings("all")
+        RedisSerializationContext<Object, Object> serializationContext = RedisSerializationContext
+            .newSerializationContext()
+            .key((RedisSerializer)new StringRedisSerializer())
+            .value(serializer)
+            .hashKey(StringRedisSerializer.UTF_8)
+            .hashValue(serializer)
+            .build();
+
+        return new ReactiveRedisTemplate<>(reactiveRedisConnectionFactory, serializationContext);
+    }
+
+}

+ 48 - 0
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/fst/FstSerializationRedisSerializer.java

@@ -0,0 +1,48 @@
+package org.jetlinks.community.standalone.configuration.fst;
+
+import io.netty.util.concurrent.FastThreadLocal;
+import lombok.AllArgsConstructor;
+import lombok.SneakyThrows;
+import org.nustaq.serialization.FSTConfiguration;
+import org.nustaq.serialization.FSTObjectInput;
+import org.nustaq.serialization.FSTObjectOutput;
+import org.springframework.data.redis.serializer.RedisSerializer;
+import org.springframework.data.redis.serializer.SerializationException;
+
+import java.io.ByteArrayOutputStream;
+import java.util.function.Supplier;
+
+@AllArgsConstructor
+public class FstSerializationRedisSerializer implements RedisSerializer<Object> {
+
+    private final FastThreadLocal<FSTConfiguration> configuration;
+
+    public FstSerializationRedisSerializer(Supplier<FSTConfiguration> supplier) {
+
+        this(new FastThreadLocal<FSTConfiguration>() {
+            @Override
+            protected FSTConfiguration initialValue() {
+                return supplier.get();
+            }
+        });
+    }
+
+    @Override
+    @SneakyThrows
+    public byte[] serialize(Object o) throws SerializationException {
+        ByteArrayOutputStream arr = new ByteArrayOutputStream(1024);
+        try (FSTObjectOutput output = configuration.get().getObjectOutput(arr)) {
+            output.writeObject(o);
+        }
+        return arr.toByteArray();
+    }
+
+    @Override
+    @SneakyThrows
+    public Object deserialize(byte[] bytes) throws SerializationException {
+
+        try (FSTObjectInput input = configuration.get().getObjectInput(bytes)) {
+            return input.readObject();
+        }
+    }
+}

+ 704 - 0
jetlinks-standalone/src/main/java/org/springframework/data/redis/connection/ReactiveHashCommands.java

@@ -0,0 +1,704 @@
+/*
+ * Copyright 2016-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.redis.connection;
+
+import org.reactivestreams.Publisher;
+import org.springframework.dao.InvalidDataAccessApiUsageException;
+import org.springframework.data.redis.connection.ReactiveRedisConnection.*;
+import org.springframework.data.redis.core.ScanOptions;
+import org.springframework.lang.Nullable;
+import org.springframework.util.Assert;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.function.Function;
+
+/**
+ * Redis Hash commands executed using reactive infrastructure.
+ *
+ * @author Christoph Strobl
+ * @author Mark Paluch
+ * @since 2.0
+ */
+public interface ReactiveHashCommands {
+
+	/**
+	 * {@literal HSET} {@link Command}.
+	 *
+	 * @author Christoph Strobl
+	 * @see <a href="https://redis.io/commands/hset">Redis Documentation: HSET</a>
+	 */
+	class HSetCommand extends KeyCommand {
+
+		private static final ByteBuffer SINGLE_VALUE_KEY = ByteBuffer.allocate(0);
+		private final Map<ByteBuffer, ByteBuffer> fieldValueMap;
+		private final boolean upsert;
+
+		private HSetCommand(@Nullable ByteBuffer key, Map<ByteBuffer, ByteBuffer> keyValueMap, boolean upsert) {
+
+			super(key);
+
+			this.fieldValueMap = keyValueMap;
+			this.upsert = upsert;
+		}
+
+		/**
+		 * Creates a new {@link HSetCommand} given a {@link ByteBuffer key}.
+		 *
+		 * @param value must not be {@literal null}.
+		 * @return a new {@link HSetCommand} for {@link ByteBuffer key}.
+		 */
+		public static HSetCommand value(ByteBuffer value) {
+
+			Assert.notNull(value, "Value must not be null!");
+
+			return new HSetCommand(null, Collections.singletonMap(SINGLE_VALUE_KEY, value), Boolean.TRUE);
+		}
+
+		/**
+		 * Creates a new {@link HSetCommand} given a {@link Map} of field values.
+		 *
+		 * @param fieldValueMap must not be {@literal null}.
+		 * @return a new {@link HSetCommand} for a {@link Map} of field values.
+		 */
+		public static HSetCommand fieldValues(Map<ByteBuffer, ByteBuffer> fieldValueMap) {
+
+			Assert.notNull(fieldValueMap, "Field values map must not be null!");
+
+			return new HSetCommand(null, fieldValueMap, Boolean.TRUE);
+		}
+
+		/**
+		 * Applies a field. Constructs a new command instance with all previously configured properties.
+		 *
+		 * @param field must not be {@literal null}.
+		 * @return a new {@link HSetCommand} with {@literal field} applied.
+		 */
+		public HSetCommand ofField(ByteBuffer field) {
+
+			if (!fieldValueMap.containsKey(SINGLE_VALUE_KEY)) {
+				throw new InvalidDataAccessApiUsageException("Value has not been set.");
+			}
+
+			Assert.notNull(field, "Field not be null!");
+
+			return new HSetCommand(getKey(), Collections.singletonMap(field, fieldValueMap.get(SINGLE_VALUE_KEY)), upsert);
+		}
+
+		/**
+		 * Applies the {@literal key}. Constructs a new command instance with all previously configured properties.
+		 *
+		 * @param key must not be {@literal null}.
+		 * @return a new {@link HSetCommand} with {@literal key} applied.
+		 */
+		public HSetCommand forKey(ByteBuffer key) {
+
+			Assert.notNull(key, "Key not be null!");
+
+			return new HSetCommand(key, fieldValueMap, upsert);
+		}
+
+		/**
+		 * Disable upsert. Constructs a new command instance with all previously configured properties.
+		 *
+		 * @return a new {@link HSetCommand} with upsert disabled.
+		 */
+		public HSetCommand ifValueNotExists() {
+			return new HSetCommand(getKey(), fieldValueMap, Boolean.FALSE);
+		}
+
+		/**
+		 * @return
+		 */
+		public boolean isUpsert() {
+			return upsert;
+		}
+
+		/**
+		 * @return never {@literal null}.
+		 */
+		public Map<ByteBuffer, ByteBuffer> getFieldValueMap() {
+			return fieldValueMap;
+		}
+	}
+
+	/**
+	 * Set the {@literal value} of a hash {@literal field}.
+	 *
+	 * @param key must not be {@literal null}.
+	 * @param field must not be {@literal null}.
+	 * @param value must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hset">Redis Documentation: HSET</a>
+	 */
+	default Mono<Boolean> hSet(ByteBuffer key, ByteBuffer field, ByteBuffer value) {
+
+		Assert.notNull(key, "Key must not be null!");
+		Assert.notNull(field, "Field must not be null!");
+		Assert.notNull(value, "Value must not be null!");
+
+		return hSet(Mono.just(HSetCommand.value(value).ofField(field).forKey(key)))
+				.next()
+				.flatMap(response->Mono.justOrEmpty(response.getOutput()));
+	}
+
+	/**
+	 * Set the {@literal value} of a hash {@literal field}.
+	 *
+	 * @param key must not be {@literal null}.
+	 * @param field must not be {@literal null}.
+	 * @param value must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hsetnx">Redis Documentation: HSETNX</a>
+	 */
+	default Mono<Boolean> hSetNX(ByteBuffer key, ByteBuffer field, ByteBuffer value) {
+
+		Assert.notNull(key, "Key must not be null!");
+		Assert.notNull(field, "Field must not be null!");
+		Assert.notNull(value, "Value must not be null!");
+
+		return hSet(Mono.just(HSetCommand.value(value).ofField(field).forKey(key).ifValueNotExists()))
+				.next()
+				.flatMap(response->Mono.justOrEmpty(response.getOutput()));
+	}
+
+	/**
+	 * Set multiple hash fields to multiple values using data provided in {@literal fieldValueMap}.
+	 *
+	 * @param key must not be {@literal null}.
+	 * @param fieldValueMap must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hmset">Redis Documentation: HMSET</a>
+	 */
+	default Mono<Boolean> hMSet(ByteBuffer key, Map<ByteBuffer, ByteBuffer> fieldValueMap) {
+
+		Assert.notNull(key, "Key must not be null!");
+		Assert.notNull(fieldValueMap, "Field must not be null!");
+
+		return hSet(Mono.just(HSetCommand.fieldValues(fieldValueMap).forKey(key))).next().map(it -> true);
+	}
+
+	/**
+	 * Set the {@literal value} of a hash {@literal field}.
+	 *
+	 * @param commands must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hset">Redis Documentation: HSET</a>
+	 */
+	Flux<BooleanResponse<HSetCommand>> hSet(Publisher<HSetCommand> commands);
+
+	/**
+	 * {@literal HGET} {@link Command}.
+	 *
+	 * @author Christoph Strobl
+	 * @see <a href="https://redis.io/commands/hget">Redis Documentation: HGET</a>
+	 */
+	class HGetCommand extends KeyCommand {
+
+		private List<ByteBuffer> fields;
+
+		private HGetCommand(@Nullable ByteBuffer key, List<ByteBuffer> fields) {
+
+			super(key);
+
+			this.fields = fields;
+		}
+
+		/**
+		 * Creates a new {@link HGetCommand} given a {@link ByteBuffer field name}.
+		 *
+		 * @param field must not be {@literal null}.
+		 * @return a new {@link HGetCommand} for a {@link ByteBuffer field name}.
+		 */
+		public static HGetCommand field(ByteBuffer field) {
+
+			Assert.notNull(field, "Field must not be null!");
+
+			return new HGetCommand(null, Collections.singletonList(field));
+		}
+
+		/**
+		 * Creates a new {@link HGetCommand} given a {@link Collection} of field names.
+		 *
+		 * @param fields must not be {@literal null}.
+		 * @return a new {@link HGetCommand} for a {@link Collection} of field names.
+		 */
+		public static HGetCommand fields(Collection<ByteBuffer> fields) {
+
+			Assert.notNull(fields, "Fields must not be null!");
+
+			return new HGetCommand(null, new ArrayList<>(fields));
+		}
+
+		/**
+		 * Applies the hash {@literal key}. Constructs a new command instance with all previously configured properties.
+		 *
+		 * @param key must not be {@literal null}.
+		 * @return a new {@link HGetCommand} with {@literal key} applied.
+		 */
+		public HGetCommand from(ByteBuffer key) {
+
+			Assert.notNull(key, "Key must not be null!");
+
+			return new HGetCommand(key, fields);
+		}
+
+		/**
+		 * @return never {@literal null}.
+		 */
+		public List<ByteBuffer> getFields() {
+			return fields;
+		}
+	}
+
+	/**
+	 * Get value for given {@literal field} from hash at {@literal key}.
+	 *
+	 * @param key must not be {@literal null}.
+	 * @param field must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hget">Redis Documentation: HGET</a>
+	 */
+	default Mono<ByteBuffer> hGet(ByteBuffer key, ByteBuffer field) {
+		return hMGet(key, Collections.singletonList(field)).flatMapIterable(Function.identity()).next();
+	}
+
+	/**
+	 * Get values for given {@literal fields} from hash at {@literal key}.
+	 *
+	 * @param key must not be {@literal null}.
+	 * @param fields must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hmget">Redis Documentation: HMGET</a>
+	 */
+	default Mono<List<ByteBuffer>> hMGet(ByteBuffer key, Collection<ByteBuffer> fields) {
+
+		Assert.notNull(key, "Key must not be null!");
+		Assert.notNull(fields, "Fields must not be null!");
+
+		return hMGet(Mono.just(HGetCommand.fields(fields).from(key)))
+				.next()
+				.flatMap(res->Mono.justOrEmpty(res.getOutput()));
+	}
+
+	/**
+	 * Get values for given {@literal fields} from hash at {@literal key}.
+	 *
+	 * @param commands must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hmget">Redis Documentation: HMGET</a>
+	 */
+	Flux<MultiValueResponse<HGetCommand, ByteBuffer>> hMGet(Publisher<HGetCommand> commands);
+
+	/**
+	 * {@literal HEXISTS} {@link Command}.
+	 *
+	 * @author Christoph Strobl
+	 * @see <a href="https://redis.io/commands/hexists">Redis Documentation: HEXISTS</a>
+	 */
+	class HExistsCommand extends KeyCommand {
+
+		private final ByteBuffer field;
+
+		private HExistsCommand(@Nullable ByteBuffer key, ByteBuffer field) {
+
+			super(key);
+
+			this.field = field;
+		}
+
+		/**
+		 * Creates a new {@link HExistsCommand} given a {@link ByteBuffer field name}.
+		 *
+		 * @param field must not be {@literal null}.
+		 * @return a new {@link HExistsCommand} for a {@link ByteBuffer field name}.
+		 */
+		public static HExistsCommand field(ByteBuffer field) {
+
+			Assert.notNull(field, "Field must not be null!");
+
+			return new HExistsCommand(null, field);
+		}
+
+		/**
+		 * Applies the hash {@literal key}. Constructs a new command instance with all previously configured properties.
+		 *
+		 * @param key must not be {@literal null}.
+		 * @return a new {@link HExistsCommand} with {@literal key} applied.
+		 */
+		public HExistsCommand in(ByteBuffer key) {
+
+			Assert.notNull(key, "Key must not be null!");
+
+			return new HExistsCommand(key, field);
+		}
+
+		/**
+		 * @return never {@literal null}.
+		 */
+		public ByteBuffer getField() {
+			return field;
+		}
+	}
+
+	/**
+	 * Determine if given hash {@literal field} exists.
+	 *
+	 * @param key must not be {@literal null}.
+	 * @param field must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hexists">Redis Documentation: HEXISTS</a>
+	 */
+	default Mono<Boolean> hExists(ByteBuffer key, ByteBuffer field) {
+
+		Assert.notNull(key, "Key must not be null!");
+		Assert.notNull(field, "Field must not be null!");
+
+		return hExists(Mono.just(HExistsCommand.field(field).in(key)))
+				.next()
+				.flatMap(response->Mono.justOrEmpty(response.getOutput()));
+	}
+
+	/**
+	 * Determine if given hash {@literal field} exists.
+	 *
+	 * @param commands
+	 * @return
+	 * @see <a href="https://redis.io/commands/hexists">Redis Documentation: HEXISTS</a>
+	 */
+	Flux<BooleanResponse<HExistsCommand>> hExists(Publisher<HExistsCommand> commands);
+
+	/**
+	 * @author Christoph Strobl
+	 * @see <a href="https://redis.io/commands/hdel">Redis Documentation: HDEL</a>
+	 */
+	class HDelCommand extends KeyCommand {
+
+		private final List<ByteBuffer> fields;
+
+		private HDelCommand(@Nullable ByteBuffer key, List<ByteBuffer> fields) {
+
+			super(key);
+
+			this.fields = fields;
+		}
+
+		/**
+		 * Creates a new {@link HDelCommand} given a {@link ByteBuffer field name}.
+		 *
+		 * @param field must not be {@literal null}.
+		 * @return a new {@link HDelCommand} for a {@link ByteBuffer field name}.
+		 */
+		public static HDelCommand field(ByteBuffer field) {
+
+			Assert.notNull(field, "Field must not be null!");
+
+			return new HDelCommand(null, Collections.singletonList(field));
+		}
+
+		/**
+		 * Creates a new {@link HDelCommand} given a {@link Collection} of field names.
+		 *
+		 * @param fields must not be {@literal null}.
+		 * @return a new {@link HDelCommand} for a {@link Collection} of field names.
+		 */
+		public static HDelCommand fields(Collection<ByteBuffer> fields) {
+
+			Assert.notNull(fields, "Fields must not be null!");
+
+			return new HDelCommand(null, new ArrayList<>(fields));
+		}
+
+		/**
+		 * Applies the hash {@literal key}. Constructs a new command instance with all previously configured properties.
+		 *
+		 * @param key must not be {@literal null}.
+		 * @return a new {@link HDelCommand} with {@literal key} applied.
+		 */
+		public HDelCommand from(ByteBuffer key) {
+
+			Assert.notNull(key, "Key must not be null!");
+
+			return new HDelCommand(key, fields);
+		}
+
+		/**
+		 * @return never {@literal null}.
+		 */
+		public List<ByteBuffer> getFields() {
+			return fields;
+		}
+	}
+
+	/**
+	 * Delete given hash {@literal field}.
+	 *
+	 * @param key must not be {@literal null}.
+	 * @param field must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hdel">Redis Documentation: HDEL</a>
+	 */
+	default Mono<Boolean> hDel(ByteBuffer key, ByteBuffer field) {
+
+		Assert.notNull(field, "Field must not be null!");
+
+		return hDel(key, Collections.singletonList(field)).map(val -> val > 0 ? Boolean.TRUE : Boolean.FALSE);
+	}
+
+	/**
+	 * Delete given hash {@literal fields}.
+	 *
+	 * @param key must not be {@literal null}.
+	 * @param fields must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hdel">Redis Documentation: HDEL</a>
+	 */
+	default Mono<Long> hDel(ByteBuffer key, Collection<ByteBuffer> fields) {
+
+		Assert.notNull(key, "Key must not be null!");
+		Assert.notNull(fields, "Fields must not be null!");
+
+		return hDel(Mono.just(HDelCommand.fields(fields).from(key)))
+				.next()
+				.flatMap(response->Mono.justOrEmpty(response.getOutput()));
+	}
+
+	/**
+	 * Delete given hash {@literal fields}.
+	 *
+	 * @param commands must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hdel">Redis Documentation: HDEL</a>
+	 */
+	Flux<NumericResponse<HDelCommand, Long>> hDel(Publisher<HDelCommand> commands);
+
+	/**
+	 * Get size of hash at {@literal key}.
+	 *
+	 * @param key must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hlen">Redis Documentation: HLEN</a>
+	 */
+	default Mono<Long> hLen(ByteBuffer key) {
+
+		Assert.notNull(key, "Key must not be null!");
+
+		return hLen(Mono.just(new KeyCommand(key)))
+				.next()
+				.flatMap(response->Mono.justOrEmpty(response.getOutput()));
+	}
+
+	/**
+	 * Get size of hash at {@literal key}.
+	 *
+	 * @param commands must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hlen">Redis Documentation: HLEN</a>
+	 */
+	Flux<NumericResponse<KeyCommand, Long>> hLen(Publisher<KeyCommand> commands);
+
+	/**
+	 * Get key set (fields) of hash at {@literal key}.
+	 *
+	 * @param key must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hkeys">Redis Documentation: HKEYS</a>
+	 */
+	default Flux<ByteBuffer> hKeys(ByteBuffer key) {
+
+		Assert.notNull(key, "Key must not be null!");
+
+		return hKeys(Mono.just(new KeyCommand(key)))
+				.flatMap(CommandResponse::getOutput);
+	}
+
+	/**
+	 * Get key set (fields) of hash at {@literal key}.
+	 *
+	 * @param commands must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hkeys">Redis Documentation: HKEYS</a>
+	 */
+	Flux<CommandResponse<KeyCommand, Flux<ByteBuffer>>> hKeys(Publisher<KeyCommand> commands);
+
+	/**
+	 * Get entry set (values) of hash at {@literal key}.
+	 *
+	 * @param key must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hvals">Redis Documentation: HVALS</a>
+	 */
+	default Flux<ByteBuffer> hVals(ByteBuffer key) {
+
+		Assert.notNull(key, "Key must not be null!");
+
+		return hVals(Mono.just(new KeyCommand(key))).flatMap(CommandResponse::getOutput);
+	}
+
+	/**
+	 * Get entry set (values) of hash at {@literal key}.
+	 *
+	 * @param commands must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hvals">Redis Documentation: HVALS</a>
+	 */
+	Flux<CommandResponse<KeyCommand, Flux<ByteBuffer>>> hVals(Publisher<KeyCommand> commands);
+
+	/**
+	 * Get entire hash stored at {@literal key}.
+	 *
+	 * @param key must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hgetall">Redis Documentation: HGETALL</a>
+	 */
+	default Flux<Map.Entry<ByteBuffer, ByteBuffer>> hGetAll(ByteBuffer key) {
+
+		Assert.notNull(key, "Key must not be null!");
+
+		return hGetAll(Mono.just(new KeyCommand(key))).flatMap(CommandResponse::getOutput);
+	}
+
+	/**
+	 * Get entire hash stored at {@literal key}.
+	 *
+	 * @param commands must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hgetall">Redis Documentation: HGETALL</a>
+	 */
+	Flux<CommandResponse<KeyCommand, Flux<Map.Entry<ByteBuffer, ByteBuffer>>>> hGetAll(Publisher<KeyCommand> commands);
+
+	/**
+	 * Use a {@link Flux} to iterate over entries in the hash at {@code key}. The resulting {@link Flux} acts as a cursor
+	 * and issues {@code HSCAN} commands itself as long as the subscriber signals demand.
+	 *
+	 * @param key must not be {@literal null}.
+	 * @return the {@link Flux} emitting {@link Map.Entry entries} one by one.
+	 * @throws IllegalArgumentException in case the given key is {@literal null}.
+	 * @see <a href="https://redis.io/commands/hscan">Redis Documentation: HSCAN</a>
+	 * @since 2.1
+	 */
+	default Flux<Map.Entry<ByteBuffer, ByteBuffer>> hScan(ByteBuffer key) {
+		return hScan(key, ScanOptions.NONE);
+	}
+
+	/**
+	 * Use a {@link Flux} to iterate over entries in the hash at {@code key} given {@link ScanOptions}. The resulting
+	 * {@link Flux} acts as a cursor and issues {@code HSCAN} commands itself as long as the subscriber signals demand.
+	 *
+	 * @param key must not be {@literal null}.
+	 * @param options must not be {@literal null}. Use {@link ScanOptions#NONE} instead.
+	 * @return the {@link Flux} emitting the raw {@link Map.Entry entries} one by one.
+	 * @throws IllegalArgumentException in case one of the required arguments is {@literal null}.
+	 * @see <a href="https://redis.io/commands/hscan">Redis Documentation: HSCAN</a>
+	 * @since 2.1
+	 */
+	default Flux<Map.Entry<ByteBuffer, ByteBuffer>> hScan(ByteBuffer key, ScanOptions options) {
+
+		return hScan(Mono.just(KeyScanCommand.key(key).withOptions(options))).map(CommandResponse::getOutput)
+				.flatMap(it -> it);
+	}
+
+	/**
+	 * Use a {@link Flux} to iterate over entries in the hash at {@code key}. The resulting {@link Flux} acts as a cursor
+	 * and issues {@code HSCAN} commands itself as long as the subscriber signals demand.
+	 *
+	 * @param commands must not be {@literal null}.
+	 * @return the {@link Flux} emitting {@link CommandResponse} one by one.
+	 * @see <a href="https://redis.io/commands/hscan">Redis Documentation: HSCAN</a>
+	 * @since 2.1
+	 */
+	Flux<CommandResponse<KeyCommand, Flux<Map.Entry<ByteBuffer, ByteBuffer>>>> hScan(Publisher<KeyScanCommand> commands);
+
+	/**
+	 * @author Christoph Strobl
+	 * @see <a href="https://redis.io/commands/hstrlen">Redis Documentation: HSTRLEN</a>
+	 * @since 2.1
+	 */
+	class HStrLenCommand extends KeyCommand {
+
+		private ByteBuffer field;
+
+		/**
+		 * Creates a new {@link HStrLenCommand} given a {@code key}.
+		 *
+		 * @param key can be {@literal null}.
+		 * @param field must not be {@literal null}.
+		 */
+		private HStrLenCommand(@Nullable ByteBuffer key, ByteBuffer field) {
+
+			super(key);
+			this.field = field;
+		}
+
+		/**
+		 * Specify the {@code field} within the hash to get the length of the {@code value} of.ø
+		 *
+		 * @param field must not be {@literal null}.
+		 * @return new instance of {@link HStrLenCommand}.
+		 */
+		public static HStrLenCommand lengthOf(ByteBuffer field) {
+
+			Assert.notNull(field, "Field must not be null!");
+			return new HStrLenCommand(null, field);
+		}
+
+		/**
+		 * Define the {@code key} the hash is stored at.
+		 *
+		 * @param key must not be {@literal null}.
+		 * @return new instance of {@link HStrLenCommand}.
+		 */
+		public HStrLenCommand from(ByteBuffer key) {
+			return new HStrLenCommand(key, field);
+		}
+
+		/**
+		 * @return the field.
+		 */
+		public ByteBuffer getField() {
+			return field;
+		}
+	}
+
+	/**
+	 * Get the length of the value associated with {@code field}. If either the {@code key} or the {@code field} do not
+	 * exist, {@code 0} is emitted.
+	 *
+	 * @param key must not be {@literal null}.
+	 * @param field must not be {@literal null}.
+	 * @return never {@literal null}.
+	 * @since 2.1
+	 */
+	default Mono<Long> hStrLen(ByteBuffer key, ByteBuffer field) {
+
+		Assert.notNull(key, "Key must not be null!");
+		Assert.notNull(field, "Field must not be null!");
+
+		return hStrLen(Mono.just(HStrLenCommand.lengthOf(field).from(key))).next().map(NumericResponse::getOutput);
+	}
+
+	/**
+	 * Get the length of the value associated with {@code field}. If either the {@code key} or the {@code field} do not
+	 * exist, {@code 0} is emitted.
+	 *
+	 * @param commands must not be {@literal null}.
+	 * @return never {@literal null}.
+	 * @since 2.1
+	 */
+	Flux<NumericResponse<HStrLenCommand, Long>> hStrLen(Publisher<HStrLenCommand> commands);
+}

+ 1 - 1
jetlinks-standalone/src/main/resources/application.yml

@@ -21,7 +21,7 @@ spring:
       pool:
         max-active: 1024
     timeout: 20s
-
+    serializer: jdk # 设置fst时,redis key使用string序列化,value使用 fst序列化.
   #        max-wait: 10s
   r2dbc:
     url: r2dbc:postgresql://localhost:5432/jetlinks