|
@@ -4,46 +4,30 @@ import lombok.SneakyThrows;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.commons.collections4.CollectionUtils;
|
|
|
import org.apache.commons.collections4.MapUtils;
|
|
|
-import org.hswebframework.ezorm.core.dsl.Query;
|
|
|
-import org.hswebframework.ezorm.core.param.TermType;
|
|
|
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
|
|
|
import org.hswebframework.ezorm.rdb.mapping.defaults.SaveResult;
|
|
|
-import org.hswebframework.web.api.crud.entity.PagerResult;
|
|
|
-import org.hswebframework.web.api.crud.entity.QueryParamEntity;
|
|
|
import org.hswebframework.web.crud.service.GenericReactiveCrudService;
|
|
|
import org.hswebframework.web.exception.BusinessException;
|
|
|
-import org.hswebframework.web.exception.NotFoundException;
|
|
|
import org.hswebframework.web.id.IDGenerator;
|
|
|
import org.jetlinks.community.device.entity.*;
|
|
|
import org.jetlinks.community.device.enums.DeviceState;
|
|
|
import org.jetlinks.community.device.response.DeviceDeployResult;
|
|
|
import org.jetlinks.community.device.response.DeviceDetail;
|
|
|
-import org.jetlinks.community.device.response.DeviceInfo;
|
|
|
-import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
|
|
|
-import org.jetlinks.community.gateway.annotation.Subscribe;
|
|
|
-import org.jetlinks.community.timeseries.TimeSeriesManager;
|
|
|
import org.jetlinks.community.utils.ErrorUtils;
|
|
|
import org.jetlinks.core.device.DeviceConfigKey;
|
|
|
import org.jetlinks.core.device.DeviceOperator;
|
|
|
import org.jetlinks.core.device.DeviceRegistry;
|
|
|
import org.jetlinks.core.enums.ErrorCode;
|
|
|
-import org.jetlinks.core.event.EventBus;
|
|
|
-import org.jetlinks.core.event.Subscription;
|
|
|
import org.jetlinks.core.exception.DeviceOperationException;
|
|
|
import org.jetlinks.core.message.*;
|
|
|
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
|
|
|
import org.jetlinks.core.message.property.ReadPropertyMessageReply;
|
|
|
import org.jetlinks.core.message.property.WritePropertyMessageReply;
|
|
|
-import org.jetlinks.core.metadata.DataType;
|
|
|
+import org.jetlinks.core.metadata.ConfigMetadata;
|
|
|
import org.jetlinks.core.metadata.PropertyMetadata;
|
|
|
-import org.jetlinks.core.metadata.types.ObjectType;
|
|
|
import org.jetlinks.core.metadata.types.StringType;
|
|
|
-import org.jetlinks.core.utils.FluxUtils;
|
|
|
import org.reactivestreams.Publisher;
|
|
|
-import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
-import org.springframework.transaction.annotation.Propagation;
|
|
|
-import org.springframework.transaction.annotation.Transactional;
|
|
|
import reactor.core.publisher.Flux;
|
|
|
import reactor.core.publisher.Mono;
|
|
|
import reactor.core.scheduler.Schedulers;
|
|
@@ -51,30 +35,32 @@ import reactor.util.function.Tuple2;
|
|
|
import reactor.util.function.Tuple3;
|
|
|
import reactor.util.function.Tuples;
|
|
|
|
|
|
-import javax.annotation.PostConstruct;
|
|
|
-import java.time.Duration;
|
|
|
import java.util.*;
|
|
|
import java.util.function.Function;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
-import static org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric.devicePropertyMetric;
|
|
|
-
|
|
|
@Service
|
|
|
@Slf4j
|
|
|
public class LocalDeviceInstanceService extends GenericReactiveCrudService<DeviceInstanceEntity, String> {
|
|
|
|
|
|
- @Autowired
|
|
|
- private DeviceRegistry registry;
|
|
|
+ private final DeviceRegistry registry;
|
|
|
|
|
|
- @Autowired
|
|
|
- private LocalDeviceProductService deviceProductService;
|
|
|
+ private final LocalDeviceProductService deviceProductService;
|
|
|
|
|
|
- @Autowired
|
|
|
- private EventBus eventBus;
|
|
|
+ private final DeviceConfigMetadataManager metadataManager;
|
|
|
|
|
|
- @Autowired
|
|
|
@SuppressWarnings("all")
|
|
|
- private ReactiveRepository<DeviceTagEntity, String> tagRepository;
|
|
|
+ private final ReactiveRepository<DeviceTagEntity, String> tagRepository;
|
|
|
+
|
|
|
+ public LocalDeviceInstanceService(DeviceRegistry registry,
|
|
|
+ LocalDeviceProductService deviceProductService,
|
|
|
+ DeviceConfigMetadataManager metadataManager,
|
|
|
+ ReactiveRepository<DeviceTagEntity, String> tagRepository) {
|
|
|
+ this.registry = registry;
|
|
|
+ this.deviceProductService = deviceProductService;
|
|
|
+ this.metadataManager = metadataManager;
|
|
|
+ this.tagRepository = tagRepository;
|
|
|
+ }
|
|
|
|
|
|
|
|
|
@Override
|
|
@@ -235,25 +221,34 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
|
|
|
List<DeviceTagEntity> tags) {
|
|
|
|
|
|
DeviceDetail detail = new DeviceDetail().with(product).with(device).with(tags);
|
|
|
-
|
|
|
- return registry
|
|
|
- .getDevice(device.getId())
|
|
|
- .flatMap(operator -> operator
|
|
|
- //检查设备的真实状态,可能出现设备已经离线,但是数据库状态未及时更新的.
|
|
|
- .checkState()
|
|
|
- .map(DeviceState::of)
|
|
|
- //检查失败,则返回原始状态
|
|
|
- .onErrorReturn(device.getState())
|
|
|
- //如果状态不一致,则需要更新数据库中的状态
|
|
|
- .filter(state -> state != detail.getState())
|
|
|
- .doOnNext(detail::setState)
|
|
|
- .flatMap(state -> createUpdate()
|
|
|
- .set(DeviceInstanceEntity::getState, state)
|
|
|
- .where(DeviceInstanceEntity::getId, device.getId())
|
|
|
- .execute())
|
|
|
- .thenReturn(operator))
|
|
|
+ return Mono
|
|
|
+ .zip(
|
|
|
+ //设备信息
|
|
|
+ registry
|
|
|
+ .getDevice(device.getId())
|
|
|
+ .flatMap(operator -> operator
|
|
|
+ //检查设备的真实状态,可能出现设备已经离线,但是数据库状态未及时更新的.
|
|
|
+ .checkState()
|
|
|
+ .map(DeviceState::of)
|
|
|
+ //检查失败,则返回原始状态
|
|
|
+ .onErrorReturn(device.getState())
|
|
|
+ //如果状态不一致,则需要更新数据库中的状态
|
|
|
+ .filter(state -> state != detail.getState())
|
|
|
+ .doOnNext(detail::setState)
|
|
|
+ .flatMap(state -> createUpdate()
|
|
|
+ .set(DeviceInstanceEntity::getState, state)
|
|
|
+ .where(DeviceInstanceEntity::getId, device.getId())
|
|
|
+ .execute())
|
|
|
+ .thenReturn(operator)),
|
|
|
+ //配置定义
|
|
|
+ metadataManager
|
|
|
+ .getDeviceConfigMetadata(device.getId())
|
|
|
+ .flatMapIterable(ConfigMetadata::getProperties)
|
|
|
+ .collectList(),
|
|
|
+ detail::with
|
|
|
+ )
|
|
|
//填充详情信息
|
|
|
- .flatMap(detail::with)
|
|
|
+ .flatMap(Function.identity())
|
|
|
.switchIfEmpty(
|
|
|
Mono.defer(() -> {
|
|
|
//如果设备注册中心里没有设备信息,并且数据库里的状态不是未激活.
|
|
@@ -270,8 +265,7 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
|
|
|
.onErrorResume(err -> {
|
|
|
log.warn("get device detail error", err);
|
|
|
return Mono.just(detail);
|
|
|
- })
|
|
|
- ;
|
|
|
+ });
|
|
|
|
|
|
}
|
|
|
|