Browse Source

优化websocket

zhou-hao 5 năm trước cách đây
mục cha
commit
daf640202c

+ 40 - 30
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/socket/WebSocketMessagingHandler.java

@@ -2,6 +2,7 @@ package org.jetlinks.community.gateway.external.socket;
 
 import com.alibaba.fastjson.JSON;
 import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.authorization.ReactiveAuthenticationManager;
 import org.hswebframework.web.authorization.token.UserToken;
 import org.hswebframework.web.authorization.token.UserTokenManager;
@@ -23,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 
 @AllArgsConstructor
+@Slf4j
 public class WebSocketMessagingHandler implements WebSocketHandler {
 
     private final MessagingManager messagingManager;
@@ -57,41 +59,49 @@ public class WebSocketMessagingHandler implements WebSocketHandler {
             .flatMap(auth -> session
                 .receive()
                 .doOnNext(message -> {
-                    MessagingRequest request = JSON.parseObject(message.getPayloadAsText(), MessagingRequest.class);
-                    if (StringUtils.isEmpty(request.getId())) {
-                        session
-                            .send(Mono.just(session.textMessage(JSON.toJSONString(
-                                Message.error(request.getType().name(), null, "id不能为空")
-                            )))).subscribe();
-                    }
-                    if (request.getType() == MessagingRequest.Type.sub) {
-                        //重复订阅
-                        if (subs.containsKey(request.getId())) {
-                            return;
+                    try {
+                        MessagingRequest request = JSON.parseObject(message.getPayloadAsText(), MessagingRequest.class);
+                        if (StringUtils.isEmpty(request.getId())) {
+                            session
+                                .send(Mono.just(session.textMessage(JSON.toJSONString(
+                                    Message.error(request.getType().name(), null, "id不能为空")
+                                )))).subscribe();
                         }
-                        subs.put(request.getId(), messagingManager
-                            .subscribe(SubscribeRequest.of(request, auth))
-                            .onErrorResume(err -> Mono.just(Message.error(request.getId(), request.getTopic(), err.getMessage())))
-                            .map(msg -> session.textMessage(JSON.toJSONString(msg)))
-                            .doOnComplete(() -> {
-                                subs.remove(request.getId());
-                                Mono.just(session.textMessage(JSON.toJSONString(Message.complete(request.getId()))))
-                                    .as(session::send)
-                                    .subscribe();
-                            })
-                            .flatMap(msg -> session.send(Mono.just(msg)))
-                            .subscribe()
-                        );
+                        if (request.getType() == MessagingRequest.Type.sub) {
+                            //重复订阅
+                            if (subs.containsKey(request.getId())) {
+                                return;
+                            }
+                            subs.put(request.getId(), messagingManager
+                                .subscribe(SubscribeRequest.of(request, auth))
+                                .onErrorResume(err -> Mono.just(Message.error(request.getId(), request.getTopic(), err.getMessage())))
+                                .map(msg -> session.textMessage(JSON.toJSONString(msg)))
+                                .doOnComplete(() -> {
+                                    subs.remove(request.getId());
+                                    Mono.just(session.textMessage(JSON.toJSONString(Message.complete(request.getId()))))
+                                        .as(session::send)
+                                        .subscribe();
+                                })
+                                .flatMap(msg -> session.send(Mono.just(msg)))
+                                .subscribe()
+                            );
 
-                    } else if (request.getType() == MessagingRequest.Type.unsub) {
-                        Optional.ofNullable(subs.remove(request.getId()))
-                            .ifPresent(Disposable::dispose);
-                    } else {
+                        } else if (request.getType() == MessagingRequest.Type.unsub) {
+                            Optional.ofNullable(subs.remove(request.getId()))
+                                .ifPresent(Disposable::dispose);
+                        } else {
+                            session.send(Mono.just(session.textMessage(JSON.toJSONString(
+                                Message.error(request.getId(), request.getTopic(), "不支持的类型:" + request.getType())
+                            )))).subscribe();
+                        }
+                    } catch (Exception e) {
+                        log.warn(e.getMessage(),e);
                         session.send(Mono.just(session.textMessage(JSON.toJSONString(
-                            Message.error(request.getId(), request.getTopic(), "不支持的类型:" + request.getType())
+                            Message.error("illegal_argument", null, "消息格式错误")
                         )))).subscribe();
                     }
-                }).then())
+                })
+                .then())
             .doFinally(r -> {
                 subs.values().forEach(Disposable::dispose);
                 subs.clear();