mirror of
https://github.com/1Panel-dev/1Panel.git
synced 2025-10-16 18:36:21 +08:00
fix: terminate Docker logs process when client disconnects (#9091)
* Update container.go * Update container.go
This commit is contained in:
parent
8f7b026f0e
commit
5e549fdb71
1 changed files with 108 additions and 76 deletions
|
@ -882,16 +882,23 @@ func (u *ContainerService) ContainerLogClean(req dto.OperationWithName) error {
|
||||||
func (u *ContainerService) StreamLogs(ctx *gin.Context, params dto.StreamLog) {
|
func (u *ContainerService) StreamLogs(ctx *gin.Context, params dto.StreamLog) {
|
||||||
messageChan := make(chan string, 1024)
|
messageChan := make(chan string, 1024)
|
||||||
errorChan := make(chan error, 1)
|
errorChan := make(chan error, 1)
|
||||||
|
doneChan := make(chan struct{})
|
||||||
|
|
||||||
go collectLogs(params, messageChan, errorChan)
|
// 监听客户端连接状态
|
||||||
|
go func() {
|
||||||
|
<-ctx.Request.Context().Done()
|
||||||
|
close(doneChan) // 通知 collectLogs 停止
|
||||||
|
}()
|
||||||
|
// 启动日志收集协程
|
||||||
|
go collectLogs(doneChan, params, messageChan, errorChan)
|
||||||
|
// 流式发送日志到客户端
|
||||||
ctx.Stream(func(w io.Writer) bool {
|
ctx.Stream(func(w io.Writer) bool {
|
||||||
select {
|
select {
|
||||||
case msg, ok := <-messageChan:
|
case msg, ok := <-messageChan:
|
||||||
if !ok {
|
if !ok {
|
||||||
return msg == ""
|
return false
|
||||||
}
|
}
|
||||||
_, err := fmt.Fprintf(w, "data: %v\n\n", msg)
|
_, err := fmt.Fprintf(w, "data: %s\n\n", msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
@ -900,17 +907,19 @@ func (u *ContainerService) StreamLogs(ctx *gin.Context, params dto.StreamLog) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_, _ = fmt.Fprintf(w, "event: error\ndata: %v\n\n", err.Error())
|
_, _ = fmt.Fprintf(w, "event: error\ndata: %v\n\n", err.Error())
|
||||||
}
|
}
|
||||||
return true
|
|
||||||
case <-ctx.Request.Context().Done():
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func collectLogs(params dto.StreamLog, messageChan chan<- string, errorChan chan<- error) {
|
func collectLogs(
|
||||||
|
done <-chan struct{},
|
||||||
|
params dto.StreamLog,
|
||||||
|
messageChan chan<- string,
|
||||||
|
errorChan chan<- error,
|
||||||
|
) {
|
||||||
defer close(messageChan)
|
defer close(messageChan)
|
||||||
defer close(errorChan)
|
defer close(errorChan)
|
||||||
|
|
||||||
var cmdArgs []string
|
var cmdArgs []string
|
||||||
if params.Type == "compose" {
|
if params.Type == "compose" {
|
||||||
cmdArgs = []string{"compose", "-f", params.Compose}
|
cmdArgs = []string{"compose", "-f", params.Compose}
|
||||||
|
@ -928,6 +937,7 @@ func collectLogs(params dto.StreamLog, messageChan chan<- string, errorChan chan
|
||||||
if params.Container != "" {
|
if params.Container != "" {
|
||||||
cmdArgs = append(cmdArgs, params.Container)
|
cmdArgs = append(cmdArgs, params.Container)
|
||||||
}
|
}
|
||||||
|
|
||||||
dockerCmd := exec.Command("docker", cmdArgs...)
|
dockerCmd := exec.Command("docker", cmdArgs...)
|
||||||
|
|
||||||
stdout, err := dockerCmd.StdoutPipe()
|
stdout, err := dockerCmd.StdoutPipe()
|
||||||
|
@ -935,29 +945,51 @@ func collectLogs(params dto.StreamLog, messageChan chan<- string, errorChan chan
|
||||||
errorChan <- fmt.Errorf("failed to get stdout pipe: %v", err)
|
errorChan <- fmt.Errorf("failed to get stdout pipe: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
dockerCmd.Stderr = dockerCmd.Stdout
|
dockerCmd.Stderr = dockerCmd.Stdout
|
||||||
|
|
||||||
if err = dockerCmd.Start(); err != nil {
|
if err = dockerCmd.Start(); err != nil {
|
||||||
errorChan <- fmt.Errorf("failed to start command: %v", err)
|
errorChan <- fmt.Errorf("failed to start docker logs command: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 确保在函数退出时清理进程
|
||||||
|
defer func() {
|
||||||
|
if dockerCmd.Process != nil {
|
||||||
|
_ = dockerCmd.Process.Kill()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// 创建一个scanner来读取stdout
|
||||||
scanner := bufio.NewScanner(stdout)
|
scanner := bufio.NewScanner(stdout)
|
||||||
|
|
||||||
|
// 启动一个goroutine监听done信号
|
||||||
|
processKilled := false
|
||||||
|
go func() {
|
||||||
|
<-done
|
||||||
|
if !processKilled && dockerCmd.Process != nil {
|
||||||
|
processKilled = true
|
||||||
|
_ = dockerCmd.Process.Kill()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// 读取日志输出
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
message := scanner.Text()
|
message := scanner.Text()
|
||||||
select {
|
select {
|
||||||
case messageChan <- message:
|
case messageChan <- message:
|
||||||
case <-time.After(5 * time.Second):
|
// 消息发送成功
|
||||||
errorChan <- fmt.Errorf("message channel blocked")
|
case <-done:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err = scanner.Err(); err != nil {
|
|
||||||
|
if err = scanner.Err(); err != nil && err != io.EOF {
|
||||||
errorChan <- fmt.Errorf("scanner error: %v", err)
|
errorChan <- fmt.Errorf("scanner error: %v", err)
|
||||||
return
|
|
||||||
}
|
|
||||||
if err = dockerCmd.Wait(); err != nil {
|
|
||||||
errorChan <- fmt.Errorf("%v", err)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 等待命令完成
|
||||||
|
_ = dockerCmd.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *ContainerService) DownloadContainerLogs(containerType, container, since, tail string, c *gin.Context) error {
|
func (u *ContainerService) DownloadContainerLogs(containerType, container, since, tail string, c *gin.Context) error {
|
||||||
|
|
Loading…
Add table
Reference in a new issue