Преглед на файлове

查询时copy参数 防污染

zhou-hao преди 4 години
родител
ревизия
2a29778475

+ 0 - 2
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticSearchService.java

@@ -255,8 +255,6 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
             .onBackpressureBuffer(bufferBackpressure,
                 drop -> System.err.println("无法处理更多索引请求!"),
                 BufferOverflowStrategy.DROP_OLDEST)
-            .parallel()
-            .runOn(Schedulers.parallel())
             .flatMap(buffers -> {
                 long time = System.currentTimeMillis();
                 return this

+ 2 - 2
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DefaultDeviceDataService.java

@@ -113,7 +113,7 @@ public class DefaultDeviceDataService implements DeviceDataService {
                                                                 @Nonnull DevicePropertyAggregation... properties) {
         return this
             .getStoreStrategy(productId)
-            .flatMapMany(strategy -> strategy.aggregationPropertiesByProduct(productId, request, properties));
+            .flatMapMany(strategy -> strategy.aggregationPropertiesByProduct(productId, request.copy(), properties));
     }
 
     @Override
@@ -122,7 +122,7 @@ public class DefaultDeviceDataService implements DeviceDataService {
                                                                @Nonnull DevicePropertyAggregation... properties) {
         return this
             .getDeviceStrategy(deviceId)
-            .flatMapMany(strategy -> strategy.aggregationPropertiesByDevice(deviceId, request, properties));
+            .flatMapMany(strategy -> strategy.aggregationPropertiesByDevice(deviceId, request.copy(), properties));
     }
 
     @Nonnull

+ 4 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DeviceDataService.java

@@ -273,5 +273,9 @@ public interface DeviceDataService {
         //过滤条件
         @Schema(description = "过滤条件")
         QueryParamEntity filter = QueryParamEntity.of();
+
+        public AggregationRequest copy() {
+            return new AggregationRequest(interval, format, from, to, limit, filter.clone());
+        }
     }
 }