瀏覽代碼

优化物模型发布

zhou-hao 4 年之前
父節點
當前提交
270c1e70ee

+ 36 - 33
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/events/handler/DeviceProductDeployHandler.java

@@ -1,6 +1,9 @@
 package org.jetlinks.community.device.events.handler;
 
 import lombok.extern.slf4j.Slf4j;
+import org.jetlinks.community.device.service.data.DeviceDataService;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.event.Subscription;
 import org.jetlinks.core.metadata.DeviceMetadata;
 import org.jetlinks.core.metadata.DeviceMetadataCodec;
 import org.jetlinks.community.device.events.DeviceProductDeployEvent;
@@ -13,9 +16,13 @@ import org.springframework.boot.CommandLineRunner;
 import org.springframework.context.event.EventListener;
 import org.springframework.core.annotation.Order;
 import org.springframework.stereotype.Component;
+import reactor.core.Disposable;
+import reactor.core.publisher.Mono;
+
+import javax.annotation.PreDestroy;
 
 /**
- * 处理设备型号发布事件
+ * 处理设备产品发布事件
  *
  * @author bsetfeng
  * @author zhouhao
@@ -28,52 +35,48 @@ public class DeviceProductDeployHandler implements CommandLineRunner {
 
     private final LocalDeviceProductService productService;
 
-    private DeviceMetadataCodec codec = new JetLinksDeviceMetadataCodec();
+    private final DeviceMetadataCodec codec = new JetLinksDeviceMetadataCodec();
+
+    private final DeviceDataService dataService;
 
-    private final TimeSeriesManager timeSeriesManager;
 
     @Autowired
-    public DeviceProductDeployHandler(LocalDeviceProductService productService, TimeSeriesManager timeSeriesManager) {
+    public DeviceProductDeployHandler(LocalDeviceProductService productService,
+                                      DeviceDataService dataService) {
         this.productService = productService;
-        this.timeSeriesManager = timeSeriesManager;
+        this.dataService = dataService;
     }
 
+
     @EventListener
     public void handlerEvent(DeviceProductDeployEvent event) {
-        initDeviceEventTimeSeriesMetadata(event.getMetadata(), event.getId());
-        initDevicePropertiesTimeSeriesMetadata(event.getId());
-        initDeviceLogTimeSeriesMetadata(event.getId());
-    }
-
-    private void initDeviceEventTimeSeriesMetadata(String metadata, String productId) {
-        codec.decode(metadata)
-            .flatMapIterable(DeviceMetadata::getEvents)
-            .flatMap(eventMetadata -> timeSeriesManager.registerMetadata(DeviceTimeSeriesMetadata.event(productId, eventMetadata)))
-            .doOnError(err -> log.error(err.getMessage(), err))
-            .subscribe();
+        event.async(
+            this
+                .doRegisterMetadata(event.getId(), event.getMetadata())
+        );
     }
 
-    private void initDevicePropertiesTimeSeriesMetadata(String productId) {
-        timeSeriesManager.registerMetadata(DeviceTimeSeriesMetadata.properties(productId))
-            .doOnError(err -> log.error(err.getMessage(), err))
-            .subscribe();
+    private Mono<Void> doRegisterMetadata(String productId, String metadataString) {
+        return codec
+            .decode(metadataString)
+            .flatMap(metadata -> dataService.registerMetadata(productId, metadata));
     }
 
-    private void initDeviceLogTimeSeriesMetadata(String productId) {
-        timeSeriesManager.registerMetadata(DeviceTimeSeriesMetadata.log(productId))
-            .doOnError(err -> log.error(err.getMessage(), err))
-            .subscribe();
-    }
 
     @Override
-    public void run(String... args) throws Exception {
-        productService.createQuery()
+    public void run(String... args) {
+        //启动时发布物模型
+        productService
+            .createQuery()
             .fetch()
-            .filter(productService -> new Byte((byte) 1).equals(productService.getState()))
-            .subscribe(deviceProductEntity -> {
-                initDeviceEventTimeSeriesMetadata(deviceProductEntity.getMetadata(), deviceProductEntity.getId());
-                initDevicePropertiesTimeSeriesMetadata(deviceProductEntity.getId());
-                initDeviceLogTimeSeriesMetadata(deviceProductEntity.getId());
-            });
+            .filter(product -> new Byte((byte) 1).equals(product.getState()))
+            .flatMap(deviceProductEntity -> this
+                .doRegisterMetadata(deviceProductEntity.getId(), deviceProductEntity.getMetadata())
+                .onErrorResume(err -> {
+                    log.warn("register product metadata error", err);
+                    return Mono.empty();
+                })
+            )
+            .subscribe();
     }
 }