|
@@ -8,28 +8,22 @@ import org.hswebframework.ezorm.core.param.TermType;
|
|
|
import org.hswebframework.web.api.crud.entity.PagerResult;
|
|
|
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
|
|
|
import org.hswebframework.web.id.IDGenerator;
|
|
|
+import org.jetlinks.community.device.entity.DeviceEvent;
|
|
|
+import org.jetlinks.community.device.entity.DeviceOperationLogEntity;
|
|
|
+import org.jetlinks.community.device.entity.DeviceProperty;
|
|
|
+import org.jetlinks.community.device.enums.DeviceLogType;
|
|
|
+import org.jetlinks.community.device.events.handler.ValueTypeTranslator;
|
|
|
import org.jetlinks.community.gateway.DeviceMessageUtils;
|
|
|
+import org.jetlinks.community.timeseries.TimeSeriesData;
|
|
|
import org.jetlinks.core.device.DeviceConfigKey;
|
|
|
import org.jetlinks.core.device.DeviceProductOperator;
|
|
|
import org.jetlinks.core.device.DeviceRegistry;
|
|
|
import org.jetlinks.core.message.DeviceLogMessage;
|
|
|
import org.jetlinks.core.message.DeviceMessage;
|
|
|
-import org.jetlinks.core.message.DeviceMessageReply;
|
|
|
import org.jetlinks.core.message.Headers;
|
|
|
import org.jetlinks.core.message.event.EventMessage;
|
|
|
-import org.jetlinks.core.message.property.ReadPropertyMessageReply;
|
|
|
-import org.jetlinks.core.message.property.ReportPropertyMessage;
|
|
|
-import org.jetlinks.core.message.property.WritePropertyMessageReply;
|
|
|
import org.jetlinks.core.metadata.*;
|
|
|
import org.jetlinks.core.metadata.types.*;
|
|
|
-import org.jetlinks.community.device.entity.DeviceEvent;
|
|
|
-import org.jetlinks.community.device.entity.DeviceOperationLogEntity;
|
|
|
-import org.jetlinks.community.device.entity.DevicePropertiesEntity;
|
|
|
-import org.jetlinks.community.device.entity.DeviceProperty;
|
|
|
-import org.jetlinks.community.device.enums.DeviceLogType;
|
|
|
-import org.jetlinks.community.device.events.handler.ValueTypeTranslator;
|
|
|
-import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
|
|
|
-import org.jetlinks.community.timeseries.TimeSeriesData;
|
|
|
import org.jetlinks.core.utils.DeviceMessageTracer;
|
|
|
import org.jetlinks.core.utils.TimestampUtils;
|
|
|
import org.reactivestreams.Publisher;
|
|
@@ -43,7 +37,6 @@ import java.util.*;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.function.BiConsumer;
|
|
|
-import java.util.function.Consumer;
|
|
|
import java.util.function.Function;
|
|
|
|
|
|
import static org.jetlinks.community.device.service.data.StorageConstants.propertyIsIgnoreStorage;
|
|
@@ -58,8 +51,8 @@ import static org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric.*;
|
|
|
*/
|
|
|
public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStoragePolicy {
|
|
|
|
|
|
+ private final AtomicInteger nanoInc = new AtomicInteger();
|
|
|
protected DeviceRegistry deviceRegistry;
|
|
|
-
|
|
|
protected DeviceDataStorageProperties properties;
|
|
|
|
|
|
public AbstractDeviceDataStoragePolicy(DeviceRegistry registry,
|
|
@@ -87,9 +80,11 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
|
|
|
protected abstract Mono<Void> doSaveData(String metric, Flux<TimeSeriesData> data);
|
|
|
|
|
|
/**
|
|
|
+ * 设备消息转换 二元组 {deviceId, tsData}
|
|
|
+ *
|
|
|
* @param productId 产品ID
|
|
|
- * @param message 原始消息
|
|
|
- * @param properties 属性
|
|
|
+ * @param message 设备属性消息
|
|
|
+ * @param properties 物模型属性
|
|
|
* @return 数据集合
|
|
|
* @see this#convertPropertiesForColumnPolicy(String, DeviceMessage, Map)
|
|
|
* @see this#convertPropertiesForRowPolicy(String, DeviceMessage, Map)
|
|
@@ -106,7 +101,15 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
|
|
|
QueryParamEntity paramEntity,
|
|
|
Function<TimeSeriesData, T> mapper);
|
|
|
|
|
|
-
|
|
|
+ /**
|
|
|
+ * 保存单个设备消息,为了提升性能,存储策略会对保存请求进行缓冲,达到一定条件后
|
|
|
+ * 再进行批量写出,具体由不同对存储策略实现。
|
|
|
+ * <p>
|
|
|
+ * 如果保存失败,在这里不会得到错误信息.
|
|
|
+ *
|
|
|
+ * @param message 设备消息
|
|
|
+ * @return void
|
|
|
+ */
|
|
|
@Nonnull
|
|
|
@Override
|
|
|
public Mono<Void> saveDeviceMessage(@Nonnull DeviceMessage message) {
|
|
@@ -120,10 +123,10 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
|
|
|
@Override
|
|
|
public Mono<Void> saveDeviceMessage(@Nonnull Publisher<DeviceMessage> message) {
|
|
|
return Flux.from(message)
|
|
|
- .flatMap(this::convertMessageToTimeSeriesData)
|
|
|
- .groupBy(Tuple2::getT1, Integer.MAX_VALUE)
|
|
|
- .flatMap(group -> doSaveData(group.key(), group.map(Tuple2::getT2)))
|
|
|
- .then();
|
|
|
+ .flatMap(this::convertMessageToTimeSeriesData)
|
|
|
+ .groupBy(Tuple2::getT1, Integer.MAX_VALUE)
|
|
|
+ .flatMap(group -> doSaveData(group.key(), group.map(Tuple2::getT2)))
|
|
|
+ .then();
|
|
|
}
|
|
|
|
|
|
protected String createDataId(DeviceMessage message) {
|
|
@@ -151,6 +154,12 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
|
|
|
.toSimpleMap())));
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 设备消息转换成时序数据 二元组 {deviceId, tsData}
|
|
|
+ *
|
|
|
+ * @param message 设备消息
|
|
|
+ * @return 二元组
|
|
|
+ */
|
|
|
protected Flux<Tuple2<String, TimeSeriesData>> convertMessageToTimeSeriesData(DeviceMessage message) {
|
|
|
boolean ignoreStorage = message.getHeaderOrDefault(Headers.ignoreStorage);
|
|
|
boolean ignoreLog = message.getHeaderOrDefault(Headers.ignoreLog);
|
|
@@ -193,8 +202,16 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
|
|
|
return Flux.merge(all);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 事件消息转换成 二元组{deviceId, tsData}
|
|
|
+ *
|
|
|
+ * @param productId 产品ID
|
|
|
+ * @param message 事件消息
|
|
|
+ * @return 二元组
|
|
|
+ */
|
|
|
protected Mono<Tuple2<String, TimeSeriesData>> convertEventMessageToTimeSeriesData(String productId, EventMessage message) {
|
|
|
-
|
|
|
+ // 设备注册中心获取设备操作接口
|
|
|
+ // 获取设备元数据 物模型
|
|
|
return deviceRegistry
|
|
|
.getDevice(message.getDeviceId())
|
|
|
.flatMap(device -> device
|
|
@@ -226,20 +243,19 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
|
|
|
.map(data -> Tuples.of(deviceEventMetricId(productId, message.getEvent()), data));
|
|
|
}
|
|
|
|
|
|
-
|
|
|
+ @Override
|
|
|
public Mono<PagerResult<DeviceOperationLogEntity>> queryDeviceMessageLog(@Nonnull String deviceId, @Nonnull QueryParamEntity entity) {
|
|
|
return deviceRegistry
|
|
|
.getDevice(deviceId)
|
|
|
.flatMap(operator -> operator.getSelfConfig(DeviceConfigKey.productId))
|
|
|
.flatMap(productId -> this
|
|
|
.doQueryPager(deviceLogMetricId(productId),
|
|
|
- entity.and("deviceId", TermType.eq, deviceId),
|
|
|
- data -> data.as(DeviceOperationLogEntity.class)
|
|
|
+ entity.and("deviceId", TermType.eq, deviceId),
|
|
|
+ data -> data.as(DeviceOperationLogEntity.class)
|
|
|
))
|
|
|
.defaultIfEmpty(PagerResult.empty());
|
|
|
}
|
|
|
|
|
|
-
|
|
|
@Nonnull
|
|
|
@Override
|
|
|
public Flux<DeviceEvent> queryEvent(@Nonnull String deviceId,
|
|
@@ -255,15 +271,15 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
|
|
|
.where("deviceId", deviceId)
|
|
|
.execute(param -> this
|
|
|
.doQuery(deviceEventMetricId(tp2.getT1().getId(), event),
|
|
|
- param,
|
|
|
- data -> {
|
|
|
- DeviceEvent deviceEvent = new DeviceEvent(data.values());
|
|
|
- if (format) {
|
|
|
- deviceEvent.putFormat(tp2.getT2().getEventOrNull(event));
|
|
|
- }
|
|
|
- deviceEvent.putIfAbsent("timestamp", data.getTimestamp());
|
|
|
- return deviceEvent;
|
|
|
- })));
|
|
|
+ param,
|
|
|
+ data -> {
|
|
|
+ DeviceEvent deviceEvent = new DeviceEvent(data.values());
|
|
|
+ if (format) {
|
|
|
+ deviceEvent.putFormat(tp2.getT2().getEventOrNull(event));
|
|
|
+ }
|
|
|
+ deviceEvent.putIfAbsent("timestamp", data.getTimestamp());
|
|
|
+ return deviceEvent;
|
|
|
+ })));
|
|
|
}
|
|
|
|
|
|
@Nonnull
|
|
@@ -277,18 +293,18 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
|
|
|
.getDevice(deviceId)
|
|
|
.flatMap(device -> Mono.zip(device.getProduct(), device.getMetadata()))
|
|
|
.flatMap(tp2 -> query.toQuery()
|
|
|
- .where("deviceId", deviceId)
|
|
|
- .execute(param -> this
|
|
|
- .doQueryPager(deviceEventMetricId(tp2.getT1().getId(), event),
|
|
|
- param,
|
|
|
- data -> {
|
|
|
- DeviceEvent deviceEvent = new DeviceEvent(data.values());
|
|
|
- if (format) {
|
|
|
- deviceEvent.putFormat(tp2.getT2().getEventOrNull(event));
|
|
|
- }
|
|
|
- deviceEvent.putIfAbsent("timestamp", data.getTimestamp());
|
|
|
- return deviceEvent;
|
|
|
- }))
|
|
|
+ .where("deviceId", deviceId)
|
|
|
+ .execute(param -> this
|
|
|
+ .doQueryPager(deviceEventMetricId(tp2.getT1().getId(), event),
|
|
|
+ param,
|
|
|
+ data -> {
|
|
|
+ DeviceEvent deviceEvent = new DeviceEvent(data.values());
|
|
|
+ if (format) {
|
|
|
+ deviceEvent.putFormat(tp2.getT2().getEventOrNull(event));
|
|
|
+ }
|
|
|
+ deviceEvent.putIfAbsent("timestamp", data.getTimestamp());
|
|
|
+ return deviceEvent;
|
|
|
+ }))
|
|
|
);
|
|
|
}
|
|
|
|
|
@@ -382,6 +398,16 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
|
|
|
return Maps.newHashMapWithExpectedSize(size);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 设备消息转换 二元组{deviceId, tsData}
|
|
|
+ *
|
|
|
+ * @param productId 产品ID
|
|
|
+ * @param message 设备属性消息
|
|
|
+ * @param properties 物模型属性
|
|
|
+ * @return 数据集合
|
|
|
+ * @see this#convertPropertiesForColumnPolicy(String, DeviceMessage, Map)
|
|
|
+ * @see this#convertPropertiesForRowPolicy(String, DeviceMessage, Map)
|
|
|
+ */
|
|
|
protected Flux<Tuple2<String, TimeSeriesData>> convertPropertiesForRowPolicy(String productId,
|
|
|
DeviceMessage message,
|
|
|
Map<String, Object> properties) {
|
|
@@ -414,10 +440,10 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
|
|
|
return Mono
|
|
|
.just(TimeSeriesData.of(ts, this
|
|
|
.createRowPropertyData(id,
|
|
|
- TimestampUtils.toMillis(ts),
|
|
|
- device.getDeviceId(),
|
|
|
- propertyMetadata,
|
|
|
- entry.getT2().getValue()))
|
|
|
+ TimestampUtils.toMillis(ts),
|
|
|
+ device.getDeviceId(),
|
|
|
+ propertyMetadata,
|
|
|
+ entry.getT2().getValue()))
|
|
|
);
|
|
|
})
|
|
|
.map(data -> Tuples.of(devicePropertyMetricId(productId), data)))
|
|
@@ -511,9 +537,6 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
|
|
|
.flatMap(product -> Mono.zip(Mono.just(product), product.getMetadata()));
|
|
|
}
|
|
|
|
|
|
-
|
|
|
- private final AtomicInteger nanoInc = new AtomicInteger();
|
|
|
-
|
|
|
//将毫秒转为纳秒,努力让数据不重复
|
|
|
protected long createUniqueNanoTime(long millis) {
|
|
|
long nano = TimeUnit.MILLISECONDS.toNanos(millis);
|