From 2533234de8d829b10b3a8a39c28f04f0be278850 Mon Sep 17 00:00:00 2001 From: divyam234 Date: Thu, 2 Nov 2023 19:21:30 +0530 Subject: [PATCH] improve cache handling --- database/migrations/20231102165658_tables.sql | 20 +++++ go.mod | 2 + go.sum | 5 ++ main.go | 3 + models/session.model.go | 12 +++ routes/user.go | 11 --- services/auth.service.go | 24 +++--- services/common.go | 85 ++++++++++++++----- services/file.service.go | 39 +++------ services/user.service.go | 46 ++-------- utils/cache/main.go | 57 +++++++++++++ utils/cron/cron.go | 18 ++-- utils/tgc/workers.go | 6 +- 13 files changed, 204 insertions(+), 124 deletions(-) create mode 100644 database/migrations/20231102165658_tables.sql create mode 100644 models/session.model.go create mode 100644 utils/cache/main.go diff --git a/database/migrations/20231102165658_tables.sql b/database/migrations/20231102165658_tables.sql new file mode 100644 index 0000000..a527b0a --- /dev/null +++ b/database/migrations/20231102165658_tables.sql @@ -0,0 +1,20 @@ +-- +goose Up +-- +goose StatementBegin + +ALTER TABLE teldrive.users DROP COLUMN IF EXISTS tg_session; + +CREATE TABLE teldrive.sessions ( + session text NOT NULL, + user_id bigint NOT NULL, + hash text NOT NULL, + created_at timestamp null default timezone('utc'::text,now()), + PRIMARY KEY(session, hash), + FOREIGN KEY (user_id) REFERENCES teldrive.users(user_id) +); + +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +DROP TABLE IF EXISTS teldrive.sessions; +-- +goose StatementEnd diff --git a/go.mod b/go.mod index 6d59b51..70716c3 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/divyam234/teldrive go 1.21 require ( + github.com/coocood/freecache v1.2.4 github.com/divyam234/cors v1.4.2 github.com/gin-gonic/gin v1.9.1 github.com/go-co-op/gocron v1.35.3 @@ -24,6 +25,7 @@ require ( ) require ( + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/chenzhuoyu/iasm v0.9.0 // indirect github.com/google/uuid v1.4.0 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 1ee4c77..cb72288 100644 --- a/go.sum +++ b/go.sum @@ -4,12 +4,17 @@ github.com/bytedance/sonic v1.10.0 h1:qtNZduETEIWJVIyDl01BeNxur2rW9OwTQ/yBqFRkKE github.com/bytedance/sonic v1.10.0/go.mod h1:iZcSUejdk5aukTND/Eu/ivjQuEL0Cu9/rf50Hi0u/g4= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY= github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk= github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d h1:77cEq6EriyTZ0g/qfRdp61a3Uu/AWrgIq2s0ClJV1g0= github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d/go.mod h1:8EPpVsBuRksnlj1mLy4AWzRNQYxauNi62uWcE3to6eA= github.com/chenzhuoyu/iasm v0.9.0 h1:9fhXjVzq5hUy2gkhhgHl95zG2cEAhw9OSGs8toWWAwo= github.com/chenzhuoyu/iasm v0.9.0/go.mod h1:Xjy2NpN3h7aUqeqM+woSuuvxmIe6+DDsiNLIrkAmYog= +github.com/coocood/freecache v1.2.4 h1:UdR6Yz/X1HW4fZOuH0Z94KwG851GWOSknua5VUbb/5M= +github.com/coocood/freecache v1.2.4/go.mod h1:RBUWa/Cy+OHdfTGFEhEuE1pMCMX51Ncizj7rthiQ3vk= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= diff --git a/main.go b/main.go index 70ceb93..94fb522 100644 --- a/main.go +++ b/main.go @@ -12,6 +12,7 @@ import ( "github.com/divyam234/teldrive/utils" "github.com/divyam234/cors" + "github.com/divyam234/teldrive/utils/cache" "github.com/divyam234/teldrive/utils/cron" "github.com/gin-gonic/gin" "github.com/go-co-op/gocron" @@ -29,6 +30,8 @@ func main() { database.InitDB() + cache.InitCache() + scheduler := gocron.NewScheduler(time.UTC) scheduler.Every(1).Hour().Do(cron.FilesDeleteJob) diff --git a/models/session.model.go b/models/session.model.go new file mode 100644 index 0000000..af4241d --- /dev/null +++ b/models/session.model.go @@ -0,0 +1,12 @@ +package models + +import ( + "time" +) + +type Session struct { + UserId int64 `gorm:"type:bigint;primaryKey"` + Hash string `gorm:"type:text"` + Session string `gorm:"type:text"` + CreatedAt time.Time `gorm:"default:timezone('utc'::text, now())"` +} diff --git a/routes/user.go b/routes/user.go index 719ce5c..87137cf 100644 --- a/routes/user.go +++ b/routes/user.go @@ -96,15 +96,4 @@ func addUserRoutes(rg *gin.RouterGroup) { c.JSON(http.StatusOK, res) }) - - r.DELETE("/cache", func(c *gin.Context) { - res, err := userService.ClearCache(c) - - if err != nil { - c.AbortWithError(err.Code, err.Error) - return - } - - c.JSON(http.StatusOK, res) - }) } diff --git a/services/auth.service.go b/services/auth.service.go index 9ad6a11..344994e 100644 --- a/services/auth.service.go +++ b/services/auth.service.go @@ -16,13 +16,11 @@ import ( "strconv" "time" - "github.com/divyam234/teldrive/database" "github.com/divyam234/teldrive/models" "github.com/divyam234/teldrive/schemas" "github.com/divyam234/teldrive/types" "github.com/divyam234/teldrive/utils" "github.com/divyam234/teldrive/utils/auth" - "github.com/divyam234/teldrive/utils/kv" "github.com/divyam234/teldrive/utils/tgc" "github.com/gin-gonic/gin" "github.com/go-jose/go-jose/v3/jwt" @@ -145,9 +143,8 @@ func (as *AuthService) LogIn(c *gin.Context) (*schemas.Message, *types.AppError) IsPremium: session.IsPremium, } - tokenBytes, _ := json.Marshal(jwtClaims) - md5hash := md5.Sum(tokenBytes) - hexToken := hex.EncodeToString(md5hash[:]) + tokenhash := md5.Sum([]byte(session.Sesssion)) + hexToken := hex.EncodeToString(tokenhash[:]) jwtClaims.Hash = hexToken jweToken, err := auth.Encode(jwtClaims) @@ -168,7 +165,7 @@ func (as *AuthService) LogIn(c *gin.Context) (*schemas.Message, *types.AppError) if err := as.Db.Model(&models.User{}).Where("user_id = ?", session.UserID). Find(&result).Error; err != nil { - return nil, &types.AppError{Error: errors.New("failed to create or update user"), + return nil, &types.AppError{Error: errors.New("failed to find user"), Code: http.StatusInternalServerError} } if len(result) == 0 { @@ -192,17 +189,16 @@ func (as *AuthService) LogIn(c *gin.Context) (*schemas.Message, *types.AppError) return nil, &types.AppError{Error: errors.New("failed to create or update user"), Code: http.StatusInternalServerError} } - } else { - if err := as.Db.Model(&models.User{}).Where("user_id = ?", session.UserID). - Update("tg_session", session.Sesssion).Error; err != nil { - return nil, &types.AppError{Error: errors.New("failed to create or update user"), - Code: http.StatusInternalServerError} - } } setCookie(c, as.SessionCookieName, jweToken, as.SessionMaxAge) - database.KV.Set(kv.Key("sessions", hexToken), tokenBytes) + //create session + + if err := as.Db.Create(&models.Session{UserId: session.UserID, Hash: hexToken, Session: session.Sesssion}).Error; err != nil { + return nil, &types.AppError{Error: errors.New("failed to create user session"), + Code: http.StatusInternalServerError} + } return &schemas.Message{Status: true, Message: "login success"}, nil } @@ -254,7 +250,7 @@ func (as *AuthService) Logout(c *gin.Context) (*schemas.Message, *types.AppError }) setCookie(c, as.SessionCookieName, "", -1) - database.KV.Delete(kv.Key("sessions", jwtUser.Hash)) + as.Db.Where("session = ?", jwtUser.TgSession).Delete(&models.Session{}) return &schemas.Message{Status: true, Message: "logout success"}, nil } diff --git a/services/common.go b/services/common.go index f8a400e..15e19d0 100644 --- a/services/common.go +++ b/services/common.go @@ -12,7 +12,7 @@ import ( "github.com/divyam234/teldrive/schemas" "github.com/divyam234/teldrive/types" "github.com/divyam234/teldrive/utils" - "github.com/divyam234/teldrive/utils/kv" + "github.com/divyam234/teldrive/utils/cache" "github.com/divyam234/teldrive/utils/tgc" "github.com/gin-gonic/gin" "github.com/gotd/td/telegram" @@ -21,6 +21,12 @@ import ( "github.com/thoas/go-funk" ) +func CopyList[T any](list []T) []T { + newList := make([]T, len(list)) + copy(newList, list) + return newList +} + func getChunk(ctx context.Context, tgClient *telegram.Client, location tg.InputFileLocationClass, offset int64, limit int64) ([]byte, error) { req := &tg.UploadGetFileRequest{ @@ -84,6 +90,16 @@ func getBotInfo(ctx context.Context, token string) (*BotInfo, error) { func getParts(ctx context.Context, client *telegram.Client, file *schemas.FileOutFull, userID string) ([]types.Part, error) { + parts := []types.Part{} + + key := fmt.Sprintf("messages:%s:%s", file.ID, userID) + + err := cache.GetCache().Get(key, &parts) + + if err == nil { + return parts, nil + } + ids := funk.Map(*file.Parts, func(part models.Part) tg.InputMessageClass { return tg.InputMessageClass(&tg.InputMessageID{ID: int(part.ID)}) }) @@ -104,8 +120,6 @@ func getParts(ctx context.Context, client *telegram.Client, file *schemas.FileOu messages := res.(*tg.MessagesChannelMessages) - parts := []types.Part{} - for _, message := range messages.Messages { item := message.(*tg.Message) media := item.Media.(*tg.MessageMediaDocument) @@ -113,6 +127,7 @@ func getParts(ctx context.Context, client *telegram.Client, file *schemas.FileOu location := document.AsInputDocumentFileLocation() parts = append(parts, types.Part{Location: location, Start: 0, End: document.Size - 1, Size: document.Size}) } + cache.GetCache().Set(key, &parts, 3600) return parts, nil } @@ -124,7 +139,7 @@ func rangedParts(parts []types.Part, start, end int64) []types.Part { endPartNumber := int64(math.Ceil(float64(end) / float64(chunkSize))) - partsToDownload := parts[startPartNumber:endPartNumber] + partsToDownload := CopyList(parts[startPartNumber:endPartNumber]) partsToDownload[0].Start = start % chunkSize partsToDownload[len(partsToDownload)-1].End = end % chunkSize @@ -159,19 +174,21 @@ func GetDefaultChannel(ctx context.Context, userID int64) (int64, error) { var channelID int64 - key := kv.Key("users", strconv.FormatInt(userID, 10), "channel") + key := fmt.Sprintf("users:channel:%d", userID) - err := kv.GetValue(database.KV, key, &channelID) + err := cache.GetCache().Get(key, &channelID) - if err != nil { - var channelIds []int64 - database.DB.Model(&models.Channel{}).Where("user_id = ?", userID).Where("selected = ?", true). - Pluck("channel_id", &channelIds) + if err == nil { + return channelID, nil + } - if len(channelIds) == 1 { - channelID = channelIds[0] - kv.SetValue(database.KV, key, &channelID) - } + var channelIds []int64 + database.DB.Model(&models.Channel{}).Where("user_id = ?", userID).Where("selected = ?", true). + Pluck("channel_id", &channelIds) + + if len(channelIds) == 1 { + channelID = channelIds[0] + cache.GetCache().Set(key, channelID, 0) } if channelID == 0 { @@ -190,18 +207,42 @@ func GetBotsToken(ctx context.Context, userID int64) ([]string, error) { return nil, err } - key := kv.Key("users", strconv.FormatInt(userID, 10), strconv.FormatInt(channelId, 10), "bots") + key := fmt.Sprintf("users:bots:%d:%d", userID, channelId) - err = kv.GetValue(database.KV, key, &bots) + err = cache.GetCache().Get(key, &bots) - if err != nil { - if err := database.DB.Model(&models.Bot{}).Where("user_id = ?", userID). - Where("channel_id = ?", channelId).Pluck("token", &bots).Error; err != nil { - return nil, err - } - kv.SetValue(database.KV, key, &bots) + if err == nil { + return bots, nil } + if err := database.DB.Model(&models.Bot{}).Where("user_id = ?", userID). + Where("channel_id = ?", channelId).Pluck("token", &bots).Error; err != nil { + return nil, err + } + + cache.GetCache().Set(key, &bots, 0) return bots, nil } + +func GetSessionByHash(hash string) (*models.Session, error) { + + var session models.Session + + key := fmt.Sprintf("sessions:%s", hash) + + err := cache.GetCache().Get(key, &session) + + if err == nil { + return &session, nil + } + + if err := database.DB.Model(&models.Session{}).Where("hash = ?", hash).First(&session).Error; err != nil { + return nil, err + } + + cache.GetCache().Set(key, &session, 0) + + return &session, nil + +} diff --git a/services/file.service.go b/services/file.service.go index 2502a74..2de89df 100644 --- a/services/file.service.go +++ b/services/file.service.go @@ -3,7 +3,6 @@ package services import ( "context" "encoding/base64" - "encoding/json" "errors" "fmt" "io" @@ -11,12 +10,11 @@ import ( "strconv" "strings" - "github.com/divyam234/teldrive/database" "github.com/divyam234/teldrive/mapper" "github.com/divyam234/teldrive/models" "github.com/divyam234/teldrive/schemas" "github.com/divyam234/teldrive/utils" - "github.com/divyam234/teldrive/utils/kv" + "github.com/divyam234/teldrive/utils/cache" "github.com/divyam234/teldrive/utils/md5" "github.com/divyam234/teldrive/utils/reader" "github.com/divyam234/teldrive/utils/tgc" @@ -128,8 +126,9 @@ func (fs *FileService) UpdateFile(c *gin.Context) (*schemas.FileOut, *types.AppE file := mapper.MapFileToFileOut(files[0]) - key := kv.Key("files", fileID) - database.KV.Delete(key) + key := fmt.Sprintf("files:%s", fileID) + + cache.GetCache().Delete(key) return &file, nil @@ -337,34 +336,26 @@ func (fs *FileService) GetFileStream(c *gin.Context) { return } - data, err := database.KV.Get(kv.Key("sessions", authHash)) + session, err := GetSessionByHash(authHash) if err != nil { http.Error(w, "hash missing relogin", http.StatusBadRequest) return } - jwtUser := &types.JWTClaims{} - - err = json.Unmarshal(data, jwtUser) - - if err != nil { - http.Error(w, "invalid hash", http.StatusBadRequest) - return - } - file := &schemas.FileOutFull{} - key := kv.Key("files", fileID) + key := fmt.Sprintf("files:%s", fileID) + + err = cache.GetCache().Get(key, file) - err = kv.GetValue(database.KV, key, file) if err != nil { file, err = fs.GetFileByID(c) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } - kv.SetValue(database.KV, key, file) + cache.GetCache().Set(key, file, 0) } c.Header("Accept-Ranges", "bytes") @@ -411,9 +402,7 @@ func (fs *FileService) GetFileStream(c *gin.Context) { c.Header("Content-Disposition", fmt.Sprintf("%s; filename=\"%s\"", disposition, file.Name)) - userID, _ := strconv.ParseInt(jwtUser.Subject, 10, 64) - - tokens, err := GetBotsToken(c, userID) + tokens, err := GetBotsToken(c, session.UserId) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) @@ -429,8 +418,8 @@ func (fs *FileService) GetFileStream(c *gin.Context) { if config.LazyStreamBots || len(tokens) == 0 { var client *telegram.Client if len(tokens) == 0 { - client, _ = tgc.UserLogin(jwtUser.TgSession) - channelUser = jwtUser.Subject + client, _ = tgc.UserLogin(session.Session) + channelUser = strconv.FormatInt(session.UserId, 10) } else { tgc.Workers.Set(tokens) token = tgc.Workers.Next() @@ -455,14 +444,14 @@ func (fs *FileService) GetFileStream(c *gin.Context) { tgc.StreamWorkers.Set(tokens[:limit]) - client, err := tgc.StreamWorkers.Next() + client, index, err := tgc.StreamWorkers.Next() if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } - channelUser = strings.Split(token, ":")[0] + channelUser = strings.Split(tokens[index], ":")[0] if r.Method != "HEAD" { diff --git a/services/user.service.go b/services/user.service.go index 15548ac..70ccd7b 100644 --- a/services/user.service.go +++ b/services/user.service.go @@ -13,7 +13,7 @@ import ( "github.com/divyam234/teldrive/models" "github.com/divyam234/teldrive/schemas" "github.com/divyam234/teldrive/types" - "github.com/divyam234/teldrive/utils/kv" + "github.com/divyam234/teldrive/utils/cache" "github.com/divyam234/teldrive/utils/tgc" "github.com/gotd/td/telegram" "github.com/gotd/td/telegram/message/peer" @@ -121,9 +121,8 @@ func (us *UserService) UpdateChannel(c *gin.Context) (*schemas.Message, *types.A us.Db.Model(&models.Channel{}).Where("channel_id != ?", payload.ChannelID). Where("user_id = ?", userId).Update("selected", false) - key := kv.Key("users", strconv.FormatInt(userId, 10), "channel") - database.KV.Delete(key) - kv.SetValue(database.KV, key, payload.ChannelID) + key := fmt.Sprintf("users:channel:%d", userId) + cache.GetCache().Set(key, payload.ChannelID, 0) return &schemas.Message{Status: true, Message: "channel updated"}, nil } @@ -191,9 +190,8 @@ func (us *UserService) RemoveBots(c *gin.Context) (*schemas.Message, *types.AppE Delete(&models.Bot{}).Error; err != nil { return nil, &types.AppError{Error: errors.New("failed to delete bots"), Code: http.StatusInternalServerError} } - key := kv.Key("users", strconv.FormatInt(userID, 10), strconv.FormatInt(channelId, 10), "bots") - database.KV.Delete(key) + cache.GetCache().Delete(fmt.Sprintf("users:bots:%d:%d", userID, channelId)) return &schemas.Message{Status: true, Message: "bots deleted"}, nil @@ -232,39 +230,6 @@ func (us *UserService) RevokeBotSession(c *gin.Context) (*schemas.Message, *type } -func (us *UserService) ClearCache(c *gin.Context) (*schemas.Message, *types.AppError) { - - pattern := []byte("users") - - err := database.BoltDB.Update(func(tx *bbolt.Tx) error { - - bucket := tx.Bucket([]byte("teldrive")) - if bucket == nil { - return errors.New("bucket not found") - } - - c := bucket.Cursor() - - for key, _ := c.First(); key != nil; key, _ = c.Next() { - if bytes.HasPrefix(key, pattern) { - if err := c.Delete(); err != nil { - return err - } - } - } - - return nil - }) - - if err != nil { - return nil, &types.AppError{Error: errors.New("failed to clear cache"), - Code: http.StatusInternalServerError} - } - - return &schemas.Message{Status: true, Message: "cache cleared"}, nil - -} - func (us *UserService) addBots(c context.Context, client *telegram.Client, userId int64, channelId int64, botsTokens []string) (*schemas.Message, *types.AppError) { botInfo := []BotInfo{} @@ -359,8 +324,7 @@ func (us *UserService) addBots(c context.Context, client *telegram.Client, userI }) } - key := kv.Key("users", strconv.FormatInt(userId, 10), strconv.FormatInt(channelId, 10), "bots") - database.KV.Delete(key) + cache.GetCache().Delete(fmt.Sprintf("users:bots:%d:%d", userId, channelId)) if err := us.Db.Clauses(clause.OnConflict{DoNothing: true}).Create(&payload).Error; err != nil { return nil, &types.AppError{Error: errors.New("failed to add bots"), Code: http.StatusInternalServerError} diff --git a/utils/cache/main.go b/utils/cache/main.go new file mode 100644 index 0000000..5770dde --- /dev/null +++ b/utils/cache/main.go @@ -0,0 +1,57 @@ +package cache + +import ( + "encoding/json" + "sync" + + "github.com/coocood/freecache" +) + +var cache *Cache + +type Cache struct { + cache *freecache.Cache + mu sync.RWMutex +} + +func InitCache() { + cache = &Cache{cache: freecache.NewCache(10 * 1024 * 1024)} +} + +func GetCache() *Cache { + return cache +} + +func (c *Cache) Get(key string, value interface{}) error { + c.mu.RLock() + defer c.mu.RUnlock() + + got, err := cache.cache.Get([]byte(key)) + + if err != nil { + return err + } + + return json.Unmarshal(got, value) +} + +func (c *Cache) Set(key string, value interface{}, expireSeconds int) error { + c.mu.Lock() + defer c.mu.Unlock() + + data, err := json.Marshal(value) + if err != nil { + return err + } + + cache.cache.Set([]byte(key), data, expireSeconds) + return nil +} + +func (c *Cache) Delete(key string) error { + c.mu.Lock() + defer c.mu.Unlock() + + cache.cache.Del([]byte(key)) + return nil +} diff --git a/utils/cron/cron.go b/utils/cron/cron.go index 300530e..b73ea45 100644 --- a/utils/cron/cron.go +++ b/utils/cron/cron.go @@ -50,14 +50,14 @@ func (a *UpFiles) Scan(value interface{}) error { type Result struct { Files Files - TgSession string + Session string UserId int64 ChannelId int64 } type UploadResult struct { Files UpFiles - TgSession string + Session string UserId int64 ChannelId int64 } @@ -66,7 +66,7 @@ func deleteTGMessages(ctx context.Context, result Result) error { db := database.DB - client, err := tgc.UserLogin(result.TgSession) + client, err := tgc.UserLogin(result.Session) if err != nil { return err @@ -111,7 +111,7 @@ func cleanUploadsMessages(ctx context.Context, result UploadResult) error { db := database.DB - client, err := tgc.UserLogin(result.TgSession) + client, err := tgc.UserLogin(result.Session) if err != nil { return err @@ -157,11 +157,12 @@ func FilesDeleteJob() { var results []Result if err := db.Model(&models.File{}). - Select("JSONB_AGG(jsonb_build_object('id',files.id, 'parts',files.parts)) as files", "files.channel_id", "files.user_id", "u.tg_session"). + Select("JSONB_AGG(jsonb_build_object('id',files.id, 'parts',files.parts)) as files", "files.channel_id", "files.user_id", "s.session"). Joins("left join teldrive.users as u on u.user_id = files.user_id"). + Joins("left join (select * from teldrive.sessions order by created_at desc limit 1) as s on u.user_id = s.user_id"). Where("type = ?", "file"). Where("status = ?", "pending_deletion"). - Group("files.channel_id").Group("files.user_id").Group("u.tg_session"). + Group("files.channel_id").Group("files.user_id").Group("s.session"). Scan(&results).Error; err != nil { return } @@ -179,10 +180,11 @@ func UploadCleanJob() { var upResults []UploadResult if err := db.Model(&models.Upload{}). - Select("JSONB_AGG(jsonb_build_object('id',uploads.id,'partId',uploads.part_id)) as files", "uploads.channel_id", "uploads.user_id", "u.tg_session"). + Select("JSONB_AGG(jsonb_build_object('id',uploads.id,'partId',uploads.part_id)) as files", "uploads.channel_id", "uploads.user_id", "s.session"). Joins("left join teldrive.users as u on u.user_id = uploads.user_id"). + Joins("left join (select * from teldrive.sessions order by created_at desc limit 1) as s on s.user_id = uploads.user_id"). Where("uploads.created_at < ?", time.Now().UTC().AddDate(0, 0, -15)). - Group("uploads.channel_id").Group("uploads.user_id").Group("u.tg_session"). + Group("uploads.channel_id").Group("uploads.user_id").Group("s.session"). Scan(&upResults).Error; err != nil { return } diff --git a/utils/tgc/workers.go b/utils/tgc/workers.go index 93251a2..9aa0fa7 100644 --- a/utils/tgc/workers.go +++ b/utils/tgc/workers.go @@ -56,19 +56,19 @@ func (w *streamWorkers) Set(bots []string) { } } -func (w *streamWorkers) Next() (*Client, error) { +func (w *streamWorkers) Next() (*Client, int, error) { w.Lock() defer w.Unlock() w.index = (w.index + 1) % len(w.clients) if w.clients[w.index].Status == "idle" { stop, err := bg.Connect(w.clients[w.index].Tg) if err != nil { - return nil, err + return nil, 0, err } w.clients[w.index].Stop = stop w.clients[w.index].Status = "running" } - return w.clients[w.index], nil + return w.clients[w.index], w.index, nil } var StreamWorkers = &streamWorkers{}