فهرست منبع

增加websocket订阅消息

zhou-hao 5 سال پیش
والد
کامیت
15d5e6cab7
13فایلهای تغییر یافته به همراه468 افزوده شده و 0 حذف شده
  1. 12 0
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/TopicMessage.java
  2. 45 0
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/DefaultMessagingManager.java
  3. 30 0
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/ErrorMessage.java
  4. 23 0
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/Message.java
  5. 9 0
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/MessagingManager.java
  6. 41 0
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/SubscribeRequest.java
  7. 15 0
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/SubscriptionProvider.java
  8. 30 0
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/SuccessMessage.java
  9. 58 0
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/dashboard/DashBoardSubscriptionProvider.java
  10. 24 0
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/socket/MessagingRequest.java
  11. 86 0
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/socket/WebSocketMessagingHandler.java
  12. 53 0
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/socket/WebSocketMessagingHandlerConfiguration.java
  13. 42 0
      jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/messaging/RuleEngineSubscriptionProvider.java

+ 12 - 0
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/TopicMessage.java

@@ -1,6 +1,7 @@
 package org.jetlinks.community.gateway;
 
 import org.jetlinks.core.message.codec.EncodedMessage;
+import org.jetlinks.rule.engine.executor.PayloadType;
 
 import javax.annotation.Nonnull;
 
@@ -26,6 +27,17 @@ public interface TopicMessage {
     @Nonnull
     EncodedMessage getMessage();
 
+    default Object convertMessage() {
+        if (getMessage() instanceof EncodableMessage) {
+            return ((EncodableMessage) getMessage()).getNativePayload();
+        }
+
+        if (getMessage().getPayloadType() == null) {
+            return getMessage().getBytes();
+        }
+        return PayloadType.valueOf(getMessage().getPayloadType().name()).read(getMessage().getPayload());
+    }
+
     static TopicMessage of(String topic, EncodedMessage message) {
         return new DefaultTopicMessage(topic, message);
     }

+ 45 - 0
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/DefaultMessagingManager.java

@@ -0,0 +1,45 @@
+package org.jetlinks.community.gateway.external;
+
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+import org.springframework.stereotype.Component;
+import org.springframework.util.AntPathMatcher;
+import org.springframework.util.PathMatcher;
+import reactor.core.publisher.Flux;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Component
+public class DefaultMessagingManager implements MessagingManager, BeanPostProcessor {
+
+    private final Map<String, SubscriptionProvider> subProvider = new ConcurrentHashMap<>();
+
+    private final static PathMatcher matcher = new AntPathMatcher();
+
+    @Override
+    public Flux<Message> subscribe(SubscribeRequest request) {
+
+        for (Map.Entry<String, SubscriptionProvider> entry : subProvider.entrySet()) {
+            if (matcher.match(entry.getKey(), request.getTopic())) {
+                return entry.getValue().subscribe(request);
+            }
+        }
+
+        return Flux.empty();
+    }
+
+    public void register(SubscriptionProvider provider) {
+        for (String pattern : provider.getTopicPattern()) {
+            subProvider.put(pattern, provider);
+        }
+    }
+
+    @Override
+    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
+        if (bean instanceof SubscriptionProvider) {
+            register(((SubscriptionProvider) bean));
+        }
+        return bean;
+    }
+}

+ 30 - 0
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/ErrorMessage.java

@@ -0,0 +1,30 @@
+package org.jetlinks.community.gateway.external;
+
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+
+@AllArgsConstructor
+@NoArgsConstructor
+@Getter
+public class ErrorMessage implements Message {
+
+    private String requestId;
+
+    private String topic;
+
+    @Getter
+    private String message;
+
+    @Override
+    public Object getPayload() {
+        return null;
+    }
+
+    @Override
+    public boolean isSuccess() {
+        return false;
+    }
+
+}

+ 23 - 0
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/Message.java

@@ -0,0 +1,23 @@
+package org.jetlinks.community.gateway.external;
+
+public interface Message {
+
+    String getRequestId();
+
+    String getTopic();
+
+    Object getPayload();
+
+    boolean isSuccess();
+
+    String getMessage();
+
+    static Message error(String id, String topic, String message) {
+        return new ErrorMessage(id, topic, message);
+    }
+
+    static Message success(String id, String topic, Object payload) {
+        return new SuccessMessage(id, topic, payload);
+    }
+
+}

