Browse Source

Merge remote-tracking branch 'origin/master'

wangzheng 5 years ago
parent
commit
a8b7d5ea35

+ 2 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceInstanceEntity.java

@@ -20,6 +20,7 @@ import javax.persistence.GeneratedValue;
 import javax.persistence.Index;
 import javax.persistence.Index;
 import javax.persistence.Table;
 import javax.persistence.Table;
 import javax.validation.constraints.NotBlank;
 import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.Pattern;
 import java.sql.JDBCType;
 import java.sql.JDBCType;
 import java.util.Collections;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Map;
@@ -33,6 +34,7 @@ public class DeviceInstanceEntity extends GenericEntity<String> implements Recor
 
 
     @Override
     @Override
     @GeneratedValue(generator = Generators.SNOW_FLAKE)
     @GeneratedValue(generator = Generators.SNOW_FLAKE)
+    @Pattern(regexp = "^[0-9a-zA-Z_\\-]+$", message = "ID只能由英文下划线和中划线组成",groups = CreateGroup.class)
     public String getId() {
     public String getId() {
         return super.getId();
         return super.getId();
     }
     }

+ 5 - 1
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceProductEntity.java

@@ -15,6 +15,7 @@ import javax.persistence.Column;
 import javax.persistence.GeneratedValue;
 import javax.persistence.GeneratedValue;
 import javax.persistence.Table;
 import javax.persistence.Table;
 import javax.validation.constraints.NotBlank;
 import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.Pattern;
 import java.sql.JDBCType;
 import java.sql.JDBCType;
 import java.util.Map;
 import java.util.Map;
 
 
@@ -25,6 +26,10 @@ public class DeviceProductEntity extends GenericEntity<String> implements Record
 
 
     @Override
     @Override
     @GeneratedValue(generator = Generators.SNOW_FLAKE)
     @GeneratedValue(generator = Generators.SNOW_FLAKE)
+    @Pattern(
+        regexp = "^[0-9a-zA-Z_\\-]+$",
+        message = "ID只能由英文下划线和中划线组成",
+        groups = CreateGroup.class)
     public String getId() {
     public String getId() {
         return super.getId();
         return super.getId();
     }
     }
@@ -61,7 +66,6 @@ public class DeviceProductEntity extends GenericEntity<String> implements Record
     @Comment("协议元数据")
     @Comment("协议元数据")
     @Column(name = "metadata")
     @Column(name = "metadata")
     @ColumnType(jdbcType = JDBCType.CLOB)
     @ColumnType(jdbcType = JDBCType.CLOB)
-    @NotBlank(message = "元数据不能为空",groups = CreateGroup.class)
     private String metadata;
     private String metadata;
 
 
     @Comment("传输协议: MQTT,COAP,UDP")
     @Comment("传输协议: MQTT,COAP,UDP")

+ 15 - 77
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java

@@ -101,50 +101,6 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
             .as(super::save);
             .as(super::save);
     }
     }
 
 
