|
@@ -65,10 +65,10 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat
|
|
public Mono<Void> registerMetadata(@Nonnull String productId, @Nonnull DeviceMetadata metadata) {
|
|
public Mono<Void> registerMetadata(@Nonnull String productId, @Nonnull DeviceMetadata metadata) {
|
|
return Flux
|
|
return Flux
|
|
.concat(Flux
|
|
.concat(Flux
|
|
- .fromIterable(metadata.getEvents())
|
|
|
|
- .flatMap(event -> timeSeriesManager.registerMetadata(DeviceTimeSeriesMetadata.event(productId, event))),
|
|
|
|
- timeSeriesManager.registerMetadata(DeviceTimeSeriesMetadata.properties(productId, metadata.getProperties())),
|
|
|
|
- timeSeriesManager.registerMetadata(DeviceTimeSeriesMetadata.log(productId)))
|
|
|
|
|
|
+ .fromIterable(metadata.getEvents())
|
|
|
|
+ .flatMap(event -> timeSeriesManager.registerMetadata(DeviceTimeSeriesMetadata.event(productId, event))),
|
|
|
|
+ timeSeriesManager.registerMetadata(DeviceTimeSeriesMetadata.properties(productId, metadata.getProperties())),
|
|
|
|
+ timeSeriesManager.registerMetadata(DeviceTimeSeriesMetadata.log(productId)))
|
|
.then();
|
|
.then();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -104,7 +104,9 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat
|
|
: Stream.of(properties).map(tp2.getT2()::getPropertyOrNull).filter(Objects::nonNull))
|
|
: Stream.of(properties).map(tp2.getT2()::getPropertyOrNull).filter(Objects::nonNull))
|
|
.collect(Collectors.toMap(PropertyMetadata::getId, Function.identity(), (a, b) -> a));
|
|
.collect(Collectors.toMap(PropertyMetadata::getId, Function.identity(), (a, b) -> a));
|
|
|
|
|
|
- return queryEachDeviceProperty(tp2.getT1().getId(), deviceId, propertiesMap, query.clone().doPaging(0, 1));
|
|
|
|
|
|
+ return queryEachDeviceProperty(tp2.getT1().getId(), deviceId, propertiesMap, query
|
|
|
|
+ .clone()
|
|
|
|
+ .doPaging(0, 1));
|
|
}));
|
|
}));
|
|
}
|
|
}
|
|
|
|
|
|
@@ -117,18 +119,20 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat
|
|
.getDevice(deviceId)
|
|
.getDevice(deviceId)
|
|
.flatMap(device -> Mono.zip(device.getProduct(), device.getMetadata()))
|
|
.flatMap(device -> Mono.zip(device.getProduct(), device.getMetadata()))
|
|
.flatMap(tp2 -> {
|
|
.flatMap(tp2 -> {
|
|
- PropertyMetadata prop = tp2.getT2().getPropertyOrNull(property);
|
|
|
|
-
|
|
|
|
- return param.toQuery()
|
|
|
|
- .includes(property)
|
|
|
|
- .execute(query -> timeSeriesManager
|
|
|
|
- .getService(devicePropertyMetric(tp2.getT1().getId()))
|
|
|
|
- .queryPager(query,
|
|
|
|
- data -> DeviceProperty
|
|
|
|
- .of(data, data.get(property).orElse(0), prop)
|
|
|
|
- .property(property)
|
|
|
|
- ));
|
|
|
|
- }
|
|
|
|
|
|
+ PropertyMetadata prop = tp2.getT2().getPropertyOrNull(property);
|
|
|
|
+
|
|
|
|
+ return param
|
|
|
|
+ .toQuery()
|
|
|
|
+ .includes(property)
|
|
|
|
+ .where("deviceId", deviceId)
|
|
|
|
+ .execute(query -> timeSeriesManager
|
|
|
|
+ .getService(devicePropertyMetric(tp2.getT1().getId()))
|
|
|
|
+ .queryPager(query,
|
|
|
|
+ data -> DeviceProperty
|
|
|
|
+ .of(data, data.get(property).orElse(0), prop)
|
|
|
|
+ .property(property)
|
|
|
|
+ ));
|
|
|
|
+ }
|
|
);
|
|
);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -143,11 +147,13 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat
|
|
.zip(device.getProduct(), device.getMetadata())
|
|
.zip(device.getProduct(), device.getMetadata())
|
|
.flatMapMany(tp2 -> {
|
|
.flatMapMany(tp2 -> {
|
|
Set<String> includes = new HashSet<>(Arrays.asList(property));
|
|
Set<String> includes = new HashSet<>(Arrays.asList(property));
|
|
- Map<String, PropertyMetadata> propertiesMap = tp2.getT2()
|
|
|
|
|
|
+ Map<String, PropertyMetadata> propertiesMap = tp2
|
|
|
|
+ .getT2()
|
|
.getProperties()
|
|
.getProperties()
|
|
.stream()
|
|
.stream()
|
|
.filter(prop -> includes.size() > 0 && includes.contains(prop.getId()))
|
|
.filter(prop -> includes.size() > 0 && includes.contains(prop.getId()))
|
|
- .collect(Collectors.toMap(PropertyMetadata::getId, Function.identity(), (a, b) -> a));
|
|
|
|
|
|
+ .collect(Collectors.toMap(PropertyMetadata::getId, Function
|
|
|
|
+ .identity(), (a, b) -> a));
|
|
|
|
|
|
return query
|
|
return query
|
|
.toQuery()
|
|
.toQuery()
|
|
@@ -176,10 +182,12 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat
|
|
.zip(device.getProduct(), device.getMetadata())
|
|
.zip(device.getProduct(), device.getMetadata())
|
|
.flatMapMany(tp2 -> {
|
|
.flatMapMany(tp2 -> {
|
|
|
|
|
|
- Map<String, PropertyMetadata> propertiesMap = tp2.getT2()
|
|
|
|
|
|
+ Map<String, PropertyMetadata> propertiesMap = tp2
|
|
|
|
+ .getT2()
|
|
.getProperties()
|
|
.getProperties()
|
|
.stream()
|
|
.stream()
|
|
- .collect(Collectors.toMap(PropertyMetadata::getId, Function.identity(), (a, b) -> a));
|
|
|
|
|
|
+ .collect(Collectors.toMap(PropertyMetadata::getId, Function
|
|
|
|
+ .identity(), (a, b) -> a));
|
|
|
|
|
|
return queryEachDeviceProperty(tp2.getT1().getId(), deviceId, propertiesMap, query);
|
|
return queryEachDeviceProperty(tp2.getT1().getId(), deviceId, propertiesMap, query);
|
|
}));
|
|
}));
|
|
@@ -192,7 +200,8 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat
|
|
@Nonnull DeviceDataService.DevicePropertyAggregation... properties) {
|
|
@Nonnull DeviceDataService.DevicePropertyAggregation... properties) {
|
|
org.joda.time.format.DateTimeFormatter formatter = DateTimeFormat.forPattern(request.getFormat());
|
|
org.joda.time.format.DateTimeFormatter formatter = DateTimeFormat.forPattern(request.getFormat());
|
|
|
|
|
|
- return AggregationQueryParam.of()
|
|
|
|
|
|
+ return AggregationQueryParam
|
|
|
|
+ .of()
|
|
.as(param -> {
|
|
.as(param -> {
|
|
for (DeviceDataService.DevicePropertyAggregation property : properties) {
|
|
for (DeviceDataService.DevicePropertyAggregation property : properties) {
|
|
param.agg(property.getProperty(), property.getAlias(), property.getAgg());
|
|
param.agg(property.getProperty(), property.getAlias(), property.getAgg());
|
|
@@ -218,7 +227,9 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat
|
|
return a;
|
|
return a;
|
|
})
|
|
})
|
|
.map(AggregationData::of))
|
|
.map(AggregationData::of))
|
|
- .sort(Comparator.<AggregationData, Date>comparing(agg -> DateTime.parse(agg.getString("time", ""), formatter).toDate()).reversed())
|
|
|
|
|
|
+ .sort(Comparator.<AggregationData, Date>comparing(agg -> DateTime
|
|
|
|
+ .parse(agg.getString("time", ""), formatter)
|
|
|
|
+ .toDate()).reversed())
|
|
;
|
|
;
|
|
}
|
|
}
|
|
|
|
|