diff --git a/.gitignore b/.gitignore index 649c85b..232ed26 100644 --- a/.gitignore +++ b/.gitignore @@ -27,4 +27,5 @@ sslcerts *.env.example *.env.local *.env.staging -*.db \ No newline at end of file +*.db +logs \ No newline at end of file diff --git a/README.md b/README.md index d302103..fbda6b3 100644 --- a/README.md +++ b/README.md @@ -129,14 +129,16 @@ In addition to the mandatory variables, you can also set the following optional - `COOKIE_SAME_SITE` : Only needed when frontend is on other domain (Default true). -- `LAZY_STREAM_BOTS` : If set to true start Bot session and close immediately when stream or download request is over otherwise run bots forever till server stops (Default false). - - `BG_BOTS_LIMIT` : If LAZY_STREAM_BOTS is set to false it start atmost BG_BOTS_LIMIT no of bots in background to prevent connection recreation on every request (Default 5). - `UPLOAD_RETENTION` : No of days to keep incomplete uploads parts in channel afterwards these parts are deleted (Default 15). - `ENCRYPTION_KEY` : Password for Encryption. +- `DEV` : DEV mode to enable debug logging(Default false). + +- `LOG_SQL` : Log sql queries (Default false). + > [!WARNING] > Keep your Password safe once generated teldrive uses same encryption as of rclone internally so you don't need to enable crypt in rclone.**Teldrive generates random salt for each file part and saves in database so its more secure than rclone crypt whereas in rclone same salt value is used for all files which can be compromised easily**. Enabling crypt in rclone makes UI reduntant so encrypting files in teldrive internally is better way to encrypt files and more secure encryption than rclone.To encrypt files see more about teldrive rclone config. diff --git a/api/router.go b/api/router.go index 1d13778..c8862c4 100644 --- a/api/router.go +++ b/api/router.go @@ -1,21 +1,27 @@ package api import ( + "time" + "github.com/divyam234/teldrive/pkg/controller" "github.com/divyam234/teldrive/pkg/middleware" "github.com/divyam234/teldrive/ui" + ginzap "github.com/gin-contrib/zap" "github.com/gin-gonic/gin" + "go.uber.org/zap" ) -func InitRouter() *gin.Engine { +func InitRouter(logger *zap.Logger) *gin.Engine { - r := gin.Default() + r := gin.New() - r.Use(gin.Recovery()) + r.Use(ginzap.Ginzap(logger, time.RFC3339, true)) + + r.Use(ginzap.RecoveryWithZap(logger, true)) r.Use(middleware.Cors()) - c := controller.NewController() + c := controller.NewController(logger) api := r.Group("/api") { diff --git a/cmd/teldrive/main.go b/cmd/teldrive/main.go index 5863f64..84c14e9 100644 --- a/cmd/teldrive/main.go +++ b/cmd/teldrive/main.go @@ -3,43 +3,39 @@ package main import ( "fmt" "mime" - "path/filepath" "github.com/divyam234/teldrive/api" "github.com/divyam234/teldrive/internal/cron" "github.com/divyam234/teldrive/internal/logger" - "github.com/divyam234/teldrive/internal/utils" "github.com/divyam234/teldrive/pkg/database" - cnf "github.com/divyam234/teldrive/config" + "github.com/divyam234/teldrive/config" "github.com/divyam234/teldrive/internal/cache" "github.com/gin-gonic/gin" ) func main() { - gin.SetMode(gin.ReleaseMode) + config.InitConfig() - cnf.InitConfig() + if config.GetConfig().Dev { + gin.SetMode(gin.DebugMode) + } else { + gin.SetMode(gin.ReleaseMode) + } - logger.InitLogger() + log := logger.InitLogger() database.InitDB() cache.InitCache() - cron.StartCronJobs() + cron.StartCronJobs(log) mime.AddExtensionType(".js", "application/javascript") - r := api.InitRouter() + r := api.InitRouter(log) + + r.Run(fmt.Sprintf(":%d", config.GetConfig().Port)) - config := cnf.GetConfig() - certDir := filepath.Join(config.ExecDir, "sslcerts") - ok, _ := utils.PathExists(certDir) - if ok && config.Https { - r.RunTLS(fmt.Sprintf(":%d", config.Port), filepath.Join(certDir, "cert.pem"), filepath.Join(certDir, "key.pem")) - } else { - r.Run(fmt.Sprintf(":%d", config.Port)) - } } diff --git a/config/config.go b/config/config.go index 3137069..2ff5255 100644 --- a/config/config.go +++ b/config/config.go @@ -29,10 +29,11 @@ type Config struct { TgClientLangPack string `envconfig:"TG_CLIENT_LANG_PACK" default:"webk"` RunMigrations bool `envconfig:"RUN_MIGRATIONS" default:"true"` Port int `envconfig:"PORT" default:"8080"` - LazyStreamBots bool `envconfig:"LAZY_STREAM_BOTS" default:"false"` BgBotsLimit int `envconfig:"BG_BOTS_LIMIT" default:"5"` UploadRetention int `envconfig:"UPLOAD_RETENTION" default:"15"` DisableStreamBots bool `envconfig:"DISABLE_STREAM_BOTS" default:"false"` + Dev bool `envconfig:"DEV" default:"false"` + LogSql bool `envconfig:"LOG_SQL" default:"false"` EncryptionKey string `envconfig:"ENCRYPTION_KEY"` ExecDir string } diff --git a/docker-compose.yml b/docker-compose.yml index c0842c6..f885ba8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -7,6 +7,7 @@ services: container_name: teldrive volumes: - ./teldrive.db:/app/teldrive.db:rw + - ./logs:/app/logs:rw env_file: teldrive.env ports: - 8080:8080 diff --git a/go.mod b/go.mod index 5fb184b..a285b34 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.21 require ( github.com/coocood/freecache v1.2.4 github.com/divyam234/cors v1.4.2 + github.com/gin-contrib/zap v0.2.0 github.com/gin-gonic/gin v1.9.1 github.com/go-co-op/gocron v1.37.0 github.com/go-jose/go-jose/v3 v3.0.1 @@ -18,6 +19,7 @@ require ( go.etcd.io/bbolt v1.3.8 go.uber.org/zap v1.26.0 golang.org/x/time v0.5.0 + gopkg.in/natefinch/lumberjack.v2 v2.2.1 gorm.io/driver/postgres v1.5.4 gorm.io/gorm v1.25.5 ) @@ -29,6 +31,7 @@ require ( github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect github.com/sethvargo/go-retry v0.2.4 // indirect + ) require ( diff --git a/go.sum b/go.sum index 068c2a9..97ea541 100644 --- a/go.sum +++ b/go.sum @@ -58,6 +58,8 @@ github.com/gin-contrib/secure v0.0.1 h1:DMMx3xXDY+MLA9kzIPHksyzC5/V5J6014c/WAmdS github.com/gin-contrib/secure v0.0.1/go.mod h1:6kseOBFrSR3Is/kM1jDhCg/WsXAMvKJkuPvG9dGph/c= github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= +github.com/gin-contrib/zap v0.2.0 h1:HLvt3rZXyC8XC+s2lHzMFow3UDqiEbfrBWJyHHS6L8A= +github.com/gin-contrib/zap v0.2.0/go.mod h1:eqfbe9ZmI+GgTZF6nRiC2ZwDeM4DK1Viwc8OxTCphh0= github.com/gin-gonic/contrib v0.0.0-20221130124618-7e01895a63f2 h1:dyuNlYlG1faymw39NdJddnzJICy6587tiGSVioWhYoE= github.com/gin-gonic/contrib v0.0.0-20221130124618-7e01895a63f2/go.mod h1:iqneQ2Df3omzIVTkIfn7c1acsVnMGiSLn4XF5Blh3Yg= github.com/gin-gonic/gin v1.5.0/go.mod h1:Nd6IXA8m5kNZdNEHMBd93KT+mdY3+bewLgRvmCsR2Do= @@ -331,6 +333,8 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EV gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE= gopkg.in/go-playground/validator.v9 v9.29.1/go.mod h1:+c9/zcJMFNgbLvly1L1V+PpxWdVbfP1avr/N00E2vyQ= +gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= +gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= diff --git a/internal/cron/cron.go b/internal/cron/cron.go index ddc40a8..8c409f2 100644 --- a/internal/cron/cron.go +++ b/internal/cron/cron.go @@ -14,6 +14,7 @@ import ( "github.com/divyam234/teldrive/pkg/services" "github.com/go-co-op/gocron" "github.com/gotd/td/tg" + "go.uber.org/zap" ) type Files []File @@ -60,7 +61,7 @@ type UploadResult struct { ChannelId int64 } -func deleteTGMessages(ctx context.Context, result Result) error { +func deleteTGMessages(ctx context.Context, logger *zap.Logger, result Result) error { db := database.DB @@ -82,7 +83,7 @@ func deleteTGMessages(ctx context.Context, result Result) error { } - err = tgc.RunWithAuth(ctx, client, "", func(ctx context.Context) error { + err = tgc.RunWithAuth(ctx, logger, client, "", func(ctx context.Context) error { channel, err := services.GetChannelById(ctx, client, result.ChannelId, strconv.FormatInt(result.UserId, 10)) @@ -102,10 +103,10 @@ func deleteTGMessages(ctx context.Context, result Result) error { db.Where("id = any($1)", fileIds).Delete(&models.File{}) } - return nil + return err } -func cleanUploadsMessages(ctx context.Context, result UploadResult) error { +func cleanUploadsMessages(ctx context.Context, logger *zap.Logger, result UploadResult) error { db := database.DB @@ -115,7 +116,7 @@ func cleanUploadsMessages(ctx context.Context, result UploadResult) error { return err } - err = tgc.RunWithAuth(ctx, client, "", func(ctx context.Context) error { + err = tgc.RunWithAuth(ctx, logger, client, "", func(ctx context.Context) error { channel, err := services.GetChannelById(ctx, client, result.ChannelId, strconv.FormatInt(result.UserId, 10)) @@ -139,10 +140,10 @@ func cleanUploadsMessages(ctx context.Context, result UploadResult) error { db.Where("part_id = any($1)", parts).Delete(&models.Upload{}) } - return nil + return err } -func filesDeleteJob() { +func filesDeleteJob(logger *zap.Logger) { db := database.DB ctx, cancel := context.WithCancel(context.Background()) @@ -161,11 +162,16 @@ func filesDeleteJob() { } for _, row := range results { - deleteTGMessages(ctx, row) + err := deleteTGMessages(ctx, logger, row) + if err != nil { + logger.Error("failed to clean pending files", zap.Int64("user", row.UserId), zap.Error(err)) + } + logger.Info("cleaned pending files", zap.Int64("user", row.UserId), + zap.Int64("channel", row.ChannelId)) } } -func uploadCleanJob() { +func uploadCleanJob(logger *zap.Logger) { db := database.DB ctx, cancel := context.WithCancel(context.Background()) c := config.GetConfig() @@ -183,22 +189,29 @@ func uploadCleanJob() { return } for _, row := range upResults { - cleanUploadsMessages(ctx, row) + err := cleanUploadsMessages(ctx, logger, row) + if err != nil { + logger.Error("failed to clean orpahan file parts", zap.Int64("user", row.UserId), zap.Error(err)) + } + logger.Info("cleaned orpahan file parts", zap.Int64("user", row.UserId), + zap.Int64("channel", row.ChannelId)) } } -func folderSizeUpdate() { +func folderSizeUpdate(logger *zap.Logger) { database.DB.Exec("call teldrive.update_size();") + logger.Info("updates folder sizes") + } -func StartCronJobs() { +func StartCronJobs(logger *zap.Logger) { scheduler := gocron.NewScheduler(time.UTC) - scheduler.Every(1).Hour().Do(filesDeleteJob) + scheduler.Every(1).Hour().Do(filesDeleteJob, logger) - scheduler.Every(12).Hour().Do(uploadCleanJob) + scheduler.Every(12).Hour().Do(uploadCleanJob, logger) - scheduler.Every(2).Hour().Do(folderSizeUpdate) + scheduler.Every(2).Hour().Do(folderSizeUpdate, logger) scheduler.StartAsync() } diff --git a/internal/logger/logger.go b/internal/logger/logger.go index 0ac8e35..c69f80a 100644 --- a/internal/logger/logger.go +++ b/internal/logger/logger.go @@ -2,18 +2,51 @@ package logger import ( "os" + "time" + "github.com/divyam234/teldrive/config" "go.uber.org/zap" "go.uber.org/zap/zapcore" + "gopkg.in/natefinch/lumberjack.v2" ) -var Logger *zap.Logger +func InitLogger() *zap.Logger { + customTimeEncoder := func(t time.Time, enc zapcore.PrimitiveArrayEncoder) { + enc.AppendString(t.Format("02/01/2006 03:04 PM")) + } + var ( + consoleConfig zapcore.EncoderConfig + logLevel zapcore.Level + ) -func InitLogger() { - config := zap.NewProductionEncoderConfig() - config.EncodeTime = zapcore.ISO8601TimeEncoder - consoleEncoder := zapcore.NewConsoleEncoder(config) - defaultLogLevel := zapcore.InfoLevel - core := zapcore.NewCore(consoleEncoder, zapcore.AddSync(os.Stdout), defaultLogLevel) - Logger = zap.New(core, zap.AddCaller(), zap.AddStacktrace(zapcore.ErrorLevel)) + if config.GetConfig().Dev { + consoleConfig = zap.NewDevelopmentEncoderConfig() + logLevel = zap.DebugLevel + } else { + + consoleConfig = zap.NewProductionEncoderConfig() + logLevel = zap.InfoLevel + } + consoleConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder + consoleConfig.EncodeTime = customTimeEncoder + consoleEncoder := zapcore.NewConsoleEncoder(consoleConfig) + + fileEncoderConfig := zap.NewProductionEncoderConfig() + fileEncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder + fileEncoder := zapcore.NewJSONEncoder(fileEncoderConfig) + + fileWriter := zapcore.AddSync(&lumberjack.Logger{ + Filename: "logs/teldrive.log", + MaxSize: 10, + MaxBackups: 3, + MaxAge: 7, + Compress: true, + }) + + core := zapcore.NewTee( + zapcore.NewCore(consoleEncoder, zapcore.AddSync(os.Stdout), logLevel), + zapcore.NewCore(fileEncoder, fileWriter, zapcore.DebugLevel), + ) + + return zap.New(core, zap.AddStacktrace(zapcore.FatalLevel)) } diff --git a/internal/tgc/run.go b/internal/tgc/run.go index 871c084..8cf62cc 100644 --- a/internal/tgc/run.go +++ b/internal/tgc/run.go @@ -3,13 +3,12 @@ package tgc import ( "context" - "github.com/divyam234/teldrive/internal/logger" "github.com/gotd/td/telegram" "github.com/pkg/errors" "go.uber.org/zap" ) -func RunWithAuth(ctx context.Context, client *telegram.Client, token string, f func(ctx context.Context) error) error { +func RunWithAuth(ctx context.Context, logger *zap.Logger, client *telegram.Client, token string, f func(ctx context.Context) error) error { return client.Run(ctx, func(ctx context.Context) error { status, err := client.Auth().Status(ctx) if err != nil { @@ -20,18 +19,18 @@ func RunWithAuth(ctx context.Context, client *telegram.Client, token string, f f if !status.Authorized { return errors.Errorf("not authorized. please login first") } - logger.Logger.Info("User Session", + logger.Debug("User Session", zap.Int64("id", status.User.ID), zap.String("username", status.User.Username)) } else { if !status.Authorized { - logger.Logger.Info("creating bot session") + logger.Debug("creating bot session") _, err := client.Auth().Bot(ctx, token) if err != nil { return err } status, _ = client.Auth().Status(ctx) - logger.Logger.Info("Bot Session", + logger.Debug("Bot Session", zap.Int64("id", status.User.ID), zap.String("username", status.User.Username)) } diff --git a/internal/tgc/tgc.go b/internal/tgc/tgc.go index e1a56cf..77c2ed8 100644 --- a/internal/tgc/tgc.go +++ b/internal/tgc/tgc.go @@ -5,7 +5,7 @@ import ( "time" "github.com/cenkalti/backoff/v4" - cnf "github.com/divyam234/teldrive/config" + "github.com/divyam234/teldrive/config" "github.com/divyam234/teldrive/internal/kv" "github.com/divyam234/teldrive/internal/recovery" "github.com/divyam234/teldrive/internal/retry" @@ -18,7 +18,8 @@ import ( "golang.org/x/time/rate" ) -func deviceConfig(appConfig *cnf.Config) telegram.DeviceConfig { +func deviceConfig() telegram.DeviceConfig { + appConfig := config.GetConfig() config := telegram.DeviceConfig{ DeviceModel: appConfig.TgClientDeviceModel, SystemVersion: appConfig.TgClientSystemVersion, @@ -42,8 +43,6 @@ func New(ctx context.Context, handler telegram.UpdateHandler, storage session.St _clock := tdclock.System - config := cnf.GetConfig() - noUpdates := true if handler != nil { @@ -54,7 +53,7 @@ func New(ctx context.Context, handler telegram.UpdateHandler, storage session.St ReconnectionBackoff: func() backoff.BackOff { return Backoff(_clock) }, - Device: deviceConfig(config), + Device: deviceConfig(), SessionStorage: storage, RetryInterval: time.Second, MaxRetries: 10, @@ -65,7 +64,7 @@ func New(ctx context.Context, handler telegram.UpdateHandler, storage session.St UpdateHandler: handler, } - return telegram.NewClient(config.AppId, config.AppHash, opts) + return telegram.NewClient(config.GetConfig().AppId, config.GetConfig().AppHash, opts) } func NoLogin(ctx context.Context, handler telegram.UpdateHandler, storage session.Storage) *telegram.Client { @@ -90,17 +89,15 @@ func UserLogin(ctx context.Context, sessionStr string) (*telegram.Client, error) return nil, err } middlewares, _ := NewDefaultMiddlewares(ctx) - config := cnf.GetConfig() - middlewares = append(middlewares, ratelimit.New(rate.Every(time.Millisecond*time.Duration(config.Rate)), config.RateBurst)) + middlewares = append(middlewares, ratelimit.New(rate.Every(time.Millisecond*time.Duration(config.GetConfig().Rate)), config.GetConfig().RateBurst)) return New(ctx, nil, storage, middlewares...), nil } func BotLogin(ctx context.Context, token string) (*telegram.Client, error) { - config := cnf.GetConfig() storage := kv.NewSession(database.KV, kv.Key("botsession", token)) middlewares, _ := NewDefaultMiddlewares(ctx) - if config.RateLimit { - middlewares = append(middlewares, ratelimit.New(rate.Every(time.Millisecond*time.Duration(config.Rate)), config.RateBurst)) + if config.GetConfig().RateLimit { + middlewares = append(middlewares, ratelimit.New(rate.Every(time.Millisecond*time.Duration(config.GetConfig().Rate)), config.GetConfig().RateBurst)) } return New(ctx, nil, storage, middlewares...), nil diff --git a/internal/tgc/workers.go b/internal/tgc/workers.go index d51c46d..5c1710a 100644 --- a/internal/tgc/workers.go +++ b/internal/tgc/workers.go @@ -8,13 +8,13 @@ import ( "github.com/gotd/td/telegram" ) -type BotWorkers struct { +type UploadWorker struct { mu sync.Mutex bots map[int64][]string currIdx map[int64]int } -func (w *BotWorkers) Set(bots []string, channelId int64) { +func (w *UploadWorker) Set(bots []string, channelId int64) { w.mu.Lock() defer w.mu.Unlock() _, ok := w.bots[channelId] @@ -26,30 +26,28 @@ func (w *BotWorkers) Set(bots []string, channelId int64) { } } -func (w *BotWorkers) Next(channelId int64) string { +func (w *UploadWorker) Next(channelId int64) (string, int) { w.mu.Lock() defer w.mu.Unlock() index := w.currIdx[channelId] w.currIdx[channelId] = (index + 1) % len(w.bots[channelId]) - return w.bots[channelId][index] + return w.bots[channelId][index], index } -var Workers = &BotWorkers{} - type Client struct { Tg *telegram.Client Stop bg.StopFunc Status string } -type streamWorkers struct { +type StreamWorker struct { mu sync.Mutex bots map[int64][]string clients map[int64][]*Client currIdx map[int64]int } -func (w *streamWorkers) Set(bots []string, channelId int64) { +func (w *StreamWorker) Set(bots []string, channelId int64) { w.mu.Lock() defer w.mu.Unlock() _, ok := w.bots[channelId] @@ -67,7 +65,7 @@ func (w *streamWorkers) Set(bots []string, channelId int64) { } -func (w *streamWorkers) Next(channelId int64) (*Client, int, error) { +func (w *StreamWorker) Next(channelId int64) (*Client, int, error) { w.mu.Lock() defer w.mu.Unlock() index := w.currIdx[channelId] @@ -84,7 +82,7 @@ func (w *streamWorkers) Next(channelId int64) (*Client, int, error) { return nextClient, index, nil } -func (w *streamWorkers) UserWorker(client *telegram.Client) (*Client, error) { +func (w *StreamWorker) UserWorker(client *telegram.Client) (*Client, error) { w.mu.Lock() defer w.mu.Unlock() @@ -108,5 +106,3 @@ func (w *streamWorkers) UserWorker(client *telegram.Client) (*Client, error) { } return nextClient, nil } - -var StreamWorkers = &streamWorkers{} diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 51040fc..36a45b2 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -3,6 +3,7 @@ package controller import ( "github.com/divyam234/teldrive/pkg/database" "github.com/divyam234/teldrive/pkg/services" + "go.uber.org/zap" ) type Controller struct { @@ -12,11 +13,11 @@ type Controller struct { AuthService *services.AuthService } -func NewController() *Controller { +func NewController(logger *zap.Logger) *Controller { return &Controller{ - FileService: services.NewFileService(database.DB), - UserService: services.NewUserService(database.DB), - UploadService: services.NewUploadService(database.DB), - AuthService: services.NewAuthService(database.DB), + FileService: services.NewFileService(database.DB, logger), + UserService: services.NewUserService(database.DB, logger), + UploadService: services.NewUploadService(database.DB, logger), + AuthService: services.NewAuthService(database.DB, logger), } } diff --git a/pkg/controller/file.go b/pkg/controller/file.go index df64cc2..98d8af0 100644 --- a/pkg/controller/file.go +++ b/pkg/controller/file.go @@ -30,7 +30,7 @@ func (fc *Controller) UpdateFile(c *gin.Context) { func (fc *Controller) GetFileByID(c *gin.Context) { res, err := fc.FileService.GetFileByID(c) if err != nil { - httputil.NewError(c, http.StatusNotFound, err) + httputil.NewError(c, http.StatusNotFound, err.Error) return } diff --git a/pkg/database/database.go b/pkg/database/database.go index 0372306..776d7fe 100644 --- a/pkg/database/database.go +++ b/pkg/database/database.go @@ -7,7 +7,7 @@ import ( "path/filepath" "time" - cnf "github.com/divyam234/teldrive/config" + "github.com/divyam234/teldrive/config" "github.com/divyam234/teldrive/internal/kv" "github.com/pressly/goose/v3" "go.etcd.io/bbolt" @@ -26,20 +26,24 @@ func InitDB() { var err error + logLevel := logger.Silent + + if config.GetConfig().LogSql { + logLevel = logger.Info + + } + newLogger := logger.New( log.New(os.Stdout, "\r\n", log.LstdFlags), logger.Config{ - SlowThreshold: time.Second, - LogLevel: logger.Silent, - IgnoreRecordNotFoundError: true, - ParameterizedQueries: true, - Colorful: false, + SlowThreshold: time.Second, + LogLevel: logLevel, + ParameterizedQueries: true, + Colorful: true, }, ) - config := cnf.GetConfig() - - DB, err = gorm.Open(postgres.Open(config.DatabaseUrl), &gorm.Config{ + DB, err = gorm.Open(postgres.Open(config.GetConfig().DatabaseUrl), &gorm.Config{ NamingStrategy: schema.NamingStrategy{ TablePrefix: "teldrive.", SingularTable: false, @@ -63,12 +67,12 @@ func InitDB() { sqlDB.SetConnMaxIdleTime(10 * time.Minute) go func() { - if config.RunMigrations { + if config.GetConfig().RunMigrations { migrate() } }() - boltDB, err := bbolt.Open(filepath.Join(config.ExecDir, "teldrive.db"), 0666, &bbolt.Options{ + boltDB, err := bbolt.Open(filepath.Join(config.GetConfig().ExecDir, "teldrive.db"), 0666, &bbolt.Options{ Timeout: time.Second, NoGrowSync: false, }) diff --git a/pkg/schemas/upload.go b/pkg/schemas/upload.go index 0e320cc..965257f 100644 --- a/pkg/schemas/upload.go +++ b/pkg/schemas/upload.go @@ -1,7 +1,8 @@ package schemas type UploadQuery struct { - Filename string `form:"fileName" binding:"required"` + PartName string `form:"partName" binding:"required"` + FileName string `form:"fileName" binding:"required"` PartNo int `form:"partNo" binding:"required"` ChannelID int64 `form:"channelId"` Encrypted bool `form:"encrypted"` diff --git a/pkg/services/auth.go b/pkg/services/auth.go index 0e0e618..6cb1ede 100644 --- a/pkg/services/auth.go +++ b/pkg/services/auth.go @@ -16,7 +16,7 @@ import ( "strconv" "time" - cnf "github.com/divyam234/teldrive/config" + "github.com/divyam234/teldrive/config" "github.com/divyam234/teldrive/internal/auth" "github.com/divyam234/teldrive/internal/tgc" "github.com/divyam234/teldrive/internal/utils" @@ -31,6 +31,7 @@ import ( "github.com/gotd/td/telegram/auth/qrlogin" "github.com/gotd/td/tg" "github.com/gotd/td/tgerr" + "go.uber.org/zap" "gorm.io/gorm" ) @@ -38,13 +39,15 @@ type AuthService struct { Db *gorm.DB SessionMaxAge int SessionCookieName string + log *zap.Logger } -func NewAuthService(db *gorm.DB) *AuthService { +func NewAuthService(db *gorm.DB, logger *zap.Logger) *AuthService { return &AuthService{ Db: db, SessionMaxAge: 30 * 24 * 60 * 60, - SessionCookieName: "user-session"} + SessionCookieName: "user-session", + log: logger.Named("auth")} } func ip4toInt(IPv4Address net.IP) int64 { @@ -88,22 +91,21 @@ func generateTgSession(dcID int, authKey []byte, port int) string { func setCookie(c *gin.Context, key string, value string, age int) { - config := cnf.GetConfig() - - if config.CookieSameSite { + if config.GetConfig().CookieSameSite { c.SetSameSite(2) } else { c.SetSameSite(4) } - c.SetCookie(key, value, age, "/", c.Request.Host, config.Https, true) + + c.SetCookie(key, value, age, "/", "", config.GetConfig().Https, true) } func checkUserIsAllowed(userName string) bool { - config := cnf.GetConfig() found := false - if len(config.AllowedUsers) > 0 { - for _, user := range config.AllowedUsers { + allowedUsers := config.GetConfig().AllowedUsers + if len(allowedUsers) > 0 { + for _, user := range allowedUsers { if user == userName { found = true break @@ -234,7 +236,7 @@ func (as *AuthService) Logout(c *gin.Context) (*schemas.Message, *types.AppError jwtUser := val.(*types.JWTClaims) client, _ := tgc.UserLogin(c, jwtUser.TgSession) - tgc.RunWithAuth(c, client, "", func(ctx context.Context) error { + tgc.RunWithAuth(c, as.log, client, "", func(ctx context.Context) error { _, err := client.API().AuthLogOut(c) return err }) diff --git a/pkg/services/common.go b/pkg/services/common.go index dca0fce..532bb76 100644 --- a/pkg/services/common.go +++ b/pkg/services/common.go @@ -21,6 +21,7 @@ import ( "github.com/gotd/td/tg" "github.com/pkg/errors" "github.com/thoas/go-funk" + "go.uber.org/zap" ) type buffer struct { @@ -99,10 +100,10 @@ func getUserAuth(c *gin.Context) (int64, string) { return userId, jwtUser.TgSession } -func getBotInfo(ctx context.Context, token string) (*types.BotInfo, error) { +func getBotInfo(ctx context.Context, logger *zap.Logger, token string) (*types.BotInfo, error) { client, _ := tgc.BotLogin(ctx, token) var user *tg.User - err := tgc.RunWithAuth(ctx, client, token, func(ctx context.Context) error { + err := tgc.RunWithAuth(ctx, logger, client, token, func(ctx context.Context) error { user, _ = client.Self(ctx) return nil }) diff --git a/pkg/services/file.go b/pkg/services/file.go index 088170f..c1a6fa9 100644 --- a/pkg/services/file.go +++ b/pkg/services/file.go @@ -11,7 +11,7 @@ import ( "strconv" "strings" - cnf "github.com/divyam234/teldrive/config" + "github.com/divyam234/teldrive/config" "github.com/divyam234/teldrive/internal/cache" "github.com/divyam234/teldrive/internal/http_range" "github.com/divyam234/teldrive/internal/md5" @@ -22,6 +22,7 @@ import ( "github.com/divyam234/teldrive/pkg/models" "github.com/divyam234/teldrive/pkg/schemas" "github.com/gotd/td/tg" + "go.uber.org/zap" "github.com/divyam234/teldrive/pkg/types" @@ -32,19 +33,41 @@ import ( "gorm.io/gorm/clause" ) +const ( + updateFileContext = "file update" + getFileByIDContext = "getting file by ID" + listFilesContext = "listing files" + getPathIDContext = "getting path ID" + makeDirectoryContext = "making directory" + copyFileContext = "copying file" + moveFilesContext = "moving files" + deleteFilesContext = "deleting files" + moveDirectoryContext = "moving directory" + bindJSONContext = "binding JSON" + bindQueryContext = "binding query" +) + type FileService struct { - Db *gorm.DB + Db *gorm.DB + log *zap.Logger + worker *tgc.StreamWorker } -func NewFileService(db *gorm.DB) *FileService { - return &FileService{Db: db} +func NewFileService(db *gorm.DB, logger *zap.Logger) *FileService { + return &FileService{Db: db, log: logger.Named("files"), + worker: &tgc.StreamWorker{}} +} + +func (fs *FileService) logAndReturn(context string, err error, errCode int) *types.AppError { + fs.log.Error(context, zap.Error(err)) + return &types.AppError{Error: err, Code: errCode} } func (fs *FileService) CreateFile(c *gin.Context) (*schemas.FileOut, *types.AppError) { userId, _ := getUserAuth(c) var fileIn schemas.CreateFile if err := c.ShouldBindJSON(&fileIn); err != nil { - return nil, &types.AppError{Error: err, Code: http.StatusBadRequest} + return nil, fs.logAndReturn(bindJSONContext, err, http.StatusBadRequest) } var fileDB models.File @@ -54,7 +77,7 @@ func (fs *FileService) CreateFile(c *gin.Context) (*schemas.FileOut, *types.AppE if fileIn.Path != "" { var parent models.File if err := fs.Db.Where("type = ? AND path = ?", "folder", fileIn.Path).First(&parent).Error; err != nil { - return nil, &types.AppError{Error: err, Code: http.StatusNotFound} + return nil, fs.logAndReturn(bindJSONContext, err, http.StatusInternalServerError) } fileDB.ParentID = parent.ID } @@ -76,7 +99,7 @@ func (fs *FileService) CreateFile(c *gin.Context) (*schemas.FileOut, *types.AppE if fileIn.ChannelID == 0 { channelId, err = GetDefaultChannel(c, userId) if err != nil { - return nil, &types.AppError{Error: err, Code: http.StatusInternalServerError} + return nil, fs.logAndReturn("default channel", err, http.StatusInternalServerError) } } fileDB.ChannelID = utils.Int64Pointer(channelId) @@ -102,10 +125,9 @@ func (fs *FileService) CreateFile(c *gin.Context) (*schemas.FileOut, *types.AppE if err := fs.Db.Create(&fileDB).Error; err != nil { pgErr := err.(*pgconn.PgError) if pgErr.Code == "23505" { - return nil, &types.AppError{Error: errors.New("file exists"), Code: http.StatusInternalServerError} + return nil, fs.logAndReturn("file exists", err, http.StatusInternalServerError) } - return nil, &types.AppError{Error: err, Code: http.StatusInternalServerError} - + return nil, fs.logAndReturn("file create", err, http.StatusInternalServerError) } res := mapper.ToFileOut(fileDB) @@ -127,16 +149,16 @@ func (fs *FileService) UpdateFile(c *gin.Context) (*schemas.FileOut, *types.AppE if fileUpdate.Type == "folder" && fileUpdate.Name != "" { if err := fs.Db.Raw("select * from teldrive.update_folder(?, ?)", fileID, fileUpdate.Name).Scan(&files).Error; err != nil { - return nil, &types.AppError{Error: err, Code: http.StatusInternalServerError} + return nil, fs.logAndReturn(updateFileContext, err, http.StatusInternalServerError) } } else { if err := fs.Db.Model(&files).Clauses(clause.Returning{}).Where("id = ?", fileID).Updates(fileUpdate).Error; err != nil { - return nil, &types.AppError{Error: err, Code: http.StatusInternalServerError} + return nil, fs.logAndReturn(updateFileContext, err, http.StatusInternalServerError) } } if len(files) == 0 { - return nil, &types.AppError{Error: errors.New("file not updated"), Code: http.StatusInternalServerError} + return nil, fs.logAndReturn(updateFileContext, errors.New("update failed"), http.StatusInternalServerError) } file := mapper.ToFileOut(files[0]) @@ -149,8 +171,7 @@ func (fs *FileService) UpdateFile(c *gin.Context) (*schemas.FileOut, *types.AppE } -func (fs *FileService) GetFileByID(c *gin.Context) (*schemas.FileOutFull, error) { - +func (fs *FileService) GetFileByID(c *gin.Context) (*schemas.FileOutFull, *types.AppError) { fileID := c.Param("fileID") var file []models.File @@ -158,14 +179,14 @@ func (fs *FileService) GetFileByID(c *gin.Context) (*schemas.FileOutFull, error) fs.Db.Model(&models.File{}).Where("id = ?", fileID).Find(&file) if len(file) == 0 { - return nil, errors.New("file not found") + err := errors.New("file not found") + return nil, fs.logAndReturn(getFileByIDContext, err, http.StatusNotFound) } return mapper.ToFileOutFull(file[0]), nil } func (fs *FileService) ListFiles(c *gin.Context) (*schemas.FileResponse, *types.AppError) { - userId, _ := getUserAuth(c) var ( @@ -181,15 +202,15 @@ func (fs *FileService) ListFiles(c *gin.Context) (*schemas.FileResponse, *types. fileQuery.UserID = userId if err := c.ShouldBindQuery(&pagingParams); err != nil { - return nil, &types.AppError{Error: err, Code: http.StatusBadRequest} + return nil, fs.logAndReturn(listFilesContext, err, http.StatusBadRequest) } if err := c.ShouldBindQuery(&sortingParams); err != nil { - return nil, &types.AppError{Error: err, Code: http.StatusBadRequest} + return nil, fs.logAndReturn(listFilesContext, err, http.StatusBadRequest) } if err := c.ShouldBindQuery(&fileQuery); err != nil { - return nil, &types.AppError{Error: err, Code: http.StatusBadRequest} + return nil, fs.logAndReturn(listFilesContext, err, http.StatusBadRequest) } var ( @@ -199,7 +220,7 @@ func (fs *FileService) ListFiles(c *gin.Context) (*schemas.FileResponse, *types. if fileQuery.Path != "" { pathId, err = fs.getPathId(fileQuery.Path) if err != nil { - return nil, &types.AppError{Error: err, Code: http.StatusNotFound} + return nil, fs.logAndReturn(listFilesContext, err, http.StatusNotFound) } } @@ -219,7 +240,7 @@ func (fs *FileService) ListFiles(c *gin.Context) (*schemas.FileResponse, *types. err := mapstructure.Decode(fileQuery, &filterQuery) if err != nil { - return nil, &types.AppError{Error: err, Code: http.StatusBadRequest} + return nil, fs.logAndReturn(listFilesContext, err, http.StatusBadRequest) } delete(filterQuery, "op") @@ -243,7 +264,6 @@ func (fs *FileService) ListFiles(c *gin.Context) (*schemas.FileResponse, *types. setOrderFilter(query, &pagingParams, &sortingParams) query.Order(getOrder(sortingParams)) - } var results []schemas.FileOut @@ -276,33 +296,29 @@ func (fs *FileService) getPathId(path string) (string, error) { } func (fs *FileService) MakeDirectory(c *gin.Context) (*schemas.FileOut, *types.AppError) { - var payload schemas.MkDir - var files []models.File if err := c.ShouldBindJSON(&payload); err != nil { - return nil, &types.AppError{Error: err, Code: http.StatusBadRequest} + return nil, fs.logAndReturn(makeDirectoryContext, err, http.StatusBadRequest) } userId, _ := getUserAuth(c) if err := fs.Db.Raw("select * from teldrive.create_directories(?, ?)", userId, payload.Path). Scan(&files).Error; err != nil { - return nil, &types.AppError{Error: err, Code: http.StatusInternalServerError} + return nil, fs.logAndReturn(makeDirectoryContext, err, http.StatusInternalServerError) } file := mapper.ToFileOut(files[0]) return &file, nil - } func (fs *FileService) CopyFile(c *gin.Context) (*schemas.FileOut, *types.AppError) { - var payload schemas.Copy if err := c.ShouldBindJSON(&payload); err != nil { - return nil, &types.AppError{Error: err, Code: http.StatusBadRequest} + return nil, fs.logAndReturn(copyFileContext, err, http.StatusBadRequest) } userId, session := getUserAuth(c) @@ -317,7 +333,7 @@ func (fs *FileService) CopyFile(c *gin.Context) (*schemas.FileOut, *types.AppErr newIds := models.Parts{} - err := tgc.RunWithAuth(c, client, "", func(ctx context.Context) error { + err := tgc.RunWithAuth(c, fs.log, client, "", func(ctx context.Context) error { user := strconv.FormatInt(userId, 10) messages, err := getTGMessages(c, client, file.Parts, file.ChannelID, user) if err != nil { @@ -365,13 +381,13 @@ func (fs *FileService) CopyFile(c *gin.Context) (*schemas.FileOut, *types.AppErr }) if err != nil { - return nil, &types.AppError{Error: err, Code: http.StatusBadRequest} + return nil, fs.logAndReturn(copyFileContext, err, http.StatusBadRequest) } var destRes []models.File if err := fs.Db.Raw("select * from teldrive.create_directories(?, ?)", userId, payload.Destination).Scan(&destRes).Error; err != nil { - return nil, &types.AppError{Error: err, Code: http.StatusInternalServerError} + return nil, fs.logAndReturn(copyFileContext, err, http.StatusInternalServerError) } dest := destRes[0] @@ -390,72 +406,65 @@ func (fs *FileService) CopyFile(c *gin.Context) (*schemas.FileOut, *types.AppErr dbFile.ChannelID = &file.ChannelID if err := fs.Db.Create(&dbFile).Error; err != nil { - return nil, &types.AppError{Error: err, Code: http.StatusInternalServerError} - + return nil, fs.logAndReturn(copyFileContext, err, http.StatusInternalServerError) } out := mapper.ToFileOut(dbFile) return &out, nil - } func (fs *FileService) MoveFiles(c *gin.Context) (*schemas.Message, *types.AppError) { - var payload schemas.FileOperation if err := c.ShouldBindJSON(&payload); err != nil { - return nil, &types.AppError{Error: err, Code: http.StatusBadRequest} + return nil, fs.logAndReturn(moveFilesContext, err, http.StatusBadRequest) } var destination models.File if err := fs.Db.Model(&models.File{}).Select("id").Where("path = ?", payload.Destination).First(&destination).Error; errors.Is(err, gorm.ErrRecordNotFound) { - return nil, &types.AppError{Error: err, Code: http.StatusNotFound} - + return nil, fs.logAndReturn(moveFilesContext, err, http.StatusNotFound) } if err := fs.Db.Model(&models.File{}).Where("id IN ?", payload.Files).UpdateColumn("parent_id", destination.ID).Error; err != nil { - return nil, &types.AppError{Error: err, Code: http.StatusInternalServerError} + return nil, fs.logAndReturn(moveFilesContext, err, http.StatusInternalServerError) } return &schemas.Message{Message: "files moved"}, nil } func (fs *FileService) DeleteFiles(c *gin.Context) (*schemas.Message, *types.AppError) { - var payload schemas.FileOperation if err := c.ShouldBindJSON(&payload); err != nil { - return nil, &types.AppError{Error: err, Code: http.StatusBadRequest} + return nil, fs.logAndReturn(deleteFilesContext, err, http.StatusBadRequest) } if err := fs.Db.Exec("call teldrive.delete_files($1)", payload.Files).Error; err != nil { - return nil, &types.AppError{Error: err, Code: http.StatusInternalServerError} + return nil, fs.logAndReturn(deleteFilesContext, err, http.StatusInternalServerError) } return &schemas.Message{Message: "files deleted"}, nil } func (fs *FileService) MoveDirectory(c *gin.Context) (*schemas.Message, *types.AppError) { - var payload schemas.DirMove if err := c.ShouldBindJSON(&payload); err != nil { - return nil, &types.AppError{Error: err, Code: http.StatusBadRequest} + return nil, fs.logAndReturn(moveDirectoryContext, err, http.StatusBadRequest) } userId, _ := getUserAuth(c) if err := fs.Db.Exec("select * from teldrive.move_directory(? , ? , ?)", payload.Source, payload.Destination, userId).Error; err != nil { - return nil, &types.AppError{Error: err, Code: http.StatusInternalServerError} + return nil, fs.logAndReturn(moveDirectoryContext, err, http.StatusInternalServerError) } return &schemas.Message{Message: "directory moved"}, nil } func (fs *FileService) GetFileStream(c *gin.Context) { - w := c.Writer r := c.Request @@ -464,7 +473,7 @@ func (fs *FileService) GetFileStream(c *gin.Context) { authHash := c.Query("hash") if authHash == "" { - http.Error(w, "misssing hash param", http.StatusBadRequest) + http.Error(w, "missing hash param", http.StatusBadRequest) return } @@ -480,11 +489,12 @@ func (fs *FileService) GetFileStream(c *gin.Context) { key := fmt.Sprintf("files:%s", fileID) err = cache.GetCache().Get(key, file) + var appErr *types.AppError if err != nil { - file, err = fs.GetFileByID(c) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) + file, appErr = fs.GetFileByID(c) + if appErr != nil { + http.Error(w, appErr.Error.Error(), http.StatusBadRequest) return } cache.GetCache().Set(key, file, 0) @@ -518,6 +528,7 @@ func (fs *FileService) GetFileStream(c *gin.Context) { start = ranges[0].Start end = ranges[0].End c.Header("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, file.Size)) + w.WriteHeader(http.StatusPartialContent) } @@ -546,86 +557,68 @@ func (fs *FileService) GetFileStream(c *gin.Context) { tokens, err := getBotsToken(c, session.UserId, file.ChannelID) if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) + fs.log.Error("failed to get bots", zap.Error(err)) + http.Error(w, err.Error(), http.StatusInternalServerError) return } - config := cnf.GetConfig() - var ( - token, channelUser string - lr io.ReadCloser + channelUser string + lr io.ReadCloser ) - if config.LazyStreamBots { - tgc.Workers.Set(tokens, file.ChannelID) - token = tgc.Workers.Next(file.ChannelID) - client, _ := tgc.BotLogin(c, token) - channelUser = strings.Split(token, ":")[0] - if r.Method != "HEAD" { - tgc.RunWithAuth(c, client, token, func(ctx context.Context) error { - parts, err := getParts(c, client, file, channelUser) - if err != nil { - return err - } - parts = rangedParts(parts, start, end) - if file.Encrypted { - lr, _ = reader.NewDecryptedReader(c, client, parts, contentLength) - } else { - lr, _ = reader.NewLinearReader(c, client, parts, contentLength) - } - io.CopyN(w, lr, contentLength) - return nil - }) + var client *tgc.Client + + if config.GetConfig().DisableStreamBots || len(tokens) == 0 { + tgClient, _ := tgc.UserLogin(c, session.Session) + client, err = fs.worker.UserWorker(tgClient) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return } + channelUser = strconv.FormatInt(session.UserId, 10) + + fs.log.Debug("requesting file", zap.String("name", file.Name), + zap.String("user", channelUser), zap.Int64("start", start), + zap.Int64("end", end), zap.Int64("fileSize", file.Size)) } else { + var index int + limit := min(len(tokens), config.GetConfig().BgBotsLimit) - var client *tgc.Client + fs.worker.Set(tokens[:limit], file.ChannelID) - if config.DisableStreamBots || len(tokens) == 0 { - tgClient, _ := tgc.UserLogin(c, session.Session) - client, err = tgc.StreamWorkers.UserWorker(tgClient) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - channelUser = strconv.FormatInt(session.UserId, 10) - } else { - var index int - limit := min(len(tokens), config.BgBotsLimit) - - tgc.StreamWorkers.Set(tokens[:limit], file.ChannelID) - - client, index, err = tgc.StreamWorkers.Next(file.ChannelID) - - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - channelUser = strings.Split(tokens[index], ":")[0] + client, index, err = fs.worker.Next(file.ChannelID) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return } - - if r.Method != "HEAD" { - parts, err := getParts(c, client.Tg, file, channelUser) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - parts = rangedParts(parts, start, end) - - if file.Encrypted { - lr, _ = reader.NewDecryptedReader(c, client.Tg, parts, contentLength) - } else { - lr, _ = reader.NewLinearReader(c, client.Tg, parts, contentLength) - } - - io.CopyN(w, lr, contentLength) - } + channelUser = strings.Split(tokens[index], ":")[0] + fs.log.Debug("requesting file", zap.String("name", file.Name), + zap.String("bot", channelUser), zap.Int("botNo", index), zap.Int64("start", start), + zap.Int64("end", end), zap.Int64("fileSize", file.Size)) } + if r.Method != "HEAD" { + parts, err := getParts(c, client.Tg, file, channelUser) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + parts = rangedParts(parts, start, end) + + if file.Encrypted { + lr, _ = reader.NewDecryptedReader(c, client.Tg, parts, contentLength) + } else { + lr, _ = reader.NewLinearReader(c, client.Tg, parts, contentLength) + } + + if _, err := io.CopyN(w, lr, contentLength); err != nil { + fs.log.Debug("closed file stream", zap.Error(err)) + } + } } func setOrderFilter(query *gorm.DB, pagingParams *schemas.PaginationQuery, sortingParams *schemas.SortingQuery) *gorm.DB { diff --git a/pkg/services/upload.go b/pkg/services/upload.go index 38c9c8c..418e0cf 100644 --- a/pkg/services/upload.go +++ b/pkg/services/upload.go @@ -5,17 +5,17 @@ import ( "crypto/rand" "crypto/sha256" "encoding/base64" - "errors" "net/http" "strconv" "strings" "time" - cnf "github.com/divyam234/teldrive/config" + "github.com/divyam234/teldrive/config" "github.com/divyam234/teldrive/internal/crypt" "github.com/divyam234/teldrive/internal/tgc" "github.com/divyam234/teldrive/pkg/mapper" "github.com/divyam234/teldrive/pkg/schemas" + "go.uber.org/zap" "github.com/divyam234/teldrive/pkg/types" @@ -31,11 +31,14 @@ import ( const saltLength = 32 type UploadService struct { - Db *gorm.DB + Db *gorm.DB + log *zap.Logger + worker *tgc.UploadWorker } -func NewUploadService(db *gorm.DB) *UploadService { - return &UploadService{Db: db} +func NewUploadService(db *gorm.DB, logger *zap.Logger) *UploadService { + return &UploadService{Db: db, log: logger.Named("uploads"), + worker: &tgc.UploadWorker{}} } func generateRandomSalt() (string, error) { @@ -52,14 +55,18 @@ func generateRandomSalt() (string, error) { return hashedSalt, nil } +func (us *UploadService) logAndReturn(context string, err error, errCode int) *types.AppError { + us.log.Error(context, zap.Error(err)) + return &types.AppError{Error: err, Code: errCode} +} + func (us *UploadService) GetUploadFileById(c *gin.Context) (*schemas.UploadOut, *types.AppError) { uploadId := c.Param("id") parts := []schemas.UploadPartOut{} - config := cnf.GetConfig() if err := us.Db.Model(&models.Upload{}).Order("part_no").Where("upload_id = ?", uploadId). - Where("created_at >= ?", time.Now().UTC().AddDate(0, 0, -config.UploadRetention)). + Where("created_at >= ?", time.Now().UTC().AddDate(0, 0, -config.GetConfig().UploadRetention)). Find(&parts).Error; err != nil { - return nil, &types.AppError{Error: err, Code: http.StatusInternalServerError} + return nil, us.logAndReturn("get upload", err, http.StatusInternalServerError) } return &schemas.UploadOut{Parts: parts}, nil @@ -68,7 +75,7 @@ func (us *UploadService) GetUploadFileById(c *gin.Context) (*schemas.UploadOut, func (us *UploadService) DeleteUploadFile(c *gin.Context) (*schemas.Message, *types.AppError) { uploadId := c.Param("id") if err := us.Db.Where("upload_id = ?", uploadId).Delete(&models.Upload{}).Error; err != nil { - return nil, &types.AppError{Error: err, Code: http.StatusInternalServerError} + return nil, us.logAndReturn("delete upload", err, http.StatusInternalServerError) } return &schemas.Message{Message: "upload deleted"}, nil @@ -104,13 +111,13 @@ func (us *UploadService) CreateUploadPart(c *gin.Context) (*schemas.UploadPartOu } func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *types.AppError) { - var ( uploadQuery schemas.UploadQuery channelId int64 err error client *telegram.Client token string + index int channelUser string out *schemas.UploadPartOut ) @@ -118,7 +125,7 @@ func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *ty uploadQuery.PartNo = 1 if err := c.ShouldBindQuery(&uploadQuery); err != nil { - return nil, &types.AppError{Error: err, Code: http.StatusBadRequest} + return nil, us.logAndReturn("UploadFile", err, http.StatusBadRequest) } userId, session := getUserAuth(c) @@ -129,12 +136,10 @@ func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *ty fileSize := c.Request.ContentLength - fileName := uploadQuery.Filename - if uploadQuery.ChannelID == 0 { channelId, err = GetDefaultChannel(c, userId) if err != nil { - return nil, &types.AppError{Error: err, Code: http.StatusInternalServerError} + return nil, us.logAndReturn("uploadFile", err, http.StatusInternalServerError) } } else { channelId = uploadQuery.ChannelID @@ -143,20 +148,25 @@ func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *ty tokens, err := getBotsToken(c, userId, channelId) if err != nil { - return nil, &types.AppError{Error: err, Code: http.StatusInternalServerError} + return nil, us.logAndReturn("uploadFile", err, http.StatusInternalServerError) } if len(tokens) == 0 { client, _ = tgc.UserLogin(c, session) channelUser = strconv.FormatInt(userId, 10) } else { - tgc.Workers.Set(tokens, channelId) - token = tgc.Workers.Next(channelId) + us.worker.Set(tokens, channelId) + token, index = us.worker.Next(channelId) client, _ = tgc.BotLogin(c, token) channelUser = strings.Split(token, ":")[0] } - err = tgc.RunWithAuth(c, client, token, func(ctx context.Context) error { + us.log.Debug("uploading file", zap.String("fileName", uploadQuery.FileName), + zap.String("partName", uploadQuery.PartName), + zap.String("bot", channelUser), zap.Int("botNo", index), + zap.Int("chunkNo", uploadQuery.PartNo), zap.Int64("partSize", fileSize)) + + err = tgc.RunWithAuth(c, us.log, client, token, func(ctx context.Context) error { channel, err := GetChannelById(ctx, client, channelId, channelUser) @@ -164,15 +174,13 @@ func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *ty return err } - config := cnf.GetConfig() - var salt string if uploadQuery.Encrypted { //gen random Salt salt, _ = generateRandomSalt() - cipher, _ := crypt.NewCipher(config.EncryptionKey, salt) + cipher, _ := crypt.NewCipher(config.GetConfig().EncryptionKey, salt) fileSize = crypt.EncryptedSize(fileSize) fileStream, _ = cipher.EncryptData(fileStream) } @@ -181,13 +189,13 @@ func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *ty u := uploader.NewUploader(api).WithThreads(16).WithPartSize(512 * 1024) - upload, err := u.Upload(c, uploader.NewUpload(fileName, fileStream, fileSize)) + upload, err := u.Upload(c, uploader.NewUpload(uploadQuery.PartName, fileStream, fileSize)) if err != nil { return err } - document := message.UploadedDocument(upload).Filename(fileName).ForceFile(true) + document := message.UploadedDocument(upload).Filename(uploadQuery.PartName).ForceFile(true) sender := message.NewSender(client.API()) @@ -210,15 +218,10 @@ func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *ty message = channelMsg.Message.(*tg.Message) break } - - } - - if message.ID == 0 { - return errors.New("failed to upload part") } partUpload := &models.Upload{ - Name: fileName, + Name: uploadQuery.PartName, UploadId: uploadId, PartId: message.ID, ChannelID: channelId, @@ -239,8 +242,12 @@ func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *ty }) if err != nil { - return nil, &types.AppError{Error: err, Code: http.StatusInternalServerError} + return nil, us.logAndReturn("uploadFile", err, http.StatusInternalServerError) } + us.log.Debug("upload finished", zap.String("fileName", uploadQuery.FileName), + zap.String("partName", uploadQuery.PartName), + zap.Int("chunkNo", uploadQuery.PartNo)) + return out, nil } diff --git a/pkg/services/user.go b/pkg/services/user.go index 7ad4f9e..d6f43bf 100644 --- a/pkg/services/user.go +++ b/pkg/services/user.go @@ -18,6 +18,7 @@ import ( "github.com/gotd/td/telegram/query" "github.com/gotd/td/tg" "github.com/thoas/go-funk" + "go.uber.org/zap" "github.com/gin-gonic/gin" "gorm.io/gorm" @@ -25,11 +26,12 @@ import ( ) type UserService struct { - Db *gorm.DB + Db *gorm.DB + log *zap.Logger } -func NewUserService(db *gorm.DB) *UserService { - return &UserService{Db: db} +func NewUserService(db *gorm.DB, logger *zap.Logger) *UserService { + return &UserService{Db: db, log: logger.Named("users")} } func (us *UserService) GetProfilePhoto(c *gin.Context) { @@ -42,7 +44,7 @@ func (us *UserService) GetProfilePhoto(c *gin.Context) { return } - err = tgc.RunWithAuth(c, client, "", func(ctx context.Context) error { + err = tgc.RunWithAuth(c, us.log, client, "", func(ctx context.Context) error { self, err := client.Self(c) if err != nil { return err @@ -210,7 +212,7 @@ func (us *UserService) addBots(c context.Context, client *telegram.Client, userI var wg sync.WaitGroup - err := tgc.RunWithAuth(c, client, "", func(ctx context.Context) error { + err := tgc.RunWithAuth(c, us.log, client, "", func(ctx context.Context) error { channel, err := GetChannelById(ctx, client, channelId, strconv.FormatInt(userId, 10)) if err != nil { @@ -229,7 +231,7 @@ func (us *UserService) addBots(c context.Context, client *telegram.Client, userI waitChan <- struct{}{} wg.Add(1) go func(t string) { - info, err := getBotInfo(c, t) + info, err := getBotInfo(c, us.log, t) if err != nil { return }