|
@@ -124,10 +124,10 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
|
|
|
@Override
|
|
|
public Mono<Void> saveDeviceMessage(@Nonnull Publisher<DeviceMessage> message) {
|
|
|
return Flux.from(message)
|
|
|
- .flatMap(this::convertMessageToTimeSeriesData)
|
|
|
- .groupBy(Tuple2::getT1, Integer.MAX_VALUE)
|
|
|
- .flatMap(group -> doSaveData(group.key(), group.map(Tuple2::getT2)))
|
|
|
- .then();
|
|
|
+ .flatMap(this::convertMessageToTimeSeriesData)
|
|
|
+ .groupBy(Tuple2::getT1, Integer.MAX_VALUE)
|
|
|
+ .flatMap(group -> doSaveData(group.key(), group.map(Tuple2::getT2)))
|
|
|
+ .then();
|
|
|
}
|
|
|
|
|
|
protected String createDataId(DeviceMessage message) {
|
|
@@ -251,8 +251,8 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
|
|
|
.flatMap(operator -> operator.getSelfConfig(DeviceConfigKey.productId))
|
|
|
.flatMap(productId -> this
|
|
|
.doQueryPager(deviceLogMetricId(productId),
|
|
|
- entity.and("deviceId", TermType.eq, deviceId),
|
|
|
- data -> data.as(DeviceOperationLogEntity.class)
|
|
|
+ entity.and("deviceId", TermType.eq, deviceId),
|
|
|
+ data -> data.as(DeviceOperationLogEntity.class)
|
|
|
))
|
|
|
.defaultIfEmpty(PagerResult.empty());
|
|
|
}
|
|
@@ -272,15 +272,15 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
|
|
|
.where("deviceId", deviceId)
|
|
|
.execute(param -> this
|
|
|
.doQuery(deviceEventMetricId(tp2.getT1().getId(), event),
|
|
|
- param,
|
|
|
- data -> {
|
|
|
- DeviceEvent deviceEvent = new DeviceEvent(data.values());
|
|
|
- if (format) {
|
|
|
- deviceEvent.putFormat(tp2.getT2().getEventOrNull(event));
|
|
|
- }
|
|
|
- deviceEvent.putIfAbsent("timestamp", data.getTimestamp());
|
|
|
- return deviceEvent;
|
|
|
- })));
|
|
|
+ param,
|
|
|
+ data -> {
|
|
|
+ DeviceEvent deviceEvent = new DeviceEvent(data.values());
|
|
|
+ if (format) {
|
|
|
+ deviceEvent.putFormat(tp2.getT2().getEventOrNull(event));
|
|
|
+ }
|
|
|
+ deviceEvent.putIfAbsent("timestamp", data.getTimestamp());
|
|
|
+ return deviceEvent;
|
|
|
+ })));
|
|
|
}
|
|
|
|
|
|
@Nonnull
|
|
@@ -294,18 +294,18 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
|
|
|
.getDevice(deviceId)
|
|
|
.flatMap(device -> Mono.zip(device.getProduct(), device.getMetadata()))
|
|
|
.flatMap(tp2 -> query.toQuery()
|
|
|
- .where("deviceId", deviceId)
|
|
|
- .execute(param -> this
|
|
|
- .doQueryPager(deviceEventMetricId(tp2.getT1().getId(), event),
|
|
|
- param,
|
|
|
- data -> {
|
|
|
- DeviceEvent deviceEvent = new DeviceEvent(data.values());
|
|
|
- if (format) {
|
|
|
- deviceEvent.putFormat(tp2.getT2().getEventOrNull(event));
|
|
|
- }
|
|
|
- deviceEvent.putIfAbsent("timestamp", data.getTimestamp());
|
|
|
- return deviceEvent;
|
|
|
- }))
|
|
|
+ .where("deviceId", deviceId)
|
|
|
+ .execute(param -> this
|
|
|
+ .doQueryPager(deviceEventMetricId(tp2.getT1().getId(), event),
|
|
|
+ param,
|
|
|
+ data -> {
|
|
|
+ DeviceEvent deviceEvent = new DeviceEvent(data.values());
|
|
|
+ if (format) {
|
|
|
+ deviceEvent.putFormat(tp2.getT2().getEventOrNull(event));
|
|
|
+ }
|
|
|
+ deviceEvent.putIfAbsent("timestamp", data.getTimestamp());
|
|
|
+ return deviceEvent;
|
|
|
+ }))
|
|
|
);
|
|
|
}
|
|
|
|
|
@@ -415,6 +415,9 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
|
|
|
if (MapUtils.isEmpty(properties)) {
|
|
|
return Flux.empty();
|
|
|
}
|
|
|
+ Map<String, Long> propertySourceTimes = DeviceMessageUtils
|
|
|
+ .tryGetPropertySourceTimes(message)
|
|
|
+ .orElseGet(Collections::emptyMap);
|
|
|
return this
|
|
|
.deviceRegistry
|
|
|
.getDevice(message.getDeviceId())
|
|
@@ -425,8 +428,8 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
|
|
|
.index()
|
|
|
.flatMap(entry -> {
|
|
|
String id;
|
|
|
- long ts = message.getTimestamp();
|
|
|
String property = entry.getT2().getKey();
|
|
|
+ long ts = propertySourceTimes.getOrDefault(property,message.getTimestamp());
|
|
|
//忽略存在没有的属性和忽略存储的属性
|
|
|
PropertyMetadata propertyMetadata = metadata.getPropertyOrNull(property);
|
|
|
if (propertyMetadata == null || propertyIsIgnoreStorage(propertyMetadata)) {
|
|
@@ -441,10 +444,10 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
|
|
|
return Mono
|
|
|
.just(TimeSeriesData.of(ts, this
|
|
|
.createRowPropertyData(id,
|
|
|
- TimestampUtils.toMillis(ts),
|
|
|
- device.getDeviceId(),
|
|
|
- propertyMetadata,
|
|
|
- entry.getT2().getValue()))
|
|
|
+ TimestampUtils.toMillis(ts),
|
|
|
+ device.getDeviceId(),
|
|
|
+ propertyMetadata,
|
|
|
+ entry.getT2().getValue()))
|
|
|
);
|
|
|
})
|
|
|
.map(data -> Tuples.of(devicePropertyMetricId(productId), data)))
|
|
@@ -544,8 +547,8 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
|
|
|
}
|
|
|
if (properties.length == 1) {
|
|
|
return metadata.getProperty(properties[0])
|
|
|
- .map(Arrays::asList)
|
|
|
- .orElseGet(Collections::emptyList);
|
|
|
+ .map(Arrays::asList)
|
|
|
+ .orElseGet(Collections::emptyList);
|
|
|
}
|
|
|
Set<String> ids = new HashSet<>(Arrays.asList(properties));
|
|
|
return metadata
|