|
@@ -3,15 +3,22 @@ package org.jetlinks.community.device.message;
|
|
|
import lombok.AllArgsConstructor;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.hswebframework.web.logger.ReactiveLogger;
|
|
|
+import org.jetlinks.community.PropertyMetadataConstants;
|
|
|
import org.jetlinks.core.device.DeviceOperator;
|
|
|
import org.jetlinks.core.device.DeviceRegistry;
|
|
|
import org.jetlinks.core.event.EventBus;
|
|
|
import org.jetlinks.core.message.ChildDeviceMessage;
|
|
|
import org.jetlinks.core.message.DeviceMessage;
|
|
|
+import org.jetlinks.core.message.Message;
|
|
|
import org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor;
|
|
|
+import org.jetlinks.core.message.property.WritePropertyMessage;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
+import reactor.core.publisher.Flux;
|
|
|
import reactor.core.publisher.Mono;
|
|
|
|
|
|
+import java.util.Map;
|
|
|
+import java.util.function.Function;
|
|
|
+
|
|
|
/**
|
|
|
* 发送设备指令的时候,将消息推送到网关中.
|
|
|
*
|
|
@@ -27,26 +34,53 @@ public class DeviceMessageSendLogInterceptor implements DeviceMessageSenderInter
|
|
|
|
|
|
private final DeviceRegistry registry;
|
|
|
|
|
|
- public Mono<Void> doPublish(Mono<DeviceOperator> device, DeviceMessage message) {
|
|
|
- return device
|
|
|
- .zipWhen(DeviceOperator::getProduct)
|
|
|
- .doOnNext(tp2 -> message.addHeader("productId", tp2.getT2().getId()))
|
|
|
- .flatMap(tp2 -> {
|
|
|
- String topic = DeviceMessageConnector.createDeviceMessageTopic(tp2.getT2().getId(),tp2.getT1().getDeviceId(),message);
|
|
|
-
|
|
|
+ public Mono<Void> doPublish(Message message) {
|
|
|
+ return DeviceMessageConnector
|
|
|
+ .createDeviceMessageTopic(registry, message)
|
|
|
+ .flatMap(topic -> {
|
|
|
Mono<Void> publisher = eventBus.publish(topic, message).then();
|
|
|
-
|
|
|
if (message instanceof ChildDeviceMessage) {
|
|
|
- DeviceMessage msg = (DeviceMessage) ((ChildDeviceMessage) message).getChildDeviceMessage();
|
|
|
- publisher = publisher.then(doPublish(registry.getDevice(msg.getDeviceId()), msg));
|
|
|
+ publisher = publisher.then(doPublish(((ChildDeviceMessage) message).getChildDeviceMessage()));
|
|
|
}
|
|
|
return publisher;
|
|
|
- });
|
|
|
+ })
|
|
|
+ .then();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public <R extends DeviceMessage> Flux<R> afterSent(DeviceOperator device, DeviceMessage message, Flux<R> reply) {
|
|
|
+ if (message instanceof WritePropertyMessage) {
|
|
|
+ Map<String, Object> properties =((WritePropertyMessage) message).getProperties();
|
|
|
+ if (properties.size() == 1) {
|
|
|
+ String property = properties.keySet().iterator().next();
|
|
|
+ Object value = properties.values().iterator().next();
|
|
|
+ //手动写值的属性则直接返回
|
|
|
+ return device
|
|
|
+ .getMetadata()
|
|
|
+ .flatMap(metadata -> Mono
|
|
|
+ .justOrEmpty(
|
|
|
+ metadata
|
|
|
+ .getProperty(property)
|
|
|
+ .filter(PropertyMetadataConstants.Source::isManual)
|
|
|
+ .map(ignore -> ((WritePropertyMessage) message)
|
|
|
+ .newReply()
|
|
|
+ .addHeader("source", PropertyMetadataConstants.Source.manual)
|
|
|
+ .addProperty(property, value)
|
|
|
+ .success()
|
|
|
+ )
|
|
|
+ ))
|
|
|
+ .map(replyMsg -> this.doPublish(replyMsg).thenReturn((R) replyMsg).flux())
|
|
|
+ .defaultIfEmpty(reply)
|
|
|
+ .flatMapMany(Function.identity());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return reply;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public Mono<DeviceMessage> preSend(DeviceOperator device, DeviceMessage message) {
|
|
|
- return doPublish(Mono.just(device), message)
|
|
|
+ return this
|
|
|
+ .doPublish(message )
|
|
|
.thenReturn(message)
|
|
|
.doOnEach(ReactiveLogger.onComplete(() -> {
|
|
|
if (log.isDebugEnabled()) {
|