From 8b22acce2776ebc959d0b9b07e9fb2a2d3569fd8 Mon Sep 17 00:00:00 2001 From: CityFun <31820853+zhengkunwang223@users.noreply.github.com> Date: Wed, 18 Jun 2025 15:14:56 +0800 Subject: [PATCH] fix: fix issue with open container log failed (#9163) --- agent/app/service/container.go | 202 ++++++++---------- .../src/components/log/container/index.vue | 4 +- 2 files changed, 97 insertions(+), 109 deletions(-) diff --git a/agent/app/service/container.go b/agent/app/service/container.go index 9b2bd91de..d9c142d45 100644 --- a/agent/app/service/container.go +++ b/agent/app/service/container.go @@ -880,116 +880,102 @@ func (u *ContainerService) ContainerLogClean(req dto.OperationWithName) error { } func (u *ContainerService) StreamLogs(ctx *gin.Context, params dto.StreamLog) { - messageChan := make(chan string, 1024) - errorChan := make(chan error, 1) - doneChan := make(chan struct{}) - - // 监听客户端连接状态 - go func() { - <-ctx.Request.Context().Done() - close(doneChan) // 通知 collectLogs 停止 - }() - // 启动日志收集协程 - go collectLogs(doneChan, params, messageChan, errorChan) - // 流式发送日志到客户端 - ctx.Stream(func(w io.Writer) bool { - select { - case msg, ok := <-messageChan: - if !ok { - return false - } - _, err := fmt.Fprintf(w, "data: %s\n\n", msg) - if err != nil { - return false - } - return true - case err := <-errorChan: - if err != nil { - _, _ = fmt.Fprintf(w, "event: error\ndata: %v\n\n", err.Error()) - } - return false - } - }) + messageChan := make(chan string, 1024) + errorChan := make(chan error, 1) + doneChan := make(chan struct{}) + + go func() { + <-ctx.Request.Context().Done() + close(doneChan) + }() + go collectLogs(doneChan, params, messageChan, errorChan) + ctx.Stream(func(w io.Writer) bool { + select { + case msg, ok := <-messageChan: + if !ok { + return false + } + _, err := fmt.Fprintf(w, "data: %s\n\n", msg) + if err != nil { + return false + } + return true + case err := <-errorChan: + if err != nil { + _, _ = fmt.Fprintf(w, "event: error\ndata: %v\n\n", err.Error()) + } + return false + } + }) } -func collectLogs( - done <-chan struct{}, - params dto.StreamLog, - messageChan chan<- string, - errorChan chan<- error, -) { - defer close(messageChan) - defer close(errorChan) - var cmdArgs []string - if params.Type == "compose" { - cmdArgs = []string{"compose", "-f", params.Compose} - } - cmdArgs = append(cmdArgs, "logs") - if params.Follow { - cmdArgs = append(cmdArgs, "-f") - } - if params.Tail != "0" { - cmdArgs = append(cmdArgs, "--tail", params.Tail) - } - if params.Since != "all" { - cmdArgs = append(cmdArgs, "--since", params.Since) - } - if params.Container != "" { - cmdArgs = append(cmdArgs, params.Container) - } - - dockerCmd := exec.Command("docker", cmdArgs...) - - stdout, err := dockerCmd.StdoutPipe() - if err != nil { - errorChan <- fmt.Errorf("failed to get stdout pipe: %v", err) - return - } - - dockerCmd.Stderr = dockerCmd.Stdout - - if err = dockerCmd.Start(); err != nil { - errorChan <- fmt.Errorf("failed to start docker logs command: %v", err) - return - } - - // 确保在函数退出时清理进程 - defer func() { - if dockerCmd.Process != nil { - _ = dockerCmd.Process.Kill() - } - }() - - // 创建一个scanner来读取stdout - scanner := bufio.NewScanner(stdout) - - // 启动一个goroutine监听done信号 - processKilled := false - go func() { - <-done - if !processKilled && dockerCmd.Process != nil { - processKilled = true - _ = dockerCmd.Process.Kill() - } - }() - - // 读取日志输出 - for scanner.Scan() { - message := scanner.Text() - select { - case messageChan <- message: - // 消息发送成功 - case <-done: - return - } - } - - if err = scanner.Err(); err != nil && err != io.EOF { - errorChan <- fmt.Errorf("scanner error: %v", err) - } - - // 等待命令完成 - _ = dockerCmd.Wait() +func collectLogs(done <-chan struct{}, params dto.StreamLog, messageChan chan<- string, errorChan chan<- error) { + defer close(messageChan) + defer close(errorChan) + var cmdArgs []string + if params.Type == "compose" { + cmdArgs = []string{"compose", "-f", params.Compose} + } + cmdArgs = append(cmdArgs, "logs") + if params.Follow { + cmdArgs = append(cmdArgs, "-f") + } + if params.Tail != "0" { + cmdArgs = append(cmdArgs, "--tail", params.Tail) + } + if params.Since != "all" { + cmdArgs = append(cmdArgs, "--since", params.Since) + } + if params.Container != "" { + cmdArgs = append(cmdArgs, params.Container) + } + + dockerCmd := exec.Command("docker", cmdArgs...) + + stdout, err := dockerCmd.StdoutPipe() + if err != nil { + errorChan <- fmt.Errorf("failed to get stdout pipe: %v", err) + return + } + + dockerCmd.Stderr = dockerCmd.Stdout + + if err = dockerCmd.Start(); err != nil { + errorChan <- fmt.Errorf("failed to start docker logs command: %v", err) + return + } + + defer func() { + if dockerCmd.Process != nil { + _ = dockerCmd.Process.Kill() + } + }() + + scanner := bufio.NewScanner(stdout) + + processKilled := false + go func() { + <-done + if !processKilled && dockerCmd.Process != nil { + processKilled = true + _ = dockerCmd.Process.Kill() + } + }() + + for scanner.Scan() { + message := scanner.Text() + select { + case messageChan <- message: + case <-done: + return + } + } + + if err = scanner.Err(); err != nil && err != io.EOF { + errorChan <- fmt.Errorf("scanner error: %v", err) + } + + _ = dockerCmd.Wait() } func (u *ContainerService) DownloadContainerLogs(containerType, container, since, tail string, c *gin.Context) error { diff --git a/frontend/src/components/log/container/index.vue b/frontend/src/components/log/container/index.vue index 8dfaa8614..d589a1df5 100644 --- a/frontend/src/components/log/container/index.vue +++ b/frontend/src/components/log/container/index.vue @@ -153,7 +153,9 @@ const searchLogs = async () => { }; eventSource.onerror = (event: MessageEvent) => { stopListening(); - MsgError(event.data); + if (event.data && event.data != '') { + MsgError(event.data); + } }; };