|
@@ -2,6 +2,7 @@ package org.jetlinks.community.device.service.data;
|
|
|
|
|
|
import com.alibaba.fastjson.JSON;
|
|
|
import com.google.common.collect.Maps;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.commons.codec.digest.DigestUtils;
|
|
|
import org.apache.commons.collections.MapUtils;
|
|
|
import org.hswebframework.ezorm.core.param.TermType;
|
|
@@ -50,6 +51,7 @@ import static org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric.*;
|
|
|
* @author zhouhao
|
|
|
* @since 1.5.0
|
|
|
*/
|
|
|
+@Slf4j
|
|
|
public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStoragePolicy {
|
|
|
|
|
|
private final AtomicInteger nanoInc = new AtomicInteger();
|
|
@@ -87,8 +89,8 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
|
|
|
* @param message 设备属性消息
|
|
|
* @param properties 物模型属性
|
|
|
* @return 数据集合
|
|
|
- * @see this#convertPropertiesForColumnPolicy(String, DeviceMessage, Map)
|
|
|
- * @see this#convertPropertiesForRowPolicy(String, DeviceMessage, Map)
|
|
|
+ * @see AbstractDeviceDataStoragePolicy#convertPropertiesForColumnPolicy(String, DeviceMessage, Map)
|
|
|
+ * @see AbstractDeviceDataStoragePolicy#convertPropertiesForRowPolicy(String, DeviceMessage, Map)
|
|
|
*/
|
|
|
protected abstract Flux<Tuple2<String, TimeSeriesData>> convertProperties(String productId,
|
|
|
DeviceMessage message,
|
|
@@ -135,6 +137,14 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
|
|
|
return DigestUtils.md5Hex(String.join("_", message.getDeviceId(), String.valueOf(createUniqueNanoTime(ts))));
|
|
|
}
|
|
|
|
|
|
+ protected String getDeviceLogMetric(String productId) {
|
|
|
+ return deviceLogMetricId(productId);
|
|
|
+ }
|
|
|
+
|
|
|
+ protected String getDeviceEventMetric(String productId, String eventId) {
|
|
|
+ return deviceEventMetricId(productId, eventId);
|
|
|
+ }
|
|
|
+
|
|
|
protected Mono<Tuple2<String, TimeSeriesData>> createDeviceMessageLog(String productId,
|
|
|
DeviceMessage message,
|
|
|
BiConsumer<DeviceMessage, DeviceOperationLogEntity> logEntityConsumer) {
|
|
@@ -211,37 +221,45 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
|
|
|
* @return 二元组
|
|
|
*/
|
|
|
protected Mono<Tuple2<String, TimeSeriesData>> convertEventMessageToTimeSeriesData(String productId, EventMessage message) {
|
|
|
- // 设备注册中心获取设备操作接口
|
|
|
- // 获取设备元数据 物模型
|
|
|
+
|
|
|
return deviceRegistry
|
|
|
- .getDevice(message.getDeviceId())
|
|
|
- .flatMap(device -> device
|
|
|
+ .getProduct(productId)
|
|
|
+ .flatMap(product -> product
|
|
|
.getMetadata()
|
|
|
- .map(metadata -> {
|
|
|
- Object value = message.getData();
|
|
|
- DataType dataType = metadata
|
|
|
- .getEvent(message.getEvent())
|
|
|
- .map(EventMetadata::getType)
|
|
|
- .orElseGet(UnknownType::new);
|
|
|
- Object tempValue = ValueTypeTranslator.translator(value, dataType);
|
|
|
- Map<String, Object> data;
|
|
|
- if (tempValue instanceof Map) {
|
|
|
- @SuppressWarnings("all")
|
|
|
- Map<String, Object> mapValue = ((Map) tempValue);
|
|
|
- int size = mapValue.size();
|
|
|
- data = newMap(size);
|
|
|
- data.putAll(mapValue);
|
|
|
- } else {
|
|
|
- data = newMap(16);
|
|
|
- data.put("value", tempValue);
|
|
|
+ .<TimeSeriesData>handle((metadata, sink) -> {
|
|
|
+ if (metadata.getEventOrNull(message.getEvent()) == null) {
|
|
|
+ log.warn("产品[{}]物模型中未定义事件:{}", productId, message.getEvent());
|
|
|
+ return;
|
|
|
}
|
|
|
- data.put("id", createDataId(message));
|
|
|
- data.put("deviceId", device.getDeviceId());
|
|
|
- data.put("createTime", System.currentTimeMillis());
|
|
|
-
|
|
|
- return TimeSeriesData.of(TimestampUtils.toMillis(message.getTimestamp()), data);
|
|
|
+ Map<String, Object> data = createEventData(message, metadata);
|
|
|
+ sink.next(TimeSeriesData.of(TimestampUtils.toMillis(message.getTimestamp()), data));
|
|
|
}))
|
|
|
- .map(data -> Tuples.of(deviceEventMetricId(productId, message.getEvent()), data));
|
|
|
+ .map(data -> Tuples.of(getDeviceEventMetric(productId, message.getEvent()), data));
|
|
|
+ }
|
|
|
+
|
|
|
+ protected Map<String, Object> createEventData(EventMessage message, DeviceMetadata metadata) {
|
|
|
+ Object value = message.getData();
|
|
|
+ DataType dataType = metadata
|
|
|
+ .getEvent(message.getEvent())
|
|
|
+ .map(EventMetadata::getType)
|
|
|
+ .orElseGet(UnknownType::new);
|
|
|
+ Object tempValue = ValueTypeTranslator.translator(value, dataType);
|
|
|
+ Map<String, Object> data;
|
|
|
+ if (tempValue instanceof Map) {
|
|
|
+ @SuppressWarnings("all")
|
|
|
+ Map<String, Object> mapValue = ((Map) tempValue);
|
|
|
+ int size = mapValue.size();
|
|
|
+ data = newMap(size);
|
|
|
+ data.putAll(mapValue);
|
|
|
+ } else {
|
|
|
+ data = newMap(16);
|
|
|
+ data.put("value", tempValue);
|
|
|
+ }
|
|
|
+ data.put("id", createDataId(message));
|
|
|
+ data.put("deviceId", message.getDeviceId());
|
|
|
+ data.put("createTime", System.currentTimeMillis());
|
|
|
+
|
|
|
+ return data;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -429,7 +447,7 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
|
|
|
.flatMap(entry -> {
|
|
|
String id;
|
|
|
String property = entry.getT2().getKey();
|
|
|
- long ts = propertySourceTimes.getOrDefault(property,message.getTimestamp());
|
|
|
+ long ts = propertySourceTimes.getOrDefault(property, message.getTimestamp());
|
|
|
//忽略存在没有的属性和忽略存储的属性
|
|
|
PropertyMetadata propertyMetadata = metadata.getPropertyOrNull(property);
|
|
|
if (propertyMetadata == null || propertyIsIgnoreStorage(propertyMetadata)) {
|