Procházet zdrojové kódy

优化http接入,增加websocket支持

zhouhao před 2 roky
rodič
revize
c1198ebdfe
10 změnil soubory, kde provedl 873 přidání a 87 odebrání
  1. 180 0
      jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/device/AsyncHttpExchangeMessage.java
  2. 20 1
      jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/device/HttpDeviceSession.java
  3. 165 18
      jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/device/HttpServerDeviceGateway.java
  4. 2 6
      jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/device/HttpServerDeviceGatewayProvider.java
  5. 124 0
      jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/device/WebSocketDeviceSession.java
  6. 2 0
      jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/server/HttpServer.java
  7. 43 0
      jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/server/WebSocketExchange.java
  8. 12 40
      jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/server/vertx/VertxHttpExchange.java
  9. 58 22
      jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/server/vertx/VertxHttpServer.java
  10. 267 0
      jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/server/vertx/VertxWebSocketExchange.java

+ 180 - 0
jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/device/AsyncHttpExchangeMessage.java

@@ -0,0 +1,180 @@
+package org.jetlinks.community.network.http.device;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import lombok.extern.slf4j.Slf4j;
+import org.jetlinks.core.message.codec.http.*;
+import org.jetlinks.community.network.http.server.HttpExchange;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.MediaType;
+import org.springframework.util.CollectionUtils;
+import reactor.core.publisher.Mono;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.stream.Collectors;
+
+
+@Slf4j
+public class AsyncHttpExchangeMessage implements HttpExchangeMessage {
+
+    private static final AtomicReferenceFieldUpdater<AsyncHttpExchangeMessage, Boolean>
+        RESPONDED = AtomicReferenceFieldUpdater.newUpdater(AsyncHttpExchangeMessage.class, Boolean.class, "responded");
+
+    private final HttpExchange exchange;
+
+    private volatile Boolean responded = false;
+
+    public AsyncHttpExchangeMessage(HttpExchange exchange) {
+        this.exchange = exchange;
+    }
+
+    @Nonnull
+    @Override
+    public Mono<Void> response(@Nonnull HttpResponseMessage message) {
+        return Mono
+            .defer(() -> {
+                if (!RESPONDED.getAndSet(this, true) && !exchange.isClosed()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("响应HTTP请求:\n{}", message.print());
+                    }
+                    return exchange.response(message);
+                }
+                return Mono.empty();
+            });
+    }
+
+    @Override
+    public Mono<ByteBuf> payload() {
+        return exchange
+            .request()
+            .getBody();
+    }
+
+    @Nonnull
+    @Override
+    public String getUrl() {
+        return exchange.request().getUrl();
+    }
+
+    @Nonnull
+    @Override
+    public HttpMethod getMethod() {
+        return exchange.request().getMethod();
+    }
+
+    @Nullable
+    @Override
+    public MediaType getContentType() {
+        return exchange.request().getContentType();
+    }
+
+    @Nonnull
+    @Override
+    public List<Header> getHeaders() {
+        return exchange.request().getHeaders();
+    }
+
+    @Nullable
+    @Override
+    public Map<String, String> getQueryParameters() {
+        return exchange.request().getQueryParameters();
+    }
+
+    @Override
+    public Mono<MultiPart> multiPartAsync() {
+        return exchange.request().multiPart();
+    }
+
+    @Override
+    @Deprecated
+    @Nonnull
+    public Optional<MultiPart> multiPart() {
+        //log.warn("该方法已过时,请使用multiPartAsync()方法替代");
+        return Optional.ofNullable(
+            multiPartAsync()
+                .toFuture()
+                .getNow(null)
+        );
+    }
+
+    @Nonnull
+    @Override
+    public ByteBuf getPayload() {
+        //log.warn("该方法已过时,请使用payload()方法替代");
+        return payload()
+            .toFuture()
+            .getNow(Unpooled.EMPTY_BUFFER);
+    }
+
+    @Override
+    public String toString() {
+        return print();
+    }
+
+    @Override
+    public String print() {
+        StringBuilder builder = new StringBuilder();
+        builder.append(getMethod()).append(" ").append(getPath());
+        if (!CollectionUtils.isEmpty(getQueryParameters())) {
+            builder.append("?")
+                   .append(getQueryParameters()
+                               .entrySet().stream()
+                               .map(e -> e.getKey().concat("=").concat(e.getValue()))
+                               .collect(Collectors.joining("&")))
+                   .append("\n");
+        } else {
+            builder.append("\n");
+        }
+        for (Header header : getHeaders()) {
+            builder
+                .append(header.getName()).append(": ").append(String.join(",", header.getValue()))
+                .append("\n");
+        }
+
+        if (MultiPart.isMultiPart(getContentType())) {
+            MultiPart multiPart = multiPartAsync()
+                .toFuture()
+                .getNow(null);
+            builder.append("\n");
+            if (multiPart != null) {
+                builder.append("\n");
+                for (Part part : multiPart.getParts()) {
+                    builder.append(part).append("\n");
+                }
+            } else {
+                builder.append("\n")
+                       .append("<unread multiPart>\n");
+            }
+        } else if (getMethod() != HttpMethod.GET && getMethod() != HttpMethod.DELETE) {
+            ByteBuf payload = payload().toFuture().getNow(null);
+            if (payload == null) {
+                return builder.append("\n")
+                              .append("<unread payload>\n")
+                              .toString();
+            }
+            if(payload.refCnt()==0){
+                return builder.append("\n")
+                              .append("<payload released>\n")
+                              .toString();
+            }
+            if (payload.readableBytes() == 0) {
+                return builder.toString();
+            }
+            builder.append("\n");
+            if (ByteBufUtil.isText(payload, StandardCharsets.UTF_8)) {
+                builder.append(payload.toString(StandardCharsets.UTF_8));
+            } else {
+                ByteBufUtil.appendPrettyHexDump(builder, payload);
+            }
+        }
+
+        return builder.toString();
+    }
+}

