|
@@ -34,6 +34,7 @@ import java.util.Date;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.function.Function;
|
|
|
|
|
|
@Component
|
|
|
@AllArgsConstructor
|
|
@@ -111,6 +112,11 @@ public class DeviceMessageBusinessHandler {
|
|
|
@Subscribe("/device/*/*/register")
|
|
|
@Transactional(propagation = Propagation.NEVER)
|
|
|
public Mono<Void> autoRegisterDevice(DeviceRegisterMessage message) {
|
|
|
+ if (message.getHeader(Headers.force).orElse(false)) {
|
|
|
+ return this
|
|
|
+ .doAutoRegister(message)
|
|
|
+ .then();
|
|
|
+ }
|
|
|
return registry
|
|
|
.getDevice(message.getDeviceId())
|
|
|
.flatMap(device -> {
|
|
@@ -140,8 +146,9 @@ public class DeviceMessageBusinessHandler {
|
|
|
* @return void
|
|
|
*/
|
|
|
@Subscribe("/device/*/*/message/children/*/register")
|
|
|
- @Transactional(propagation = Propagation.NEVER)
|
|
|
+ @Transactional(propagation = Propagation.REQUIRES_NEW)
|
|
|
public Mono<Void> autoBindChildrenDevice(ChildDeviceMessage message) {
|
|
|
+
|
|
|
Message childMessage = message.getChildDeviceMessage();
|
|
|
if (childMessage instanceof DeviceRegisterMessage) {
|
|
|
String childId = ((DeviceRegisterMessage) childMessage).getDeviceId();
|
|
@@ -149,20 +156,27 @@ public class DeviceMessageBusinessHandler {
|
|
|
log.warn("子设备注册消息循环依赖:{}", message);
|
|
|
return Mono.empty();
|
|
|
}
|
|
|
+ //网关设备添加到header中
|
|
|
+ childMessage.addHeaderIfAbsent(DeviceConfigKey.parentGatewayId.getKey(), message.getDeviceId());
|
|
|
+
|
|
|
return registry
|
|
|
.getDevice(childId)
|
|
|
- .switchIfEmpty(Mono.defer(() -> doAutoRegister(((DeviceRegisterMessage) childMessage))))
|
|
|
- .flatMap(dev -> dev
|
|
|
- .setConfig(DeviceConfigKey.parentGatewayId, message.getDeviceId())
|
|
|
- .thenReturn(dev))
|
|
|
- .flatMap(DeviceOperator::getState)
|
|
|
- .flatMap(state -> deviceService
|
|
|
- .createUpdate()
|
|
|
- .set(DeviceInstanceEntity::getParentId, message.getDeviceId())
|
|
|
- .set(DeviceInstanceEntity::getState, DeviceState.of(state))
|
|
|
- .where(DeviceInstanceEntity::getId, childId)
|
|
|
- .execute()
|
|
|
- ).then();
|
|
|
+ .map(device -> device
|
|
|
+ .getState()
|
|
|
+ //更新数据库
|
|
|
+ .flatMap(state -> deviceService
|
|
|
+ .createUpdate()
|
|
|
+ .set(DeviceInstanceEntity::getParentId, message.getDeviceId())
|
|
|
+ //状态还有更好的更新方式?
|
|
|
+ .set(DeviceInstanceEntity::getState, DeviceState.of(state))
|
|
|
+ .where(DeviceInstanceEntity::getId, childId)
|
|
|
+ .execute())
|
|
|
+ //更新缓存
|
|
|
+ .then(device.setConfig(DeviceConfigKey.parentGatewayId, message.getDeviceId()))
|
|
|
+ .thenReturn(device))
|
|
|
+ .defaultIfEmpty(Mono.defer(() -> doAutoRegister(((DeviceRegisterMessage) childMessage))))
|
|
|
+ .flatMap(Function.identity())
|
|
|
+ .then();
|
|
|
}
|
|
|
return Mono.empty();
|
|
|
}
|