|
@@ -5,6 +5,7 @@ import com.alibaba.excel.ExcelWriter;
|
|
|
import com.alibaba.excel.write.metadata.WriteSheet;
|
|
|
import com.alibaba.fastjson.JSON;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.commons.collections4.CollectionUtils;
|
|
|
import org.hswebframework.ezorm.core.dsl.Query;
|
|
|
import org.hswebframework.ezorm.core.param.QueryParam;
|
|
|
import org.hswebframework.ezorm.core.param.TermType;
|
|
@@ -17,9 +18,11 @@ import org.hswebframework.web.exception.BusinessException;
|
|
|
import org.hswebframework.web.exception.NotFoundException;
|
|
|
import org.hswebframework.web.logger.ReactiveLogger;
|
|
|
import org.jetlinks.community.device.entity.DeviceOperationLogEntity;
|
|
|
+import org.jetlinks.community.device.message.DeviceMessageUtils;
|
|
|
import org.jetlinks.core.device.DeviceConfigKey;
|
|
|
import org.jetlinks.core.device.DeviceOperator;
|
|
|
import org.jetlinks.core.device.DeviceRegistry;
|
|
|
+import org.jetlinks.core.message.DeviceMessage;
|
|
|
import org.jetlinks.core.message.DeviceOfflineMessage;
|
|
|
import org.jetlinks.core.message.DeviceOnlineMessage;
|
|
|
import org.jetlinks.core.metadata.DataType;
|
|
@@ -53,6 +56,7 @@ import reactor.core.publisher.Mono;
|
|
|
import reactor.core.publisher.SignalType;
|
|
|
import reactor.core.scheduler.Schedulers;
|
|
|
import reactor.util.function.Tuple2;
|
|
|
+import reactor.util.function.Tuple3;
|
|
|
import reactor.util.function.Tuples;
|
|
|
|
|
|
import javax.annotation.PostConstruct;
|
|
@@ -65,6 +69,7 @@ import java.util.Date;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.function.Function;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
import static org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric.devicePropertyMetric;
|
|
@@ -163,7 +168,9 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
|
|
|
registry.registry(org.jetlinks.core.device.DeviceInfo.builder()
|
|
|
.id(instance.getId())
|
|
|
.productId(instance.getProductId())
|
|
|
- .build())
|
|
|
+ .build()
|
|
|
+ .addConfig(DeviceConfigKey.parentGatewayId, instance.getParentId())
|
|
|
+ )
|
|
|
//设置其他配置信息
|
|
|
.flatMap(deviceOperator -> deviceOperator.getState()
|
|
|
.flatMap(r -> {
|
|
@@ -320,19 +327,8 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
|
|
|
//订阅设备上下线
|
|
|
FluxUtils.bufferRate(messageGateway
|
|
|
.subscribe("/device/*/online", "/device/*/offline")
|
|
|
- .flatMap(message -> Mono.fromCallable(() -> {
|
|
|
-
|
|
|
- if (message.getMessage() instanceof EncodableMessage) {
|
|
|
- Object msg = ((EncodableMessage) message.getMessage()).getNativePayload();
|
|
|
- if (msg instanceof DeviceOnlineMessage) {
|
|
|
- return ((DeviceOnlineMessage) msg).getDeviceId();
|
|
|
- }
|
|
|
- if (msg instanceof DeviceOfflineMessage) {
|
|
|
- return ((DeviceOfflineMessage) msg).getDeviceId();
|
|
|
- }
|
|
|
- }
|
|
|
- return null;
|
|
|
- })), 800, 200, Duration.ofSeconds(2))
|
|
|
+ .flatMap(message -> Mono.justOrEmpty(DeviceMessageUtils.convert(message))
|
|
|
+ .map(DeviceMessage::getDeviceId)), 800, 200, Duration.ofSeconds(2))
|
|
|
.flatMap(list -> syncStateBatch(Flux.just(list), false).count())
|
|
|
.onErrorContinue((err, obj) -> log.error(err.getMessage(), err))
|
|
|
.subscribe((i) -> log.info("同步设备状态成功:{}", i));
|
|
@@ -347,30 +343,50 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
|
|
|
}
|
|
|
|
|
|
public Flux<Integer> syncStateBatch(Flux<List<String>> batch, boolean force) {
|
|
|
- return batch.flatMap(list -> Flux.fromIterable(list)
|
|
|
- .flatMap(registry::getDevice)
|
|
|
- .publishOn(Schedulers.parallel())
|
|
|
- .flatMap(operation -> {
|
|
|
- if (force) {
|
|
|
- return operation.checkState().zipWith(Mono.just(operation.getDeviceId()));
|
|
|
- }
|
|
|
- return operation.getState().zipWith(Mono.just(operation.getDeviceId()));
|
|
|
- })
|
|
|
- .groupBy(Tuple2::getT1, Tuple2::getT2)
|
|
|
- .flatMap(group -> {
|
|
|
- @SuppressWarnings("all")
|
|
|
- DeviceState state = group.key() == null ? DeviceState.offline : DeviceState.of(group.key());
|
|
|
- return group.collectList()
|
|
|
- .flatMap(idList -> getRepository()
|
|
|
- .createUpdate()
|
|
|
- .set(DeviceInstanceEntity::getState, state)
|
|
|
- .where()
|
|
|
- .in(DeviceInstanceEntity::getId, idList)
|
|
|
- .execute());
|
|
|
- }));
|
|
|
+ return batch
|
|
|
+ .flatMap(list -> Flux.fromIterable(list)
|
|
|
+ .flatMap(registry::getDevice)
|
|
|
+ .publishOn(Schedulers.parallel())
|
|
|
+ .flatMap(operation -> {
|
|
|
+ Mono<Byte> state = force ? operation.checkState() : operation.getState();
|
|
|
+ return Mono.zip(
|
|
|
+ state,//状态
|
|
|
+ Mono.just(operation.getDeviceId()), //设备id
|
|
|
+ operation.getConfig(DeviceConfigKey.isGatewayDevice)//是否为网关设备
|
|
|
+ );
|
|
|
+ })
|
|
|
+ .groupBy(Tuple2::getT1, Function.identity())
|
|
|
+ .flatMap(group -> {
|
|
|
+ @SuppressWarnings("all")
|
|
|
+ DeviceState state = group.key() == null ? DeviceState.offline : DeviceState.of(group.key());
|
|
|
+ return group
|
|
|
+ .collectList()
|
|
|
+ .flatMap(idList -> Mono.zip(
|
|
|
+ //修改设备状态
|
|
|
+ getRepository()
|
|
|
+ .createUpdate()
|
|
|
+ .set(DeviceInstanceEntity::getState, state)
|
|
|
+ .where()
|
|
|
+ .in(DeviceInstanceEntity::getId, idList.stream().map(Tuple3::getT2).collect(Collectors.toList()))
|
|
|
+ .execute(),
|
|
|
+ //修改子设备状态
|
|
|
+ Flux.fromIterable(idList)
|
|
|
+ .filter(Tuple3::getT3)
|
|
|
+ .map(Tuple3::getT2)
|
|
|
+ .collectList()
|
|
|
+ .filter(CollectionUtils::isNotEmpty)
|
|
|
+ .flatMap(parents ->
|
|
|
+ getRepository()
|
|
|
+ .createUpdate()
|
|
|
+ .set(DeviceInstanceEntity::getState, state)
|
|
|
+ .where()
|
|
|
+ .in(DeviceInstanceEntity::getParentId, parents)
|
|
|
+ .execute()
|
|
|
+ ).defaultIfEmpty(0)
|
|
|
+ , Math::addExact));
|
|
|
+ }));
|
|
|
}
|
|
|
|
|
|
-
|
|
|
public Flux<ImportDeviceInstanceResult> doBatchImport(String fileUrl) {
|
|
|
return deviceProductService
|
|
|
.createQuery()
|