refactor: Simplify process data handling and improve performance

- Replaced goroutine-based processing with a direct loop for handling process data.
- Introduced context support for process and connection retrieval.
- Enhanced error handling and data structuring for process information.
- Improved SSH session retrieval by mapping users by host for better efficiency.
This commit is contained in:
HynoR 2025-12-08 11:04:47 +08:00
parent f9a20ec443
commit 8fda4d5bc0

View file

@ -1,14 +1,14 @@
package websocket
import (
"context"
"encoding/json"
"fmt"
"github.com/1Panel-dev/1Panel/agent/utils/common"
"sort"
"strings"
"sync"
"time"
"github.com/1Panel-dev/1Panel/agent/utils/common"
"github.com/1Panel-dev/1Panel/agent/global"
"github.com/1Panel-dev/1Panel/agent/utils/files"
"github.com/shirou/gopsutil/v4/host"
@ -155,93 +155,85 @@ func getDownloadProcess(progress DownloadProgress) (res []byte, err error) {
return
}
func handleProcessData(proc *process.Process, processConfig *PsProcessConfig, pidConnections map[int32][]net.ConnectionStat) *PsProcessData {
if processConfig.Pid > 0 && processConfig.Pid != proc.Pid {
return nil
}
procData := PsProcessData{
PID: proc.Pid,
}
if procName, err := proc.Name(); err == nil {
procData.Name = procName
} else {
procData.Name = "<UNKNOWN>"
}
if processConfig.Name != "" && !strings.Contains(procData.Name, processConfig.Name) {
return nil
}
if username, err := proc.Username(); err == nil {
procData.Username = username
}
if processConfig.Username != "" && !strings.Contains(procData.Username, processConfig.Username) {
return nil
}
procData.PPID, _ = proc.Ppid()
statusArray, _ := proc.Status()
if len(statusArray) > 0 {
procData.Status = strings.Join(statusArray, ",")
}
createTime, procErr := proc.CreateTime()
if procErr == nil {
t := time.Unix(createTime/1000, 0)
procData.StartTime = t.Format("2006-1-2 15:04:05")
}
procData.NumThreads, _ = proc.NumThreads()
procData.CpuValue, _ = proc.CPUPercent()
procData.CpuPercent = fmt.Sprintf("%.2f%%", procData.CpuValue)
if memInfo, err := proc.MemoryInfo(); err == nil {
procData.RssValue = memInfo.RSS
procData.Rss = common.FormatBytes(memInfo.RSS)
} else {
procData.RssValue = 0
}
if connections, ok := pidConnections[proc.Pid]; ok {
procData.NumConnections = len(connections)
}
return &procData
}
func getProcessData(processConfig PsProcessConfig) (res []byte, err error) {
var processes []*process.Process
processes, err = process.Processes()
ctx := context.Background()
processes, err := process.ProcessesWithContext(ctx)
if err != nil {
return
}
var (
result []PsProcessData
resultMutex sync.Mutex
wg sync.WaitGroup
numWorkers = 4
)
handleData := func(proc *process.Process) {
procData := PsProcessData{
PID: proc.Pid,
}
if processConfig.Pid > 0 && processConfig.Pid != proc.Pid {
return
}
if procName, err := proc.Name(); err == nil {
procData.Name = procName
} else {
procData.Name = "<UNKNOWN>"
}
if processConfig.Name != "" && !strings.Contains(procData.Name, processConfig.Name) {
return
}
if username, err := proc.Username(); err == nil {
procData.Username = username
}
if processConfig.Username != "" && !strings.Contains(procData.Username, processConfig.Username) {
return
}
procData.PPID, _ = proc.Ppid()
statusArray, _ := proc.Status()
if len(statusArray) > 0 {
procData.Status = strings.Join(statusArray, ",")
}
createTime, procErr := proc.CreateTime()
if procErr == nil {
t := time.Unix(createTime/1000, 0)
procData.StartTime = t.Format("2006-1-2 15:04:05")
}
procData.NumThreads, _ = proc.NumThreads()
procData.CpuValue, _ = proc.CPUPercent()
procData.CpuPercent = fmt.Sprintf("%.2f", procData.CpuValue) + "%"
if memInfo, err := proc.MemoryInfo(); err == nil {
procData.RssValue = memInfo.RSS
procData.Rss = common.FormatBytes(memInfo.RSS)
} else {
procData.RssValue = 0
}
if connections, err := proc.Connections(); err == nil {
procData.NumConnections = len(connections)
}
resultMutex.Lock()
result = append(result, procData)
resultMutex.Unlock()
connections, err := net.ConnectionsMaxWithContext(ctx, "all", 32768)
if err != nil {
return
}
chunkSize := (len(processes) + numWorkers - 1) / numWorkers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
start := i * chunkSize
end := (i + 1) * chunkSize
if end > len(processes) {
end = len(processes)
pidConnections := make(map[int32][]net.ConnectionStat, len(processes))
for _, conn := range connections {
if conn.Pid == 0 {
continue
}
go func(start, end int) {
defer wg.Done()
for j := start; j < end; j++ {
handleData(processes[j])
}
}(start, end)
pidConnections[conn.Pid] = append(pidConnections[conn.Pid], conn)
}
wg.Wait()
result := make([]PsProcessData, 0, len(processes))
for _, proc := range processes {
procData := handleProcessData(proc, &processConfig, pidConnections)
if procData != nil {
result = append(result, *procData)
}
}
sort.Slice(result, func(i, j int) bool {
return result[i].PID < result[j].PID
})
res, err = json.Marshal(result)
return
}
@ -252,46 +244,67 @@ func getSSHSessions(config SSHSessionConfig) (res []byte, err error) {
users []host.UserStat
processes []*process.Process
)
processes, err = process.Processes()
if err != nil {
return
}
users, err = host.Users()
if err != nil {
res, err = json.Marshal(result)
return
}
usersByHost := make(map[string][]host.UserStat)
for _, user := range users {
if user.Host == "" {
continue
}
if config.LoginUser != "" && !strings.Contains(user.User, config.LoginUser) {
continue
}
if config.LoginIP != "" && !strings.Contains(user.Host, config.LoginIP) {
continue
}
usersByHost[user.Host] = append(usersByHost[user.Host], user)
}
if len(usersByHost) == 0 {
res, err = json.Marshal(result)
return
}
processes, err = process.Processes()
if err != nil {
return
}
for _, proc := range processes {
name, _ := proc.Name()
if name != "sshd" || proc.Pid == 0 {
continue
}
connections, _ := proc.Connections()
if len(connections) == 0 {
continue
}
cmdline, cmdErr := proc.Cmdline()
if cmdErr != nil {
continue
}
for _, conn := range connections {
for _, user := range users {
if user.Host == "" {
continue
}
if conn.Raddr.IP == user.Host {
if config.LoginUser != "" && !strings.Contains(user.User, config.LoginUser) {
continue
}
if config.LoginIP != "" && !strings.Contains(user.Host, config.LoginIP) {
continue
}
if terminal, err := proc.Cmdline(); err == nil {
if strings.Contains(terminal, user.Terminal) {
session := sshSession{
Username: user.User,
Host: user.Host,
Terminal: user.Terminal,
PID: proc.Pid,
}
t := time.Unix(int64(user.Started), 0)
session.LoginTime = t.Format("2006-1-2 15:04:05")
result = append(result, session)
}
}
matchedUsers, exists := usersByHost[conn.Raddr.IP]
if !exists {
continue
}
for _, user := range matchedUsers {
if strings.Contains(cmdline, user.Terminal) {
t := time.Unix(int64(user.Started), 0)
result = append(result, sshSession{
Username: user.User,
Host: user.Host,
Terminal: user.Terminal,
PID: proc.Pid,
LoginTime: t.Format("2006-1-2 15:04:05"),
})
}
}
}