|
@@ -15,6 +15,7 @@ import org.hswebframework.web.api.crud.entity.QueryNoPagingOperation;
|
|
|
import org.hswebframework.web.api.crud.entity.QueryOperation;
|
|
|
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
|
|
|
import org.hswebframework.web.authorization.Authentication;
|
|
|
+import org.hswebframework.web.authorization.Dimension;
|
|
|
import org.hswebframework.web.authorization.annotation.*;
|
|
|
import org.hswebframework.web.bean.FastBeanCopier;
|
|
|
import org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController;
|
|
@@ -72,6 +73,8 @@ import java.util.Map;
|
|
|
import java.util.function.Function;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
+import static org.hswebframework.reactor.excel.ReactorExcel.*;
|
|
|
+
|
|
|
@RestController
|
|
|
@RequestMapping({"/device-instance", "/device/instance"})
|
|
|
@Authorize
|
|
@@ -444,33 +447,45 @@ public class DeviceInstanceController implements
|
|
|
@Operation(summary = "导入设备数据")
|
|
|
public Flux<ImportDeviceInstanceResult> doBatchImportByProduct(@PathVariable @Parameter(description = "产品ID") String productId,
|
|
|
@RequestParam @Parameter(description = "文件地址,支持csv,xlsx文件格式") String fileUrl) {
|
|
|
- return this
|
|
|
- .getDeviceProductDetail(productId)
|
|
|
- .map(tp4 -> Tuples.of(new DeviceWrapper(tp4.getT3().getTags(), tp4.getT4()), tp4.getT1()))
|
|
|
- .flatMapMany(wrapper -> importExportService
|
|
|
- .getInputStream(fileUrl)
|
|
|
- .flatMapMany(inputStream -> ReactorExcel.read(inputStream, FileUtils.getExtension(fileUrl), wrapper.getT1()))
|
|
|
- .doOnNext(info -> info.setProductName(wrapper.getT2().getName()))
|
|
|
- )
|
|
|
- .map(info -> {
|
|
|
- DeviceInstanceEntity entity = FastBeanCopier.copy(info, new DeviceInstanceEntity());
|
|
|
- entity.setProductId(productId);
|
|
|
- if (StringUtils.isEmpty(entity.getId())) {
|
|
|
- throw new BusinessException("第" + (info.getRowNumber() + 1) + "行:设备ID不能为空");
|
|
|
- }
|
|
|
- return Tuples.of(entity, info.getTags());
|
|
|
- })
|
|
|
- .buffer(100)//每100条数据保存一次
|
|
|
- .publishOn(Schedulers.single())
|
|
|
- .concatMap(buffer ->
|
|
|
- Mono.zip(
|
|
|
- service.save(Flux.fromIterable(buffer).map(Tuple2::getT1)),
|
|
|
- tagRepository
|
|
|
- .save(Flux.fromIterable(buffer).flatMapIterable(Tuple2::getT2))
|
|
|
- .defaultIfEmpty(SaveResult.of(0, 0))
|
|
|
- ))
|
|
|
- .map(res -> ImportDeviceInstanceResult.success(res.getT1()))
|
|
|
- .onErrorResume(err -> Mono.just(ImportDeviceInstanceResult.error(err)));
|
|
|
+ return Authentication
|
|
|
+ .currentReactive()
|
|
|
+ .flatMapMany(auth -> {
|
|
|
+
|
|
|
+ //从当前用户的维度中获取机构信息,需要将用户绑定到对应到机构.
|
|
|
+ Map<String, String> orgMapping = auth
|
|
|
+ .getDimensions("org")
|
|
|
+ .stream()
|
|
|
+ .collect(Collectors.toMap(Dimension::getName, Dimension::getId, (_1, _2) -> _1));
|
|
|
+
|
|
|
+ return this
|
|
|
+ .getDeviceProductDetail(productId)
|
|
|
+ .map(tp4 -> Tuples.of(new DeviceWrapper(tp4.getT3().getTags(), tp4.getT4()), tp4.getT1()))
|
|
|
+ .flatMapMany(wrapper -> importExportService
|
|
|
+ .getInputStream(fileUrl)
|
|
|
+ .flatMapMany(inputStream -> read(inputStream, FileUtils.getExtension(fileUrl), wrapper.getT1()))
|
|
|
+ .doOnNext(info -> info.setProductName(wrapper.getT2().getName()))
|
|
|
+ )
|
|
|
+ .map(info -> {
|
|
|
+ DeviceInstanceEntity entity = FastBeanCopier.copy(info, new DeviceInstanceEntity());
|
|
|
+ entity.setProductId(productId);
|
|
|
+ entity.setOrgId(orgMapping.get(info.getOrgName()));
|
|
|
+ if (StringUtils.isEmpty(entity.getId())) {
|
|
|
+ throw new BusinessException("第" + (info.getRowNumber() + 1) + "行:设备ID不能为空");
|
|
|
+ }
|
|
|
+ return Tuples.of(entity, info.getTags());
|
|
|
+ })
|
|
|
+ .buffer(100)//每100条数据保存一次
|
|
|
+ .publishOn(Schedulers.single())
|
|
|
+ .concatMap(buffer ->
|
|
|
+ Mono.zip(
|
|
|
+ service.save(Flux.fromIterable(buffer).map(Tuple2::getT1)),
|
|
|
+ tagRepository
|
|
|
+ .save(Flux.fromIterable(buffer).flatMapIterable(Tuple2::getT2))
|
|
|
+ .defaultIfEmpty(SaveResult.of(0, 0))
|
|
|
+ ))
|
|
|
+ .map(res -> ImportDeviceInstanceResult.success(res.getT1()))
|
|
|
+ .onErrorResume(err -> Mono.just(ImportDeviceInstanceResult.error(err)));
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
//获取导出模版
|
|
@@ -509,60 +524,72 @@ public class DeviceInstanceController implements
|
|
|
.displayName())));
|
|
|
parameter.setPaging(false);
|
|
|
parameter.toNestQuery(q -> q.is(DeviceInstanceEntity::getProductId, productId));
|
|
|
- return getDeviceProductDetail(productId)
|
|
|
- .map(tp4 -> Tuples
|
|
|
- .of(
|
|
|
- //表头
|
|
|
- DeviceExcelInfo.getExportHeaderMapping(tp4.getT3().getTags(), tp4.getT4()),
|
|
|
- //配置key集合
|
|
|
- tp4
|
|
|
- .getT4()
|
|
|
- .stream()
|
|
|
- .map(ConfigPropertyMetadata::getProperty)
|
|
|
- .collect(Collectors.toList())
|
|
|
- ))
|
|
|
- .defaultIfEmpty(Tuples.of(DeviceExcelInfo.getExportHeaderMapping(Collections.emptyList(),
|
|
|
- Collections.emptyList()),
|
|
|
- Collections.emptyList()))
|
|
|
- .flatMapMany(headerAndConfigKey ->
|
|
|
- ReactorExcel.<DeviceExcelInfo>writer(format)
|
|
|
- .headers(headerAndConfigKey.getT1())
|
|
|
- .converter(DeviceExcelInfo::toMap)
|
|
|
- .writeBuffer(
|
|
|
- service.query(parameter)
|
|
|
- .flatMap(entity -> {
|
|
|
- DeviceExcelInfo exportEntity = FastBeanCopier.copy(entity, new DeviceExcelInfo(), "state");
|
|
|
- exportEntity.setState(entity.getState().getText());
|
|
|
- return registry
|
|
|
- .getDevice(entity.getId())
|
|
|
- .flatMap(deviceOperator -> deviceOperator
|
|
|
- .getSelfConfigs(headerAndConfigKey.getT2())
|
|
|
- .map(Values::getAllValues))
|
|
|
- .doOnNext(configs -> exportEntity
|
|
|
- .getConfiguration()
|
|
|
- .putAll(configs))
|
|
|
- .thenReturn(exportEntity);
|
|
|
- })
|
|
|
- .buffer(200)
|
|
|
- .flatMap(list -> {
|
|
|
- Map<String, DeviceExcelInfo> importInfo = list
|
|
|
- .stream()
|
|
|
- .collect(Collectors.toMap(DeviceExcelInfo::getId, Function.identity()));
|
|
|
- return tagRepository.createQuery()
|
|
|
- .where()
|
|
|
- .in(DeviceTagEntity::getDeviceId, importInfo.keySet())
|
|
|
- .fetch()
|
|
|
- .collect(Collectors.groupingBy(DeviceTagEntity::getDeviceId))
|
|
|
- .flatMapIterable(Map::entrySet)
|
|
|
- .doOnNext(entry -> importInfo
|
|
|
- .get(entry.getKey())
|
|
|
- .setTags(entry.getValue()))
|
|
|
- .thenMany(Flux.fromIterable(list));
|
|
|
- })
|
|
|
- , 512 * 1024))//缓冲512k
|
|
|
- .doOnError(err -> log.error(err.getMessage(), err))
|
|
|
- .map(bufferFactory::wrap)
|
|
|
- .as(response::writeWith);
|
|
|
+ return Authentication
|
|
|
+ .currentReactive()
|
|
|
+ .flatMap(auth -> {
|
|
|
+ //从当前用户的维度中获取机构信息,需要将用户绑定到对应到机构.
|
|
|
+ Map<String, String> orgMapping = auth
|
|
|
+ .getDimensions("org")
|
|
|
+ .stream()
|
|
|
+ .collect(Collectors.toMap(Dimension::getId, Dimension::getName, (_1, _2) -> _1));
|
|
|
+ return this
|
|
|
+ .getDeviceProductDetail(productId)
|
|
|
+ .map(tp4 -> Tuples
|
|
|
+ .of(
|
|
|
+ //表头
|
|
|
+ DeviceExcelInfo.getExportHeaderMapping(tp4.getT3().getTags(), tp4.getT4()),
|
|
|
+ //配置key集合
|
|
|
+ tp4
|
|
|
+ .getT4()
|
|
|
+ .stream()
|
|
|
+ .map(ConfigPropertyMetadata::getProperty)
|
|
|
+ .collect(Collectors.toList())
|
|
|
+ ))
|
|
|
+ .defaultIfEmpty(Tuples.of(DeviceExcelInfo.getExportHeaderMapping(Collections.emptyList(), Collections
|
|
|
+ .emptyList()),
|
|
|
+ Collections.emptyList()))
|
|
|
+ .flatMapMany(headerAndConfigKey -> ReactorExcel
|
|
|
+ .<DeviceExcelInfo>writer(format)
|
|
|
+ .headers(headerAndConfigKey.getT1())
|
|
|
+ .converter(DeviceExcelInfo::toMap)
|
|
|
+ .writeBuffer(service
|
|
|
+ .query(parameter)
|
|
|
+ .flatMap(entity -> {
|
|
|
+ DeviceExcelInfo exportEntity = FastBeanCopier.copy(entity, new DeviceExcelInfo(), "state");
|
|
|
+ exportEntity.setOrgName(orgMapping.get(entity.getOrgId()));
|
|
|
+ exportEntity.setState(entity.getState().getText());
|
|
|
+ return registry
|
|
|
+ .getDevice(entity.getId())
|
|
|
+ .flatMap(deviceOperator -> deviceOperator
|
|
|
+ .getSelfConfigs(headerAndConfigKey.getT2())
|
|
|
+ .map(Values::getAllValues))
|
|
|
+ .doOnNext(configs -> exportEntity
|
|
|
+ .getConfiguration()
|
|
|
+ .putAll(configs))
|
|
|
+ .thenReturn(exportEntity);
|
|
|
+ })
|
|
|
+ .buffer(200)
|
|
|
+ .flatMap(list -> {
|
|
|
+ Map<String, DeviceExcelInfo> importInfo = list
|
|
|
+ .stream()
|
|
|
+ .collect(Collectors.toMap(DeviceExcelInfo::getId, Function.identity()));
|
|
|
+ return tagRepository
|
|
|
+ .createQuery()
|
|
|
+ .where()
|
|
|
+ .in(DeviceTagEntity::getDeviceId, importInfo.keySet())
|
|
|
+ .fetch()
|
|
|
+ .collect(Collectors.groupingBy(DeviceTagEntity::getDeviceId))
|
|
|
+ .flatMapIterable(Map::entrySet)
|
|
|
+ .doOnNext(entry -> importInfo
|
|
|
+ .get(entry.getKey())
|
|
|
+ .setTags(entry.getValue()))
|
|
|
+ .thenMany(Flux.fromIterable(list));
|
|
|
+ })
|
|
|
+ , 512 * 1024))//缓冲512k
|
|
|
+ .doOnError(err -> log.error(err.getMessage(), err))
|
|
|
+ .map(bufferFactory::wrap)
|
|
|
+ .as(response::writeWith);
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
|