2024-02-12 04:52:41 +08:00
|
|
|
package cron
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/divyam234/teldrive/internal/config"
|
2024-06-03 01:41:18 +08:00
|
|
|
"github.com/divyam234/teldrive/internal/logging"
|
2024-02-12 04:52:41 +08:00
|
|
|
"github.com/divyam234/teldrive/pkg/models"
|
2024-06-04 16:46:03 +08:00
|
|
|
"github.com/divyam234/teldrive/pkg/schemas"
|
2024-02-12 04:52:41 +08:00
|
|
|
"github.com/divyam234/teldrive/pkg/services"
|
|
|
|
"github.com/go-co-op/gocron"
|
|
|
|
"go.uber.org/zap"
|
2024-06-04 16:46:03 +08:00
|
|
|
"gorm.io/datatypes"
|
2024-02-12 04:52:41 +08:00
|
|
|
"gorm.io/gorm"
|
|
|
|
)
|
|
|
|
|
|
|
|
type File struct {
|
2024-06-04 16:46:03 +08:00
|
|
|
ID string `json:"id"`
|
|
|
|
Parts []schemas.Part `json:"parts"`
|
2024-02-12 04:52:41 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
type Result struct {
|
2024-06-04 16:46:03 +08:00
|
|
|
Files datatypes.JSONSlice[File]
|
2024-02-12 04:52:41 +08:00
|
|
|
Session string
|
|
|
|
UserId int64
|
|
|
|
ChannelId int64
|
|
|
|
}
|
|
|
|
|
|
|
|
type UploadResult struct {
|
2024-06-04 16:46:03 +08:00
|
|
|
Parts datatypes.JSONSlice[int]
|
2024-02-12 04:52:41 +08:00
|
|
|
Session string
|
|
|
|
UserId int64
|
|
|
|
ChannelId int64
|
|
|
|
}
|
|
|
|
|
|
|
|
type CronService struct {
|
|
|
|
db *gorm.DB
|
|
|
|
cnf *config.Config
|
|
|
|
logger *zap.SugaredLogger
|
|
|
|
}
|
|
|
|
|
|
|
|
func StartCronJobs(db *gorm.DB, cnf *config.Config) {
|
|
|
|
scheduler := gocron.NewScheduler(time.UTC)
|
|
|
|
|
2024-02-13 21:18:22 +08:00
|
|
|
ctx := context.Background()
|
|
|
|
|
2024-02-12 04:52:41 +08:00
|
|
|
cron := CronService{db: db, cnf: cnf, logger: logging.DefaultLogger()}
|
|
|
|
|
2024-02-13 21:18:22 +08:00
|
|
|
scheduler.Every(1).Hour().Do(cron.CleanFiles, ctx)
|
2024-02-12 04:52:41 +08:00
|
|
|
|
|
|
|
scheduler.Every(2).Hour().Do(cron.UpdateFolderSize)
|
|
|
|
|
2024-02-13 21:18:22 +08:00
|
|
|
scheduler.Every(12).Hour().Do(cron.CleanUploads, ctx)
|
2024-02-12 04:52:41 +08:00
|
|
|
|
|
|
|
scheduler.StartAsync()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *CronService) CleanFiles(ctx context.Context) {
|
|
|
|
|
|
|
|
var results []Result
|
|
|
|
if err := c.db.Model(&models.File{}).
|
|
|
|
Select("JSONB_AGG(jsonb_build_object('id',files.id, 'parts',files.parts)) as files", "files.channel_id", "files.user_id", "s.session").
|
|
|
|
Joins("left join teldrive.users as u on u.user_id = files.user_id").
|
|
|
|
Joins("left join (select * from teldrive.sessions order by created_at desc limit 1) as s on u.user_id = s.user_id").
|
|
|
|
Where("type = ?", "file").
|
|
|
|
Where("status = ?", "pending_deletion").
|
|
|
|
Group("files.channel_id").Group("files.user_id").Group("s.session").
|
|
|
|
Scan(&results).Error; err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, row := range results {
|
2024-02-20 02:43:57 +08:00
|
|
|
|
|
|
|
if row.Session == "" {
|
|
|
|
break
|
|
|
|
}
|
2024-02-12 04:52:41 +08:00
|
|
|
ids := []int{}
|
|
|
|
|
|
|
|
fileIds := []string{}
|
|
|
|
|
|
|
|
for _, file := range row.Files {
|
|
|
|
fileIds = append(fileIds, file.ID)
|
|
|
|
for _, part := range file.Parts {
|
|
|
|
ids = append(ids, int(part.ID))
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
2024-05-26 04:26:05 +08:00
|
|
|
err := services.DeleteTGMessages(ctx, &c.cnf.TG, row.Session, row.ChannelId, row.UserId, ids)
|
2024-06-11 00:16:41 +08:00
|
|
|
|
2024-02-12 04:52:41 +08:00
|
|
|
if err != nil {
|
|
|
|
c.logger.Errorw("failed to clean files", err)
|
2024-06-11 00:16:41 +08:00
|
|
|
return
|
2024-02-12 04:52:41 +08:00
|
|
|
}
|
2024-06-11 00:16:41 +08:00
|
|
|
|
|
|
|
c.db.Where("id = any($1)", fileIds).Delete(&models.File{})
|
|
|
|
|
2024-02-12 04:52:41 +08:00
|
|
|
c.logger.Infow("cleaned files", "user", row.UserId, "channel", row.ChannelId)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *CronService) CleanUploads(ctx context.Context) {
|
|
|
|
|
|
|
|
var upResults []UploadResult
|
|
|
|
if err := c.db.Model(&models.Upload{}).
|
|
|
|
Select("JSONB_AGG(uploads.part_id) as parts", "uploads.channel_id", "uploads.user_id", "s.session").
|
|
|
|
Joins("left join teldrive.users as u on u.user_id = uploads.user_id").
|
|
|
|
Joins("left join (select * from teldrive.sessions order by created_at desc limit 1) as s on s.user_id = uploads.user_id").
|
2024-05-31 22:58:32 +08:00
|
|
|
Where("uploads.created_at < ?", time.Now().UTC().Add(-c.cnf.TG.Uploads.Retention)).
|
2024-02-12 04:52:41 +08:00
|
|
|
Group("uploads.channel_id").Group("uploads.user_id").Group("s.session").
|
|
|
|
Scan(&upResults).Error; err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, result := range upResults {
|
2024-05-31 22:58:32 +08:00
|
|
|
|
2024-06-04 16:46:03 +08:00
|
|
|
if result.Session == "" && len(result.Parts) > 0 {
|
|
|
|
c.db.Where("part_id = any($1)", result.Parts).Delete(&models.Upload{})
|
2024-06-11 00:16:41 +08:00
|
|
|
return
|
2024-05-31 22:58:32 +08:00
|
|
|
}
|
|
|
|
err := services.DeleteTGMessages(ctx, &c.cnf.TG, result.Session, result.ChannelId, result.UserId, result.Parts)
|
2024-06-11 00:16:41 +08:00
|
|
|
if err != nil {
|
|
|
|
c.logger.Errorw("failed to delete messages", err)
|
|
|
|
return
|
2024-02-12 04:52:41 +08:00
|
|
|
}
|
2024-06-11 00:16:41 +08:00
|
|
|
c.db.Where("part_id = any($1)", result.Parts).Delete(&models.Upload{})
|
|
|
|
|
2024-02-12 04:52:41 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *CronService) UpdateFolderSize() {
|
|
|
|
c.db.Exec("call teldrive.update_size();")
|
|
|
|
}
|