Browse Source

优化websocket

周浩 9 years ago
parent
commit
2043597da0

+ 43 - 0
hsweb-web-websocket/src/main/java/org/hsweb/web/socket/CMDWebSocketAutoConfiguration.java

@@ -0,0 +1,43 @@
+package org.hsweb.web.socket;
+
+import org.hsweb.web.socket.cmd.support.SystemMonitorProcessor;
+import org.hsweb.web.socket.message.SimpleWebSocketMessageManager;
+import org.hsweb.web.socket.message.WebSocketMessageManager;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.socket.config.annotation.WebSocketConfigurationSupport;
+import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
+
+/**
+ * Created by zhouhao on 16-5-29.
+ */
+@ComponentScan(basePackages = "org.hsweb.web.socket")
+@Configuration
+public class CMDWebSocketAutoConfiguration extends WebSocketConfigurationSupport {
+    @Autowired
+    private CmdWebSocketHandler cmdWebSocketHandler;
+    @Bean
+    @ConditionalOnMissingBean(WebSocketMessageManager.class)
+    public SimpleWebSocketMessageManager simpleWebSocketMessageManager() {
+        return new SimpleWebSocketMessageManager();
+    }
+
+    @Bean
+    @ConditionalOnClass(name = "org.hyperic.sigar.Sigar")
+    public SystemMonitorProcessor systemMonitorProcessor() {
+        SystemMonitorProcessor processor = new SystemMonitorProcessor();
+        processor.setWebSocketMessageManager(simpleWebSocketMessageManager());
+        return processor;
+    }
+
+    @Override
+    protected void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
+        //绑定到 /socket
+        registry.addHandler(cmdWebSocketHandler, "/socket");
+        registry.addHandler(cmdWebSocketHandler, "/socket/js").withSockJS();
+    }
+}

+ 17 - 6
hsweb-web-websocket/src/main/java/org/hsweb/web/socket/CmdWebSocketHandler.java

@@ -2,9 +2,12 @@ package org.hsweb.web.socket;
 
 import com.alibaba.fastjson.JSON;
 import org.hsweb.web.bean.po.user.User;
+import org.hsweb.web.core.session.HttpSessionManager;
 import org.hsweb.web.socket.cmd.CMD;
 import org.hsweb.web.socket.cmd.CmdProcessor;
 import org.hsweb.web.socket.cmd.CmdProcessorContainer;