+ 20 - 1
jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/device/HttpDeviceSession.java

@@ -1,10 +1,15 @@
 package org.jetlinks.community.network.http.device;
 
+import lombok.Setter;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.message.codec.DefaultTransport;
 import org.jetlinks.core.message.codec.EncodedMessage;
 import org.jetlinks.core.message.codec.Transport;
+import org.jetlinks.core.message.codec.http.websocket.DefaultWebSocketMessage;
+import org.jetlinks.core.message.codec.http.websocket.WebSocketMessage;
 import org.jetlinks.core.server.session.DeviceSession;
+import org.jetlinks.core.utils.Reactors;
+import org.jetlinks.community.network.http.server.WebSocketExchange;
 import reactor.core.publisher.Mono;
 
 import javax.annotation.Nullable;
@@ -24,6 +29,9 @@ class HttpDeviceSession implements DeviceSession {
 
     private final InetSocketAddress address;
 
+    @Setter
+    private WebSocketExchange websocket;
+
     private long lastPingTime = System.currentTimeMillis();
 
     //默认永不超时
@@ -62,7 +70,18 @@ class HttpDeviceSession implements DeviceSession {
 
     @Override
     public Mono<Boolean> send(EncodedMessage encodedMessage) {
-        return Mono.just(false);
+        if(websocket==null){
+            return Reactors.ALWAYS_FALSE;
+        }
+        if (encodedMessage instanceof WebSocketMessage) {
+            return websocket
+                .send(((WebSocketMessage) encodedMessage))
+                .thenReturn(true);
+        } else {
+            return websocket
+                .send(DefaultWebSocketMessage.of(WebSocketMessage.Type.TEXT, encodedMessage.getPayload()))
+                .thenReturn(true);
+        }
     }
 
     @Override

+ 165 - 18
jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/device/HttpServerDeviceGateway.java

@@ -1,29 +1,35 @@
 package org.jetlinks.community.network.http.device;
 
+import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
 import lombok.AllArgsConstructor;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
 import org.hswebframework.web.logger.ReactiveLogger;
 import org.jetlinks.core.ProtocolSupport;
-import org.jetlinks.core.device.DeviceRegistry;
+import org.jetlinks.core.device.*;
 import org.jetlinks.core.device.session.DeviceSessionManager;
 import org.jetlinks.core.message.DeviceMessage;
+import org.jetlinks.core.message.DeviceOnlineMessage;
 import org.jetlinks.core.message.codec.DefaultTransport;
 import org.jetlinks.core.message.codec.FromDeviceMessageContext;
 import org.jetlinks.core.message.codec.Transport;
+import org.jetlinks.core.message.codec.http.websocket.WebSocketMessage;
 import org.jetlinks.core.route.HttpRoute;
+import org.jetlinks.core.route.WebsocketRoute;
 import org.jetlinks.core.trace.MonoTracer;
 import org.jetlinks.core.utils.TopicUtils;
 import org.jetlinks.community.gateway.AbstractDeviceGateway;
+import org.jetlinks.community.gateway.DeviceGatewayHelper;
 import org.jetlinks.community.network.DefaultNetworkType;
 import org.jetlinks.community.network.NetworkType;
 import org.jetlinks.community.network.http.server.HttpExchange;
 import org.jetlinks.community.network.http.server.HttpServer;
-import org.jetlinks.community.gateway.DeviceGatewayHelper;
+import org.jetlinks.community.network.http.server.WebSocketExchange;
 import org.jetlinks.supports.server.DecodedClientMessageHandler;
 import org.springframework.http.HttpMethod;
 import org.springframework.http.HttpStatus;
+import org.springframework.util.StringUtils;
 import reactor.core.Disposable;
 import reactor.core.publisher.Mono;
 
@@ -52,6 +58,7 @@ public class HttpServerDeviceGateway extends AbstractDeviceGateway {
     private final DeviceGatewayHelper helper;
 
     private final Map<RouteKey, Disposable> handlers = new ConcurrentHashMap<>();
+    private final Map<String, Disposable> websocketHandlers = new ConcurrentHashMap<>();
 
 
     public HttpServerDeviceGateway(String id,
@@ -68,28 +75,133 @@ public class HttpServerDeviceGateway extends AbstractDeviceGateway {
     }
 
 
-    public Transport getTransport() {
-        return DefaultTransport.HTTP;
-    }
-
     private Disposable handleRequest(HttpMethod method, String url) {
         return httpServer
             .handleRequest(method, url)
-            .filterWhen(exchange -> {
-                if (!isStarted()) {
-                    return Mono
-                        .defer(() -> exchange
-                            .error(HttpStatus.SERVICE_UNAVAILABLE)
-                            .thenReturn(false))
-                        .onErrorReturn(false);
-                }
-                return Mono.just(true);
-            })
+            //todo 背压处理
             .flatMap(this::handleHttpRequest, Integer.MAX_VALUE)
             .subscribe();
     }
 
+    private Disposable handleWebsocketRequest(String url) {
+        return httpServer
+            .handleWebsocket(url)
+            //todo 背压处理
+            .flatMap(this::handleWebsocketRequest, Integer.MAX_VALUE)
+            .subscribe();
+    }
+
+
+    private Mono<Void> handleWebsocketRequest(WebSocketExchange exchange) {
+
+        return protocol
+            .flatMap(protocol -> protocol
+                .authenticate(WebsocketAuthenticationRequest.of(exchange), registry)
+                .onErrorResume(err -> Mono.just(AuthenticationResponse.error(500, err.getMessage())))
+                .flatMap(result -> {
+                    if (result.isSuccess()) {
+                        String deviceId = result.getDeviceId();
+                        if (StringUtils.hasText(deviceId)) {
+                            DeviceOnlineMessage message = new DeviceOnlineMessage();
+                            message.setDeviceId(deviceId);
+                            return this
+                                .handleWebsocketMessage(message, exchange)
+                                .flatMap(device -> exchange
+                                    .receive()
+                                    .flatMap(msg -> handleWebsocketRequest(exchange, msg, device))
+                                    .then());
+                        } else {
+                            return exchange
+                                .receive()
+                                .flatMap(msg -> handleWebsocketRequest(exchange, msg, null))
+                                .then();
+                        }
+                    } else {
+                        log.warn("设备[{}] Websocket 认证失败:{}", exchange
+                            .getRemoteAddress()
+                            .orElse(null), result.getMessage());
+                        return exchange.close(HttpStatus.UNAUTHORIZED);
+                    }
+                }));
+
+    }
+
+    private Mono<Void> handleWebsocketRequest(WebSocketExchange exchange, WebSocketMessage msg, DeviceOperator device) {
+        if (!isStarted()) {
+            return exchange
+                .close(WebSocketCloseStatus.BAD_GATEWAY.code())
+                .onErrorResume((error) -> {
+                    log.error(error.getMessage(), error);
+                    return Mono.empty();
+                });
+        }
+        WebSocketDeviceSession session = new WebSocketDeviceSession(device, exchange);
+
+        return protocol
+            .flatMap(protocol -> {
+                if (log.isDebugEnabled()) {
+                    log.debug("收到HTTP请求\n{}", msg);
+                }
+                //调用协议执行解码
+                return protocol
+                    .getMessageCodec(DefaultTransport.WebSocket)
+                    .flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(session, msg, registry)))
+                    .cast(DeviceMessage.class)
+                    .flatMap(deviceMessage -> {
+                        monitor.receivedMessage();
+                        if (!StringUtils.hasText(deviceMessage.getDeviceId())) {
+                            deviceMessage.thingId(DeviceThingType.device, session.getDeviceId());
+                        }
+                        return handleWebsocketMessage(deviceMessage, exchange);
+                    })
+                    .doOnNext(session::setOperator)
+                    .onErrorResume(err -> {
+                        log.error("处理http请求失败:\n{}", msg, err);
+                        return exchange
+                            .close(HttpStatus.BAD_REQUEST)
+                            .then(Mono.empty());
+                    })
+                    .then();
+            })
+            .as(MonoTracer.create("http-device-gateway/" + getId() + exchange.getPath()))
+            .onErrorResume((error) -> {
+                log.error(error.getMessage(), error);
+                return exchange
+                    .close(HttpStatus.INTERNAL_SERVER_ERROR.value())
+                    .then(Mono.empty());
+            });
+    }
+
+    private Mono<DeviceOperator> handleWebsocketMessage(DeviceMessage message, WebSocketExchange exchange) {
+        return helper
+            .handleDeviceMessage(
+                message,
+                device -> new WebSocketDeviceSession(device, exchange),
+                deviceSession -> {
+                    if (deviceSession.isWrapFrom(WebSocketDeviceSession.class)) {
+                        deviceSession
+                            .unwrap(WebSocketDeviceSession.class)
+                            .setExchange(exchange);
+                    } else if (deviceSession.isWrapFrom(HttpDeviceSession.class)) {
+                        deviceSession
+                            .unwrap(HttpDeviceSession.class)
+                            .setWebsocket(exchange);
+                    }
+                },
+                () -> exchange
+                    .close(HttpStatus.NOT_FOUND)
+                    .then(Mono.empty()));
+    }
+
     private Mono<Void> handleHttpRequest(HttpExchange exchange) {
+        if (!isStarted()) {
+            return exchange
+                .error(HttpStatus.SERVICE_UNAVAILABLE)
+                .onErrorResume((error) -> {
+                    log.error(error.getMessage(), error);
+                    return Mono.empty();
+                });
+        }
         return protocol
             .flatMap(protocol -> exchange
                 .toExchangeMessage()
@@ -101,7 +213,7 @@ public class HttpServerDeviceGateway extends AbstractDeviceGateway {
                     UnknownHttpDeviceSession session = new UnknownHttpDeviceSession(exchange);
                     //调用协议执行解码
                     return protocol
-                        .getMessageCodec(getTransport())
+                        .getMessageCodec(DefaultTransport.HTTP)
                         .flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(session, httpMessage, registry)))
                         .cast(DeviceMessage.class)
                         .flatMap(deviceMessage -> {
@@ -160,8 +272,26 @@ public class HttpServerDeviceGateway extends AbstractDeviceGateway {
 
     }
 
-    final Mono<Void> reload() {
+    private void doReloadRouteWebsocket(List<WebsocketRoute> routes) {
+
+        Map<String, Disposable> readyToRemove = new HashMap<>(websocketHandlers);
+        for (WebsocketRoute route : routes) {
+            String addr = TopicUtils
+                .convertToMqttTopic(route.getAddress())
+                .replace("+", "*")
+                .replace("#", "**");
+
+            readyToRemove.remove(addr);
+            websocketHandlers.computeIfAbsent(addr, this::handleWebsocketRequest);
+        }
+        //取消处理被移除的url信息
+        for (Disposable value : readyToRemove.values()) {
+            value.dispose();
+        }
 
+    }
+
+    final Mono<Void> reloadHttp() {
         return protocol
             .flatMap(support -> support
                 .getRoutes(DefaultTransport.HTTP)
@@ -179,6 +309,23 @@ public class HttpServerDeviceGateway extends AbstractDeviceGateway {
             .then();
     }
 
+    final Mono<Void> reloadWebsocket() {
+        return protocol
+            .flatMap(support -> support
+                .getRoutes(DefaultTransport.WebSocket)
+                .filter(WebsocketRoute.class::isInstance)
+                .cast(WebsocketRoute.class)
+                .collectList()
+                .doOnNext(this::doReloadRouteWebsocket))
+            .then();
+    }
+
+    final Mono<Void> reload() {
+
+        return reloadHttp()
+            .then(reloadWebsocket());
+    }
+
 
     @AllArgsConstructor(staticName = "of")
     private static class RouteKey {

+ 2 - 6
jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/device/HttpServerDeviceGatewayProvider.java

@@ -3,6 +3,7 @@ package org.jetlinks.community.network.http.device;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
+import org.jetlinks.community.network.http.server.HttpServer;
 import org.jetlinks.core.ProtocolSupports;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.device.session.DeviceSessionManager;
@@ -14,7 +15,6 @@ import org.jetlinks.community.gateway.supports.DeviceGatewayProvider;
 import org.jetlinks.community.network.DefaultNetworkType;
 import org.jetlinks.community.network.NetworkManager;
 import org.jetlinks.community.network.NetworkType;
-import org.jetlinks.community.network.http.server.HttpServer;
 import org.jetlinks.supports.server.DecodedClientMessageHandler;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Mono;
@@ -67,14 +67,10 @@ public class HttpServerDeviceGatewayProvider implements DeviceGatewayProvider {
         return DefaultTransport.HTTP;
     }
 
-    public NetworkType getNetworkType() {
-        return DefaultNetworkType.HTTP_SERVER;
-    }
-
     @Override
     public Mono<DeviceGateway> createDeviceGateway(DeviceGatewayProperties properties) {
         return networkManager
-            .<HttpServer>getNetwork(getNetworkType(), properties.getChannelId())
+            .<HttpServer>getNetwork(DefaultNetworkType.HTTP_SERVER, properties.getChannelId())
             .map(server -> {
                 String protocol = properties.getProtocol();
                 return new HttpServerDeviceGateway(properties.getId(),

+ 124 - 0
jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/device/WebSocketDeviceSession.java

@@ -0,0 +1,124 @@
+package org.jetlinks.community.network.http.device;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import org.jetlinks.core.device.DeviceOperator;
+import org.jetlinks.core.message.codec.DefaultTransport;
+import org.jetlinks.core.message.codec.EncodedMessage;
+import org.jetlinks.core.message.codec.Transport;
+import org.jetlinks.core.message.codec.http.websocket.DefaultWebSocketMessage;
+import org.jetlinks.core.message.codec.http.websocket.WebSocketMessage;
+import org.jetlinks.core.server.session.DeviceSession;
+import org.jetlinks.community.network.http.server.WebSocketExchange;
+import reactor.core.publisher.Mono;
+
+import java.net.InetSocketAddress;
+import java.time.Duration;
+
+/**
+ * WebSocket设备会话
+ *
+ * @author zhouhao
+ * @since 1.0
+ */
+@AllArgsConstructor
+class WebSocketDeviceSession implements DeviceSession {
+
+    @Getter
+    @Setter
+    private volatile DeviceOperator operator;
+
+    @Setter
+    private WebSocketExchange exchange;
+
+    private final long connectTime = System.currentTimeMillis();
+
+    private Duration keepAliveTimeout;
+
+    public WebSocketDeviceSession(DeviceOperator device, WebSocketExchange exchange) {
+        this.operator = device;
+        this.exchange = exchange;
+    }
+
+    @Override
+    public String getId() {
+        return getDeviceId();
+    }
+
+    @Override
+    public String getDeviceId() {
+        return operator == null ? "unknown" : operator.getDeviceId();
+    }
+
+    @Override
+    public long lastPingTime() {
+        return exchange.getLastKeepAliveTime();
+    }
+
+    @Override
+    public long connectTime() {
+        return connectTime;
+    }
+
+    @Override
+    public Mono<Boolean> send(EncodedMessage encodedMessage) {
+        if (encodedMessage instanceof WebSocketMessage) {
+            return exchange
+                .send(((WebSocketMessage) encodedMessage))
+                .thenReturn(true);
+        } else {
+            return exchange
+                .send(DefaultWebSocketMessage.of(WebSocketMessage.Type.TEXT, encodedMessage.getPayload()))
+                .thenReturn(true);
+        }
+
+    }
+
+    @Override
+    public Transport getTransport() {
+        return DefaultTransport.WebSocket;
+    }
+
+    @Override
+    public void close() {
+        exchange.close()
+                .subscribe();
+    }
+
+    @Override
+    public void ping() {
+    }
+
+    @Override
+    public boolean isAlive() {
+        return exchange.isAlive();
+    }
+
+    @Override
+    public void onClose(Runnable call) {
+        exchange.closeHandler(call);
+    }
+
+    public InetSocketAddress getAddress() {
+        return exchange.getRemoteAddress().orElse(null);
+    }
+
+    @Override
+    public void setKeepAliveTimeout(Duration timeout) {
+        keepAliveTimeout = timeout;
+        exchange.setKeepAliveTimeout(timeout);
+    }
+
+    @Override
+    public Duration getKeepAliveTimeout() {
+        return keepAliveTimeout;
+    }
+
+    public WebSocketDeviceSession copy() {
+        WebSocketDeviceSession session = new WebSocketDeviceSession(operator, exchange);
+
+        session.setKeepAliveTimeout(keepAliveTimeout);
+        return session;
+    }
+}

+ 2 - 0
jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/server/HttpServer.java

@@ -38,6 +38,8 @@ public interface HttpServer extends ServerNetwork {
      */
     Flux<HttpExchange> handleRequest(String method, String... urlPattern);
 
+    Flux<WebSocketExchange> handleWebsocket(String urlPattern);
+
     default Flux<HttpExchange> handleRequest(HttpMethod method, String... urlPattern) {
         return handleRequest(method.name().toLowerCase(Locale.ROOT), urlPattern);
     }

+ 43 - 0
jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/server/WebSocketExchange.java

@@ -0,0 +1,43 @@
+package org.jetlinks.community.network.http.server;
+
+import org.jetlinks.core.message.codec.http.websocket.WebSocketSession;
+
+import java.time.Duration;
+
+/**
+ * WebSocket客户端
+ *
+ * @author zhouhao
+ * @since 1.0
+ */
+public interface WebSocketExchange extends WebSocketSession {
+
+    /**
+     * @return 客户端ID
+     */
+    String getId();
+
+    /**
+     * @return 连接是否正常
+     */
+    boolean isAlive();
+
+    /**
+     * 设置心跳超时间隔
+     *
+     * @param duration 间隔
+     */
+    void setKeepAliveTimeout(Duration duration);
+
+    /**
+     * 监听断开连接事件
+     *
+     * @param handler 监听器
+     */
+    void closeHandler(Runnable handler);
+
+    /**
+     * @return 最后一次心跳时间
+     */
+    long getLastKeepAliveTime();
+}

+ 12 - 40
jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/server/vertx/VertxHttpExchange.java

@@ -14,18 +14,19 @@ import lombok.Generated;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
+import org.jetlinks.core.message.codec.http.Header;
+import org.jetlinks.core.message.codec.http.HttpRequestMessage;
+import org.jetlinks.core.message.codec.http.HttpResponseMessage;
+import org.jetlinks.core.message.codec.http.MultiPart;
 import org.jetlinks.community.network.http.DefaultHttpRequestMessage;
 import org.jetlinks.community.network.http.VertxWebUtils;
 import org.jetlinks.community.network.http.server.HttpExchange;
 import org.jetlinks.community.network.http.server.HttpRequest;
 import org.jetlinks.community.network.http.server.HttpResponse;
-import org.jetlinks.core.message.codec.http.*;
-import org.jetlinks.core.utils.Reactors;
 import org.springframework.http.HttpHeaders;
 import org.springframework.http.MediaType;
 import org.springframework.util.StringUtils;
 import reactor.core.publisher.Mono;
-import reactor.core.publisher.Sinks;
 
 import java.net.InetSocketAddress;
 import java.util.*;
@@ -72,50 +73,21 @@ public class VertxHttpExchange implements HttpExchange, HttpResponse, HttpReques
         if (httpServerRequest.method() == HttpMethod.GET) {
             body = Mono.just(Unpooled.EMPTY_BUFFER);
         } else {
-            log.debug("create multi part");
-            if (MultiPart.isMultiPart(getContentType())) {
-
-                Sinks.Many<ByteBuf> sink = Reactors.createMany(false);
 
-                this.httpServerRequest
-                    .handler(buf -> sink.emitNext(buf.getByteBuf(), (s, result) -> {
-                        if (result != Sinks.EmitResult.OK) {
-                            response
-                                .putHeader("X-Server-Error", result.toString())
-                                .setStatusCode(500)
-                                .end();
-                        }
-                        return false;
-                    }))
-                    .endHandler(ignore -> sink.tryEmitComplete());
+            Mono<ByteBuf> buffer = Mono
+                .fromCompletionStage(this.httpServerRequest.body().toCompletionStage())
+                .map(Buffer::getByteBuf);
 
+            if (MultiPart.isMultiPart(getContentType())) {
                 body = MultiPart
-                    .parse(getSpringHttpHeaders(), sink.asFlux())
+                    .parse(getSpringHttpHeaders(), buffer.flux())
                     .doOnNext(this::setMultiPart)
                     .thenReturn(Unpooled.EMPTY_BUFFER)
                     .cache();
             } else {
-                body = Mono
-                    .<ByteBuf>create(sink -> {
-                        if (this.httpServerRequest.isEnded()) {
-                            sink.success();
-                        } else {
-                            this.httpServerRequest
-                                .bodyHandler(buffer -> {
-                                    sink.success(buffer.getByteBuf());
-                                });
-                        }
-                    })
-                    .cache();
+                body = buffer;
             }
-            body
-                .doOnError(err -> {
-                    response
-                        .putHeader("X-Server-Error", err.getMessage())
-                        .setStatusCode(500)
-                        .end();
-                })
-                .subscribe();
+
         }
     }
 
@@ -228,7 +200,7 @@ public class VertxHttpExchange implements HttpExchange, HttpResponse, HttpReques
                 setResponseDefaultLength(buf.length());
                 response.write(buf, v -> {
                     sink.success();
-                    if(!(buffer instanceof UnpooledHeapByteBuf)){
+                    if (!(buffer instanceof UnpooledHeapByteBuf)) {
                         ReferenceCountUtil.safeRelease(buffer);
                     }
                 });

+ 58 - 22
jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/server/vertx/VertxHttpServer.java

@@ -7,6 +7,7 @@ import org.jetlinks.community.network.DefaultNetworkType;
 import org.jetlinks.community.network.NetworkType;
 import org.jetlinks.community.network.http.server.HttpExchange;
 import org.jetlinks.community.network.http.server.HttpServer;
+import org.jetlinks.community.network.http.server.WebSocketExchange;
 import org.springframework.http.HttpStatus;
 import reactor.core.Disposable;
 import reactor.core.Disposables;
@@ -36,6 +37,7 @@ public class VertxHttpServer implements HttpServer {
     private String id;
 
     private final Topic<FluxSink<HttpExchange>> route = Topic.createRoot();
+    private final Topic<FluxSink<WebSocketExchange>> websocketRoute = Topic.createRoot();
 
     @Getter
     @Setter
@@ -60,31 +62,57 @@ public class VertxHttpServer implements HttpServer {
         }
         this.httpServers = httpServers;
         for (io.vertx.core.http.HttpServer server : this.httpServers) {
-            server.requestHandler(request -> {
-                request.exceptionHandler(err -> {
-                    log.error(err.getMessage(), err);
-                });
-                VertxHttpExchange exchange = new VertxHttpExchange(request, config);
+            server
+                .webSocketHandler(socket -> {
+                    socket.exceptionHandler(err -> {
+                        log.error(err.getMessage(), err);
+                    });
+
+                    String url = socket.path();
+                    if (url.endsWith("/")) {
+                        url = url.substring(0, url.length() - 1);
+                    }
+                    VertxWebSocketExchange exchange = new VertxWebSocketExchange(socket);
 
-                String url = exchange.getUrl();
-                if (url.endsWith("/")) {
-                    url = url.substring(0, url.length() - 1);
-                }
+                    websocketRoute
+                        .findTopic("/ws" + url)
+                        .flatMapIterable(Topic::getSubscribers)
+                        .doOnNext(sink -> sink.next(exchange))
+                        .switchIfEmpty(Mono.fromRunnable(() -> {
+
+                            log.warn("http server no handler for:[{}://{}{}]", socket.scheme(), socket.host(), socket.path());
+                            socket.reject(404);
+
+                        }))
+                        .subscribe();
+
+                })
+                .requestHandler(request -> {
+                    request.exceptionHandler(err -> {
+                        log.error(err.getMessage(), err);
+                    });
+
+                    VertxHttpExchange exchange = new VertxHttpExchange(request, config);
 
-                route.findTopic("/" + exchange.request().getMethod().name().toLowerCase() + url)
-                     .flatMapIterable(Topic::getSubscribers)
-                     .doOnNext(sink -> sink.next(exchange))
-                     .switchIfEmpty(Mono.fromRunnable(() -> {
+                    String url = exchange.getUrl();
+                    if (url.endsWith("/")) {
+                        url = url.substring(0, url.length() - 1);
+                    }
+
+                    route.findTopic("/" + exchange.request().getMethod().name().toLowerCase() + url)
+                         .flatMapIterable(Topic::getSubscribers)
+                         .doOnNext(sink -> sink.next(exchange))
+                         .switchIfEmpty(Mono.fromRunnable(() -> {
 
-                         log.warn("http server no handler for:[{} {}://{}{}]", request.method(), request.scheme(), request.host(), request.path());
-                         request.response()
-                                .setStatusCode(HttpStatus.NOT_FOUND.value())
-                                .end();
+                             log.warn("http server no handler for:[{} {}://{}{}]", request.method(), request.scheme(), request.host(), request.path());
+                             request.response()
+                                    .setStatusCode(HttpStatus.NOT_FOUND.value())
+                                    .end();
 
-                     }))
-                     .subscribe();
+                         }))
+                         .subscribe();
 
-            });
+                });
             server.exceptionHandler(err -> log.error(err.getMessage(), err));
         }
     }
@@ -94,9 +122,17 @@ public class VertxHttpServer implements HttpServer {
         return handleRequest("*", "/**");
     }
 
+    @Override
+    public Flux<WebSocketExchange> handleWebsocket(String urlPattern) {
+        return createRoute(websocketRoute, "ws", urlPattern);
+    }
 
     @Override
     public Flux<HttpExchange> handleRequest(String method, String... urlPatterns) {
+        return createRoute(route, method, urlPatterns);
+    }
+
+    private <T> Flux<T> createRoute(Topic<FluxSink<T>> root, String prefix, String... urlPatterns) {
         return Flux.create(sink -> {
             Disposable.Composite disposable = Disposables.composite();
             for (String urlPattern : urlPatterns) {
@@ -116,9 +152,9 @@ public class VertxHttpServer implements HttpServer {
                 if (!pattern.startsWith("/")) {
                     pattern = "/".concat(pattern);
                 }
-                pattern = "/" + method + pattern;
+                pattern = "/" + prefix + pattern;
                 log.debug("handle http request : {}", pattern);
-                Topic<FluxSink<HttpExchange>> sub = route.append(pattern);
+                Topic<FluxSink<T>> sub = root.append(pattern);
                 sub.subscribe(sink);
                 disposable.add(() -> sub.unsubscribe(sink));
             }

+ 267 - 0
jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/server/vertx/VertxWebSocketExchange.java

@@ -0,0 +1,267 @@
+package org.jetlinks.community.network.http.server.vertx;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
+import io.netty.util.ReferenceCountUtil;
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpClosedException;
+import io.vertx.core.http.ServerWebSocket;
+import io.vertx.core.net.SocketAddress;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.hswebframework.web.id.IDGenerator;
+import org.jetlinks.community.network.http.server.WebSocketExchange;
+import org.jetlinks.core.message.codec.http.Header;
+import org.jetlinks.core.message.codec.http.websocket.DefaultWebSocketMessage;
+import org.jetlinks.core.message.codec.http.websocket.WebSocketMessage;
+import org.jetlinks.core.message.codec.http.websocket.WebSocketSession;
+import org.jetlinks.core.message.codec.http.websocket.WebSocketSessionMessageWrapper;
+import org.jetlinks.core.utils.Reactors;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Sinks;
+
+import javax.annotation.Nonnull;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * @author wangzheng
+ * @author zhouhao
+ * @see WebSocketSession
+ * @since 1.0
+ */
+@Slf4j
+public class VertxWebSocketExchange implements WebSocketExchange {
+
+    private final ServerWebSocket serverWebSocket;
+
+    private final InetSocketAddress address;
+
+    private final Sinks.Many<WebSocketMessage> sink = Reactors.createMany();
+
+    private final Map<String, Object> attributes = new ConcurrentHashMap<>();
+
+    private long keepAliveTimeOutMs = Duration.ofHours(1).toMillis();
+
+    private long lastKeepAliveTime = System.currentTimeMillis();
+
+    private final List<Runnable> closeHandler = new CopyOnWriteArrayList<>();
+
+    @Getter
+    private final String id;
+
+    public VertxWebSocketExchange(ServerWebSocket serverWebSocket) {
+        this.serverWebSocket = serverWebSocket;
+        doReceived();
+        SocketAddress socketAddress = serverWebSocket.remoteAddress();
+        address = new InetSocketAddress(socketAddress.host(), socketAddress.port());
+        this.id = IDGenerator.RANDOM.generate();
+    }
+
+    @Override
+    public Optional<InetSocketAddress> getRemoteAddress() {
+        return Optional.of(address);
+    }
+
+    private void doReceived() {
+        serverWebSocket
+            .textMessageHandler(text -> handle(textMessage(text)))
+            .binaryMessageHandler(msg -> handle(binaryMessage(msg.getByteBuf())))
+            .pongHandler(buf -> handle(pongMessage(buf.getByteBuf())))
+            .closeHandler((nil) -> doClose())
+            .exceptionHandler(err -> {
+                if (err instanceof HttpClosedException) {
+                    return;
+                }
+                log.error(err.getMessage(), err);
+            })
+        ;
+    }
+
+    private void doClose() {
+        sink.emitComplete(Reactors.emitFailureHandler());
+        for (Runnable runnable : closeHandler) {
+            runnable.run();
+        }
+        closeHandler.clear();
+
+    }
+
+    private void handle(WebSocketMessage message) {
+        this.lastKeepAliveTime = System.currentTimeMillis();
+        if (sink.currentSubscriberCount() > 0) {
+            sink.emitNext(WebSocketSessionMessageWrapper.of(message, this), Reactors.emitFailureHandler());
+        } else {
+            log.warn("websocket client[{}] session no handler", address);
+        }
+    }
+
+    @Override
+    public String getUri() {
+        return serverWebSocket.uri();
+    }
+
+    @Override
+    @Nonnull
+    public List<Header> getHeaders() {
+        return serverWebSocket
+            .headers()
+            .entries()
+            .stream()
+            .map(entry -> {
+                Header header = new Header();
+                header.setName(entry.getKey());
+                header.setValue(new String[]{entry.getValue()});
+                return header;
+            }).collect(Collectors.toList());
+    }
+
+    @Override
+    public Optional<Header> getHeader(String s) {
+        return Optional.ofNullable(serverWebSocket.headers().getAll(s))
+                       .map(list -> new Header(s, list.toArray(new String[0])))
+            ;
+    }
+
+    @Override
+    public Mono<Void> close() {
+        if (serverWebSocket.isClosed()) {
+            return Mono.empty();
+        }
+        return Mono.fromRunnable(serverWebSocket::close);
+    }
+
+    @Override
+    public Mono<Void> close(int i) {
+        return close(i, "Closed");
+    }
+
+    @Override
+    public Mono<Void> close(int status, String reason) {
+        if (serverWebSocket.isClosed()) {
+            return Mono.empty();
+        }
+        return Mono.defer(() -> {
+            short code = WebSocketCloseStatus.isValidStatusCode(status) ? (short) status : (short) WebSocketCloseStatus.BAD_GATEWAY.code();
+
+            return Mono.fromCompletionStage(serverWebSocket
+                                                .close(code, reason)
+                                                .toCompletionStage());
+        });
+    }
+
+    @Override
+    public Map<String, Object> getAttributes() {
+        return attributes;
+    }
+
+    @Override
+    public Optional<Object> getAttribute(String s) {
+        return Optional.ofNullable(attributes.get(s));
+    }
+
+    @Override
+    public void setAttribute(String s, Object o) {
+        attributes.put(s, o);
+    }
+
+    @Override
+    public Flux<WebSocketMessage> receive() {
+        return sink.asFlux();
+    }
+
+    @Override
+    public Mono<Void> send(WebSocketMessage webSocketMessage) {
+        ByteBuf payload = webSocketMessage.getPayload();
+        return this
+            .doWrite(handler -> {
+                switch (webSocketMessage.getType()) {
+                    case TEXT:
+                        serverWebSocket.writeTextMessage(webSocketMessage.payloadAsString(), handler);
+                        return;
+                    case BINARY:
+                        serverWebSocket.writeBinaryMessage(Buffer.buffer(payload), handler);
+                        return;
+                    case PING:
+                        serverWebSocket.writePing(Buffer.buffer(payload));
+                        handler.handle(Future.succeededFuture());
+                        return;
+                }
+                throw new UnsupportedOperationException("unsupported message type" + webSocketMessage.getType());
+            })
+            .doAfterTerminate(() -> ReferenceCountUtil.safeRelease(payload));
+    }
+
+    protected Mono<Void> doWrite(Consumer<Handler<AsyncResult<Void>>> handler) {
+        this.lastKeepAliveTime = System.currentTimeMillis();
+        return Mono.<Void>create(sink -> {
+            try {
+                handler.accept(result -> {
+                    if (result.succeeded()) {
+                        sink.success();
+                    } else {
+                        sink.error(result.cause());
+                    }
+                });
+            } catch (Throwable e) {
+                sink.error(e);
+            }
+        });
+    }
+
+    @Override
+    public WebSocketMessage textMessage(String s) {
+        return DefaultWebSocketMessage.of(WebSocketMessage.Type.TEXT, Unpooled.wrappedBuffer(s.getBytes()));
+    }
+
+    @Override
+    public WebSocketMessage binaryMessage(ByteBuf byteBuf) {
+        return DefaultWebSocketMessage.of(WebSocketMessage.Type.BINARY, byteBuf);
+    }
+
+    @Override
+    public WebSocketMessage pingMessage(ByteBuf byteBuf) {
+        return DefaultWebSocketMessage.of(WebSocketMessage.Type.PING, byteBuf);
+    }
+
+    @Override
+    public WebSocketMessage pongMessage(ByteBuf byteBuf) {
+        return DefaultWebSocketMessage.of(WebSocketMessage.Type.PONG, byteBuf);
+    }
+
+    @Override
+    public boolean isAlive() {
+        return !serverWebSocket.isClosed() &&
+            (
+                keepAliveTimeOutMs <= 0 || System.currentTimeMillis() - lastKeepAliveTime < keepAliveTimeOutMs
+            );
+    }
+
+    @Override
+    public void setKeepAliveTimeout(Duration duration) {
+        this.keepAliveTimeOutMs = duration.toMillis();
+        this.lastKeepAliveTime = System.currentTimeMillis();
+    }
+
+    @Override
+    public void closeHandler(Runnable handler) {
+        closeHandler.add(handler);
+    }
+
+    @Override
+    public long getLastKeepAliveTime() {
+        return lastKeepAliveTime;
+    }
+}