-    /**
-     * 获取设备所有信息
-     *
-     * @param id 设备ID
-     * @return 设备详情信息
-     */
-    @Deprecated
-    public Mono<DeviceAllInfoResponse> getDeviceAllInfo(String id) {
-
-        return findById(id)//设备信息
-            .zipWhen(instance -> deviceProductService.findById(instance.getProductId()), DeviceInfo::of) //产品型号信息
-            .switchIfEmpty(Mono.error(NotFoundException::new))
-            .zipWhen(deviceInfo -> getDeviceRunRealInfo(id), DeviceAllInfoResponse::of) //设备运行状态
-            .zipWhen(info -> getDeviceLatestProperties(id).collectList(), DeviceAllInfoResponse::ofProperties) //设备属性
-            .zipWhen(info -> {
-                    DeviceMetadata deviceMetadata = new JetLinksDeviceMetadata(JSON.parseObject(info.getDeviceInfo().getDeriveMetadata()));
-                    return getEventCounts(deviceMetadata.getEvents(), id, info.getDeviceInfo().getProductId()); //事件数量统计
-                },
-                DeviceAllInfoResponse::ofEventCounts)
-            ;
-    }
-
-    /**
-     * 获取设备事件上报次数
-     *
-     * @param events    设备事件元数据
-     * @param deviceId  设备Id
-     * @param productId 型号id
-     * @return
-     */
-    private Mono<Map<String, Integer>> getEventCounts(List<EventMetadata> events, String deviceId, String productId) {
-        return Flux.merge(
-            events
-                .stream()
-                .map(Metadata::getId)
-                .map(eventId -> Query.of()
-                    .where("deviceId", deviceId)
-                    .execute(timeSeriesManager.getService(DeviceTimeSeriesMetric.deviceEventMetric(productId, eventId))::count)
-                    .map(count -> Tuples.of(eventId, count)))
-                .collect(Collectors.toList())
-        ).collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2));
-    }
-
-
     /**
     /**
      * 发布设备到设备注册中心
      * 发布设备到设备注册中心
      *
      *
@@ -214,6 +170,21 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
                     .execute()));
                     .execute()));
     }
     }
 
 
+    /**
+     * 批量注销设备
+     * @param ids 设备ID
+     * @return 注销结果
+     */
+    public Mono<Integer> unregisterDevice(Publisher<String> ids) {
+        return Flux.from(ids)
+            .flatMap(id -> registry.unregisterDevice(id).thenReturn(id))
+            .collectList()
+            .flatMap(list -> createUpdate()
+                .set(DeviceInstanceEntity::getState, DeviceState.notActive.getValue())
+                .where().in(DeviceInstanceEntity::getId, list)
+                .execute());
+    }
+
     public Mono<DeviceDetail> getDeviceDetail(String deviceId) {
     public Mono<DeviceDetail> getDeviceDetail(String deviceId) {
         return this.findById(deviceId)
         return this.findById(deviceId)
             .zipWhen(
             .zipWhen(
@@ -257,39 +228,6 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
             .defaultIfEmpty(DeviceState.notActive);
             .defaultIfEmpty(DeviceState.notActive);
     }
     }
 
 
-    @Deprecated
-    public Mono<DeviceRunInfo> getDeviceRunInfo(String deviceId) {
-        return getDeviceRunRealInfo(deviceId);
-    }
-
-    @Deprecated
-    private Mono<DeviceRunInfo> getDeviceRunRealInfo(String deviceId) {
-        return registry.getDevice(deviceId)
-            .flatMap(deviceOperator -> Mono.zip(
-                deviceOperator.getOnlineTime().switchIfEmpty(Mono.just(0L)),// 1
-                deviceOperator.getOfflineTime().switchIfEmpty(Mono.just(0L)),// 2
-                deviceOperator.checkState()
-                    .switchIfEmpty(deviceOperator.getState())
-                    .map(DeviceState::of)
-                    .defaultIfEmpty(DeviceState.notActive),// 3
-                deviceOperator.getConfig(DeviceConfigKey.metadata).switchIfEmpty(Mono.just("")),//4
-                deviceOperator.getConfig(DeviceConfigKey.productId).switchIfEmpty(Mono.just(""))//5
-                ).map(tuple4 -> DeviceRunInfo.of(
-                tuple4.getT1(), //1. 上线时间
-                tuple4.getT2(), //2. 离线时间
-                tuple4.getT3(), //3. 状态
-                tuple4.getT4(),  //4. 设备模型元数据
-                tuple4.getT5() //5. 设备类型ID
-                )
-                ).flatMap(deviceRunInfo -> createUpdate()
-                    .set(DeviceInstanceEntity::getState, deviceRunInfo.getState())
-                    .where(DeviceInstanceEntity::getId, deviceId)
-                    .execute()
-                    .thenReturn(deviceRunInfo))
-            );
-    }
-
-
     public Mono<PagerResult<DevicePropertiesEntity>> queryDeviceProperties(String deviceId, QueryParamEntity entity) {
     public Mono<PagerResult<DevicePropertiesEntity>> queryDeviceProperties(String deviceId, QueryParamEntity entity) {
         return registry.getDevice(deviceId)
         return registry.getDevice(deviceId)
             .flatMap(operator -> operator.getSelfConfig(DeviceConfigKey.productId))
             .flatMap(operator -> operator.getSelfConfig(DeviceConfigKey.productId))

+ 1 - 1
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceProductService.java

@@ -29,7 +29,7 @@ public class LocalDeviceProductService extends GenericReactiveCrudService<Device
 
 
     public Mono<Integer> deploy(String id) {
     public Mono<Integer> deploy(String id) {
         return findById(Mono.just(id))
         return findById(Mono.just(id))
-            .flatMap(product -> registry.registry(
+            .flatMap(product -> registry.register(
                 ProductInfo.builder()
                 ProductInfo.builder()
                     .id(id)
                     .id(id)
                     .protocol(product.getMessageProtocol())
                     .protocol(product.getMessageProtocol())

+ 37 - 42
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceInstanceController.java

@@ -17,10 +17,7 @@ import org.hswebframework.web.api.crud.entity.PagerResult;
 import org.hswebframework.web.api.crud.entity.QueryParamEntity;
 import org.hswebframework.web.api.crud.entity.QueryParamEntity;
 import org.hswebframework.web.authorization.Authentication;
 import org.hswebframework.web.authorization.Authentication;
 import org.hswebframework.web.authorization.Dimension;
 import org.hswebframework.web.authorization.Dimension;
-import org.hswebframework.web.authorization.annotation.Authorize;
-import org.hswebframework.web.authorization.annotation.QueryAction;
-import org.hswebframework.web.authorization.annotation.Resource;
-import org.hswebframework.web.authorization.annotation.SaveAction;
+import org.hswebframework.web.authorization.annotation.*;
 import org.hswebframework.web.bean.FastBeanCopier;
 import org.hswebframework.web.bean.FastBeanCopier;
 import org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController;
 import org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController;
 import org.hswebframework.web.exception.BusinessException;
 import org.hswebframework.web.exception.BusinessException;
@@ -72,8 +69,6 @@ public class DeviceInstanceController implements
     @Getter
     @Getter
     private final LocalDeviceInstanceService service;
     private final LocalDeviceInstanceService service;
 
 
-    private final TimeSeriesManager timeSeriesManager;
-
     private final DeviceRegistry registry;
     private final DeviceRegistry registry;
 
 
     private final LocalDeviceProductService productService;
     private final LocalDeviceProductService productService;
@@ -84,13 +79,11 @@ public class DeviceInstanceController implements
 
 
     @SuppressWarnings("all")
     @SuppressWarnings("all")
     public DeviceInstanceController(LocalDeviceInstanceService service,
     public DeviceInstanceController(LocalDeviceInstanceService service,
-                                    TimeSeriesManager timeSeriesManager,
                                     DeviceRegistry registry,
                                     DeviceRegistry registry,
                                     LocalDeviceProductService productService,
                                     LocalDeviceProductService productService,
                                     ImportExportService importExportService,
                                     ImportExportService importExportService,
                                     ReactiveRepository<DeviceTagEntity, String> tagRepository) {
                                     ReactiveRepository<DeviceTagEntity, String> tagRepository) {
         this.service = service;
         this.service = service;
-        this.timeSeriesManager = timeSeriesManager;
         this.registry = registry;
         this.registry = registry;
         this.productService = productService;
         this.productService = productService;
         this.importExportService = importExportService;
         this.importExportService = importExportService;
@@ -112,36 +105,15 @@ public class DeviceInstanceController implements
         return service.getDeviceState(id);
         return service.getDeviceState(id);
     }
     }
 
 
-    //已弃用 下一个版本删除
-    @GetMapping("/info/{id:.+}")
-    @QueryAction
-    @Deprecated
-    public Mono<DeviceInfo> getDeviceInfoById(@PathVariable String id) {
-        return service.getDeviceInfoById(id);
-    }
-
-    //已弃用 下一个版本删除
-    @GetMapping("/run-info/{id:.+}")
-    @QueryAction
-    @Deprecated
-    public Mono<DeviceRunInfo> getRunDeviceInfoById(@PathVariable String id) {
-        return service.getDeviceRunInfo(id);
-    }
-
-
-    @PostMapping({
-        "/deploy/{deviceId:.+}",//todo 已弃用 下一个版本删除
-        "/{deviceId:.+}/deploy"
-    })
+    //激活
+    @PostMapping("/{deviceId:.+}/deploy")
     @SaveAction
     @SaveAction
     public Mono<DeviceDeployResult> deviceDeploy(@PathVariable String deviceId) {
     public Mono<DeviceDeployResult> deviceDeploy(@PathVariable String deviceId) {
         return service.deploy(deviceId);
         return service.deploy(deviceId);
     }
     }
 
 
-    @PostMapping({
-        "/cancelDeploy/{deviceId:.+}", //todo 已弃用 下一个版本删除
-        "/{deviceId:.+}/undeploy"
-    })
+    //注销
+    @PostMapping( "/{deviceId:.+}/undeploy")
     @SaveAction
     @SaveAction
     public Mono<Integer> cancelDeploy(@PathVariable String deviceId) {
     public Mono<Integer> cancelDeploy(@PathVariable String deviceId) {
         return service.cancelDeploy(deviceId);
         return service.cancelDeploy(deviceId);
@@ -192,15 +164,6 @@ public class DeviceInstanceController implements
             .defaultIfEmpty(0);
             .defaultIfEmpty(0);
     }
     }
 
 
-    //已废弃
-    @GetMapping("/{productId:.+}/{deviceId:.+}/properties")
-    @Deprecated
-    @QueryAction
-    public Flux<DevicePropertiesEntity> getDeviceLatestProperties(@PathVariable String productId,
-                                                                  @PathVariable String deviceId) {
-        return service.getDeviceLatestProperties(deviceId);
-    }
-
     //获取最新的设备属性
     //获取最新的设备属性
     @GetMapping("/{deviceId:.+}/properties/latest")
     @GetMapping("/{deviceId:.+}/properties/latest")
     @QueryAction
     @QueryAction
@@ -291,6 +254,38 @@ public class DeviceInstanceController implements
             .thenMany(getDeviceTags(deviceId));
             .thenMany(getDeviceTags(deviceId));
     }
     }
 
 
+    /**
+     * 批量删除设备,只会删除未激活的设备.
+     *
+     * @param idList ID列表
+     * @return 被删除数量
+     * @since 1.1
+     */
+    @PutMapping("/batch/_delete")
+    @DeleteAction
+    public Mono<Integer> deleteBatch(@RequestBody Flux<String> idList) {
+        return idList
+            .collectList()
+            .flatMap(list -> service.createDelete()
+                .where()
+                .in(DeviceInstanceEntity::getId, list)
+                .and(DeviceInstanceEntity::getState, DeviceState.notActive)
+                .execute());
+    }
+
+    /**
+     * 批量注销设备
+     *
+     * @param idList ID列表
+     * @return 被注销的数量
+     * @since 1.1
+     */
+    @PutMapping("/batch/_unDeploy")
+    @SaveAction
+    public Mono<Integer> unDeployBatch(@RequestBody Flux<String> idList) {
+        return service.unregisterDevice(idList);
+    }
+
 
 
     @GetMapping(value = "/import", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
     @GetMapping(value = "/import", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
     @ApiOperation("批量导入数据")
     @ApiOperation("批量导入数据")

+ 6 - 1
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/DeviceAlarmService.java

@@ -22,7 +22,12 @@ public class DeviceAlarmService extends GenericReactiveCrudService<DeviceAlarmEn
     }
     }
 
 
     public Mono<Void> stop(String id) {
     public Mono<Void> stop(String id) {
-        return instanceService.stop(id);
+        return instanceService.stop(id)
+            .then(createUpdate()
+                .set(DeviceAlarmEntity::getState,AlarmState.stopped)
+                .where(DeviceAlarmEntity::getId,id)
+                .execute())
+            .then();
     }
     }
 
 
     private Mono<Void> doStart(DeviceAlarmEntity entity) {
     private Mono<Void> doStart(DeviceAlarmEntity entity) {

+ 7 - 0
jetlinks-standalone/pom.xml

@@ -216,6 +216,13 @@
             <version>${hsweb.framework.version}</version>
             <version>${hsweb.framework.version}</version>
         </dependency>
         </dependency>
 
 
+        <dependency>
+            <groupId>de.ruedigermoeller</groupId>
+            <artifactId>fst</artifactId>
+            <version>2.57</version>
+        </dependency>
+
+
     </dependencies>
     </dependencies>
 
 
 
 

+ 41 - 0
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/JetLinksRedisConfiguration.java

@@ -0,0 +1,41 @@
+package org.jetlinks.community.standalone.configuration;
+
+import org.jetlinks.community.standalone.configuration.fst.FstSerializationRedisSerializer;
+import org.nustaq.serialization.FSTConfiguration;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.core.io.ResourceLoader;
+import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
+import org.springframework.data.redis.core.ReactiveRedisTemplate;
+import org.springframework.data.redis.serializer.RedisSerializationContext;
+import org.springframework.data.redis.serializer.RedisSerializer;
+import org.springframework.data.redis.serializer.StringRedisSerializer;
+
+@Configuration
+@ConditionalOnProperty(prefix = "spring.redis",name = "serializer",havingValue = "fst")
+public class JetLinksRedisConfiguration {
+
+    @Bean
+    public ReactiveRedisTemplate<Object, Object> reactiveRedisTemplate(
+        ReactiveRedisConnectionFactory reactiveRedisConnectionFactory, ResourceLoader resourceLoader) {
+
+        FstSerializationRedisSerializer serializer = new FstSerializationRedisSerializer(() -> {
+            FSTConfiguration configuration = FSTConfiguration.createDefaultConfiguration()
+                .setForceSerializable(true);
+            configuration.setClassLoader(resourceLoader.getClassLoader());
+            return configuration;
+        });
+        @SuppressWarnings("all")
+        RedisSerializationContext<Object, Object> serializationContext = RedisSerializationContext
+            .newSerializationContext()
+            .key((RedisSerializer)new StringRedisSerializer())
+            .value(serializer)
+            .hashKey(StringRedisSerializer.UTF_8)
+            .hashValue(serializer)
+            .build();
+
+        return new ReactiveRedisTemplate<>(reactiveRedisConnectionFactory, serializationContext);
+    }
+
+}

+ 48 - 0
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/fst/FstSerializationRedisSerializer.java

@@ -0,0 +1,48 @@
+package org.jetlinks.community.standalone.configuration.fst;
+
+import io.netty.util.concurrent.FastThreadLocal;
+import lombok.AllArgsConstructor;
+import lombok.SneakyThrows;
+import org.nustaq.serialization.FSTConfiguration;
+import org.nustaq.serialization.FSTObjectInput;
+import org.nustaq.serialization.FSTObjectOutput;
+import org.springframework.data.redis.serializer.RedisSerializer;
+import org.springframework.data.redis.serializer.SerializationException;
+
+import java.io.ByteArrayOutputStream;
+import java.util.function.Supplier;
+
+@AllArgsConstructor
+public class FstSerializationRedisSerializer implements RedisSerializer<Object> {
+
+    private final FastThreadLocal<FSTConfiguration> configuration;
+
+    public FstSerializationRedisSerializer(Supplier<FSTConfiguration> supplier) {
+
+        this(new FastThreadLocal<FSTConfiguration>() {
+            @Override
+            protected FSTConfiguration initialValue() {
+                return supplier.get();
+            }
+        });
+    }
+
+    @Override
+    @SneakyThrows
+    public byte[] serialize(Object o) throws SerializationException {
+        ByteArrayOutputStream arr = new ByteArrayOutputStream(1024);
+        try (FSTObjectOutput output = configuration.get().getObjectOutput(arr)) {
+            output.writeObject(o);
+        }
+        return arr.toByteArray();
+    }
+
+    @Override
+    @SneakyThrows
+    public Object deserialize(byte[] bytes) throws SerializationException {
+
+        try (FSTObjectInput input = configuration.get().getObjectInput(bytes)) {
+            return input.readObject();
+        }
+    }
+}

+ 704 - 0
jetlinks-standalone/src/main/java/org/springframework/data/redis/connection/ReactiveHashCommands.java

@@ -0,0 +1,704 @@
+/*
+ * Copyright 2016-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.redis.connection;
+
+import org.reactivestreams.Publisher;
+import org.springframework.dao.InvalidDataAccessApiUsageException;
+import org.springframework.data.redis.connection.ReactiveRedisConnection.*;
+import org.springframework.data.redis.core.ScanOptions;
+import org.springframework.lang.Nullable;
+import org.springframework.util.Assert;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.function.Function;
+
+/**
+ * Redis Hash commands executed using reactive infrastructure.
+ *
+ * @author Christoph Strobl
+ * @author Mark Paluch
+ * @since 2.0
+ */
+public interface ReactiveHashCommands {
+
+	/**
+	 * {@literal HSET} {@link Command}.
+	 *
+	 * @author Christoph Strobl
+	 * @see <a href="https://redis.io/commands/hset">Redis Documentation: HSET</a>
+	 */
+	class HSetCommand extends KeyCommand {
+
+		private static final ByteBuffer SINGLE_VALUE_KEY = ByteBuffer.allocate(0);
+		private final Map<ByteBuffer, ByteBuffer> fieldValueMap;
+		private final boolean upsert;
+
+		private HSetCommand(@Nullable ByteBuffer key, Map<ByteBuffer, ByteBuffer> keyValueMap, boolean upsert) {
+
+			super(key);
+
+			this.fieldValueMap = keyValueMap;
+			this.upsert = upsert;
+		}
+
+		/**
+		 * Creates a new {@link HSetCommand} given a {@link ByteBuffer key}.
+		 *
+		 * @param value must not be {@literal null}.
+		 * @return a new {@link HSetCommand} for {@link ByteBuffer key}.
+		 */
+		public static HSetCommand value(ByteBuffer value) {
+
+			Assert.notNull(value, "Value must not be null!");
+
+			return new HSetCommand(null, Collections.singletonMap(SINGLE_VALUE_KEY, value), Boolean.TRUE);
+		}
+
+		/**
+		 * Creates a new {@link HSetCommand} given a {@link Map} of field values.
+		 *
+		 * @param fieldValueMap must not be {@literal null}.
+		 * @return a new {@link HSetCommand} for a {@link Map} of field values.
+		 */
+		public static HSetCommand fieldValues(Map<ByteBuffer, ByteBuffer> fieldValueMap) {
+
+			Assert.notNull(fieldValueMap, "Field values map must not be null!");
+
+			return new HSetCommand(null, fieldValueMap, Boolean.TRUE);
+		}
+
+		/**
+		 * Applies a field. Constructs a new command instance with all previously configured properties.
+		 *
+		 * @param field must not be {@literal null}.
+		 * @return a new {@link HSetCommand} with {@literal field} applied.
+		 */
+		public HSetCommand ofField(ByteBuffer field) {
+
+			if (!fieldValueMap.containsKey(SINGLE_VALUE_KEY)) {
+				throw new InvalidDataAccessApiUsageException("Value has not been set.");
+			}
+
+			Assert.notNull(field, "Field not be null!");
+
+			return new HSetCommand(getKey(), Collections.singletonMap(field, fieldValueMap.get(SINGLE_VALUE_KEY)), upsert);
+		}
+
+		/**
+		 * Applies the {@literal key}. Constructs a new command instance with all previously configured properties.
+		 *
+		 * @param key must not be {@literal null}.
+		 * @return a new {@link HSetCommand} with {@literal key} applied.
+		 */
+		public HSetCommand forKey(ByteBuffer key) {
+
+			Assert.notNull(key, "Key not be null!");
+
+			return new HSetCommand(key, fieldValueMap, upsert);
+		}
+
+		/**
+		 * Disable upsert. Constructs a new command instance with all previously configured properties.
+		 *
+		 * @return a new {@link HSetCommand} with upsert disabled.
+		 */
+		public HSetCommand ifValueNotExists() {
+			return new HSetCommand(getKey(), fieldValueMap, Boolean.FALSE);
+		}
+
+		/**
+		 * @return
+		 */
+		public boolean isUpsert() {
+			return upsert;
+		}
+
+		/**
+		 * @return never {@literal null}.
+		 */
+		public Map<ByteBuffer, ByteBuffer> getFieldValueMap() {
+			return fieldValueMap;
+		}
+	}
+
+	/**
+	 * Set the {@literal value} of a hash {@literal field}.
+	 *
+	 * @param key must not be {@literal null}.
+	 * @param field must not be {@literal null}.
+	 * @param value must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hset">Redis Documentation: HSET</a>
+	 */
+	default Mono<Boolean> hSet(ByteBuffer key, ByteBuffer field, ByteBuffer value) {
+
+		Assert.notNull(key, "Key must not be null!");
+		Assert.notNull(field, "Field must not be null!");
+		Assert.notNull(value, "Value must not be null!");
+
+		return hSet(Mono.just(HSetCommand.value(value).ofField(field).forKey(key)))
+				.next()
+				.flatMap(response->Mono.justOrEmpty(response.getOutput()));
+	}
+
+	/**
+	 * Set the {@literal value} of a hash {@literal field}.
+	 *
+	 * @param key must not be {@literal null}.
+	 * @param field must not be {@literal null}.
+	 * @param value must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hsetnx">Redis Documentation: HSETNX</a>
+	 */
+	default Mono<Boolean> hSetNX(ByteBuffer key, ByteBuffer field, ByteBuffer value) {
+
+		Assert.notNull(key, "Key must not be null!");
+		Assert.notNull(field, "Field must not be null!");
+		Assert.notNull(value, "Value must not be null!");
+
+		return hSet(Mono.just(HSetCommand.value(value).ofField(field).forKey(key).ifValueNotExists()))
+				.next()
+				.flatMap(response->Mono.justOrEmpty(response.getOutput()));
+	}
+
+	/**
+	 * Set multiple hash fields to multiple values using data provided in {@literal fieldValueMap}.
+	 *
+	 * @param key must not be {@literal null}.
+	 * @param fieldValueMap must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hmset">Redis Documentation: HMSET</a>
+	 */
+	default Mono<Boolean> hMSet(ByteBuffer key, Map<ByteBuffer, ByteBuffer> fieldValueMap) {
+
+		Assert.notNull(key, "Key must not be null!");
+		Assert.notNull(fieldValueMap, "Field must not be null!");
+
+		return hSet(Mono.just(HSetCommand.fieldValues(fieldValueMap).forKey(key))).next().map(it -> true);
+	}
+
+	/**
+	 * Set the {@literal value} of a hash {@literal field}.
+	 *
+	 * @param commands must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hset">Redis Documentation: HSET</a>
+	 */
+	Flux<BooleanResponse<HSetCommand>> hSet(Publisher<HSetCommand> commands);
+
+	/**
+	 * {@literal HGET} {@link Command}.
+	 *
+	 * @author Christoph Strobl
+	 * @see <a href="https://redis.io/commands/hget">Redis Documentation: HGET</a>
+	 */
+	class HGetCommand extends KeyCommand {
+
+		private List<ByteBuffer> fields;
+
+		private HGetCommand(@Nullable ByteBuffer key, List<ByteBuffer> fields) {
+
+			super(key);
+
+			this.fields = fields;
+		}
+
+		/**
+		 * Creates a new {@link HGetCommand} given a {@link ByteBuffer field name}.
+		 *
+		 * @param field must not be {@literal null}.
+		 * @return a new {@link HGetCommand} for a {@link ByteBuffer field name}.
+		 */
+		public static HGetCommand field(ByteBuffer field) {
+
+			Assert.notNull(field, "Field must not be null!");
+
+			return new HGetCommand(null, Collections.singletonList(field));
+		}
+
+		/**
+		 * Creates a new {@link HGetCommand} given a {@link Collection} of field names.
+		 *
+		 * @param fields must not be {@literal null}.
+		 * @return a new {@link HGetCommand} for a {@link Collection} of field names.
+		 */
+		public static HGetCommand fields(Collection<ByteBuffer> fields) {
+
+			Assert.notNull(fields, "Fields must not be null!");
+
+			return new HGetCommand(null, new ArrayList<>(fields));
+		}
+
+		/**
+		 * Applies the hash {@literal key}. Constructs a new command instance with all previously configured properties.
+		 *
+		 * @param key must not be {@literal null}.
+		 * @return a new {@link HGetCommand} with {@literal key} applied.
+		 */
+		public HGetCommand from(ByteBuffer key) {
+
+			Assert.notNull(key, "Key must not be null!");
+
+			return new HGetCommand(key, fields);
+		}
+
+		/**
+		 * @return never {@literal null}.
+		 */
+		public List<ByteBuffer> getFields() {
+			return fields;
+		}
+	}
+
+	/**
+	 * Get value for given {@literal field} from hash at {@literal key}.
+	 *
+	 * @param key must not be {@literal null}.
+	 * @param field must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hget">Redis Documentation: HGET</a>
+	 */
+	default Mono<ByteBuffer> hGet(ByteBuffer key, ByteBuffer field) {
+		return hMGet(key, Collections.singletonList(field)).flatMapIterable(Function.identity()).next();
+	}
+
+	/**
+	 * Get values for given {@literal fields} from hash at {@literal key}.
+	 *
+	 * @param key must not be {@literal null}.
+	 * @param fields must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hmget">Redis Documentation: HMGET</a>
+	 */
+	default Mono<List<ByteBuffer>> hMGet(ByteBuffer key, Collection<ByteBuffer> fields) {
+
+		Assert.notNull(key, "Key must not be null!");
+		Assert.notNull(fields, "Fields must not be null!");
+
+		return hMGet(Mono.just(HGetCommand.fields(fields).from(key)))
+				.next()
+				.flatMap(res->Mono.justOrEmpty(res.getOutput()));
+	}
+
+	/**
+	 * Get values for given {@literal fields} from hash at {@literal key}.
+	 *
+	 * @param commands must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hmget">Redis Documentation: HMGET</a>
+	 */
+	Flux<MultiValueResponse<HGetCommand, ByteBuffer>> hMGet(Publisher<HGetCommand> commands);
+
+	/**
+	 * {@literal HEXISTS} {@link Command}.
+	 *
+	 * @author Christoph Strobl
+	 * @see <a href="https://redis.io/commands/hexists">Redis Documentation: HEXISTS</a>
+	 */
+	class HExistsCommand extends KeyCommand {
+
+		private final ByteBuffer field;
+
+		private HExistsCommand(@Nullable ByteBuffer key, ByteBuffer field) {
+
+			super(key);
+
+			this.field = field;
+		}
+
+		/**
+		 * Creates a new {@link HExistsCommand} given a {@link ByteBuffer field name}.
+		 *
+		 * @param field must not be {@literal null}.
+		 * @return a new {@link HExistsCommand} for a {@link ByteBuffer field name}.
+		 */
+		public static HExistsCommand field(ByteBuffer field) {
+
+			Assert.notNull(field, "Field must not be null!");
+
+			return new HExistsCommand(null, field);
+		}
+
+		/**
+		 * Applies the hash {@literal key}. Constructs a new command instance with all previously configured properties.
+		 *
+		 * @param key must not be {@literal null}.
+		 * @return a new {@link HExistsCommand} with {@literal key} applied.
+		 */
+		public HExistsCommand in(ByteBuffer key) {
+
+			Assert.notNull(key, "Key must not be null!");
+
+			return new HExistsCommand(key, field);
+		}
+
+		/**
+		 * @return never {@literal null}.
+		 */
+		public ByteBuffer getField() {
+			return field;
+		}
+	}
+
+	/**
+	 * Determine if given hash {@literal field} exists.
+	 *
+	 * @param key must not be {@literal null}.
+	 * @param field must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hexists">Redis Documentation: HEXISTS</a>
+	 */
+	default Mono<Boolean> hExists(ByteBuffer key, ByteBuffer field) {
+
+		Assert.notNull(key, "Key must not be null!");
+		Assert.notNull(field, "Field must not be null!");
+
+		return hExists(Mono.just(HExistsCommand.field(field).in(key)))
+				.next()
+				.flatMap(response->Mono.justOrEmpty(response.getOutput()));
+	}
+
+	/**
+	 * Determine if given hash {@literal field} exists.
+	 *
+	 * @param commands
+	 * @return
+	 * @see <a href="https://redis.io/commands/hexists">Redis Documentation: HEXISTS</a>
+	 */
+	Flux<BooleanResponse<HExistsCommand>> hExists(Publisher<HExistsCommand> commands);
+
+	/**
+	 * @author Christoph Strobl
+	 * @see <a href="https://redis.io/commands/hdel">Redis Documentation: HDEL</a>
+	 */
+	class HDelCommand extends KeyCommand {
+
+		private final List<ByteBuffer> fields;
+
+		private HDelCommand(@Nullable ByteBuffer key, List<ByteBuffer> fields) {
+
+			super(key);
+
+			this.fields = fields;
+		}
+
+		/**
+		 * Creates a new {@link HDelCommand} given a {@link ByteBuffer field name}.
+		 *
+		 * @param field must not be {@literal null}.
+		 * @return a new {@link HDelCommand} for a {@link ByteBuffer field name}.
+		 */
+		public static HDelCommand field(ByteBuffer field) {
+
+			Assert.notNull(field, "Field must not be null!");
+
+			return new HDelCommand(null, Collections.singletonList(field));
+		}
+
+		/**
+		 * Creates a new {@link HDelCommand} given a {@link Collection} of field names.
+		 *
+		 * @param fields must not be {@literal null}.
+		 * @return a new {@link HDelCommand} for a {@link Collection} of field names.
+		 */
+		public static HDelCommand fields(Collection<ByteBuffer> fields) {
+
+			Assert.notNull(fields, "Fields must not be null!");
+
+			return new HDelCommand(null, new ArrayList<>(fields));
+		}
+
+		/**
+		 * Applies the hash {@literal key}. Constructs a new command instance with all previously configured properties.
+		 *
+		 * @param key must not be {@literal null}.
+		 * @return a new {@link HDelCommand} with {@literal key} applied.
+		 */
+		public HDelCommand from(ByteBuffer key) {
+
+			Assert.notNull(key, "Key must not be null!");
+
+			return new HDelCommand(key, fields);
+		}
+
+		/**
+		 * @return never {@literal null}.
+		 */
+		public List<ByteBuffer> getFields() {
+			return fields;
+		}
+	}
+
+	/**
+	 * Delete given hash {@literal field}.
+	 *
+	 * @param key must not be {@literal null}.
+	 * @param field must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hdel">Redis Documentation: HDEL</a>
+	 */
+	default Mono<Boolean> hDel(ByteBuffer key, ByteBuffer field) {
+
+		Assert.notNull(field, "Field must not be null!");
+
+		return hDel(key, Collections.singletonList(field)).map(val -> val > 0 ? Boolean.TRUE : Boolean.FALSE);
+	}
+
+	/**
+	 * Delete given hash {@literal fields}.
+	 *
+	 * @param key must not be {@literal null}.
+	 * @param fields must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hdel">Redis Documentation: HDEL</a>
+	 */
+	default Mono<Long> hDel(ByteBuffer key, Collection<ByteBuffer> fields) {
+
+		Assert.notNull(key, "Key must not be null!");
+		Assert.notNull(fields, "Fields must not be null!");
+
+		return hDel(Mono.just(HDelCommand.fields(fields).from(key)))
+				.next()
+				.flatMap(response->Mono.justOrEmpty(response.getOutput()));
+	}
+
+	/**
+	 * Delete given hash {@literal fields}.
+	 *
+	 * @param commands must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hdel">Redis Documentation: HDEL</a>
+	 */
+	Flux<NumericResponse<HDelCommand, Long>> hDel(Publisher<HDelCommand> commands);
+
+	/**
+	 * Get size of hash at {@literal key}.
+	 *
+	 * @param key must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hlen">Redis Documentation: HLEN</a>
+	 */
+	default Mono<Long> hLen(ByteBuffer key) {
+
+		Assert.notNull(key, "Key must not be null!");
+
+		return hLen(Mono.just(new KeyCommand(key)))
+				.next()
+				.flatMap(response->Mono.justOrEmpty(response.getOutput()));
+	}
+
+	/**
+	 * Get size of hash at {@literal key}.
+	 *
+	 * @param commands must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hlen">Redis Documentation: HLEN</a>
+	 */
+	Flux<NumericResponse<KeyCommand, Long>> hLen(Publisher<KeyCommand> commands);
+
+	/**
+	 * Get key set (fields) of hash at {@literal key}.
+	 *
+	 * @param key must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hkeys">Redis Documentation: HKEYS</a>
+	 */
+	default Flux<ByteBuffer> hKeys(ByteBuffer key) {
+
+		Assert.notNull(key, "Key must not be null!");
+
+		return hKeys(Mono.just(new KeyCommand(key)))
+				.flatMap(CommandResponse::getOutput);
+	}
+
+	/**
+	 * Get key set (fields) of hash at {@literal key}.
+	 *
+	 * @param commands must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hkeys">Redis Documentation: HKEYS</a>
+	 */
+	Flux<CommandResponse<KeyCommand, Flux<ByteBuffer>>> hKeys(Publisher<KeyCommand> commands);
+
+	/**
+	 * Get entry set (values) of hash at {@literal key}.
+	 *
+	 * @param key must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hvals">Redis Documentation: HVALS</a>
+	 */
+	default Flux<ByteBuffer> hVals(ByteBuffer key) {
+
+		Assert.notNull(key, "Key must not be null!");
+
+		return hVals(Mono.just(new KeyCommand(key))).flatMap(CommandResponse::getOutput);
+	}
+
+	/**
+	 * Get entry set (values) of hash at {@literal key}.
+	 *
+	 * @param commands must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hvals">Redis Documentation: HVALS</a>
+	 */
+	Flux<CommandResponse<KeyCommand, Flux<ByteBuffer>>> hVals(Publisher<KeyCommand> commands);
+
+	/**
+	 * Get entire hash stored at {@literal key}.
+	 *
+	 * @param key must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hgetall">Redis Documentation: HGETALL</a>
+	 */
+	default Flux<Map.Entry<ByteBuffer, ByteBuffer>> hGetAll(ByteBuffer key) {
+
+		Assert.notNull(key, "Key must not be null!");
+
+		return hGetAll(Mono.just(new KeyCommand(key))).flatMap(CommandResponse::getOutput);
+	}
+
+	/**
+	 * Get entire hash stored at {@literal key}.
+	 *
+	 * @param commands must not be {@literal null}.
+	 * @return
+	 * @see <a href="https://redis.io/commands/hgetall">Redis Documentation: HGETALL</a>
+	 */
+	Flux<CommandResponse<KeyCommand, Flux<Map.Entry<ByteBuffer, ByteBuffer>>>> hGetAll(Publisher<KeyCommand> commands);
+
+	/**
+	 * Use a {@link Flux} to iterate over entries in the hash at {@code key}. The resulting {@link Flux} acts as a cursor
+	 * and issues {@code HSCAN} commands itself as long as the subscriber signals demand.
+	 *
+	 * @param key must not be {@literal null}.
+	 * @return the {@link Flux} emitting {@link Map.Entry entries} one by one.
+	 * @throws IllegalArgumentException in case the given key is {@literal null}.
+	 * @see <a href="https://redis.io/commands/hscan">Redis Documentation: HSCAN</a>
+	 * @since 2.1
+	 */
+	default Flux<Map.Entry<ByteBuffer, ByteBuffer>> hScan(ByteBuffer key) {
+		return hScan(key, ScanOptions.NONE);
+	}
+
+	/**
+	 * Use a {@link Flux} to iterate over entries in the hash at {@code key} given {@link ScanOptions}. The resulting
+	 * {@link Flux} acts as a cursor and issues {@code HSCAN} commands itself as long as the subscriber signals demand.
+	 *
+	 * @param key must not be {@literal null}.
+	 * @param options must not be {@literal null}. Use {@link ScanOptions#NONE} instead.
+	 * @return the {@link Flux} emitting the raw {@link Map.Entry entries} one by one.
+	 * @throws IllegalArgumentException in case one of the required arguments is {@literal null}.
+	 * @see <a href="https://redis.io/commands/hscan">Redis Documentation: HSCAN</a>
+	 * @since 2.1
+	 */
+	default Flux<Map.Entry<ByteBuffer, ByteBuffer>> hScan(ByteBuffer key, ScanOptions options) {
+
+		return hScan(Mono.just(KeyScanCommand.key(key).withOptions(options))).map(CommandResponse::getOutput)
+				.flatMap(it -> it);
+	}
+
+	/**
+	 * Use a {@link Flux} to iterate over entries in the hash at {@code key}. The resulting {@link Flux} acts as a cursor
+	 * and issues {@code HSCAN} commands itself as long as the subscriber signals demand.
+	 *
+	 * @param commands must not be {@literal null}.
+	 * @return the {@link Flux} emitting {@link CommandResponse} one by one.
+	 * @see <a href="https://redis.io/commands/hscan">Redis Documentation: HSCAN</a>
+	 * @since 2.1
+	 */
+	Flux<CommandResponse<KeyCommand, Flux<Map.Entry<ByteBuffer, ByteBuffer>>>> hScan(Publisher<KeyScanCommand> commands);
+
+	/**
+	 * @author Christoph Strobl
+	 * @see <a href="https://redis.io/commands/hstrlen">Redis Documentation: HSTRLEN</a>
+	 * @since 2.1
+	 */
+	class HStrLenCommand extends KeyCommand {
+
+		private ByteBuffer field;
+
+		/**
+		 * Creates a new {@link HStrLenCommand} given a {@code key}.
+		 *
+		 * @param key can be {@literal null}.
+		 * @param field must not be {@literal null}.
+		 */
+		private HStrLenCommand(@Nullable ByteBuffer key, ByteBuffer field) {
+
+			super(key);
+			this.field = field;
+		}
+
+		/**
+		 * Specify the {@code field} within the hash to get the length of the {@code value} of.ø
+		 *
+		 * @param field must not be {@literal null}.
+		 * @return new instance of {@link HStrLenCommand}.
+		 */
+		public static HStrLenCommand lengthOf(ByteBuffer field) {
+
+			Assert.notNull(field, "Field must not be null!");
+			return new HStrLenCommand(null, field);
+		}
+
+		/**
+		 * Define the {@code key} the hash is stored at.
+		 *
+		 * @param key must not be {@literal null}.
+		 * @return new instance of {@link HStrLenCommand}.
+		 */
+		public HStrLenCommand from(ByteBuffer key) {
+			return new HStrLenCommand(key, field);
+		}
+
+		/**
+		 * @return the field.
+		 */
+		public ByteBuffer getField() {
+			return field;
+		}
+	}
+
+	/**
+	 * Get the length of the value associated with {@code field}. If either the {@code key} or the {@code field} do not
+	 * exist, {@code 0} is emitted.
+	 *
+	 * @param key must not be {@literal null}.
+	 * @param field must not be {@literal null}.
+	 * @return never {@literal null}.
+	 * @since 2.1
+	 */
+	default Mono<Long> hStrLen(ByteBuffer key, ByteBuffer field) {
+
+		Assert.notNull(key, "Key must not be null!");
+		Assert.notNull(field, "Field must not be null!");
+
+		return hStrLen(Mono.just(HStrLenCommand.lengthOf(field).from(key))).next().map(NumericResponse::getOutput);
+	}
+
+	/**
+	 * Get the length of the value associated with {@code field}. If either the {@code key} or the {@code field} do not
+	 * exist, {@code 0} is emitted.
+	 *
+	 * @param commands must not be {@literal null}.
+	 * @return never {@literal null}.
+	 * @since 2.1
+	 */
+	Flux<NumericResponse<HStrLenCommand, Long>> hStrLen(Publisher<HStrLenCommand> commands);
+}

+ 1 - 1
jetlinks-standalone/src/main/resources/application.yml

@@ -21,7 +21,7 @@ spring:
       pool:
       pool:
         max-active: 1024
         max-active: 1024
     timeout: 20s
     timeout: 20s
-
+    serializer: jdk # 设置fst时,redis key使用string序列化,value使用 fst序列化.
   #        max-wait: 10s
   #        max-wait: 10s
   r2dbc:
   r2dbc:
     url: r2dbc:postgresql://localhost:5432/jetlinks
     url: r2dbc:postgresql://localhost:5432/jetlinks