+ 9 - 0
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/MessagingManager.java

@@ -0,0 +1,9 @@
+package org.jetlinks.community.gateway.external;
+
+import reactor.core.publisher.Flux;
+
+public interface MessagingManager {
+
+    Flux<Message> subscribe(SubscribeRequest request);
+
+}

+ 41 - 0
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/SubscribeRequest.java

@@ -0,0 +1,41 @@
+package org.jetlinks.community.gateway.external;
+
+import lombok.*;
+import org.hswebframework.web.authorization.Authentication;
+import org.jetlinks.community.ValueObject;
+import org.jetlinks.community.gateway.external.socket.MessagingRequest;
+
+import java.util.Map;
+
+@Getter
+@Setter
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class SubscribeRequest implements ValueObject {
+
+    private String id;
+
+    private String topic;
+
+    private Map<String, Object> parameter;
+
+    private Authentication authentication;
+
+    @Override
+    public Map<String, Object> getAll() {
+        return parameter;
+    }
+
+
+    public static SubscribeRequest of(MessagingRequest request,
+                                      Authentication authentication) {
+        return SubscribeRequest.builder()
+            .id(request.getId())
+            .topic(request.getTopic())
+            .parameter(request.getParameter())
+            .authentication(authentication)
+            .build();
+
+    }
+}

+ 15 - 0
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/SubscriptionProvider.java

@@ -0,0 +1,15 @@
+package org.jetlinks.community.gateway.external;
+
+import reactor.core.publisher.Flux;
+
+public interface SubscriptionProvider {
+
+    String id();
+
+    String name();
+
+    String[] getTopicPattern();
+
+    Flux<Message> subscribe(SubscribeRequest request);
+
+}

+ 30 - 0
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/SuccessMessage.java

@@ -0,0 +1,30 @@
+package org.jetlinks.community.gateway.external;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@NoArgsConstructor
+@AllArgsConstructor
+@Getter
+@Setter
+public class SuccessMessage implements Message {
+
+    private String requestId;
+
+    private String topic;
+
+    private Object payload;
+
+
+    @Override
+    public boolean isSuccess() {
+        return true;
+    }
+
+    @Override
+    public String getMessage() {
+        return null;
+    }
+}

+ 58 - 0
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/dashboard/DashBoardSubscriptionProvider.java

@@ -0,0 +1,58 @@
+package org.jetlinks.community.gateway.external.dashboard;
+
+import org.jetlinks.community.dashboard.DashboardManager;
+import org.jetlinks.community.dashboard.MeasurementParameter;
+import org.jetlinks.community.gateway.external.Message;
+import org.jetlinks.community.gateway.external.SubscribeRequest;
+import org.jetlinks.community.gateway.external.SubscriptionProvider;
+import org.springframework.stereotype.Component;
+import org.springframework.util.AntPathMatcher;
+import org.springframework.util.PathMatcher;
+import reactor.core.publisher.Flux;
+
+import java.util.Map;
+
+@Component
+public class DashBoardSubscriptionProvider implements SubscriptionProvider {
+
+    private final DashboardManager dashboardManager;
+
+    public DashBoardSubscriptionProvider(DashboardManager dashboardManager) {
+        this.dashboardManager = dashboardManager;
+    }
+
+    private static final PathMatcher pathMatcher = new AntPathMatcher();
+
+    @Override
+    public String id() {
+        return "dashboard";
+    }
+
+    @Override
+    public String name() {
+        return "仪表盘";
+    }
+
+    @Override
+    public String[] getTopicPattern() {
+        return new String[]{"/dashboard/**"};
+    }
+
+    @Override
+    public Flux<Message> subscribe(SubscribeRequest request) {
+        return Flux.defer(() -> {
+            try {
+                Map<String, String> variables = pathMatcher.extractUriTemplateVariables(
+                    "/dashboard/{dashboard}/{object}/{measurement}/{dimension}", request.getTopic());
+                return dashboardManager.getDashboard(variables.get("dashboard"))
+                    .flatMap(dashboard -> dashboard.getObject(variables.get("object")))
+                    .flatMap(object -> object.getMeasurement(variables.get("measurement")))
+                    .flatMap(measurement -> measurement.getDimension(variables.get("dimension")))
+                    .flatMapMany(dimension -> dimension.getValue(MeasurementParameter.of(request.getParameter())))
+                    .map(val -> Message.success(request.getId(), request.getTopic(), val));
+            } catch (Exception e) {
+                return Flux.error(new IllegalArgumentException("topic格式错误,正确格式:/dashboard/{dashboard}/{object}/{measurement}/{dimension}", e));
+            }
+        });
+    }
+}

