mirror of
https://github.com/1Panel-dev/1Panel.git
synced 2025-09-05 06:04:35 +08:00
fix: Optimize duplicate execution prevention for cronjob
This commit is contained in:
parent
5a0c50b9a2
commit
0c5abd3dc6
5 changed files with 30 additions and 12 deletions
|
@ -4,8 +4,6 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
// BackupAccounts ---> SourceAccountIDs
|
||||
// BackupAccounts ---> DownloadAccountID
|
||||
type Cronjob struct {
|
||||
BaseModel
|
||||
|
||||
|
@ -40,10 +38,11 @@ type Cronjob struct {
|
|||
IgnoreErr bool `json:"ignoreErr"`
|
||||
RetainCopies uint64 `json:"retainCopies"`
|
||||
|
||||
Status string `json:"status"`
|
||||
EntryIDs string `json:"entryIDs"`
|
||||
Records []JobRecords `json:"records"`
|
||||
Secret string `json:"secret"`
|
||||
IsExecuting bool `json:"isExecuting"`
|
||||
Status string `json:"status"`
|
||||
EntryIDs string `json:"entryIDs"`
|
||||
Records []JobRecords `json:"records"`
|
||||
Secret string `json:"secret"`
|
||||
}
|
||||
|
||||
type JobRecords struct {
|
||||
|
|
|
@ -29,9 +29,10 @@ type ICronjobRepo interface {
|
|||
Update(id uint, vars map[string]interface{}) error
|
||||
Delete(opts ...DBOption) error
|
||||
DeleteRecord(opts ...DBOption) error
|
||||
StartRecords(cronjobID uint, targetPath, cronjobType string) model.JobRecords
|
||||
StartRecords(cronjobID uint) model.JobRecords
|
||||
UpdateRecords(id uint, vars map[string]interface{}) error
|
||||
EndRecords(record model.JobRecords, status, message, records string)
|
||||
AddFailedRecord(cronjobID uint)
|
||||
PageRecords(page, size int, opts ...DBOption) (int64, []model.JobRecords, error)
|
||||
}
|
||||
|
||||
|
@ -143,7 +144,7 @@ func (c *CronjobRepo) WithByRecordDropID(id int) DBOption {
|
|||
}
|
||||
}
|
||||
|
||||
func (u *CronjobRepo) StartRecords(cronjobID uint, targetPath, cronjobType string) model.JobRecords {
|
||||
func (u *CronjobRepo) StartRecords(cronjobID uint) model.JobRecords {
|
||||
var record model.JobRecords
|
||||
record.StartTime = time.Now()
|
||||
record.CronjobID = cronjobID
|
||||
|
@ -152,6 +153,7 @@ func (u *CronjobRepo) StartRecords(cronjobID uint, targetPath, cronjobType strin
|
|||
if err := global.DB.Create(&record).Error; err != nil {
|
||||
global.LOG.Errorf("create record status failed, err: %v", err)
|
||||
}
|
||||
_ = u.Update(cronjobID, map[string]interface{}{"is_executing": true})
|
||||
return record
|
||||
}
|
||||
func (u *CronjobRepo) EndRecords(record model.JobRecords, status, message, records string) {
|
||||
|
@ -165,6 +167,17 @@ func (u *CronjobRepo) EndRecords(record model.JobRecords, status, message, recor
|
|||
if err := global.DB.Model(&model.JobRecords{}).Where("id = ?", record.ID).Updates(errMap).Error; err != nil {
|
||||
global.LOG.Errorf("update record status failed, err: %v", err)
|
||||
}
|
||||
_ = u.Update(record.CronjobID, map[string]interface{}{"is_executing": false})
|
||||
}
|
||||
func (u *CronjobRepo) AddFailedRecord(cronjobID uint) {
|
||||
var record model.JobRecords
|
||||
record.StartTime = time.Now()
|
||||
record.CronjobID = cronjobID
|
||||
record.Status = constant.StatusFailed
|
||||
record.Message = "The current cronjob is being executed"
|
||||
if err := global.DB.Create(&record).Error; err != nil {
|
||||
global.LOG.Errorf("create record status failed, err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (u *CronjobRepo) Save(id uint, cronjob model.Cronjob) error {
|
||||
|
|
|
@ -25,7 +25,12 @@ import (
|
|||
)
|
||||
|
||||
func (u *CronjobService) HandleJob(cronjob *model.Cronjob) {
|
||||
record := cronjobRepo.StartRecords(cronjob.ID, "", cronjob.Type)
|
||||
cronjobItem, _ := cronjobRepo.Get(repo.WithByID(cronjob.ID))
|
||||
if cronjobItem.IsExecuting {
|
||||
cronjobRepo.AddFailedRecord(cronjob.ID)
|
||||
return
|
||||
}
|
||||
record := cronjobRepo.StartRecords(cronjob.ID)
|
||||
taskItem, err := task.NewTaskWithOps(fmt.Sprintf("cronjob-%s", cronjob.Name), task.TaskHandle, task.TaskScopeCronjob, record.TaskID, cronjob.ID)
|
||||
if err != nil {
|
||||
global.LOG.Errorf("new task for exec shell failed, err: %v", err)
|
||||
|
|
|
@ -1,16 +1,16 @@
|
|||
package hook
|
||||
|
||||
import (
|
||||
"github.com/1Panel-dev/1Panel/agent/app/service"
|
||||
"github.com/1Panel-dev/1Panel/agent/utils/alert_push"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/1Panel-dev/1Panel/agent/app/dto"
|
||||
"github.com/1Panel-dev/1Panel/agent/app/model"
|
||||
"github.com/1Panel-dev/1Panel/agent/app/repo"
|
||||
"github.com/1Panel-dev/1Panel/agent/app/service"
|
||||
"github.com/1Panel-dev/1Panel/agent/constant"
|
||||
"github.com/1Panel-dev/1Panel/agent/global"
|
||||
"github.com/1Panel-dev/1Panel/agent/utils/alert_push"
|
||||
"github.com/1Panel-dev/1Panel/agent/utils/cmd"
|
||||
"github.com/1Panel-dev/1Panel/agent/utils/xpack"
|
||||
)
|
||||
|
@ -68,6 +68,7 @@ func handleSnapStatus() {
|
|||
|
||||
func handleCronjobStatus() {
|
||||
var jobRecords []model.JobRecords
|
||||
_ = global.DB.Model(&model.Cronjob{}).Where("is_executing = ?", true).Updates(map[string]interface{}{"is_executing": false}).Error
|
||||
_ = global.DB.Where("status = ?", constant.StatusWaiting).Find(&jobRecords).Error
|
||||
for _, record := range jobRecords {
|
||||
err := global.DB.Model(&model.JobRecords{}).Where("status = ?", constant.StatusWaiting).
|
||||
|
|
|
@ -19,7 +19,7 @@ import (
|
|||
)
|
||||
|
||||
var AddTable = &gormigrate.Migration{
|
||||
ID: "20250805-add-table",
|
||||
ID: "20250828-add-table",
|
||||
Migrate: func(tx *gorm.DB) error {
|
||||
return tx.AutoMigrate(
|
||||
&model.AppDetail{},
|
||||
|
|
Loading…
Add table
Reference in a new issue