refactor: improve error handling in file chunk upload

This commit is contained in:
divyam234 2024-05-28 01:13:43 +05:30
parent d60947011b
commit 6dd0fd2d80
7 changed files with 45 additions and 37 deletions

View file

@ -190,7 +190,7 @@ teldrive run --help
| --tg-rate | Limiting rate | No | 100 |
| --tg-session-file | Bot session file. | No | $HOME/.teldrive/session.db |
| --tg-bg-bots-limit | Start at most this no of bots in the background to prevent connection recreation on every request.Increase this if you are streaming or downloading large no of files simultaneously. | No | 5
| --tg-uploads-threads | Concurrent Uploads threads for uploading file | No | 16 |
| --tg-uploads-threads | Concurrent Uploads threads for uploading file | No | 8 |
| --tg-uploads-retention | Uploads retention duration.Duration to keep failed uploaded chunks in db for resuming uploads. | No | 7d |
| --tg-proxy | Socks5 or HTTP proxy for telegram client. | No | "" |

View file

@ -84,10 +84,10 @@ func NewRun() *cobra.Command {
runCmd.Flags().IntVar(&config.TG.BgBotsLimit, "tg-bg-bots-limit", 5, "Background bots limit")
runCmd.Flags().BoolVar(&config.TG.DisableStreamBots, "tg-disable-stream-bots", false, "Disable stream bots")
runCmd.Flags().StringVar(&config.TG.Uploads.EncryptionKey, "tg-uploads-encryption-key", "", "Uploads encryption key")
runCmd.Flags().IntVar(&config.TG.Uploads.Threads, "tg-uploads-threads", 16, "Uploads threads")
duration.DurationVar(runCmd.Flags(), &config.TG.Uploads.Retention, "tg-uploads-retention", (24*7)*time.Hour,
"Uploads retention duration")
runCmd.Flags().IntVar(&config.TG.Uploads.Threads, "tg-uploads-threads", 8, "Uploads threads")
runCmd.Flags().IntVar(&config.TG.Uploads.MaxRetries, "tg-uploads-max-retries", 10, "Uploads Retries")
duration.DurationVar(runCmd.Flags(), &config.TG.ReconnectTimeout, "tg-reconnect-timeout", 5*time.Minute, "Reconnect Timeout")
duration.DurationVar(runCmd.Flags(), &config.TG.Uploads.Retention, "tg-uploads-retention", (24*7)*time.Hour, "Uploads retention duration")
runCmd.MarkFlagRequired("tg-app-id")
runCmd.MarkFlagRequired("tg-app-hash")
runCmd.MarkFlagRequired("db-data-source")

View file

@ -43,4 +43,4 @@
[tg.uploads]
encryption-key = ""
retention = "7d"
threads = 16
threads = 8

View file

@ -33,9 +33,11 @@ type TGConfig struct {
BgBotsLimit int
DisableStreamBots bool
Proxy string
ReconnectTimeout time.Duration
Uploads struct {
EncryptionKey string
Threads int
MaxRetries int
Retention time.Duration
}
}

View file

@ -49,5 +49,5 @@ func (r *recovery) shouldRecover(err error) bool {
_, ok := tgerr.As(err)
return !errors.Is(err, context.Canceled) && !ok
return !ok
}

View file

@ -7,12 +7,9 @@ import (
"github.com/cenkalti/backoff/v4"
"github.com/divyam234/teldrive/internal/config"
"github.com/divyam234/teldrive/internal/kv"
"github.com/divyam234/teldrive/internal/recovery"
"github.com/divyam234/teldrive/internal/retry"
"github.com/divyam234/teldrive/internal/utils"
"github.com/gotd/contrib/middleware/floodwait"
"github.com/gotd/contrib/middleware/ratelimit"
tdclock "github.com/gotd/td/clock"
"github.com/gotd/td/session"
"github.com/gotd/td/telegram"
"github.com/gotd/td/telegram/dcs"
@ -21,13 +18,10 @@ import (
"golang.org/x/time/rate"
)
func defaultMiddlewares(ctx context.Context) ([]telegram.Middleware, error) {
func defaultMiddlewares() []telegram.Middleware {
return []telegram.Middleware{
recovery.New(ctx, Backoff(tdclock.System)),
retry.New(5),
floodwait.NewSimpleWaiter(),
}, nil
}
}
func New(ctx context.Context, config *config.TGConfig, handler telegram.UpdateHandler, storage session.Storage, middlewares ...telegram.Middleware) (*telegram.Client, error) {
@ -45,6 +39,9 @@ func New(ctx context.Context, config *config.TGConfig, handler telegram.UpdateHa
Resolver: dcs.Plain(dcs.PlainOptions{
Dial: dialer,
}),
ReconnectionBackoff: func() backoff.BackOff {
return NewBackoff(config.ReconnectTimeout)
},
Device: telegram.DeviceConfig{
DeviceModel: config.DeviceModel,
SystemVersion: config.SystemVersion,
@ -54,8 +51,8 @@ func New(ctx context.Context, config *config.TGConfig, handler telegram.UpdateHa
LangCode: config.LangCode,
},
SessionStorage: storage,
RetryInterval: time.Second,
MaxRetries: 10,
RetryInterval: 5 * time.Second,
MaxRetries: -1,
DialTimeout: 10 * time.Second,
Middlewares: middlewares,
UpdateHandler: handler,
@ -65,7 +62,7 @@ func New(ctx context.Context, config *config.TGConfig, handler telegram.UpdateHa
}
func NoAuthClient(ctx context.Context, config *config.TGConfig, handler telegram.UpdateHandler, storage session.Storage) (*telegram.Client, error) {
middlewares, _ := defaultMiddlewares(ctx)
middlewares := defaultMiddlewares()
middlewares = append(middlewares, ratelimit.New(rate.Every(time.Millisecond*100), 5))
return New(ctx, config, handler, storage, middlewares...)
}
@ -85,7 +82,7 @@ func AuthClient(ctx context.Context, config *config.TGConfig, sessionStr string)
if err := loader.Save(context.TODO(), data); err != nil {
return nil, err
}
middlewares, _ := defaultMiddlewares(ctx)
middlewares := defaultMiddlewares()
middlewares = append(middlewares, ratelimit.New(rate.Every(time.Millisecond*
time.Duration(config.Rate)), config.RateBurst))
return New(ctx, config, nil, storage, middlewares...)
@ -93,7 +90,7 @@ func AuthClient(ctx context.Context, config *config.TGConfig, sessionStr string)
func BotClient(ctx context.Context, KV kv.KV, config *config.TGConfig, token string) (*telegram.Client, error) {
storage := kv.NewSession(KV, kv.Key("botsession", token))
middlewares, _ := defaultMiddlewares(ctx)
middlewares := defaultMiddlewares()
if config.RateLimit {
middlewares = append(middlewares, ratelimit.New(rate.Every(time.Millisecond*
time.Duration(config.Rate)), config.RateBurst))
@ -101,11 +98,22 @@ func BotClient(ctx context.Context, KV kv.KV, config *config.TGConfig, token str
}
return New(ctx, config, nil, storage, middlewares...)
}
func Backoff(_clock tdclock.Clock) backoff.BackOff {
b := backoff.NewExponentialBackOff()
func UploadClient(ctx context.Context, KV kv.KV, config *config.TGConfig, token string, middlewares ...telegram.Middleware) (*telegram.Client, error) {
storage := kv.NewSession(KV, kv.Key("botsession", token))
middlewares = append(middlewares, defaultMiddlewares()...)
if config.RateLimit {
middlewares = append(middlewares, ratelimit.New(rate.Every(time.Millisecond*
time.Duration(config.Rate)), config.RateBurst))
}
return New(ctx, config, nil, storage, middlewares...)
}
func NewBackoff(timeout time.Duration) backoff.BackOff {
b := backoff.NewExponentialBackOff()
b.Multiplier = 1.1
b.MaxElapsedTime = time.Duration(120) * time.Second
b.Clock = _clock
b.MaxElapsedTime = timeout
b.MaxInterval = 10 * time.Second
return b
}

View file

@ -15,6 +15,8 @@ import (
"github.com/divyam234/teldrive/internal/crypt"
"github.com/divyam234/teldrive/internal/kv"
"github.com/divyam234/teldrive/internal/recovery"
"github.com/divyam234/teldrive/internal/retry"
"github.com/divyam234/teldrive/internal/tgc"
"github.com/divyam234/teldrive/pkg/logging"
"github.com/divyam234/teldrive/pkg/mapper"
@ -145,13 +147,14 @@ func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *ty
} else {
us.worker.Set(tokens, channelId)
token, index = us.worker.Next(channelId)
client, _ = tgc.BotClient(c, us.kv, us.cnf, token)
client, _ = tgc.UploadClient(c, us.kv, us.cnf, token, recovery.New(c, tgc.NewBackoff(us.cnf.ReconnectTimeout)),
retry.New(us.cnf.Uploads.MaxRetries))
channelUser = strings.Split(token, ":")[0]
}
logger := logging.FromContext(c)
logger.Debugw("uploading file", "fileName", uploadQuery.FileName,
logger.Debugw("uploading chunk", "fileName", uploadQuery.FileName,
"partName", uploadQuery.PartName,
"bot", channelUser, "botNo", index,
"chunkNo", uploadQuery.PartNo, "partSize", fileSize)
@ -161,7 +164,6 @@ func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *ty
channel, err := GetChannelById(ctx, client, channelId, channelUser)
if err != nil {
logger.Error("error", err)
return err
}
@ -179,7 +181,7 @@ func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *ty
u := uploader.NewUploader(api).WithThreads(us.cnf.Uploads.Threads).WithPartSize(512 * 1024)
upload, err := u.Upload(c, uploader.NewUpload(uploadQuery.PartName, fileStream, fileSize))
upload, err := u.Upload(ctx, uploader.NewUpload(uploadQuery.PartName, fileStream, fileSize))
if err != nil {
return err
@ -192,12 +194,9 @@ func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *ty
target := sender.To(&tg.InputPeerChannel{ChannelID: channel.ChannelID,
AccessHash: channel.AccessHash})
res, err := target.Media(c, document)
res, err := target.Media(ctx, document)
if err != nil {
logger.Debugw("upload failed", "fileName", uploadQuery.FileName,
"partName", uploadQuery.PartName,
"chunkNo", uploadQuery.PartNo)
return err
}
@ -214,9 +213,6 @@ func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *ty
}
if message.ID == 0 {
logger.Debugw("upload failed", "fileName", uploadQuery.FileName,
"partName", uploadQuery.PartName,
"chunkNo", uploadQuery.PartNo)
return fmt.Errorf("upload failed")
}
@ -246,14 +242,16 @@ func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *ty
})
if err != nil {
logger.Debugw("upload failed", "fileName", uploadQuery.FileName,
"partName", uploadQuery.PartName,
"chunkNo", uploadQuery.PartNo)
return nil, &types.AppError{Error: err}
}
logger.Debugw("upload finished", "fileName", uploadQuery.FileName,
"partName", uploadQuery.PartName,
"chunkNo", uploadQuery.PartNo)
return out, nil
}
func generateRandomSalt() (string, error) {