+ 24 - 0
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/socket/MessagingRequest.java

@@ -0,0 +1,24 @@
+package org.jetlinks.community.gateway.external.socket;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.util.Map;
+
+@Getter
+@Setter
+public class MessagingRequest {
+
+    private String id;
+
+    private Type type;
+
+    private String topic;
+
+    private Map<String,Object> parameter;
+
+
+    public enum Type{
+        pub,sub,unsub
+    }
+}

+ 86 - 0
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/socket/WebSocketMessagingHandler.java

@@ -0,0 +1,86 @@
+package org.jetlinks.community.gateway.external.socket;
+
+import com.alibaba.fastjson.JSON;
+import lombok.AllArgsConstructor;
+import org.hswebframework.web.authorization.ReactiveAuthenticationManager;
+import org.hswebframework.web.authorization.token.UserToken;
+import org.hswebframework.web.authorization.token.UserTokenManager;
+import org.jetlinks.community.gateway.external.Message;
+import org.jetlinks.community.gateway.external.MessagingManager;
+import org.jetlinks.community.gateway.external.SubscribeRequest;
+import org.springframework.util.StringUtils;
+import org.springframework.web.reactive.socket.CloseStatus;
+import org.springframework.web.reactive.socket.WebSocketHandler;
+import org.springframework.web.reactive.socket.WebSocketSession;
+import reactor.core.publisher.Mono;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+@AllArgsConstructor
+public class WebSocketMessagingHandler implements WebSocketHandler {
+
+    private final MessagingManager messagingManager;
+
+    private final UserTokenManager userTokenManager;
+
+    private final ReactiveAuthenticationManager authenticationManager;
+
+    // /messaging/{token}
+    @Override
+    public Mono<Void> handle(WebSocketSession session) {
+        String[] path = session.getHandshakeInfo().getUri().getPath().split("[/]");
+        if (path.length == 0) {
+            return session.send(Mono.just(session.textMessage(JSON.toJSONString(
+                Message.error("auth", null, "错误的请求")
+            )))).then(session.close(CloseStatus.BAD_DATA));
+        }
+        String token = path[path.length - 1];
+
+        Set<String> disposable = new ConcurrentSkipListSet<>();
+
+        return userTokenManager.getByToken(token)
+            .map(UserToken::getUserId)
+            .flatMap(authenticationManager::getByUserId)
+            .switchIfEmpty(session
+                .send(Mono.just(session.textMessage(JSON.toJSONString(
+                    Message.error("auth", null, "认证失败")
+                ))))
+                .then(session.close(CloseStatus.BAD_DATA))
+                .then(Mono.empty()))
+            .flatMap(auth -> session
+                .receive()
+                .flatMap(message -> {
+                    MessagingRequest request = JSON.parseObject(message.getPayloadAsText(), MessagingRequest.class);
+                    if (StringUtils.isEmpty(request.getId())) {
+                        return session
+                            .send(Mono.just(session.textMessage(JSON.toJSONString(
+                                Message.error(request.getType().name(), null, "id不能为空")
+                            ))));
+                    }
+                    if (request.getType() == MessagingRequest.Type.sub) {
+                        //重复订阅
+                        if (disposable.contains(request.getId())) {
+                            return Mono.empty();
+                        }
+                        disposable.add(request.getId());
+                        return session.send(messagingManager
+                            .subscribe(SubscribeRequest.of(request, auth))
+                            .onErrorResume(err -> Mono.just(Message.error(request.getId(), request.getTopic(), err.getMessage())))
+                            .takeWhile(r -> disposable.contains(request.getId()))
+                            .switchIfEmpty(Mono.fromSupplier(() -> Message.error(request.getId(), request.getTopic(), "不支持的Topic")))
+                            .map(msg -> session.textMessage(JSON.toJSONString(msg)))
+                            .doOnComplete(() -> disposable.remove(request.getId()))
+                        );
+                    } else if (request.getType() == MessagingRequest.Type.unsub) {
+                        return Mono.fromRunnable(() -> disposable.remove(request.getId()));
+                    } else {
+                        return session.send(Mono.just(session.textMessage(JSON.toJSONString(
+                            Message.error(request.getId(), request.getTopic(), "不支持的类型:" + request.getType())
+                        ))));
+                    }
+                }).then())
+            .doFinally(r -> disposable.clear());
+
+    }
+}

