Kaynağa Gözat

websocket支持ping pong

zhou-hao 4 yıl önce
ebeveyn
işleme
efe03f7318

+ 7 - 2
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/Message.java

@@ -35,11 +35,16 @@ public interface Message {
         return new SimpleMessage(id, null, null, Type.complete, null);
     }
 
+    static Message pong(String id) {
+        return new SimpleMessage(id, null, null, Type.pong, null);
+    }
+
     enum Type {
         authError,
         result,
         error,
-        complete
+        complete,
+        ping,
+        pong
     }
-
 }

+ 20 - 1
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/socket/WebSocketMessagingHandler.java

@@ -11,9 +11,11 @@ import org.hswebframework.web.logger.ReactiveLogger;
 import org.jetlinks.community.gateway.external.Message;
 import org.jetlinks.community.gateway.external.MessagingManager;
 import org.jetlinks.community.gateway.external.SubscribeRequest;
+import org.springframework.core.io.buffer.DataBufferFactory;
 import org.springframework.util.StringUtils;
 import org.springframework.web.reactive.socket.CloseStatus;
 import org.springframework.web.reactive.socket.WebSocketHandler;
+import org.springframework.web.reactive.socket.WebSocketMessage;
 import org.springframework.web.reactive.socket.WebSocketSession;
 import reactor.core.Disposable;
 import reactor.core.publisher.Mono;
@@ -64,8 +66,25 @@ public class WebSocketMessagingHandler implements WebSocketHandler {
                 .receive()
                 .doOnNext(message -> {
                     try {
+                        if (message.getType() == WebSocketMessage.Type.PONG) {
+                            return;
+                        }
+                        if (message.getType() == WebSocketMessage.Type.PING) {
+                            session
+                                .send(Mono.just(session.pongMessage(DataBufferFactory::allocateBuffer)))
+                                .subscribe();
+                            return;
+                        }
                         MessagingRequest request = JSON.parseObject(message.getPayloadAsText(), MessagingRequest.class);
-                        if (request == null || request.getType() == MessagingRequest.Type.ping) {
+                        if (request == null) {
+                            return;
+                        }
+                        if (request.getType() == MessagingRequest.Type.ping) {
+                            session
+                                .send(Mono.just(session.textMessage(JSON.toJSONString(
+                                    Message.pong(request.getId())
+                                ))))
+                                .subscribe();
                             return;
                         }
                         if (StringUtils.isEmpty(request.getId())) {