Selaa lähdekoodia

优化子设备消息处理

zhou-hao 4 vuotta sitten
vanhempi
commit
c919a42f19

+ 58 - 30
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java

@@ -255,47 +255,75 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
             ;
     }
 
-    private Mono<Void> handleMessage(String deviceId,
-                                     DeviceOperator device,
-                                     DeviceMessage message,
-                                     MqttConnectionSession firstSession) {
-        DeviceSession managedSession = sessionManager.getSession(deviceId);
-
-        //主动离线
-        if (message instanceof DeviceOfflineMessage) {
-            sessionManager.unregister(deviceId);
-            return Mono.empty();
+    protected Mono<Boolean> handleOnlineOffline(DeviceMessage message, MqttConnectionSession firstSession) {
+        if (message == null || message.getDeviceId() == null) {
+            return Mono.just(false);
         }
-
-        //session 不存在,可能是同一个mqtt返回多个设备消息
-        if (managedSession == null) {
-            firstSession = new MqttConnectionSession(deviceId, device, getTransport(), firstSession.getConnection(), gatewayMonitor);
-            sessionManager.register(managedSession = firstSession);
+        String deviceId = message.getDeviceId();
+        Mono<Boolean> then = Mono.empty();
+        if (message instanceof ChildDeviceMessage) {
+            then = handleOnlineOffline((DeviceMessage) ((ChildDeviceMessage) message).getChildDeviceMessage(), firstSession);
+        } else if (message instanceof ChildDeviceMessageReply) {
+            then = handleOnlineOffline((DeviceMessage) ((ChildDeviceMessageReply) message).getChildDeviceMessage(), firstSession);
+        } else if (message instanceof DeviceOnlineMessage) {
+            then = Mono.just(true);
+        } else if (message instanceof DeviceOfflineMessage) {
+            sessionManager.unregister(deviceId);
+            return Mono.just(true);
         }
+        if (firstSession.isAlive()) {
+            DeviceSession[] managedSession = new DeviceSession[]{
+                sessionManager.getSession(deviceId)
+            };
+            //session 不存在,可能是同一个mqtt返回多个设备消息
+            if (managedSession[0] == null) {
+                then = registry
+                    .getDevice(deviceId)
+                    .doOnNext(device -> sessionManager
+                        .register(managedSession[0] =
+                                      new MqttConnectionSession(deviceId,
+                                                                device,
+                                                                getTransport(),
+                                                                firstSession.getConnection(),
+                                                                gatewayMonitor)))
+                    .then(then);
+            } else {
+                managedSession[0] = firstSession;
+            }
 
-        //保持会话,在低功率设备上,可能无法保持mqtt长连接.
-        if (message.getHeader(Headers.keepOnline).orElse(false)) {
-            if (!managedSession.isWrapFrom(KeepOnlineSession.class)) {
-                int timeout = message.getHeaderOrDefault(Headers.keepOnlineTimeoutSeconds);
-                KeepOnlineSession keepOnlineSession = new KeepOnlineSession(firstSession, Duration.ofSeconds(timeout));
-                //替换会话
-                managedSession = sessionManager.replace(firstSession, keepOnlineSession);
+            //保持会话,在低功率设备上,可能无法保持mqtt长连接.
+            if (message.getHeader(Headers.keepOnline).orElse(false)) {
+                then = Mono
+                    .fromRunnable(() -> {
+                        DeviceSession session = managedSession[0];
+                        if (!session.isWrapFrom(KeepOnlineSession.class)) {
+                            int timeout = message.getHeaderOrDefault(Headers.keepOnlineTimeoutSeconds);
+                            KeepOnlineSession keepOnlineSession = new KeepOnlineSession(session, Duration.ofSeconds(timeout));
+                            //替换会话
+                            session = sessionManager.replace(session, keepOnlineSession);
+                            session.keepAlive();
+                        }
+                    })
+                    .then(then);
+            }else if(managedSession[0]!=null) {
+                managedSession[0].keepAlive();
             }
-        } else {
-            managedSession = firstSession;
         }
 
-        managedSession.keepAlive();
+        return then;
+    }
 
+    private Mono<Void> handleMessage(String deviceId,
+                                     DeviceOperator device,
+                                     DeviceMessage message,
+                                     MqttConnectionSession firstSession) {
         if (messageProcessor.hasDownstreams()) {
             sink.next(message);
         }
-        if (message instanceof DeviceOnlineMessage) {
-            return Mono.empty();
-        }
 
-        return messageHandler
-            .handleMessage(device, message)
+        return handleOnlineOffline(message, firstSession)
+            //只有empty才转发消息
+            .switchIfEmpty(messageHandler.handleMessage(device, message))
             .then();
     }
 

+ 215 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/DeviceMessageBusinessHandler.java

@@ -0,0 +1,215 @@
+package org.jetlinks.community.device.service;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
+import org.jetlinks.core.device.DeviceConfigKey;
+import org.jetlinks.core.device.DeviceOperator;
+import org.jetlinks.core.device.DeviceRegistry;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.event.Subscription;
+import org.jetlinks.core.message.*;
+import org.jetlinks.core.utils.FluxUtils;
+import org.jetlinks.community.device.entity.DeviceInstanceEntity;
+import org.jetlinks.community.device.entity.DeviceTagEntity;
+import org.jetlinks.community.device.enums.DeviceState;
+import org.jetlinks.community.gateway.annotation.Subscribe;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Propagation;
+import org.springframework.transaction.annotation.Transactional;
+import reactor.core.publisher.BufferOverflowStrategy;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+import javax.annotation.PostConstruct;
+import java.time.Duration;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Component
+@AllArgsConstructor
+@Slf4j
+public class DeviceMessageBusinessHandler {
+
+    private final LocalDeviceInstanceService deviceService;
+
+    private final LocalDeviceProductService productService;
+
+    private final DeviceRegistry registry;
+
+    private final ReactiveRepository<DeviceTagEntity, String> tagRepository;
+
+    private final EventBus eventBus;
+
+    private Mono<DeviceOperator> doAutoRegister(DeviceRegisterMessage message) {
+        //自动注册
+        return Mono
+            .zip(
+                //T1. 设备ID
+                Mono.justOrEmpty(message.getDeviceId()),
+                //T2. 设备名称
+                Mono.justOrEmpty(message.getHeader("deviceName")).map(String::valueOf),
+                //T3. 产品ID
+                Mono.justOrEmpty(message.getHeader("productId").map(String::valueOf)),
+                //T4. 产品
+                Mono.justOrEmpty(message.getHeader("productId").map(String::valueOf))
+                    .flatMap(productService::findById),
+                //T5. 配置信息
+                Mono.justOrEmpty(message.getHeader("configuration").map(Map.class::cast).orElse(new HashMap()))
+            ).flatMap(tps -> {
+                DeviceInstanceEntity instance = new DeviceInstanceEntity();
+                instance.setId(tps.getT1());
+                instance.setName(tps.getT2());
+                instance.setProductId(tps.getT3());
+                instance.setProductName(tps.getT4().getName());
+                instance.setConfiguration(tps.getT5());
+                instance.setRegistryTime(message.getTimestamp());
+                instance.setCreateTimeNow();
+                instance.setCreatorId(tps.getT4().getCreatorId());
+                instance.setOrgId(tps.getT4().getOrgId());
+                instance.setState(DeviceState.online);
+                return deviceService
+                    .save(Mono.just(instance))
+                    .thenReturn(instance)
+                    .flatMap(device -> registry
+                        .register(device.toDeviceInfo().addConfig("state", DeviceState.online)));
+            });
+    }
+
+    @Subscribe("/device/*/*/register")
+    @Transactional(propagation = Propagation.NEVER)
+    public Mono<Void> autoRegisterDevice(DeviceRegisterMessage message) {
+        return registry
+            .getDevice(message.getDeviceId())
+            .switchIfEmpty(Mono.defer(() -> {
+                //自动注册
+                return doAutoRegister(message);
+            }))
+            .then();
+    }
+
+    /**
+     * 通过订阅子设备注册消息,自动绑定子设备到网关设备
+     *
+     * @param message 子设备消息
+     * @return void
+     */
+    @Subscribe("/device/*/*/message/children/*/register")
+    @Transactional(propagation = Propagation.NEVER)
+    public Mono<Void> autoBindChildrenDevice(ChildDeviceMessage message) {
+        String childId = message.getChildDeviceId();
+        Message childMessage = message.getChildDeviceMessage();
+        if (childMessage instanceof DeviceRegisterMessage) {
+
+            return registry
+                .getDevice(childId)
+                .switchIfEmpty(Mono.defer(() -> doAutoRegister(((DeviceRegisterMessage) childMessage))))
+                .flatMap(dev -> dev
+                    .setConfig(DeviceConfigKey.parentGatewayId, message.getDeviceId())
+                    .thenReturn(dev))
+                .flatMap(DeviceOperator::getState)
+                .flatMap(state -> deviceService
+                    .createUpdate()
+                    .set(DeviceInstanceEntity::getParentId, message.getDeviceId())
+                    .set(DeviceInstanceEntity::getState, DeviceState.of(state))
+                    .where(DeviceInstanceEntity::getId, childId)
+                    .execute()
+                ).then();
+        }
+        return Mono.empty();
+    }
+
+    /**
+     * 通过订阅子设备注销消息,自动解绑子设备
+     *
+     * @param message 子设备消息
+     * @return void
+     */
+    @Subscribe("/device/*/*/message/children/*/unregister")
+    public Mono<Void> autoUnbindChildrenDevice(ChildDeviceMessage message) {
+        String childId = message.getChildDeviceId();
+        Message childMessage = message.getChildDeviceMessage();
+        if (childMessage instanceof DeviceUnRegisterMessage) {
+
+            return registry
+                .getDevice(childId)
+                .flatMap(dev -> dev
+                    .removeConfig(DeviceConfigKey.parentGatewayId.getKey())
+                    .then(dev.checkState()))
+                .flatMap(state -> deviceService
+                    .createUpdate()
+                    .setNull(DeviceInstanceEntity::getParentId)
+                    .set(DeviceInstanceEntity::getState, DeviceState.of(state))
+                    .where(DeviceInstanceEntity::getId, childId)
+                    .execute()
+                    .then());
+        }
+        return Mono.empty();
+    }
+
+    @Subscribe("/device/*/*/message/tags/update")
+    public Mono<Void> updateDeviceTag(UpdateTagMessage message) {
+        Map<String, Object> tags = message.getTags();
+        String deviceId = message.getDeviceId();
+
+        return registry
+            .getDevice(deviceId)
+            .flatMap(DeviceOperator::getMetadata)
+            .flatMapMany(metadata -> Flux
+                .fromIterable(tags.entrySet())
+                .map(e -> {
+                    DeviceTagEntity tagEntity = metadata
+                        .getTag(e.getKey())
+                        .map(DeviceTagEntity::of)
+                        .orElseGet(() -> {
+                            DeviceTagEntity entity = new DeviceTagEntity();
+                            entity.setKey(e.getKey());
+                            entity.setType("string");
+                            entity.setName(e.getKey());
+                            entity.setCreateTime(new Date());
+                            entity.setDescription("设备上报");
+                            return entity;
+                        });
+                    tagEntity.setValue(String.valueOf(e.getValue()));
+                    tagEntity.setDeviceId(deviceId);
+                    tagEntity.setId(DeviceTagEntity.createTagId(deviceId, tagEntity.getKey()));
+                    return tagEntity;
+                }))
+            .as(tagRepository::save)
+            .then();
+    }
+
+
+    @PostConstruct
+    public void init() {
+
+        Subscription subscription = Subscription
+            .builder()
+            .subscriberId("device-state-synchronizer")
+            .topics("/device/*/*/online", "/device/*/*/offline")
+            .justLocal()//只订阅本地
+            .build();
+
+        //订阅设备上下线消息,同步数据库中的设备状态,
+        //最小间隔800毫秒,最大缓冲数量500,最长间隔2秒.
+        //如果2条消息间隔大于0.8秒则不缓冲直接更新
+        //否则缓冲,数量超过500后批量更新
+        //无论缓冲区是否超过500条,都每2秒更新一次.
+        FluxUtils.bufferRate(eventBus
+                                 .subscribe(subscription, DeviceMessage.class)
+                                 .map(DeviceMessage::getDeviceId),
+                             800, Integer.getInteger("device.state.sync.batch", 500), Duration.ofSeconds(2))
+                 .onBackpressureBuffer(64,
+                                       list -> log.warn("无法处理更多设备状态同步!"),
+                                       BufferOverflowStrategy.DROP_OLDEST)
+                 .publishOn(Schedulers.boundedElastic(), 64)
+                 .concatMap(list -> deviceService.syncStateBatch(Flux.just(list), false).map(List::size))
+                 .onErrorContinue((err, obj) -> log.error(err.getMessage(), err))
+                 .subscribe((i) -> log.info("同步设备状态成功:{}", i));
+
+    }
+
+}

+ 0 - 167
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java

@@ -305,34 +305,6 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
             .defaultIfEmpty(DeviceState.notActive);
     }
 
-    @PostConstruct
-    public void init() {
-
-        org.jetlinks.core.event.Subscription subscription = org.jetlinks.core.event.Subscription.of(
-            "device-state-synchronizer",
-            new String[]{
-                "/device/*/*/online",
-                "/device/*/*/offline"
-            },
-            Subscription.Feature.local
-        );
-
-        //订阅设备上下线消息,同步数据库中的设备状态,
-        //最小间隔800毫秒,最大缓冲数量500,最长间隔2秒.
-        //如果2条消息间隔大于0.8秒则不缓冲直接更新
-        //否则缓冲,数量超过500后批量更新
-        //无论缓冲区是否超过500条,都每2秒更新一次.
-        FluxUtils.bufferRate(eventBus
-                                 .subscribe(subscription, DeviceMessage.class)
-                                 .map(DeviceMessage::getDeviceId),
-                             800, Integer.getInteger("device.state.sync.batch", 500), Duration.ofSeconds(2))
-                 .publishOn(Schedulers.parallel())
-                 .concatMap(list -> syncStateBatch(Flux.just(list), false).map(List::size))
-                 .onErrorContinue((err, obj) -> log.error(err.getMessage(), err))
-                 .subscribe((i) -> log.info("同步设备状态成功:{}", i));
-    }
-
-
     public Flux<List<DeviceStateInfo>> syncStateBatch(Flux<List<String>> batch, boolean force) {
 
         return batch
@@ -483,143 +455,4 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
     }
 
 
-    @Subscribe("/device/*/*/register")
-    @Transactional(propagation = Propagation.NEVER)
-    public Mono<Void> autoRegisterDevice(DeviceRegisterMessage message) {
-        return registry
-            .getDevice(message.getDeviceId())
-            .switchIfEmpty(Mono.defer(() -> {
-                //自动注册
-                return doAutoRegister(message);
-            }))
-            .then();
-    }
-
-    private Mono<DeviceOperator> doAutoRegister(DeviceRegisterMessage message) {
-        //自动注册
-        return Mono
-            .zip(
-                //T1. 设备ID
-                Mono.justOrEmpty(message.getDeviceId()),
-                //T2. 设备名称
-                Mono.justOrEmpty(message.getHeader("deviceName")).map(String::valueOf),
-                //T3. 产品ID
-                Mono.justOrEmpty(message.getHeader("productId").map(String::valueOf)),
-                //T4. 产品
-                Mono.justOrEmpty(message.getHeader("productId").map(String::valueOf))
-                    .flatMap(deviceProductService::findById),
-                //T5. 配置信息
-                Mono.justOrEmpty(message.getHeader("configuration").map(Map.class::cast).orElse(new HashMap()))
-            ).flatMap(tps -> {
-                DeviceInstanceEntity instance = new DeviceInstanceEntity();
-                instance.setId(tps.getT1());
-                instance.setName(tps.getT2());
-                instance.setProductId(tps.getT3());
-                instance.setProductName(tps.getT4().getName());
-                instance.setConfiguration(tps.getT5());
-                instance.setRegistryTime(message.getTimestamp());
-                instance.setCreateTimeNow();
-                instance.setCreatorId(tps.getT4().getCreatorId());
-                instance.setOrgId(tps.getT4().getOrgId());
-                instance.setState(DeviceState.online);
-                return super
-                    .save(Mono.just(instance))
-                    .thenReturn(instance)
-                    .flatMap(device -> registry
-                        .register(device.toDeviceInfo().addConfig("state", DeviceState.online)));
-            });
-    }
-
-    /**
-     * 通过订阅子设备注册消息,自动绑定子设备到网关设备
-     *
-     * @param message 子设备消息
-     * @return void
-     */
-    @Subscribe("/device/*/*/message/children/*/register")
-    @Transactional(propagation = Propagation.NEVER)
-    public Mono<Void> autoBindChildrenDevice(ChildDeviceMessage message) {
-        String childId = message.getChildDeviceId();
-        Message childMessage = message.getChildDeviceMessage();
-        if (childMessage instanceof DeviceRegisterMessage) {
-
-            return registry
-                .getDevice(childId)
-                .switchIfEmpty(Mono.defer(() -> doAutoRegister(((DeviceRegisterMessage) childMessage))))
-                .flatMap(dev -> dev.setConfig(DeviceConfigKey.parentGatewayId, message.getDeviceId()).thenReturn(dev))
-                .flatMap(DeviceOperator::getState)
-                .flatMap(state -> this
-                    .createUpdate()
-                    .set(DeviceInstanceEntity::getParentId, message.getDeviceId())
-                    .set(DeviceInstanceEntity::getState, DeviceState.of(state))
-                    .where(DeviceInstanceEntity::getId, childId)
-                    .execute()
-                ).then();
-        }
-        return Mono.empty();
-    }
-
-    /**
-     * 通过订阅子设备注销消息,自动解绑子设备
-     *
-     * @param message 子设备消息
-     * @return void
-     */
-    @Subscribe("/device/*/*/message/children/*/unregister")
-    public Mono<Void> autoUnbindChildrenDevice(ChildDeviceMessage message) {
-        String childId = message.getChildDeviceId();
-        Message childMessage = message.getChildDeviceMessage();
-        if (childMessage instanceof DeviceUnRegisterMessage) {
-
-            return registry
-                .getDevice(childId)
-                .flatMap(dev -> dev
-                    .removeConfig(DeviceConfigKey.parentGatewayId.getKey())
-                    .then(dev.checkState()))
-                .flatMap(state -> createUpdate()
-                    .setNull(DeviceInstanceEntity::getParentId)
-                    .set(DeviceInstanceEntity::getState, DeviceState.of(state))
-                    .where(DeviceInstanceEntity::getId, childId)
-                    .execute()
-                    .then());
-
-
-        }
-        return Mono.empty();
-    }
-
-    //保存标签
-    @Subscribe("/device/*/*/message/tags/update")
-    public Mono<Void> updateDeviceTag(UpdateTagMessage message) {
-        Map<String, Object> tags = message.getTags();
-        String deviceId = message.getDeviceId();
-
-        return registry
-            .getDevice(deviceId)
-            .flatMap(DeviceOperator::getMetadata)
-            .flatMapMany(metadata -> Flux
-                .fromIterable(tags.entrySet())
-                .map(e -> {
-                    DeviceTagEntity tagEntity = metadata
-                        .getTag(e.getKey())
-                        .map(DeviceTagEntity::of)
-                        .orElseGet(() -> {
-                            DeviceTagEntity entity = new DeviceTagEntity();
-                            entity.setKey(e.getKey());
-                            entity.setType("string");
-                            entity.setName(e.getKey());
-                            entity.setCreateTime(new Date());
-                            entity.setDescription("设备上报");
-                            return entity;
-                        });
-                    tagEntity.setValue(String.valueOf(e.getValue()));
-                    tagEntity.setDeviceId(deviceId);
-                    tagEntity.setId(DeviceTagEntity.createTagId(deviceId, tagEntity.getKey()));
-                    return tagEntity;
-                }))
-            .as(tagRepository::save)
-            .then();
-    }
-
-
 }