|
@@ -7,11 +7,15 @@ import com.alibaba.fastjson.JSON;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.hswebframework.ezorm.core.dsl.Query;
|
|
|
import org.hswebframework.ezorm.core.param.QueryParam;
|
|
|
+import org.hswebframework.ezorm.core.param.TermType;
|
|
|
+import org.hswebframework.web.api.crud.entity.PagerResult;
|
|
|
+import org.hswebframework.web.api.crud.entity.QueryParamEntity;
|
|
|
import org.hswebframework.web.bean.FastBeanCopier;
|
|
|
import org.hswebframework.web.crud.service.GenericReactiveCrudService;
|
|
|
import org.hswebframework.web.exception.BusinessException;
|
|
|
import org.hswebframework.web.exception.NotFoundException;
|
|
|
import org.hswebframework.web.logger.ReactiveLogger;
|
|
|
+import org.jetlinks.community.device.entity.DeviceOperationLogEntity;
|
|
|
import org.jetlinks.core.device.DeviceConfigKey;
|
|
|
import org.jetlinks.core.device.DeviceOperator;
|
|
|
import org.jetlinks.core.device.DeviceRegistry;
|
|
@@ -58,6 +62,8 @@ import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
+import static org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric.devicePropertyMetric;
|
|
|
+
|
|
|
@Service
|
|
|
@Slf4j
|
|
|
public class LocalDeviceInstanceService extends GenericReactiveCrudService<DeviceInstanceEntity, String> {
|
|
@@ -78,6 +84,7 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
|
|
|
private TimeSeriesManager timeSeriesManager;
|
|
|
|
|
|
|
|
|
+
|
|
|
/**
|
|
|
* 获取设备所有信息
|
|
|
*
|
|
@@ -90,7 +97,7 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
|
|
|
.zipWhen(instance -> deviceProductService.findById(instance.getProductId()), DeviceInfo::of) //产品型号信息
|
|
|
.switchIfEmpty(Mono.error(NotFoundException::new))
|
|
|
.zipWhen(deviceInfo -> getDeviceRunRealInfo(id), DeviceAllInfoResponse::of) //设备运行状态
|
|
|
- .zipWhen(info -> getProperties(info.getDeviceInfo().getProductId(), id).collectList(), DeviceAllInfoResponse::ofProperties) //设备属性
|
|
|
+ .zipWhen(info -> getDeviceLatestProperties( id).collectList(), DeviceAllInfoResponse::ofProperties) //设备属性
|
|
|
.zipWhen(info -> {
|
|
|
DeviceMetadata deviceMetadata = new JetLinksDeviceMetadata(JSON.parseObject(info.getDeviceInfo().getDeriveMetadata()));
|
|
|
return getEventCounts(deviceMetadata.getEvents(), id, info.getDeviceInfo().getProductId()); //事件数量统计
|
|
@@ -224,35 +231,50 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
|
|
|
}
|
|
|
|
|
|
|
|
|
- public Mono<DevicePropertiesEntity> getProperty(String deviceId, String property) {
|
|
|
- return createQuery()
|
|
|
- .where(DeviceInstanceEntity::getId, deviceId)
|
|
|
- .fetchOne()
|
|
|
- .map(DeviceInstanceEntity::getProductId)
|
|
|
- .flatMap(productId -> doGetDeviceProperty(productId, deviceId, property));
|
|
|
+ public Mono<PagerResult<DevicePropertiesEntity>> queryDeviceProperties(String deviceId, QueryParamEntity entity) {
|
|
|
+ return registry.getDevice(deviceId)
|
|
|
+ .flatMap(operator -> operator.getSelfConfig(DeviceConfigKey.productId))
|
|
|
+ .flatMap(productId -> timeSeriesManager
|
|
|
+ .getService(devicePropertyMetric(productId))
|
|
|
+ .queryPager(entity.and("deviceId", TermType.eq, deviceId), data -> data.as(DevicePropertiesEntity.class)))
|
|
|
+ .defaultIfEmpty(PagerResult.empty());
|
|
|
+ }
|
|
|
+
|
|
|
+ public Mono<DevicePropertiesEntity> getDeviceLatestProperty(String deviceId, String property) {
|
|
|
+ return registry
|
|
|
+ .getDevice(deviceId)
|
|
|
+ .flatMap(operator -> operator.getSelfConfig(DeviceConfigKey.productId))
|
|
|
+ .flatMap(productId -> doGetLatestDeviceProperty(productId, deviceId, property));
|
|
|
}
|
|
|
|
|
|
- public Flux<DevicePropertiesEntity> getProperties(String productId, String deviceId) {
|
|
|
+ public Flux<DevicePropertiesEntity> getDeviceLatestProperties(String deviceId) {
|
|
|
return registry.getDevice(deviceId)
|
|
|
- .flatMap(DeviceOperator::getMetadata)
|
|
|
- .flatMapMany(metadata -> Flux.merge(metadata.getProperties()
|
|
|
+ .flatMap(operator -> Mono.zip(operator.getMetadata(), operator.getSelfConfig(DeviceConfigKey.productId)))
|
|
|
+ .flatMapMany(zip -> Flux.merge(zip.getT1().getProperties()
|
|
|
.stream()
|
|
|
- .map(property -> doGetDeviceProperty(productId, deviceId, property.getId()))
|
|
|
- .collect(Collectors.toList())))
|
|
|
- ;
|
|
|
+ .map(property -> doGetLatestDeviceProperty(zip.getT2(), deviceId, property.getId()))
|
|
|
+ .collect(Collectors.toList())));
|
|
|
}
|
|
|
|
|
|
- private Mono<DevicePropertiesEntity> doGetDeviceProperty(String productId, String deviceId, String property) {
|
|
|
+ private Mono<DevicePropertiesEntity> doGetLatestDeviceProperty(String productId, String deviceId, String property) {
|
|
|
return Query.of()
|
|
|
.and(DevicePropertiesEntity::getDeviceId, deviceId)
|
|
|
.and(DevicePropertiesEntity::getProperty, property)
|
|
|
.doPaging(0, 1)
|
|
|
- .execute(timeSeriesManager
|
|
|
- .getService(DeviceTimeSeriesMetric.devicePropertyMetric(productId))::query)
|
|
|
+ .execute(timeSeriesManager.getService(devicePropertyMetric(productId))::query)
|
|
|
.map(data -> data.as(DevicePropertiesEntity.class))
|
|
|
.singleOrEmpty();
|
|
|
}
|
|
|
|
|
|
+ public Mono<PagerResult<DeviceOperationLogEntity>> queryDeviceLog(String deviceId, QueryParamEntity entity) {
|
|
|
+ return registry.getDevice(deviceId)
|
|
|
+ .flatMap(operator -> operator.getSelfConfig(DeviceConfigKey.productId))
|
|
|
+ .flatMap(productId -> timeSeriesManager
|
|
|
+ .getService(DeviceTimeSeriesMetric.deviceLogMetric(productId))
|
|
|
+ .queryPager(entity.and("deviceId", TermType.eq, deviceId),
|
|
|
+ data -> data.as(DeviceOperationLogEntity.class)))
|
|
|
+ .defaultIfEmpty(PagerResult.empty());
|
|
|
+ }
|
|
|
|
|
|
@PostConstruct
|
|
|
public void init() {
|