teldrive/internal/cron/cron.go

218 lines
5.3 KiB
Go
Raw Normal View History

2023-08-16 20:48:32 +08:00
package cron
import (
"context"
2023-09-20 03:20:44 +08:00
"database/sql/driver"
"encoding/json"
"strconv"
2023-11-01 01:46:52 +08:00
"time"
2023-08-16 20:48:32 +08:00
2023-12-05 20:53:54 +08:00
"github.com/divyam234/teldrive/config"
2023-12-03 03:47:23 +08:00
"github.com/divyam234/teldrive/internal/tgc"
"github.com/divyam234/teldrive/pkg/database"
"github.com/divyam234/teldrive/pkg/models"
"github.com/divyam234/teldrive/pkg/services"
2023-12-03 14:52:25 +08:00
"github.com/go-co-op/gocron"
2023-08-16 20:48:32 +08:00
"github.com/gotd/td/tg"
"go.uber.org/zap"
2023-08-16 20:48:32 +08:00
)
2023-09-20 03:20:44 +08:00
type Files []File
type File struct {
ID string `json:"id"`
Parts []models.Part `json:"parts"`
}
func (a Files) Value() (driver.Value, error) {
return json.Marshal(a)
}
func (a *Files) Scan(value interface{}) error {
if err := json.Unmarshal(value.([]byte), &a); err != nil {
return err
}
return nil
}
2023-12-05 20:53:54 +08:00
type UpParts []int
2023-11-01 01:46:52 +08:00
2023-12-05 20:53:54 +08:00
func (a UpParts) Value() (driver.Value, error) {
2023-11-01 01:46:52 +08:00
return json.Marshal(a)
}
2023-12-05 20:53:54 +08:00
func (a *UpParts) Scan(value interface{}) error {
2023-11-01 01:46:52 +08:00
if err := json.Unmarshal(value.([]byte), &a); err != nil {
return err
}
return nil
}
2023-08-16 20:48:32 +08:00
type Result struct {
2023-09-20 03:20:44 +08:00
Files Files
2023-11-02 21:51:30 +08:00
Session string
2023-08-24 02:40:40 +08:00
UserId int64
2023-08-16 20:48:32 +08:00
ChannelId int64
}
2023-11-01 01:46:52 +08:00
type UploadResult struct {
2023-12-05 20:53:54 +08:00
Parts UpParts
2023-11-02 21:51:30 +08:00
Session string
2023-11-01 01:46:52 +08:00
UserId int64
ChannelId int64
}
func deleteTGMessages(ctx context.Context, logger *zap.Logger, result Result) error {
2023-08-16 20:48:32 +08:00
2023-09-20 03:20:44 +08:00
db := database.DB
2023-08-16 20:48:32 +08:00
2023-11-25 12:31:29 +08:00
client, err := tgc.UserLogin(ctx, result.Session)
2023-08-16 20:48:32 +08:00
if err != nil {
return err
}
2023-09-20 03:20:44 +08:00
ids := []int{}
2023-08-16 20:48:32 +08:00
2023-09-20 03:20:44 +08:00
fileIds := []string{}
2023-08-16 20:48:32 +08:00
2023-09-20 03:20:44 +08:00
for _, file := range result.Files {
fileIds = append(fileIds, file.ID)
for _, part := range file.Parts {
ids = append(ids, int(part.ID))
}
}
err = tgc.RunWithAuth(ctx, logger, client, "", func(ctx context.Context) error {
2023-09-20 03:20:44 +08:00
channel, err := services.GetChannelById(ctx, client, result.ChannelId, strconv.FormatInt(result.UserId, 10))
if err != nil {
return err
}
messageDeleteRequest := tg.ChannelsDeleteMessagesRequest{Channel: channel, ID: ids}
_, err = client.API().ChannelsDeleteMessages(ctx, &messageDeleteRequest)
if err != nil {
return err
}
return nil
})
if err == nil {
db.Where("id = any($1)", fileIds).Delete(&models.File{})
2023-08-16 20:48:32 +08:00
}
2023-09-20 03:20:44 +08:00
return err
2023-08-16 20:48:32 +08:00
}
func cleanUploadsMessages(ctx context.Context, logger *zap.Logger, result UploadResult) error {
2023-11-01 01:46:52 +08:00
db := database.DB
2023-11-25 12:31:29 +08:00
client, err := tgc.UserLogin(ctx, result.Session)
2023-11-01 01:46:52 +08:00
if err != nil {
return err
}
err = tgc.RunWithAuth(ctx, logger, client, "", func(ctx context.Context) error {
2023-11-01 01:46:52 +08:00
channel, err := services.GetChannelById(ctx, client, result.ChannelId, strconv.FormatInt(result.UserId, 10))
if err != nil {
return err
}
2023-12-05 20:53:54 +08:00
messageDeleteRequest := tg.ChannelsDeleteMessagesRequest{Channel: channel, ID: result.Parts}
2023-11-01 01:46:52 +08:00
_, err = client.API().ChannelsDeleteMessages(ctx, &messageDeleteRequest)
if err != nil {
return err
}
return nil
})
2023-12-05 20:53:54 +08:00
parts := []int{}
for _, id := range result.Parts {
parts = append(parts, id)
}
2023-11-01 01:46:52 +08:00
if err == nil {
2023-12-05 20:53:54 +08:00
db.Where("part_id = any($1)", parts).Delete(&models.Upload{})
2023-11-01 01:46:52 +08:00
}
return err
2023-11-01 01:46:52 +08:00
}
func filesDeleteJob(logger *zap.Logger) {
2023-08-16 20:48:32 +08:00
db := database.DB
2023-09-20 03:20:44 +08:00
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
2023-08-16 20:48:32 +08:00
var results []Result
2023-09-20 03:20:44 +08:00
if err := db.Model(&models.File{}).
2023-11-02 21:51:30 +08:00
Select("JSONB_AGG(jsonb_build_object('id',files.id, 'parts',files.parts)) as files", "files.channel_id", "files.user_id", "s.session").
2023-08-16 20:48:32 +08:00
Joins("left join teldrive.users as u on u.user_id = files.user_id").
2023-11-02 21:51:30 +08:00
Joins("left join (select * from teldrive.sessions order by created_at desc limit 1) as s on u.user_id = s.user_id").
2023-09-20 03:20:44 +08:00
Where("type = ?", "file").
Where("status = ?", "pending_deletion").
2023-11-02 21:51:30 +08:00
Group("files.channel_id").Group("files.user_id").Group("s.session").
2023-09-20 03:20:44 +08:00
Scan(&results).Error; err != nil {
2023-08-16 20:48:32 +08:00
return
}
2023-09-20 03:20:44 +08:00
for _, row := range results {
err := deleteTGMessages(ctx, logger, row)
if err != nil {
logger.Error("failed to clean pending files", zap.Int64("user", row.UserId), zap.Error(err))
}
logger.Info("cleaned pending files", zap.Int64("user", row.UserId),
zap.Int64("channel", row.ChannelId))
2023-08-16 20:48:32 +08:00
}
}
2023-11-01 01:46:52 +08:00
func uploadCleanJob(logger *zap.Logger) {
2023-11-01 01:46:52 +08:00
db := database.DB
ctx, cancel := context.WithCancel(context.Background())
2023-12-05 20:53:54 +08:00
c := config.GetConfig()
2023-11-01 01:46:52 +08:00
defer cancel()
var upResults []UploadResult
if err := db.Model(&models.Upload{}).
2023-12-05 20:53:54 +08:00
Select("JSONB_AGG(uploads.part_id) as parts", "uploads.channel_id", "uploads.user_id", "s.session").
2023-11-01 01:46:52 +08:00
Joins("left join teldrive.users as u on u.user_id = uploads.user_id").
2023-11-02 21:51:30 +08:00
Joins("left join (select * from teldrive.sessions order by created_at desc limit 1) as s on s.user_id = uploads.user_id").
2023-12-05 20:53:54 +08:00
Where("uploads.created_at < ?", time.Now().UTC().AddDate(0, 0, -c.UploadRetention)).
2023-11-02 21:51:30 +08:00
Group("uploads.channel_id").Group("uploads.user_id").Group("s.session").
2023-11-01 01:46:52 +08:00
Scan(&upResults).Error; err != nil {
return
}
for _, row := range upResults {
err := cleanUploadsMessages(ctx, logger, row)
if err != nil {
logger.Error("failed to clean orpahan file parts", zap.Int64("user", row.UserId), zap.Error(err))
}
logger.Info("cleaned orpahan file parts", zap.Int64("user", row.UserId),
zap.Int64("channel", row.ChannelId))
2023-11-01 01:46:52 +08:00
}
}
2023-12-03 14:52:25 +08:00
func folderSizeUpdate(logger *zap.Logger) {
2023-12-03 14:52:25 +08:00
database.DB.Exec("call teldrive.update_size();")
2024-01-20 18:15:16 +08:00
logger.Info("updated folder sizes")
2023-12-03 14:52:25 +08:00
}
func StartCronJobs(logger *zap.Logger) {
2023-12-03 14:52:25 +08:00
scheduler := gocron.NewScheduler(time.UTC)
scheduler.Every(1).Hour().Do(filesDeleteJob, logger)
2023-12-03 14:52:25 +08:00
scheduler.Every(12).Hour().Do(uploadCleanJob, logger)
2023-12-03 14:52:25 +08:00
scheduler.Every(2).Hour().Do(folderSizeUpdate, logger)
2023-12-03 14:52:25 +08:00
scheduler.StartAsync()
}