浏览代码

优化处理器

周浩 9 年之前
父节点
当前提交
d3bd7e4b7f

+ 7 - 1
hsweb-web-websocket/src/main/java/org/hsweb/web/socket/cmd/support/AbstractCmdProcessor.java

@@ -3,6 +3,7 @@ package org.hsweb.web.socket.cmd.support;
 import org.hsweb.web.bean.po.user.User;
 import org.hsweb.web.core.session.HttpSessionManager;
 import org.hsweb.web.socket.cmd.CmdProcessor;
+import org.hsweb.web.socket.message.WebSocketMessageManager;
 import org.hsweb.web.socket.utils.SessionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -15,13 +16,18 @@ import org.springframework.web.socket.WebSocketSession;
 public abstract class AbstractCmdProcessor implements CmdProcessor {
     protected Logger logger = LoggerFactory.getLogger(this.getClass());
 
-    private HttpSessionManager httpSessionManager;
+    protected HttpSessionManager httpSessionManager;
+    protected WebSocketMessageManager webSocketMessageManager;
 
     @Autowired
     public void setHttpSessionManager(HttpSessionManager httpSessionManager) {
         this.httpSessionManager = httpSessionManager;
     }
 
+    public void setWebSocketMessageManager(WebSocketMessageManager webSocketMessageManager) {
+        this.webSocketMessageManager = webSocketMessageManager;
+    }
+
     public User getUser(WebSocketSession socketSession) {
         return SessionUtils.getUser(socketSession, httpSessionManager);
     }

+ 78 - 16
hsweb-web-websocket/src/main/java/org/hsweb/web/socket/cmd/support/SystemMonitorProcessor.java

@@ -18,11 +18,13 @@ import java.util.concurrent.*;
  */
 public class SystemMonitorProcessor extends AbstractCmdProcessor {
 
-    private WebSocketMessageManager webSocketMessageManager;
+
     private Sigar sigar;
     private ExecutorService exec = Executors.newCachedThreadPool();
     private Map<String, Publish> cpuPublish = new ConcurrentHashMap<>();
-    private boolean cpuMonitorIsStarted;
+    private Map<String, Publish> memPublish = new ConcurrentHashMap<>();
+
+    private boolean cpuMonitorIsStarted, memMonitorIsStarted;
 
     public SystemMonitorProcessor() {
         sigar = new Sigar();
@@ -41,9 +43,9 @@ public class SystemMonitorProcessor extends AbstractCmdProcessor {
     public void exec(CMD cmd) throws Exception {
         String type = ((String) cmd.getParams().get("type"));
         if (type == null) return;
+        String userId = getUser(cmd).getId();
         switch (type) {
             case "cpu":
-                String userId = getUser(cmd).getId();
                 Publish publish = cpuPublish.get(userId);
                 if (publish == null) {
                     publish = new Publish();
@@ -58,19 +60,43 @@ public class SystemMonitorProcessor extends AbstractCmdProcessor {
                 }
                 webSocketMessageManager.subscribe(getName(), userId, cmd.getSession());
                 break;
-            case "cpu-cancel":
-                userId = getUser(cmd).getId();
-                publish = cpuPublish.get(userId);
-                if (publish != null) {
-                    publish.removeSession(cmd.getSession());
-                    if (publish.getSessionMap().isEmpty())
-                        cpuPublish.remove(userId);
+            case "mem":
+                publish = memPublish.get(userId);
+                if (publish == null) {
+                    publish = new Publish();
+                    publish.setUserId(userId);
+                    publish.setCallback((String) cmd.getParams().get("callback"));
+                    memPublish.put(userId, publish);
+                }
+                publish.addSession(cmd.getSession());
+                if (!memMonitorIsStarted) {
+                    startPublishMem();
+                    memMonitorIsStarted = true;
                 }
+                webSocketMessageManager.subscribe(getName(), userId, cmd.getSession());
+                break;
+            case "mem-cancel":
+                cancelPublish(memPublish, userId, cmd.getSession());
+                break;
+            case "cpu-cancel":
+                cancelPublish(cpuPublish, userId, cmd.getSession());
+                break;
+            case "cancel":
+                cancelPublish(memPublish, userId, cmd.getSession());
+                cancelPublish(cpuPublish, userId, cmd.getSession());
                 webSocketMessageManager.deSubscribe(getName(), userId, cmd.getSession());
                 break;
         }
     }
 
+    protected void cancelPublish(Map<String, Publish> publishMap, String userId, WebSocketSession socketSession) {
+        Publish publish = publishMap.get(userId);
+        if (publish != null) {
+            publish.removeSession(socketSession);
+            if (publish.getSessionMap().isEmpty())
+                publishMap.remove(userId);
+        }
+    }
 
     public User getUser(CMD cmd) {
         return getUser(cmd.getSession());
@@ -150,6 +176,43 @@ public class SystemMonitorProcessor extends AbstractCmdProcessor {
         });
     }
 
+    public Future startPublishMem() throws Exception {
+        return exec.submit((Callable) () -> {
+            for (; ; ) {
+                try {
+                    if (memPublish.isEmpty()) {
+                        Thread.sleep(1000);
+                        if (memPublish.isEmpty()) {
+                            memMonitorIsStarted = false;
+                            return null;
+                        }
+                    }
+                    Map<String, Object> map = sigar.getMem().toMap();
+                    Runtime runtime=  Runtime.getRuntime();
+                    map.put("jvmTotal", runtime.totalMemory());
+                    map.put("jvmMax",runtime.maxMemory());
+                    map.put("jvmFree",runtime.freeMemory());
+                    memPublish.values().forEach(publish -> {
+                        WebSocketMessage msg = new WebSocketMessage();
+                        msg.setTo(publish.getUserId());
+                        msg.setContent(map);
+                        msg.setType(getName());
+                        msg.setCallBack(publish.getCallback());
+                        msg.setFrom("system");
+                        try {
+                            webSocketMessageManager.publish(msg);
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                        }
+
+                    });
+                    Thread.sleep(1000);
+                } catch (Exception e) {
+                }
+            }
+        });
+    }
+
     @Override
     public void init() throws Exception {
 
@@ -162,13 +225,12 @@ public class SystemMonitorProcessor extends AbstractCmdProcessor {
     @Override
     public void onSessionClose(WebSocketSession session) throws Exception {
         User user = getUser(session);
-        synchronized (cpuPublish) {
-            Publish publish = cpuPublish.get(user.getId());
-            if (publish != null) {
-                publish.removeSession(session);
-                if (publish.getSessionMap().isEmpty()) cpuPublish.remove(user.getId());
-            }
+        if (user != null) {
+            cancelPublish(cpuPublish, user.getId(), session);
+            cancelPublish(memPublish, user.getId(), session);
+            webSocketMessageManager.deSubscribe(getName(), user.getId(),session);
         }
+
     }
 
 }