diff --git a/README.md b/README.md index a4c4552..a709db2 100644 --- a/README.md +++ b/README.md @@ -190,7 +190,7 @@ teldrive run --help | --tg-rate | Limiting rate | No | 100 | | --tg-session-file | Bot session file. | No | $HOME/.teldrive/session.db | | --tg-bg-bots-limit | Start at most this no of bots in the background to prevent connection recreation on every request.Increase this if you are streaming or downloading large no of files simultaneously. | No | 5 -| --tg-uploads-threads | Concurrent Uploads threads for uploading file | No | 16 | +| --tg-uploads-threads | Concurrent Uploads threads for uploading file | No | 8 | | --tg-uploads-retention | Uploads retention duration.Duration to keep failed uploaded chunks in db for resuming uploads. | No | 7d | | --tg-proxy | Socks5 or HTTP proxy for telegram client. | No | "" | diff --git a/cmd/run.go b/cmd/run.go index e8b86e2..6f66582 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -84,10 +84,10 @@ func NewRun() *cobra.Command { runCmd.Flags().IntVar(&config.TG.BgBotsLimit, "tg-bg-bots-limit", 5, "Background bots limit") runCmd.Flags().BoolVar(&config.TG.DisableStreamBots, "tg-disable-stream-bots", false, "Disable stream bots") runCmd.Flags().StringVar(&config.TG.Uploads.EncryptionKey, "tg-uploads-encryption-key", "", "Uploads encryption key") - runCmd.Flags().IntVar(&config.TG.Uploads.Threads, "tg-uploads-threads", 16, "Uploads threads") - duration.DurationVar(runCmd.Flags(), &config.TG.Uploads.Retention, "tg-uploads-retention", (24*7)*time.Hour, - "Uploads retention duration") - + runCmd.Flags().IntVar(&config.TG.Uploads.Threads, "tg-uploads-threads", 8, "Uploads threads") + runCmd.Flags().IntVar(&config.TG.Uploads.MaxRetries, "tg-uploads-max-retries", 10, "Uploads Retries") + duration.DurationVar(runCmd.Flags(), &config.TG.ReconnectTimeout, "tg-reconnect-timeout", 5*time.Minute, "Reconnect Timeout") + duration.DurationVar(runCmd.Flags(), &config.TG.Uploads.Retention, "tg-uploads-retention", (24*7)*time.Hour, "Uploads retention duration") runCmd.MarkFlagRequired("tg-app-id") runCmd.MarkFlagRequired("tg-app-hash") runCmd.MarkFlagRequired("db-data-source") diff --git a/config.sample.toml b/config.sample.toml index 65a2528..23ce29f 100644 --- a/config.sample.toml +++ b/config.sample.toml @@ -43,4 +43,4 @@ [tg.uploads] encryption-key = "" retention = "7d" - threads = 16 + threads = 8 diff --git a/internal/config/config.go b/internal/config/config.go index b6df1a9..002c752 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -33,9 +33,11 @@ type TGConfig struct { BgBotsLimit int DisableStreamBots bool Proxy string + ReconnectTimeout time.Duration Uploads struct { EncryptionKey string Threads int + MaxRetries int Retention time.Duration } } diff --git a/internal/recovery/recovery.go b/internal/recovery/recovery.go index e312fa1..f193980 100644 --- a/internal/recovery/recovery.go +++ b/internal/recovery/recovery.go @@ -49,5 +49,5 @@ func (r *recovery) shouldRecover(err error) bool { _, ok := tgerr.As(err) - return !errors.Is(err, context.Canceled) && !ok + return !ok } diff --git a/internal/tgc/tgc.go b/internal/tgc/tgc.go index 9e63089..02ac1ff 100644 --- a/internal/tgc/tgc.go +++ b/internal/tgc/tgc.go @@ -7,12 +7,9 @@ import ( "github.com/cenkalti/backoff/v4" "github.com/divyam234/teldrive/internal/config" "github.com/divyam234/teldrive/internal/kv" - "github.com/divyam234/teldrive/internal/recovery" - "github.com/divyam234/teldrive/internal/retry" "github.com/divyam234/teldrive/internal/utils" "github.com/gotd/contrib/middleware/floodwait" "github.com/gotd/contrib/middleware/ratelimit" - tdclock "github.com/gotd/td/clock" "github.com/gotd/td/session" "github.com/gotd/td/telegram" "github.com/gotd/td/telegram/dcs" @@ -21,13 +18,10 @@ import ( "golang.org/x/time/rate" ) -func defaultMiddlewares(ctx context.Context) ([]telegram.Middleware, error) { - +func defaultMiddlewares() []telegram.Middleware { return []telegram.Middleware{ - recovery.New(ctx, Backoff(tdclock.System)), - retry.New(5), floodwait.NewSimpleWaiter(), - }, nil + } } func New(ctx context.Context, config *config.TGConfig, handler telegram.UpdateHandler, storage session.Storage, middlewares ...telegram.Middleware) (*telegram.Client, error) { @@ -45,6 +39,9 @@ func New(ctx context.Context, config *config.TGConfig, handler telegram.UpdateHa Resolver: dcs.Plain(dcs.PlainOptions{ Dial: dialer, }), + ReconnectionBackoff: func() backoff.BackOff { + return NewBackoff(config.ReconnectTimeout) + }, Device: telegram.DeviceConfig{ DeviceModel: config.DeviceModel, SystemVersion: config.SystemVersion, @@ -54,8 +51,8 @@ func New(ctx context.Context, config *config.TGConfig, handler telegram.UpdateHa LangCode: config.LangCode, }, SessionStorage: storage, - RetryInterval: time.Second, - MaxRetries: 10, + RetryInterval: 5 * time.Second, + MaxRetries: -1, DialTimeout: 10 * time.Second, Middlewares: middlewares, UpdateHandler: handler, @@ -65,7 +62,7 @@ func New(ctx context.Context, config *config.TGConfig, handler telegram.UpdateHa } func NoAuthClient(ctx context.Context, config *config.TGConfig, handler telegram.UpdateHandler, storage session.Storage) (*telegram.Client, error) { - middlewares, _ := defaultMiddlewares(ctx) + middlewares := defaultMiddlewares() middlewares = append(middlewares, ratelimit.New(rate.Every(time.Millisecond*100), 5)) return New(ctx, config, handler, storage, middlewares...) } @@ -85,7 +82,7 @@ func AuthClient(ctx context.Context, config *config.TGConfig, sessionStr string) if err := loader.Save(context.TODO(), data); err != nil { return nil, err } - middlewares, _ := defaultMiddlewares(ctx) + middlewares := defaultMiddlewares() middlewares = append(middlewares, ratelimit.New(rate.Every(time.Millisecond* time.Duration(config.Rate)), config.RateBurst)) return New(ctx, config, nil, storage, middlewares...) @@ -93,7 +90,7 @@ func AuthClient(ctx context.Context, config *config.TGConfig, sessionStr string) func BotClient(ctx context.Context, KV kv.KV, config *config.TGConfig, token string) (*telegram.Client, error) { storage := kv.NewSession(KV, kv.Key("botsession", token)) - middlewares, _ := defaultMiddlewares(ctx) + middlewares := defaultMiddlewares() if config.RateLimit { middlewares = append(middlewares, ratelimit.New(rate.Every(time.Millisecond* time.Duration(config.Rate)), config.RateBurst)) @@ -101,11 +98,22 @@ func BotClient(ctx context.Context, KV kv.KV, config *config.TGConfig, token str } return New(ctx, config, nil, storage, middlewares...) } -func Backoff(_clock tdclock.Clock) backoff.BackOff { - b := backoff.NewExponentialBackOff() +func UploadClient(ctx context.Context, KV kv.KV, config *config.TGConfig, token string, middlewares ...telegram.Middleware) (*telegram.Client, error) { + storage := kv.NewSession(KV, kv.Key("botsession", token)) + middlewares = append(middlewares, defaultMiddlewares()...) + if config.RateLimit { + middlewares = append(middlewares, ratelimit.New(rate.Every(time.Millisecond* + time.Duration(config.Rate)), config.RateBurst)) + + } + return New(ctx, config, nil, storage, middlewares...) +} + +func NewBackoff(timeout time.Duration) backoff.BackOff { + b := backoff.NewExponentialBackOff() b.Multiplier = 1.1 - b.MaxElapsedTime = time.Duration(120) * time.Second - b.Clock = _clock + b.MaxElapsedTime = timeout + b.MaxInterval = 10 * time.Second return b } diff --git a/pkg/services/upload.go b/pkg/services/upload.go index 81e6efe..9d60658 100644 --- a/pkg/services/upload.go +++ b/pkg/services/upload.go @@ -15,6 +15,8 @@ import ( "github.com/divyam234/teldrive/internal/crypt" "github.com/divyam234/teldrive/internal/kv" + "github.com/divyam234/teldrive/internal/recovery" + "github.com/divyam234/teldrive/internal/retry" "github.com/divyam234/teldrive/internal/tgc" "github.com/divyam234/teldrive/pkg/logging" "github.com/divyam234/teldrive/pkg/mapper" @@ -145,13 +147,14 @@ func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *ty } else { us.worker.Set(tokens, channelId) token, index = us.worker.Next(channelId) - client, _ = tgc.BotClient(c, us.kv, us.cnf, token) + client, _ = tgc.UploadClient(c, us.kv, us.cnf, token, recovery.New(c, tgc.NewBackoff(us.cnf.ReconnectTimeout)), + retry.New(us.cnf.Uploads.MaxRetries)) channelUser = strings.Split(token, ":")[0] } logger := logging.FromContext(c) - logger.Debugw("uploading file", "fileName", uploadQuery.FileName, + logger.Debugw("uploading chunk", "fileName", uploadQuery.FileName, "partName", uploadQuery.PartName, "bot", channelUser, "botNo", index, "chunkNo", uploadQuery.PartNo, "partSize", fileSize) @@ -161,7 +164,6 @@ func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *ty channel, err := GetChannelById(ctx, client, channelId, channelUser) if err != nil { - logger.Error("error", err) return err } @@ -179,7 +181,7 @@ func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *ty u := uploader.NewUploader(api).WithThreads(us.cnf.Uploads.Threads).WithPartSize(512 * 1024) - upload, err := u.Upload(c, uploader.NewUpload(uploadQuery.PartName, fileStream, fileSize)) + upload, err := u.Upload(ctx, uploader.NewUpload(uploadQuery.PartName, fileStream, fileSize)) if err != nil { return err @@ -192,12 +194,9 @@ func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *ty target := sender.To(&tg.InputPeerChannel{ChannelID: channel.ChannelID, AccessHash: channel.AccessHash}) - res, err := target.Media(c, document) + res, err := target.Media(ctx, document) if err != nil { - logger.Debugw("upload failed", "fileName", uploadQuery.FileName, - "partName", uploadQuery.PartName, - "chunkNo", uploadQuery.PartNo) return err } @@ -214,9 +213,6 @@ func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *ty } if message.ID == 0 { - logger.Debugw("upload failed", "fileName", uploadQuery.FileName, - "partName", uploadQuery.PartName, - "chunkNo", uploadQuery.PartNo) return fmt.Errorf("upload failed") } @@ -246,14 +242,16 @@ func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *ty }) if err != nil { + logger.Debugw("upload failed", "fileName", uploadQuery.FileName, + "partName", uploadQuery.PartName, + "chunkNo", uploadQuery.PartNo) return nil, &types.AppError{Error: err} } - logger.Debugw("upload finished", "fileName", uploadQuery.FileName, "partName", uploadQuery.PartName, "chunkNo", uploadQuery.PartNo) - return out, nil + } func generateRandomSalt() (string, error) {