|
@@ -16,6 +16,8 @@ import org.jetlinks.community.timeseries.TimeSeriesService;
|
|
|
import reactor.core.publisher.Flux;
|
|
|
import reactor.core.publisher.Mono;
|
|
|
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Map;
|
|
|
import java.util.stream.Collectors;
|
|
|
import java.util.stream.Stream;
|
|
|
|
|
@@ -36,18 +38,23 @@ class DevicePropertyMeasurement extends StaticMeasurement {
|
|
|
}
|
|
|
|
|
|
|
|
|
- Flux<MeasurementValue> fromHistory(String deviceId, int history, boolean format) {
|
|
|
+ Map<String, Object> createValue(Object value) {
|
|
|
+ Map<String, Object> values = new HashMap<>();
|
|
|
+ values.put("value", value);
|
|
|
+ values.put("formatValue", metadata.getValueType().format(value));
|
|
|
+ return values;
|
|
|
+ }
|
|
|
+
|
|
|
+ Flux<MeasurementValue> fromHistory(String deviceId, int history) {
|
|
|
return history <= 0 ? Flux.empty() : QueryParamEntity.newQuery()
|
|
|
.doPaging(0, history)
|
|
|
.where("deviceId", deviceId)
|
|
|
- .and("property",metadata.getId())
|
|
|
+ .and("property", metadata.getId())
|
|
|
.execute(timeSeriesService::query)
|
|
|
- .map(data -> SimpleMeasurementValue.of(format ?
|
|
|
- data.get("formatValue").orElse("/") :
|
|
|
- data.get("value").orElse(null), data.getTimestamp()));
|
|
|
+ .map(data -> SimpleMeasurementValue.of(createValue(data.get("value").orElse(null)), data.getTimestamp()));
|
|
|
}
|
|
|
|
|
|
- Flux<MeasurementValue> fromRealTime(String deviceId, boolean format) {
|
|
|
+ Flux<MeasurementValue> fromRealTime(String deviceId) {
|
|
|
return messageGateway
|
|
|
.subscribe(Stream.of(
|
|
|
"/device/" + deviceId + "/message/property/report"
|
|
@@ -68,7 +75,7 @@ class DevicePropertyMeasurement extends StaticMeasurement {
|
|
|
return Mono.empty();
|
|
|
})
|
|
|
.filter(msg -> msg.containsKey(metadata.getId()))
|
|
|
- .map(msg -> SimpleMeasurementValue.of(format ? metadata.getValueType().format(msg.get(metadata.getId())) : msg.get(metadata.getId()), System.currentTimeMillis()));
|
|
|
+ .map(msg -> SimpleMeasurementValue.of(createValue(msg.get(metadata.getId())), System.currentTimeMillis()));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -99,18 +106,16 @@ class DevicePropertyMeasurement extends StaticMeasurement {
|
|
|
|
|
|
@Override
|
|
|
public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
|
|
|
- boolean format = parameter.getBoolean("format").orElse(true);
|
|
|
-
|
|
|
return Mono.justOrEmpty(parameter.getString("deviceId"))
|
|
|
.flatMapMany(deviceId -> {
|
|
|
int history = parameter.getInt("history").orElse(0);
|
|
|
//合并历史数据和实时数据
|
|
|
return Flux.concat(
|
|
|
//查询历史数据
|
|
|
- fromHistory(deviceId, history, format)
|
|
|
+ fromHistory(deviceId, history)
|
|
|
,
|
|
|
//从消息网关订阅实时事件消息
|
|
|
- fromRealTime(deviceId, format)
|
|
|
+ fromRealTime(deviceId)
|
|
|
);
|
|
|
});
|
|
|
}
|