Browse Source

优化websocket消息订阅

zhou-hao 4 years ago
parent
commit
e0b9c4f170

+ 5 - 0
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/Message.java

@@ -22,6 +22,11 @@ public interface Message {
         return new SimpleMessage(id, topic, null, Type.error, message);
     }
 
+    static Message error(String id, String topic, Throwable message) {
+
+        return new SimpleMessage(id, topic, null, Type.error, message.getMessage() == null ? message.getClass().getSimpleName() : message.getMessage());
+    }
+
     static Message success(String id, String topic, Object payload) {
         return new SimpleMessage(id, topic, payload, Type.result, null);
     }

+ 26 - 9
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/socket/WebSocketMessagingHandler.java

@@ -3,9 +3,11 @@ package org.jetlinks.community.gateway.external.socket;
 import com.alibaba.fastjson.JSON;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.hswebframework.web.authorization.Authentication;
 import org.hswebframework.web.authorization.ReactiveAuthenticationManager;
 import org.hswebframework.web.authorization.token.UserToken;
 import org.hswebframework.web.authorization.token.UserTokenManager;
+import org.hswebframework.web.logger.ReactiveLogger;
 import org.jetlinks.community.gateway.external.Message;
 import org.jetlinks.community.gateway.external.MessagingManager;
 import org.jetlinks.community.gateway.external.SubscribeRequest;
@@ -15,8 +17,10 @@ import org.springframework.web.reactive.socket.WebSocketHandler;
 import org.springframework.web.reactive.socket.WebSocketSession;
 import reactor.core.Disposable;
 import reactor.core.publisher.Mono;
+import reactor.util.context.Context;
 
 import javax.annotation.Nonnull;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -73,33 +77,46 @@ public class WebSocketMessagingHandler implements WebSocketHandler {
                         }
                         if (request.getType() == MessagingRequest.Type.sub) {
                             //重复订阅
-                            if (subs.containsKey(request.getId())) {
+                            Disposable old = subs.get(request.getId());
+                            if (old != null && !old.isDisposed()) {
                                 return;
                             }
-                            subs.put(request.getId(), messagingManager
+                            Map<String, String> context = new HashMap<>();
+                            context.put("userId", auth.getUser().getId());
+                            context.put("userName", auth.getUser().getName());
+                            Disposable sub = messagingManager
                                 .subscribe(SubscribeRequest.of(request, auth))
-                                .onErrorResume(err -> Mono.just(Message.error(request.getId(), request.getTopic(), err.getMessage())))
+                                .doOnEach(ReactiveLogger.onError(err -> log.error("{}", err.getMessage(), err)))
+                                .onErrorResume(err -> Mono.just(Message.error(request.getId(), request.getTopic(), err)))
                                 .map(msg -> session.textMessage(JSON.toJSONString(msg)))
                                 .doOnComplete(() -> {
+                                    log.debug("complete subscription:{}", request.getTopic());
                                     subs.remove(request.getId());
                                     Mono.just(session.textMessage(JSON.toJSONString(Message.complete(request.getId()))))
                                         .as(session::send)
                                         .subscribe();
                                 })
-                                .flatMap(msg -> session.send(Mono.just(msg)))
-                                .subscribe()
-                            );
-
+                                .doOnCancel(() -> {
+                                    log.debug("cancel subscription:{}", request.getTopic());
+                                    subs.remove(request.getId());
+                                })
+                                .transform(session::send)
+                                .subscriberContext(ReactiveLogger.start(context))
+                                .subscriberContext(Context.of(Authentication.class, auth))
+                                .subscribe();
+                            if (!sub.isDisposed()) {
+                                subs.put(request.getId(), sub);
+                            }
                         } else if (request.getType() == MessagingRequest.Type.unsub) {
                             Optional.ofNullable(subs.remove(request.getId()))
-                                .ifPresent(Disposable::dispose);
+                                    .ifPresent(Disposable::dispose);
                         } else {
                             session.send(Mono.just(session.textMessage(JSON.toJSONString(
                                 Message.error(request.getId(), request.getTopic(), "不支持的类型:" + request.getType())
                             )))).subscribe();
                         }
                     } catch (Exception e) {
-                        log.warn(e.getMessage(),e);
+                        log.warn(e.getMessage(), e);
                         session.send(Mono.just(session.textMessage(JSON.toJSONString(
                             Message.error("illegal_argument", null, "消息格式错误")
                         )))).subscribe();