+ 53 - 0
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/socket/WebSocketMessagingHandlerConfiguration.java

@@ -0,0 +1,53 @@
+package org.jetlinks.community.gateway.external.socket;
+
+import org.hswebframework.web.authorization.ReactiveAuthenticationManager;
+import org.hswebframework.web.authorization.token.UserTokenManager;
+import org.jetlinks.community.gateway.external.MessagingManager;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.core.Ordered;
+import org.springframework.web.reactive.HandlerMapping;
+import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
+import org.springframework.web.reactive.socket.WebSocketHandler;
+import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Configuration
+//@ConditionalOnBean({
+//    ReactiveAuthenticationManager.class,
+//    UserTokenManager.class
+//})
+public class WebSocketMessagingHandlerConfiguration {
+
+
+    @Bean
+    public HandlerMapping webSocketMessagingHandlerMapping(MessagingManager messagingManager,
+                                           UserTokenManager userTokenManager,
+                                           ReactiveAuthenticationManager authenticationManager) {
+
+
+        WebSocketMessagingHandler messagingHandler=new WebSocketMessagingHandler(
+            messagingManager,
+            userTokenManager,
+            authenticationManager
+        );
+        final Map<String, WebSocketHandler> map = new HashMap<>(1);
+        map.put("/messaging/**", messagingHandler);
+
+        final SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
+        mapping.setOrder(Ordered.HIGHEST_PRECEDENCE);
+        mapping.setUrlMap(map);
+        return mapping;
+    }
+
+    @Bean
+    @ConditionalOnMissingBean
+    public WebSocketHandlerAdapter webSocketHandlerAdapter() {
+        return new WebSocketHandlerAdapter();
+    }
+
+
+}

+ 42 - 0
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/messaging/RuleEngineSubscriptionProvider.java

@@ -0,0 +1,42 @@
+package org.jetlinks.community.rule.engine.messaging;
+
+import org.jetlinks.community.gateway.MessageGateway;
+import org.jetlinks.community.gateway.Subscription;
+import org.jetlinks.community.gateway.external.Message;
+import org.jetlinks.community.gateway.external.SubscribeRequest;
+import org.jetlinks.community.gateway.external.SubscriptionProvider;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+
+
+@Component
+public class RuleEngineSubscriptionProvider implements SubscriptionProvider {
+
+    private final MessageGateway messageGateway;
+
+    public RuleEngineSubscriptionProvider(MessageGateway messageGateway) {
+        this.messageGateway = messageGateway;
+    }
+
+    @Override
+    public String id() {
+        return "rule-engine";
+    }
+
+    @Override
+    public String name() {
+        return "规则引擎";
+    }
+
+    @Override
+    public String[] getTopicPattern() {
+        return new String[]{"/rule-engine/**"};
+    }
+
+    @Override
+    public Flux<Message> subscribe(SubscribeRequest request) {
+
+        return messageGateway.subscribe(Subscription.asList(request.getTopic()), "rule:sub:" + request.getId(), true)
+            .map(msg -> Message.success(request.getId(), msg.getTopic(), msg.convertMessage()));
+    }
+}