|
@@ -15,7 +15,9 @@ import org.jetlinks.community.gateway.DeviceMessageUtils;
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Mono;
|
|
import reactor.core.publisher.Mono;
|
|
|
|
|
|
|
|
+import java.util.Comparator;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
|
|
+import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
|
|
|
|
@Slf4j
|
|
@Slf4j
|
|
@@ -82,12 +84,20 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
|
|
},
|
|
},
|
|
Subscription.Feature.local, Subscription.Feature.broker
|
|
Subscription.Feature.local, Subscription.Feature.broker
|
|
);
|
|
);
|
|
-
|
|
|
|
|
|
+ List<PropertyMetadata> props = metadata.getProperties();
|
|
|
|
+ Map<String, Integer> index = new HashMap<>();
|
|
|
|
+ int idx = 0;
|
|
|
|
+ for (PropertyMetadata prop : props) {
|
|
|
|
+ index.put(prop.getId(), idx++);
|
|
|
|
+ }
|
|
return
|
|
return
|
|
eventBus
|
|
eventBus
|
|
.subscribe(subscription, DeviceMessage.class)
|
|
.subscribe(subscription, DeviceMessage.class)
|
|
.flatMap(msg -> Mono.justOrEmpty(DeviceMessageUtils.tryGetProperties(msg)))
|
|
.flatMap(msg -> Mono.justOrEmpty(DeviceMessageUtils.tryGetProperties(msg)))
|
|
- .flatMap(map -> Flux.fromIterable(map.entrySet()))
|
|
|
|
|
|
+ .flatMap(map -> Flux
|
|
|
|
+ .fromIterable(map.entrySet())
|
|
|
|
+ //对本次上报的属性进行排序
|
|
|
|
+ .sort(Comparator.comparingInt(e -> index.getOrDefault(e.getKey(), 0))))
|
|
.<MeasurementValue>map(kv -> SimpleMeasurementValue.of(createValue(kv.getKey(), kv.getValue()), System.currentTimeMillis()))
|
|
.<MeasurementValue>map(kv -> SimpleMeasurementValue.of(createValue(kv.getKey(), kv.getValue()), System.currentTimeMillis()))
|
|
.onErrorContinue((err, v) -> log.error(err.getMessage(), err))
|
|
.onErrorContinue((err, v) -> log.error(err.getMessage(), err))
|
|
;
|
|
;
|