|
@@ -4,6 +4,7 @@ import com.alibaba.excel.EasyExcel;
|
|
import com.alibaba.excel.ExcelWriter;
|
|
import com.alibaba.excel.ExcelWriter;
|
|
import com.alibaba.excel.write.metadata.WriteSheet;
|
|
import com.alibaba.excel.write.metadata.WriteSheet;
|
|
import com.alibaba.fastjson.JSON;
|
|
import com.alibaba.fastjson.JSON;
|
|
|
|
+import lombok.SneakyThrows;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import org.apache.commons.collections4.CollectionUtils;
|
|
import org.apache.commons.collections4.CollectionUtils;
|
|
import org.hswebframework.ezorm.core.dsl.Query;
|
|
import org.hswebframework.ezorm.core.dsl.Query;
|
|
@@ -19,12 +20,12 @@ import org.hswebframework.web.exception.NotFoundException;
|
|
import org.hswebframework.web.logger.ReactiveLogger;
|
|
import org.hswebframework.web.logger.ReactiveLogger;
|
|
import org.jetlinks.community.device.entity.DeviceOperationLogEntity;
|
|
import org.jetlinks.community.device.entity.DeviceOperationLogEntity;
|
|
import org.jetlinks.community.device.message.DeviceMessageUtils;
|
|
import org.jetlinks.community.device.message.DeviceMessageUtils;
|
|
|
|
+import org.jetlinks.community.gateway.Subscription;
|
|
|
|
+import org.jetlinks.community.gateway.annotation.Subscribe;
|
|
import org.jetlinks.core.device.DeviceConfigKey;
|
|
import org.jetlinks.core.device.DeviceConfigKey;
|
|
import org.jetlinks.core.device.DeviceOperator;
|
|
import org.jetlinks.core.device.DeviceOperator;
|
|
import org.jetlinks.core.device.DeviceRegistry;
|
|
import org.jetlinks.core.device.DeviceRegistry;
|
|
-import org.jetlinks.core.message.DeviceMessage;
|
|
|
|
-import org.jetlinks.core.message.DeviceOfflineMessage;
|
|
|
|
-import org.jetlinks.core.message.DeviceOnlineMessage;
|
|
|
|
|
|
+import org.jetlinks.core.message.*;
|
|
import org.jetlinks.core.metadata.DataType;
|
|
import org.jetlinks.core.metadata.DataType;
|
|
import org.jetlinks.core.metadata.DeviceMetadata;
|
|
import org.jetlinks.core.metadata.DeviceMetadata;
|
|
import org.jetlinks.core.metadata.EventMetadata;
|
|
import org.jetlinks.core.metadata.EventMetadata;
|
|
@@ -100,6 +101,7 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
|
|
.doOnNext(instance -> instance.setState(null))
|
|
.doOnNext(instance -> instance.setState(null))
|
|
.as(super::save);
|
|
.as(super::save);
|
|
}
|
|
}
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* 获取设备所有信息
|
|
* 获取设备所有信息
|
|
*
|
|
*
|
|
@@ -112,7 +114,7 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
|
|
.zipWhen(instance -> deviceProductService.findById(instance.getProductId()), DeviceInfo::of) //产品型号信息
|
|
.zipWhen(instance -> deviceProductService.findById(instance.getProductId()), DeviceInfo::of) //产品型号信息
|
|
.switchIfEmpty(Mono.error(NotFoundException::new))
|
|
.switchIfEmpty(Mono.error(NotFoundException::new))
|
|
.zipWhen(deviceInfo -> getDeviceRunRealInfo(id), DeviceAllInfoResponse::of) //设备运行状态
|
|
.zipWhen(deviceInfo -> getDeviceRunRealInfo(id), DeviceAllInfoResponse::of) //设备运行状态
|
|
- .zipWhen(info -> getDeviceLatestProperties( id).collectList(), DeviceAllInfoResponse::ofProperties) //设备属性
|
|
|
|
|
|
+ .zipWhen(info -> getDeviceLatestProperties(id).collectList(), DeviceAllInfoResponse::ofProperties) //设备属性
|
|
.zipWhen(info -> {
|
|
.zipWhen(info -> {
|
|
DeviceMetadata deviceMetadata = new JetLinksDeviceMetadata(JSON.parseObject(info.getDeviceInfo().getDeriveMetadata()));
|
|
DeviceMetadata deviceMetadata = new JetLinksDeviceMetadata(JSON.parseObject(info.getDeviceInfo().getDeriveMetadata()));
|
|
return getEventCounts(deviceMetadata.getEvents(), id, info.getDeviceInfo().getProductId()); //事件数量统计
|
|
return getEventCounts(deviceMetadata.getEvents(), id, info.getDeviceInfo().getProductId()); //事件数量统计
|
|
@@ -169,8 +171,7 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
|
|
.id(instance.getId())
|
|
.id(instance.getId())
|
|
.productId(instance.getProductId())
|
|
.productId(instance.getProductId())
|
|
.build()
|
|
.build()
|
|
- .addConfig(DeviceConfigKey.parentGatewayId, instance.getParentId())
|
|
|
|
- )
|
|
|
|
|
|
+ .addConfig(DeviceConfigKey.parentGatewayId, instance.getParentId()))
|
|
//设置其他配置信息
|
|
//设置其他配置信息
|
|
.flatMap(deviceOperator -> deviceOperator.getState()
|
|
.flatMap(deviceOperator -> deviceOperator.getState()
|
|
.flatMap(r -> {
|
|
.flatMap(r -> {
|
|
@@ -204,6 +205,7 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
|
|
|
|
|
|
/**
|
|
/**
|
|
* 取消发布(取消激活),取消后,设备无法再连接到服务. 注册中心也无法再获取到该设备信息.
|
|
* 取消发布(取消激活),取消后,设备无法再连接到服务. 注册中心也无法再获取到该设备信息.
|
|
|
|
+ *
|
|
* @param id 设备ID
|
|
* @param id 设备ID
|
|
* @return 取消结果
|
|
* @return 取消结果
|
|
*/
|
|
*/
|
|
@@ -326,13 +328,13 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
|
|
|
|
|
|
//订阅设备上下线
|
|
//订阅设备上下线
|
|
FluxUtils.bufferRate(messageGateway
|
|
FluxUtils.bufferRate(messageGateway
|
|
- .subscribe("/device/*/online", "/device/*/offline")
|
|
|
|
|
|
+ .subscribe(Subscription.asList("/device/*/online", "/device/*/offline"), "device-state-synchronizer", false)
|
|
.flatMap(message -> Mono.justOrEmpty(DeviceMessageUtils.convert(message))
|
|
.flatMap(message -> Mono.justOrEmpty(DeviceMessageUtils.convert(message))
|
|
.map(DeviceMessage::getDeviceId)), 800, 200, Duration.ofSeconds(2))
|
|
.map(DeviceMessage::getDeviceId)), 800, 200, Duration.ofSeconds(2))
|
|
- .flatMap(list -> syncStateBatch(Flux.just(list), false).count())
|
|
|
|
|
|
+ .publishOn(Schedulers.parallel())
|
|
|
|
+ .concatMap(list -> syncStateBatch(Flux.just(list), false).reduce(Math::addExact))
|
|
.onErrorContinue((err, obj) -> log.error(err.getMessage(), err))
|
|
.onErrorContinue((err, obj) -> log.error(err.getMessage(), err))
|
|
.subscribe((i) -> log.info("同步设备状态成功:{}", i));
|
|
.subscribe((i) -> log.info("同步设备状态成功:{}", i));
|
|
-
|
|
|
|
}
|
|
}
|
|
|
|
|
|
public Mono<DeviceInfo> getDeviceInfoById(String id) {
|
|
public Mono<DeviceInfo> getDeviceInfoById(String id) {
|
|
@@ -343,130 +345,90 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
|
|
}
|
|
}
|
|
|
|
|
|
public Flux<Integer> syncStateBatch(Flux<List<String>> batch, boolean force) {
|
|
public Flux<Integer> syncStateBatch(Flux<List<String>> batch, boolean force) {
|
|
|
|
+
|
|
return batch
|
|
return batch
|
|
- .flatMap(list -> Flux.fromIterable(list)
|
|
|
|
- .flatMap(registry::getDevice)
|
|
|
|
|
|
+ .concatMap(list -> Flux.fromIterable(list)
|
|
.publishOn(Schedulers.parallel())
|
|
.publishOn(Schedulers.parallel())
|
|
|
|
+ .flatMap(registry::getDevice)
|
|
.flatMap(operation -> {
|
|
.flatMap(operation -> {
|
|
Mono<Byte> state = force ? operation.checkState() : operation.getState();
|
|
Mono<Byte> state = force ? operation.checkState() : operation.getState();
|
|
return Mono.zip(
|
|
return Mono.zip(
|
|
- state,//状态
|
|
|
|
|
|
+ state.defaultIfEmpty(org.jetlinks.core.device.DeviceState.offline),//状态
|
|
Mono.just(operation.getDeviceId()), //设备id
|
|
Mono.just(operation.getDeviceId()), //设备id
|
|
- operation.getConfig(DeviceConfigKey.isGatewayDevice)//是否为网关设备
|
|
|
|
|
|
+ operation.getConfig(DeviceConfigKey.isGatewayDevice).defaultIfEmpty(false)//是否为网关设备
|
|
);
|
|
);
|
|
})
|
|
})
|
|
- .groupBy(Tuple2::getT1, Function.identity())
|
|
|
|
|
|
+ .collect(Collectors.groupingBy(Tuple2::getT1))
|
|
|
|
+ .flatMapIterable(Map::entrySet)
|
|
|
|
+ .publishOn(Schedulers.single())
|
|
.flatMap(group -> {
|
|
.flatMap(group -> {
|
|
- @SuppressWarnings("all")
|
|
|
|
- DeviceState state = group.key() == null ? DeviceState.offline : DeviceState.of(group.key());
|
|
|
|
- return group
|
|
|
|
- .collectList()
|
|
|
|
- .flatMap(idList -> Mono.zip(
|
|
|
|
- //修改设备状态
|
|
|
|
- getRepository()
|
|
|
|
- .createUpdate()
|
|
|
|
- .set(DeviceInstanceEntity::getState, state)
|
|
|
|
- .where()
|
|
|
|
- .in(DeviceInstanceEntity::getId, idList.stream().map(Tuple3::getT2).collect(Collectors.toList()))
|
|
|
|
- .execute(),
|
|
|
|
- //修改子设备状态
|
|
|
|
- Flux.fromIterable(idList)
|
|
|
|
- .filter(Tuple3::getT3)
|
|
|
|
- .map(Tuple3::getT2)
|
|
|
|
- .collectList()
|
|
|
|
- .filter(CollectionUtils::isNotEmpty)
|
|
|
|
- .flatMap(parents ->
|
|
|
|
- getRepository()
|
|
|
|
- .createUpdate()
|
|
|
|
- .set(DeviceInstanceEntity::getState, state)
|
|
|
|
- .where()
|
|
|
|
- .in(DeviceInstanceEntity::getParentId, parents)
|
|
|
|
- .execute()
|
|
|
|
- ).defaultIfEmpty(0)
|
|
|
|
- , Math::addExact));
|
|
|
|
|
|
+ DeviceState state = DeviceState.of(group.getKey());
|
|
|
|
+ return Mono.zip(
|
|
|
|
+ //修改设备状态
|
|
|
|
+ getRepository()
|
|
|
|
+ .createUpdate()
|
|
|
|
+ .set(DeviceInstanceEntity::getState, state)
|
|
|
|
+ .where()
|
|
|
|
+ .in(DeviceInstanceEntity::getId, group.getValue().stream().map(Tuple3::getT2).collect(Collectors.toList()))
|
|
|
|
+ .execute()
|
|
|
|
+ .defaultIfEmpty(0),
|
|
|
|
+ Flux.fromIterable(group.getValue())
|
|
|
|
+ .filter(Tuple3::getT3)
|
|
|
|
+ .map(Tuple3::getT2)
|
|
|
|
+ .collectList()
|
|
|
|
+ .filter(CollectionUtils::isNotEmpty)
|
|
|
|
+ .flatMap(parents ->
|
|
|
|
+ getRepository()
|
|
|
|
+ .createUpdate()
|
|
|
|
+ .set(DeviceInstanceEntity::getState, state)
|
|
|
|
+ .where()
|
|
|
|
+ .in(DeviceInstanceEntity::getParentId, parents)
|
|
|
|
+ .execute()
|
|
|
|
+ ).defaultIfEmpty(0), Math::addExact);
|
|
}));
|
|
}));
|
|
}
|
|
}
|
|
|
|
|
|
- public Flux<ImportDeviceInstanceResult> doBatchImport(String fileUrl) {
|
|
|
|
- return deviceProductService
|
|
|
|
- .createQuery()
|
|
|
|
- .fetch()
|
|
|
|
- .collectList()
|
|
|
|
- .flatMapMany(productEntities -> {
|
|
|
|
- Map<String, String> productNameMap = productEntities.stream()
|
|
|
|
- .collect(Collectors.toMap(DeviceProductEntity::getName, DeviceProductEntity::getId, (_1, _2) -> _1));
|
|
|
|
- return importExportService
|
|
|
|
- .doImport(DeviceInstanceImportExportEntity.class, fileUrl)
|
|
|
|
- .map(result -> {
|
|
|
|
- try {
|
|
|
|
- DeviceInstanceImportExportEntity importExportEntity = result.getResult();
|
|
|
|
- DeviceInstanceEntity entity = FastBeanCopier.copy(importExportEntity, new DeviceInstanceEntity());
|
|
|
|
- String productId = productNameMap.get(importExportEntity.getProductName());
|
|
|
|
- if (StringUtils.isEmpty(productId)) {
|
|
|
|
- throw new BusinessException("设备型号不存在");
|
|
|
|
- }
|
|
|
|
- if (StringUtils.isEmpty(entity.getId())) {
|
|
|
|
- throw new BusinessException("设备ID不能为空");
|
|
|
|
- }
|
|
|
|
|
|
|
|
- entity.setProductId(productId);
|
|
|
|
- entity.setState(DeviceState.notActive);
|
|
|
|
- return entity;
|
|
|
|
- } catch (Throwable e) {
|
|
|
|
- throw new BusinessException("第" +
|
|
|
|
- (result.getRowIndex() + 2)
|
|
|
|
- + "行:" + e.getMessage());
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
- })
|
|
|
|
- .buffer(50)
|
|
|
|
- .flatMap(list -> this.save(Flux.fromIterable(list)))
|
|
|
|
- .map(ImportDeviceInstanceResult::success)
|
|
|
|
- .onErrorResume(err -> Mono.just(ImportDeviceInstanceResult.error(err)))
|
|
|
|
- .doOnEach(ReactiveLogger.on(SignalType.CANCEL, (ctx, signal) -> {
|
|
|
|
- log.warn("用户取消导入设备实例:{}", fileUrl);
|
|
|
|
- }))
|
|
|
|
- ;
|
|
|
|
|
|
+ @Subscribe("/device/*/message/children/*/register")
|
|
|
|
+ public Mono<Void> autoBindChildrenDevice(ChildDeviceMessage message) {
|
|
|
|
+ String childId = message.getChildDeviceId();
|
|
|
|
+ Message childMessage = message.getChildDeviceMessage();
|
|
|
|
+ if (childMessage instanceof DeviceRegisterMessage) {
|
|
|
|
+ return registry.getDevice(message.getDeviceId())
|
|
|
|
+ .flatMap(DeviceOperator::getState)
|
|
|
|
+ .flatMap(state -> createUpdate()
|
|
|
|
+ .set(DeviceInstanceEntity::getParentId, message.getDeviceId())
|
|
|
|
+ .set(DeviceInstanceEntity::getState, DeviceState.of(state))
|
|
|
|
+ .where(DeviceInstanceEntity::getId, childId)
|
|
|
|
+ .execute()
|
|
|
|
+ .then(registry
|
|
|
|
+ .getDevice(childId)
|
|
|
|
+ .flatMap(dev -> dev.setConfig(DeviceConfigKey.parentGatewayId, message.getDeviceId())))
|
|
|
|
+ .then());
|
|
|
|
+ }
|
|
|
|
+ return Mono.empty();
|
|
}
|
|
}
|
|
|
|
|
|
- private DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
|
|
|
|
-
|
|
|
|
- public Mono<Void> doExport(ServerHttpResponse response, QueryParam queryParam, String fileName) throws IOException {
|
|
|
|
- response.getHeaders()
|
|
|
|
- .set(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=".concat(
|
|
|
|
- URLEncoder.encode(fileName, StandardCharsets.UTF_8.displayName())));
|
|
|
|
- queryParam.setPaging(false);
|
|
|
|
|
|
+ @Subscribe("/device/*/message/children/*/unregister")
|
|
|
|
+ public Mono<Void> autoUnbindChildrenDevice(ChildDeviceMessage message) {
|
|
|
|
+ String childId = message.getChildDeviceId();
|
|
|
|
+ Message childMessage = message.getChildDeviceMessage();
|
|
|
|
+ if (childMessage instanceof DeviceUnRegisterMessage) {
|
|
|
|
+
|
|
|
|
+ return registry.getDevice(childId)
|
|
|
|
+ .flatMap(dev -> dev
|
|
|
|
+ .removeConfig(DeviceConfigKey.parentGatewayId.getKey())
|
|
|
|
+ .then(dev.checkState()))
|
|
|
|
+ .flatMap(state -> createUpdate()
|
|
|
|
+ .setNull(DeviceInstanceEntity::getParentId)
|
|
|
|
+ .set(DeviceInstanceEntity::getState, DeviceState.of(state))
|
|
|
|
+ .where(DeviceInstanceEntity::getId, childId)
|
|
|
|
+ .execute()
|
|
|
|
+ .then());
|
|
|
|
|
|
- return response.writeWith(Flux.create(sink -> {
|
|
|
|
- OutputStream outputStream = new OutputStream() {
|
|
|
|
- @Override
|
|
|
|
- public void write(byte[] b) {
|
|
|
|
- sink.next(bufferFactory.wrap(b));
|
|
|
|
- }
|
|
|
|
|
|
|
|
- @Override
|
|
|
|
- public void write(int b) {
|
|
|
|
- sink.next(bufferFactory.wrap(new byte[]{(byte) b}));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public void close() {
|
|
|
|
- sink.complete();
|
|
|
|
- }
|
|
|
|
- };
|
|
|
|
- ExcelWriter excelWriter = EasyExcel.write(outputStream, DeviceInstanceImportExportEntity.class).build();
|
|
|
|
- WriteSheet writeSheet = EasyExcel.writerSheet().build();
|
|
|
|
- sink.onCancel(query(queryParam)
|
|
|
|
- .map(entity -> {
|
|
|
|
- return FastBeanCopier.copy(entity, new DeviceInstanceImportExportEntity());
|
|
|
|
- })
|
|
|
|
- .buffer(100)
|
|
|
|
- .doOnNext(list -> excelWriter.write(list, writeSheet))
|
|
|
|
- .doFinally(s -> excelWriter.finish())
|
|
|
|
- .doOnError(sink::error)
|
|
|
|
- .subscribe());
|
|
|
|
- })
|
|
|
|
- );
|
|
|
|
|
|
+ }
|
|
|
|
+ return Mono.empty();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|