+import org.hsweb.web.socket.message.WebSocketMessageManager;
+import org.hsweb.web.socket.utils.SessionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -23,6 +26,12 @@ public class CmdWebSocketHandler extends TextWebSocketHandler {
 
     protected Logger logger = LoggerFactory.getLogger(this.getClass());
 
+    @Autowired
+    private HttpSessionManager httpSessionManager;
+
+    @Autowired
+    private WebSocketMessageManager webSocketMessageManager;
+
     @Autowired
     private CmdProcessorContainer processorContainer;
 
@@ -31,7 +40,6 @@ public class CmdWebSocketHandler extends TextWebSocketHandler {
         if (logger.isInfoEnabled())
             logger.info("handleMessage,id:{} msg={}", session.getId(), message.getPayload());
         try {
-
             CMD request = JSON.parseObject(message.getPayload(), CMD.class);
             CmdProcessor processor = processorContainer.getCmdProcessor(request.getCmd());
             if (null != processor) {
@@ -44,16 +52,18 @@ public class CmdWebSocketHandler extends TextWebSocketHandler {
     }
 
     private User getUser(WebSocketSession session) {
-        return (User) session.getAttributes().get("user");
+        return SessionUtils.getUser(session, httpSessionManager);
     }
 
     @Override
     public void afterConnectionEstablished(WebSocketSession session) throws Exception {
         User user = getUser(session);
-//        if (user == null) {
-//            session.close(CloseStatus.BAD_DATA.withReason("请登陆!"));
-//            return;
-//        }
+        if (user == null) {
+            session.close(CloseStatus.BAD_DATA.withReason("请登陆!"));
+            return;
+        }
+        session.getAttributes().put("user", user);
+        webSocketMessageManager.onSessionConnect(session);
         for (CmdProcessor processor : processorContainer.getAll()) {
             processor.onSessionConnect(session);
         }
@@ -61,6 +71,7 @@ public class CmdWebSocketHandler extends TextWebSocketHandler {
 
     @Override
     public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
+        webSocketMessageManager.onSessionClose(session);
         for (CmdProcessor processor : processorContainer.getAll()) {
             processor.onSessionClose(session);
         }

+ 1 - 3
hsweb-web-websocket/src/main/java/org/hsweb/web/socket/cmd/SpringCmdProcessorContainer.java

@@ -56,9 +56,7 @@ public class SpringCmdProcessorContainer implements CmdProcessorContainer {
     public void init() {
         Map<String, CmdProcessor> processorMap = applicationContext.getBeansOfType(CmdProcessor.class);
         if (processorMap != null) {
-            for (CmdProcessor cmdProcessor : processorMap.values()) {
-                registerCmdProcessor(cmdProcessor);
-            }
+            processorMap.values().forEach(this::registerCmdProcessor);
         }
     }
 }

+ 28 - 0
hsweb-web-websocket/src/main/java/org/hsweb/web/socket/cmd/support/AbstractCmdProcessor.java

@@ -0,0 +1,28 @@
+package org.hsweb.web.socket.cmd.support;
+
+import org.hsweb.web.bean.po.user.User;
+import org.hsweb.web.core.session.HttpSessionManager;
+import org.hsweb.web.socket.cmd.CmdProcessor;
+import org.hsweb.web.socket.utils.SessionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.socket.WebSocketSession;
+
+/**
+ * Created by zhouhao on 16-5-30.
+ */
+public abstract class AbstractCmdProcessor implements CmdProcessor {
+    protected Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    private HttpSessionManager httpSessionManager;
+
+    @Autowired
+    public void setHttpSessionManager(HttpSessionManager httpSessionManager) {
+        this.httpSessionManager = httpSessionManager;
+    }
+
+    public User getUser(WebSocketSession socketSession) {
+        return SessionUtils.getUser(socketSession, httpSessionManager);
+    }
+}

+ 174 - 0
hsweb-web-websocket/src/main/java/org/hsweb/web/socket/cmd/support/SystemMonitorProcessor.java

@@ -0,0 +1,174 @@
+package org.hsweb.web.socket.cmd.support;
+
+import org.hsweb.web.bean.po.user.User;
+import org.hsweb.web.socket.cmd.CMD;
+import org.hsweb.web.socket.message.WebSocketMessage;
+import org.hsweb.web.socket.message.WebSocketMessageManager;
+import org.hyperic.sigar.CpuInfo;
+import org.hyperic.sigar.CpuPerc;
+import org.hyperic.sigar.Sigar;
+import org.springframework.web.socket.WebSocketSession;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Created by zhouhao on 16-5-29.
+ */
+public class SystemMonitorProcessor extends AbstractCmdProcessor {
+
+    private WebSocketMessageManager webSocketMessageManager;
+    private Sigar sigar;
+    private ExecutorService exec = Executors.newCachedThreadPool();
+    private Map<String, Publish> cpuPublish = new ConcurrentHashMap<>();
+    private boolean cpuMonitorIsStarted;
+
+    public SystemMonitorProcessor() {
+        sigar = new Sigar();
+    }
+
+    public void setWebSocketMessageManager(WebSocketMessageManager webSocketMessageManager) {
+        this.webSocketMessageManager = webSocketMessageManager;
+    }
+
+    @Override
+    public String getName() {
+        return "system-monitor";
+    }
+
+    @Override
+    public void exec(CMD cmd) throws Exception {
+        String type = ((String) cmd.getParams().get("type"));
+        if (type == null) return;
+        switch (type) {
+            case "cpu":
+                String userId = getUser(cmd).getId();
+                Publish publish = cpuPublish.get(userId);
+                if (publish == null) {
+                    publish = new Publish();
+                    publish.setUserId(userId);
+                    publish.setCallback((String) cmd.getParams().get("callback"));
+                    cpuPublish.put(userId, publish);
+                }
+                publish.addSession(cmd.getSession());
+                if (!cpuMonitorIsStarted) {
+                    startPublishCpu();
+                    cpuMonitorIsStarted = true;
+                }
+                webSocketMessageManager.subscribe(getName(), userId, cmd.getSession());
+                break;
+            case "cpu-cancel":
+                userId = getUser(cmd).getId();
+                publish = cpuPublish.get(userId);
+                if (publish != null) {
+                    publish.removeSession(cmd.getSession());
+                    if (publish.getSessionMap().isEmpty())
+                        cpuPublish.remove(userId);
+                }
+                webSocketMessageManager.deSubscribe(getName(), userId, cmd.getSession());
+                break;
+        }
+    }
+
+
+    public User getUser(CMD cmd) {
+        return getUser(cmd.getSession());
+    }
+
+    class Publish {
+        private String userId;
+        private String callback;
+        private Map<String, WebSocketSession> sessionMap = Collections.synchronizedMap(new HashMap<>());
+
+        public String getUserId() {
+            return userId;
+        }
+
+        public void setUserId(String userId) {
+            this.userId = userId;
+        }
+
+        public String getCallback() {
+            return callback;
+        }
+
+        public void setCallback(String callback) {
+            this.callback = callback;
+        }
+
+        public Map<String, WebSocketSession> getSessionMap() {
+            return sessionMap;
+        }
+
+        public void removeSession(WebSocketSession socketSession) {
+            sessionMap.remove(socketSession.getId());
+        }
+
+        public void addSession(WebSocketSession socketSession) {
+            sessionMap.put(socketSession.getId(), socketSession);
+        }
+    }
+
+    public Future startPublishCpu() throws Exception {
+        return exec.submit((Callable) () -> {
+            for (; ; ) {
+                try {
+                    if (cpuPublish.isEmpty()) {
+                        Thread.sleep(1000);
+                        if (cpuPublish.isEmpty()) {
+                            cpuMonitorIsStarted = false;
+                            return null;
+                        }
+                    }
+                    List<Map> infoList = new LinkedList<>();
+                    CpuInfo[] cpuInfo = sigar.getCpuInfoList();
+                    CpuPerc[] cpuPercs = sigar.getCpuPercList();
+                    for (int i = 0; i < cpuInfo.length; i++) {
+                        Map info = cpuInfo[i].toMap();
+                        info.put("perc", cpuPercs[i]);
+                        infoList.add(info);
+                    }
+                    cpuPublish.values().forEach(publish -> {
+                        WebSocketMessage msg = new WebSocketMessage();
+                        msg.setTo(publish.getUserId());
+                        msg.setContent(infoList);
+                        msg.setType(getName());
+                        msg.setCallBack(publish.getCallback());
+                        msg.setFrom("system");
+                        try {
+                            webSocketMessageManager.publish(msg);
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                        }
+
+                    });
+                    Thread.sleep(1000);
+                } catch (Exception e) {
+                }
+            }
+        });
+    }
+
+    @Override
+    public void init() throws Exception {
+
+    }
+
+    @Override
+    public void onSessionConnect(WebSocketSession session) throws Exception {
+    }
+
+    @Override
+    public void onSessionClose(WebSocketSession session) throws Exception {
+        User user = getUser(session);
+        synchronized (cpuPublish) {
+            Publish publish = cpuPublish.get(user.getId());
+            if (publish != null) {
+                publish.removeSession(session);
+                if (publish.getSessionMap().isEmpty()) cpuPublish.remove(user.getId());
+            }
+        }
+    }
+
+}

+ 172 - 0
hsweb-web-websocket/src/main/java/org/hsweb/web/socket/message/SimpleWebSocketMessageManager.java

@@ -0,0 +1,172 @@
+package org.hsweb.web.socket.message;
+
+import org.hsweb.web.bean.po.user.User;
+import org.hsweb.web.core.exception.AuthorizeException;
+import org.hsweb.web.core.session.HttpSessionManager;
+import org.hsweb.web.socket.utils.SessionUtils;
+import org.springframework.web.socket.TextMessage;
+import org.springframework.web.socket.WebSocketSession;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Created by zhouhao on 16-5-29.
+ */
+public class SimpleWebSocketMessageManager implements WebSocketMessageManager {
+
+    private static final ConcurrentMap<String, Map<String, WebSocketSession>> session_map = new ConcurrentHashMap<>();
+
+    private static final ConcurrentMap<String, Subscribe> subscribe_map = new ConcurrentHashMap<>();
+
+    private static final ConcurrentMap<String, Map<String, Queue<WebSocketMessage>>> message_map = new ConcurrentHashMap<>();
+
+    private HttpSessionManager httpSessionManager;
+
+    public void setHttpSessionManager(HttpSessionManager httpSessionManager) {
+        this.httpSessionManager = httpSessionManager;
+    }
+
+    public Map<String, WebSocketSession> getSessionMap(String userId) {
+        Map<String, WebSocketSession> map = session_map.get(userId);
+        if (map == null) {
+            map = Collections.synchronizedMap(new HashMap<>());
+            session_map.put(userId, map);
+        }
+        return map;
+    }
+
+    @Override
+    public boolean publish(WebSocketMessage message) throws IOException {
+        String to = message.getTo();
+        Subscribe subscribe = subscribe_map.get(to);
+        Map<String, WebSocketSession> socketSession = getSessionMap(message.getTo());
+        if (!socketSession.isEmpty() && subscribe != null) {
+            if (message.getSessionId() == null)
+                socketSession.values().forEach(session -> {
+                    try {
+                        if (subscribe.getTopic(session.getId()).contains(message.getType()))
+                            session.sendMessage(new TextMessage(message.toString()));
+                        else saveMessage(message);
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                    }
+                });
+            else {
+                WebSocketSession session = socketSession.get(message.getSessionId());
+                if (session != null && session.isOpen()) {
+                    session.sendMessage(new TextMessage(message.toString()));
+                }
+
+            }
+            return true;
+        } else {
+            saveMessage(message);
+        }
+        return false;
+    }
+
+    protected Queue<WebSocketMessage> getMessageQueue(String userId, String type) {
+        Map<String, Queue<WebSocketMessage>> message_type_map = message_map.get(userId);
+        if (message_type_map == null) {
+            message_type_map = new ConcurrentHashMap<>();
+            message_map.putIfAbsent(userId, message_type_map);
+        }
+        Queue<WebSocketMessage> queue = message_type_map.get(type);
+        if (queue == null) {
+            queue = new ConcurrentLinkedQueue<>();
+            message_type_map.putIfAbsent(type, queue);
+        }
+        return queue;
+    }
+
+    protected void saveMessage(WebSocketMessage message) {
+        getMessageQueue(message.getTo(), message.getType()).offer(message);
+    }
+
+    @Override
+    public boolean deSubscribe(String type, String userId, WebSocketSession socketSession) {
+        return getSubscribe(userId).getTopic(socketSession.getId()).remove(type);
+    }
+
+    protected Subscribe getSubscribe(String userId) {
+        Subscribe subscribe = subscribe_map.get(userId);
+        synchronized (subscribe_map) {
+            if (subscribe == null) {
+                subscribe = new Subscribe();
+                subscribe.setUserId(userId);
+                subscribe_map.put(userId, subscribe);
+            }
+        }
+        return subscribe;
+    }
+
+    @Override
+    public boolean subscribe(String type, String userId, WebSocketSession socketSession) {
+        getSubscribe(userId).getTopic(socketSession.getId()).add(type);
+        //推送未读消息
+        Queue<WebSocketMessage> queue = getMessageQueue(userId, type);
+        while (!queue.isEmpty()) {
+            try {
+                publish(queue.poll());
+            } catch (IOException e) {
+            }
+        }
+        return true;
+    }
+
+    class Subscribe {
+        private String userId;
+        private Map<String, Set<String>> topic = Collections.synchronizedMap(new HashMap<>());
+
+        public String getUserId() {
+            return userId;
+        }
+
+        public void setUserId(String userId) {
+            this.userId = userId;
+        }
+
+        public void cancelTopic(String sessionId) {
+            topic.remove(sessionId);
+        }
+
+        public Set<String> getTopic(String sessionId) {
+            Set<String> tp = topic.get(sessionId);
+            if (tp == null) {
+                tp = Collections.synchronizedSet(new HashSet<>());
+                topic.putIfAbsent(sessionId, tp);
+            }
+            return tp;
+        }
+
+    }
+
+    @Override
+    public void onSessionConnect(WebSocketSession session) throws Exception {
+        User user = getUser(session);
+        if (user == null) {
+            throw new AuthorizeException("未登录");
+        }
+        getSessionMap(user.getId()).put(session.getId(), session);
+    }
+
+    @Override
+    public void onSessionClose(WebSocketSession session) throws Exception {
+        User user = getUser(session);
+        if (user == null) {
+            return;
+        }
+        Subscribe subscribe = subscribe_map.get(user.getId());
+        if (subscribe != null)
+            subscribe.cancelTopic(session.getId());
+        getSessionMap(user.getId()).remove(session.getId());
+    }
+
+    protected User getUser(WebSocketSession session) {
+        return SessionUtils.getUser(session, httpSessionManager);
+    }
+}

+ 19 - 3
hsweb-web-websocket/src/main/java/org/hsweb/web/socket/message/WebSocketMessage.java

@@ -1,5 +1,7 @@
 package org.hsweb.web.socket.message;
 
+import com.alibaba.fastjson.JSON;
+
 import java.io.Serializable;
 import java.util.Date;
 
@@ -27,13 +29,14 @@ public class WebSocketMessage implements Serializable {
     /**
      * 消息内容
      */
-    private String content;
+    private Object content;
 
     /**
      * 发送日期
      */
     private Date sendTime;
 
+    private String sessionId;
     /**
      * 前端回掉,前端注册了此消息的回掉后,在接收到消息时,会自动触发该回掉
      */
@@ -63,11 +66,11 @@ public class WebSocketMessage implements Serializable {
         this.type = type;
     }
 
-    public String getContent() {
+    public Object getContent() {
         return content;
     }
 
-    public void setContent(String content) {
+    public void setContent(Object content) {
         this.content = content;
     }
 
@@ -86,4 +89,17 @@ public class WebSocketMessage implements Serializable {
     public void setCallBack(String callBack) {
         this.callBack = callBack;
     }
+
+    public String getSessionId() {
+        return sessionId;
+    }
+
+    public void setSessionId(String sessionId) {
+        this.sessionId = sessionId;
+    }
+
+    @Override
+    public String toString() {
+        return JSON.toJSONString(this);
+    }
 }

+ 8 - 1
hsweb-web-websocket/src/main/java/org/hsweb/web/socket/message/WebSocketMessageManager.java

@@ -1,6 +1,10 @@
 package org.hsweb.web.socket.message;
 
+import org.hsweb.web.bean.po.user.User;
 import org.hsweb.web.socket.WebSocketSessionListener;
+import org.springframework.web.socket.WebSocketSession;
+
+import java.io.IOException;
 
 /**
  * websocket消息管理器,用于使用websocket进行消息推送
@@ -14,6 +18,9 @@ public interface WebSocketMessageManager extends WebSocketSessionListener {
      * @param message 消息实例
      * @return 是否发送成功
      */
-    boolean send(WebSocketMessage message);
+    boolean publish(WebSocketMessage message) throws IOException;
+
+    boolean subscribe(String type, String userId,WebSocketSession socketSession);
 
+    boolean deSubscribe(String type, String userId,WebSocketSession socketSession);
 }

+ 37 - 0
hsweb-web-websocket/src/main/java/org/hsweb/web/socket/utils/SessionUtils.java

@@ -0,0 +1,37 @@
+package org.hsweb.web.socket.utils;
+
+import org.hsweb.web.bean.po.user.User;
+import org.hsweb.web.core.session.HttpSessionManager;
+import org.springframework.http.HttpHeaders;
+import org.springframework.web.socket.WebSocketSession;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by zhouhao on 16-5-30.
+ */
+public class SessionUtils {
+    public static User getUser(WebSocketSession session, HttpSessionManager sessionManager) {
+        User user = ((User) session.getAttributes().get("user"));
+        if (user != null) return user;
+        HttpHeaders headers = session.getHandshakeHeaders();
+        List<String> cookies = headers.get("Cookie");
+        if (cookies == null || cookies.isEmpty()) {
+            return null;
+        }
+        String[] cookie = cookies.get(0).split("[;]");
+        Map<String, Object> sessionId = new HashMap<>();
+        for (int i = 0; i < cookie.length; i++) {
+            String[] tmp = cookie[i].split("[=]");
+            if (tmp.length == 2)
+                sessionId.put(tmp[0].trim(), tmp[1].trim());
+        }
+        user = sessionManager.getUserBySessionId((String) sessionId.get("SESSION"));
+        if (user == null) {
+            user = sessionManager.getUserBySessionId((String) sessionId.get("JSESSIONID"));
+        }
+        return user;
+    }
+}