From 5e549fdb71677a1d0942e6ec1ab5a8123b7cd51c Mon Sep 17 00:00:00 2001 From: Process Xie <530551426@qq.com> Date: Mon, 16 Jun 2025 16:07:13 +0800 Subject: [PATCH] fix: terminate Docker logs process when client disconnects (#9091) * Update container.go * Update container.go --- agent/app/service/container.go | 184 +++++++++++++++++++-------------- 1 file changed, 108 insertions(+), 76 deletions(-) diff --git a/agent/app/service/container.go b/agent/app/service/container.go index 5922fc8eb..9b2bd91de 100644 --- a/agent/app/service/container.go +++ b/agent/app/service/container.go @@ -880,84 +880,116 @@ func (u *ContainerService) ContainerLogClean(req dto.OperationWithName) error { } func (u *ContainerService) StreamLogs(ctx *gin.Context, params dto.StreamLog) { - messageChan := make(chan string, 1024) - errorChan := make(chan error, 1) - - go collectLogs(params, messageChan, errorChan) - - ctx.Stream(func(w io.Writer) bool { - select { - case msg, ok := <-messageChan: - if !ok { - return msg == "" - } - _, err := fmt.Fprintf(w, "data: %v\n\n", msg) - if err != nil { - return false - } - return true - case err := <-errorChan: - if err != nil { - _, _ = fmt.Fprintf(w, "event: error\ndata: %v\n\n", err.Error()) - } - return true - case <-ctx.Request.Context().Done(): - return false - } - }) + messageChan := make(chan string, 1024) + errorChan := make(chan error, 1) + doneChan := make(chan struct{}) + + // 监听客户端连接状态 + go func() { + <-ctx.Request.Context().Done() + close(doneChan) // 通知 collectLogs 停止 + }() + // 启动日志收集协程 + go collectLogs(doneChan, params, messageChan, errorChan) + // 流式发送日志到客户端 + ctx.Stream(func(w io.Writer) bool { + select { + case msg, ok := <-messageChan: + if !ok { + return false + } + _, err := fmt.Fprintf(w, "data: %s\n\n", msg) + if err != nil { + return false + } + return true + case err := <-errorChan: + if err != nil { + _, _ = fmt.Fprintf(w, "event: error\ndata: %v\n\n", err.Error()) + } + return false + } + }) } -func collectLogs(params dto.StreamLog, messageChan chan<- string, errorChan chan<- error) { - defer close(messageChan) - defer close(errorChan) - - var cmdArgs []string - if params.Type == "compose" { - cmdArgs = []string{"compose", "-f", params.Compose} - } - cmdArgs = append(cmdArgs, "logs") - if params.Follow { - cmdArgs = append(cmdArgs, "-f") - } - if params.Tail != "0" { - cmdArgs = append(cmdArgs, "--tail", params.Tail) - } - if params.Since != "all" { - cmdArgs = append(cmdArgs, "--since", params.Since) - } - if params.Container != "" { - cmdArgs = append(cmdArgs, params.Container) - } - dockerCmd := exec.Command("docker", cmdArgs...) - - stdout, err := dockerCmd.StdoutPipe() - if err != nil { - errorChan <- fmt.Errorf("failed to get stdout pipe: %v", err) - return - } - dockerCmd.Stderr = dockerCmd.Stdout - if err = dockerCmd.Start(); err != nil { - errorChan <- fmt.Errorf("failed to start command: %v", err) - return - } - scanner := bufio.NewScanner(stdout) - for scanner.Scan() { - message := scanner.Text() - select { - case messageChan <- message: - case <-time.After(5 * time.Second): - errorChan <- fmt.Errorf("message channel blocked") - return - } - } - if err = scanner.Err(); err != nil { - errorChan <- fmt.Errorf("scanner error: %v", err) - return - } - if err = dockerCmd.Wait(); err != nil { - errorChan <- fmt.Errorf("%v", err) - return - } +func collectLogs( + done <-chan struct{}, + params dto.StreamLog, + messageChan chan<- string, + errorChan chan<- error, +) { + defer close(messageChan) + defer close(errorChan) + var cmdArgs []string + if params.Type == "compose" { + cmdArgs = []string{"compose", "-f", params.Compose} + } + cmdArgs = append(cmdArgs, "logs") + if params.Follow { + cmdArgs = append(cmdArgs, "-f") + } + if params.Tail != "0" { + cmdArgs = append(cmdArgs, "--tail", params.Tail) + } + if params.Since != "all" { + cmdArgs = append(cmdArgs, "--since", params.Since) + } + if params.Container != "" { + cmdArgs = append(cmdArgs, params.Container) + } + + dockerCmd := exec.Command("docker", cmdArgs...) + + stdout, err := dockerCmd.StdoutPipe() + if err != nil { + errorChan <- fmt.Errorf("failed to get stdout pipe: %v", err) + return + } + + dockerCmd.Stderr = dockerCmd.Stdout + + if err = dockerCmd.Start(); err != nil { + errorChan <- fmt.Errorf("failed to start docker logs command: %v", err) + return + } + + // 确保在函数退出时清理进程 + defer func() { + if dockerCmd.Process != nil { + _ = dockerCmd.Process.Kill() + } + }() + + // 创建一个scanner来读取stdout + scanner := bufio.NewScanner(stdout) + + // 启动一个goroutine监听done信号 + processKilled := false + go func() { + <-done + if !processKilled && dockerCmd.Process != nil { + processKilled = true + _ = dockerCmd.Process.Kill() + } + }() + + // 读取日志输出 + for scanner.Scan() { + message := scanner.Text() + select { + case messageChan <- message: + // 消息发送成功 + case <-done: + return + } + } + + if err = scanner.Err(); err != nil && err != io.EOF { + errorChan <- fmt.Errorf("scanner error: %v", err) + } + + // 等待命令完成 + _ = dockerCmd.Wait() } func (u *ContainerService) DownloadContainerLogs(containerType, container, since, tail string, c *gin.Context) error {