From a1440b1d7a06b2f2d8e135b14ffe3bc75ed18b52 Mon Sep 17 00:00:00 2001 From: divyam234 <47589864+divyam234@users.noreply.github.com> Date: Tue, 10 Sep 2024 13:00:47 +0530 Subject: [PATCH] refactor: remove bg bots --- cmd/run.go | 3 - go.mod | 12 +- go.sum | 20 ++-- internal/cache/cache.go | 5 +- internal/config/config.go | 3 - internal/reader/decrypted_reader.go | 141 ------------------------ internal/reader/reader.go | 42 +++++-- internal/reader/tg_multi_reader.go | 17 +-- internal/reader/tg_multi_reader_test.go | 1 - internal/tgc/helpers.go | 2 +- pkg/services/file.go | 35 ++---- 11 files changed, 64 insertions(+), 217 deletions(-) delete mode 100644 internal/reader/decrypted_reader.go diff --git a/cmd/run.go b/cmd/run.go index 16f5216..295cf74 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -64,7 +64,6 @@ func NewRun() *cobra.Command { duration.DurationVar(runCmd.Flags(), &config.CronJobs.CleanUploadsInterval, "cronjobs-clean-uploads-interval", 12*time.Hour, "Clean uploads interval") duration.DurationVar(runCmd.Flags(), &config.CronJobs.FolderSizeInterval, "cronjobs-folder-size-interval", 2*time.Hour, "Folder size update interval") - runCmd.Flags().StringVar(&config.Cache.Type, "cache-type", "memory", "Cache type redis or memory") runCmd.Flags().IntVar(&config.Cache.MaxSize, "cache-max-size", 10*1024*1024, "Max Cache max size (memory)") runCmd.Flags().StringVar(&config.Cache.RedisAddr, "cache-redis-addr", "localhost:6379", "Redis address") runCmd.Flags().StringVar(&config.Cache.RedisPass, "cache-redis-pass", "", "Redis password") @@ -99,7 +98,6 @@ func NewRun() *cobra.Command { runCmd.Flags().StringVar(&config.TG.SystemLangCode, "tg-system-lang-code", "en-US", "System language code") runCmd.Flags().StringVar(&config.TG.LangPack, "tg-lang-pack", "webk", "Language pack") runCmd.Flags().StringVar(&config.TG.Proxy, "tg-proxy", "", "HTTP OR SOCKS5 proxy URL") - runCmd.Flags().BoolVar(&config.TG.DisableBgBots, "tg-disable-bg-bots", false, "Disable Background bots") runCmd.Flags().BoolVar(&config.TG.DisableStreamBots, "tg-disable-stream-bots", false, "Disable Stream bots") runCmd.Flags().BoolVar(&config.TG.EnableLogging, "tg-enable-logging", false, "Enable telegram client logging") runCmd.Flags().StringVar(&config.TG.Uploads.EncryptionKey, "tg-uploads-encryption-key", "", "Uploads encryption key") @@ -111,7 +109,6 @@ func NewRun() *cobra.Command { duration.DurationVar(runCmd.Flags(), &config.TG.BgBotsCheckInterval, "tg-bg-bots-check-interval", 4*time.Hour, "Interval for checking Idle background bots") runCmd.Flags().IntVar(&config.TG.Stream.MultiThreads, "tg-stream-multi-threads", 0, "Stream multi-threads") runCmd.Flags().IntVar(&config.TG.Stream.Buffers, "tg-stream-buffers", 8, "No of Stream buffers") - runCmd.Flags().IntVar(&config.TG.Stream.BotsLimit, "tg-stream-bots-limit", 5, "Stream bots limit") duration.DurationVar(runCmd.Flags(), &config.TG.Stream.ChunkTimeout, "tg-stream-chunk-timeout", 20*time.Second, "Chunk Fetch Timeout") runCmd.MarkFlagRequired("tg-app-id") runCmd.MarkFlagRequired("tg-app-hash") diff --git a/go.mod b/go.mod index 2287cd7..9f5346a 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/tgdrive/teldrive -go 1.22 +go 1.22.0 require ( github.com/WinterYukky/gorm-extra-clause-plugin v0.2.1 @@ -12,7 +12,7 @@ require ( github.com/go-co-op/gocron v1.37.0 github.com/golang-jwt/jwt/v5 v5.2.1 github.com/gotd/contrib v0.20.0 - github.com/gotd/td v0.108.0 + github.com/gotd/td v0.109.0 github.com/iyear/connectproxy v0.1.1 github.com/magiconair/properties v1.8.7 github.com/mitchellh/go-homedir v1.1.0 @@ -46,7 +46,7 @@ require ( github.com/hashicorp/hcl v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect - github.com/mattn/go-sqlite3 v1.14.22 // indirect + github.com/mattn/go-sqlite3 v1.14.23 // indirect github.com/mfridman/interpolate v0.0.2 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect @@ -60,7 +60,7 @@ require ( github.com/subosito/gotenv v1.6.0 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect go.uber.org/dig v1.18.0 // indirect - golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 // indirect + golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gorm.io/driver/mysql v1.5.7 // indirect @@ -79,7 +79,7 @@ require ( github.com/go-faster/xor v1.0.0 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect - github.com/go-playground/validator/v10 v10.22.0 // indirect + github.com/go-playground/validator/v10 v10.22.1 // indirect github.com/goccy/go-json v0.10.3 // indirect github.com/gorilla/websocket v1.5.3 github.com/gotd/ige v0.2.2 // indirect @@ -106,7 +106,7 @@ require ( go.opentelemetry.io/otel/trace v1.29.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/arch v0.9.0 // indirect + golang.org/x/arch v0.10.0 // indirect golang.org/x/crypto v0.27.0 golang.org/x/net v0.29.0 golang.org/x/sync v0.8.0 diff --git a/go.sum b/go.sum index 5bf79f8..b9038bf 100644 --- a/go.sum +++ b/go.sum @@ -71,8 +71,8 @@ github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/o github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= -github.com/go-playground/validator/v10 v10.22.0 h1:k6HsTZ0sTnROkhS//R0O+55JgM8C4Bx7ia+JlgcnOao= -github.com/go-playground/validator/v10 v10.22.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM= +github.com/go-playground/validator/v10 v10.22.1 h1:40JcKH+bBNGFczGuoBYgX4I6m/i27HYW8P9FDk5PbgA= +github.com/go-playground/validator/v10 v10.22.1/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM= github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= @@ -98,8 +98,8 @@ github.com/gotd/ige v0.2.2 h1:XQ9dJZwBfDnOGSTxKXBGP4gMud3Qku2ekScRjDWWfEk= github.com/gotd/ige v0.2.2/go.mod h1:tuCRb+Y5Y3eNTo3ypIfNpQ4MFjrnONiL2jN2AKZXmb0= github.com/gotd/neo v0.1.5 h1:oj0iQfMbGClP8xI59x7fE/uHoTJD7NZH9oV1WNuPukQ= github.com/gotd/neo v0.1.5/go.mod h1:9A2a4bn9zL6FADufBdt7tZt+WMhvZoc5gWXihOPoiBQ= -github.com/gotd/td v0.108.0 h1:aU4ishpnJYmkEf8hdNw8O4CRKdpT9pK2lDIIIjm+ADA= -github.com/gotd/td v0.108.0/go.mod h1:rHtaG0hd4EY0ice4f9CVH/JxsA7ZICqkcH3aFSVZplg= +github.com/gotd/td v0.109.0 h1:aWV9NgnlHl+4AiOGVh+KQSnlqaYpO483hUdRshkGxFA= +github.com/gotd/td v0.109.0/go.mod h1:Fh4Y7cb3DWhTFZiHnShWOmXK3l9W1HZorfGaxxA7wuE= github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= @@ -144,8 +144,8 @@ github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0V github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= -github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= -github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= +github.com/mattn/go-sqlite3 v1.14.23 h1:gbShiuAP1W5j9UOksQ06aiiqPMxYecovVGwmTxWtuw0= +github.com/mattn/go-sqlite3 v1.14.23/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/mfridman/interpolate v0.0.2 h1:pnuTK7MQIxxFz1Gr+rjSIx9u7qVjf5VOoM/u6BbAxPY= github.com/mfridman/interpolate v0.0.2/go.mod h1:p+7uk6oE07mpE/Ik1b8EckO0O4ZXiGAfshKBWLUM9Xg= github.com/microsoft/go-mssqldb v1.7.2 h1:CHkFJiObW7ItKTJfHo1QX7QBBD1iV+mn1eOyRP3b/PA= @@ -242,12 +242,12 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= -golang.org/x/arch v0.9.0 h1:ub9TgUInamJ8mrZIGlBG6/4TqWeMszd4N8lNorbrr6k= -golang.org/x/arch v0.9.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= +golang.org/x/arch v0.10.0 h1:S3huipmSclq3PJMNe76NGwkBR504WFkQ5dhzWzP8ZW8= +golang.org/x/arch v0.10.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= -golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 h1:kx6Ds3MlpiUHKj7syVnbp57++8WpuKPcR5yjLBjvLEA= -golang.org/x/exp v0.0.0-20240823005443-9b4947da3948/go.mod h1:akd2r19cwCdwSwWeIdzYQGa/EZZyqcOdwWiwj5L5eKQ= +golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk= +golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY= golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/internal/cache/cache.go b/internal/cache/cache.go index 2d85893..ea06f7c 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -25,10 +25,9 @@ type MemoryCache struct { func NewCache(ctx context.Context, conf *config.Config) Cacher { var cacher Cacher - switch conf.Cache.Type { - case "memory": + if conf.Cache.RedisAddr == "" { cacher = NewMemoryCache(conf.Cache.MaxSize) - case "redis": + } else { cacher = NewRedisCache(ctx, redis.NewClient(&redis.Options{ Addr: conf.Cache.RedisAddr, Password: conf.Cache.RedisPass, diff --git a/internal/config/config.go b/internal/config/config.go index 36cde70..ba3e474 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -12,7 +12,6 @@ type Config struct { TG TGConfig CronJobs CronJobConfig Cache struct { - Type string MaxSize int RedisAddr string RedisPass string @@ -47,7 +46,6 @@ type TGConfig struct { SystemLangCode string LangPack string SessionFile string - DisableBgBots bool DisableStreamBots bool BgBotsCheckInterval time.Duration Proxy string @@ -61,7 +59,6 @@ type TGConfig struct { Retention time.Duration } Stream struct { - BotsLimit int MultiThreads int Buffers int ChunkTimeout time.Duration diff --git a/internal/reader/decrypted_reader.go b/internal/reader/decrypted_reader.go deleted file mode 100644 index 702858b..0000000 --- a/internal/reader/decrypted_reader.go +++ /dev/null @@ -1,141 +0,0 @@ -package reader - -import ( - "context" - "fmt" - "io" - - "github.com/gotd/td/tg" - "github.com/tgdrive/teldrive/internal/cache" - "github.com/tgdrive/teldrive/internal/config" - "github.com/tgdrive/teldrive/internal/crypt" - "github.com/tgdrive/teldrive/internal/tgc" - "github.com/tgdrive/teldrive/pkg/schemas" - "github.com/tgdrive/teldrive/pkg/types" -) - -type DecrpytedReader struct { - ctx context.Context - file *schemas.FileOutFull - parts []types.Part - ranges []Range - pos int - reader io.ReadCloser - remaining int64 - config *config.TGConfig - worker *tgc.StreamWorker - client *tg.Client - concurrency int - cache cache.Cacher -} - -func NewDecryptedReader( - ctx context.Context, - client *tg.Client, - worker *tgc.StreamWorker, - cache cache.Cacher, - file *schemas.FileOutFull, - parts []types.Part, - start, - end int64, - config *config.TGConfig, - concurrency int, -) (*DecrpytedReader, error) { - - r := &DecrpytedReader{ - ctx: ctx, - parts: parts, - file: file, - remaining: end - start + 1, - ranges: calculatePartByteRanges(start, end, parts[0].DecryptedSize), - config: config, - client: client, - worker: worker, - concurrency: concurrency, - cache: cache, - } - if err := r.initializeReader(); err != nil { - return nil, err - } - return r, nil - -} - -func (r *DecrpytedReader) Read(p []byte) (int, error) { - if r.remaining <= 0 { - return 0, io.EOF - } - - n, err := r.reader.Read(p) - r.remaining -= int64(n) - - if err == io.EOF && r.remaining > 0 { - if err := r.moveToNextPart(); err != nil { - return n, err - } - err = nil - } - - return n, err -} - -func (r *DecrpytedReader) Close() error { - if r.reader != nil { - err := r.reader.Close() - r.reader = nil - return err - } - return nil -} - -func (r *DecrpytedReader) initializeReader() error { - reader, err := r.getPartReader() - if err != nil { - return err - } - r.reader = reader - return nil -} - -func (r *DecrpytedReader) moveToNextPart() error { - r.reader.Close() - r.pos++ - if r.pos < len(r.ranges) { - return r.initializeReader() - } - return io.EOF -} - -func (r *DecrpytedReader) getPartReader() (io.ReadCloser, error) { - currentRange := r.ranges[r.pos] - salt := r.parts[r.ranges[r.pos].PartNo].Salt - cipher, _ := crypt.NewCipher(r.config.Uploads.EncryptionKey, salt) - partID := r.parts[currentRange.PartNo].ID - - chunkSrc := &chunkSource{ - channelID: r.file.ChannelID, - partID: partID, - client: r.client, - concurrency: r.concurrency, - cache: r.cache, - key: fmt.Sprintf("files:location:%s:%d", r.file.Id, partID), - worker: r.worker, - } - - return cipher.DecryptDataSeek(r.ctx, - func(ctx context.Context, - underlyingOffset, - underlyingLimit int64) (io.ReadCloser, error) { - var end int64 - - if underlyingLimit >= 0 { - end = min(r.parts[r.ranges[r.pos].PartNo].Size-1, underlyingOffset+underlyingLimit-1) - } - - if r.concurrency < 2 { - return newTGReader(r.ctx, underlyingOffset, end, chunkSrc) - } - return newTGMultiReader(r.ctx, underlyingOffset, end, r.config, chunkSrc) - - }, currentRange.Start, currentRange.End-currentRange.Start+1) -} diff --git a/internal/reader/reader.go b/internal/reader/reader.go index d3bd9e0..2b21f50 100644 --- a/internal/reader/reader.go +++ b/internal/reader/reader.go @@ -8,7 +8,7 @@ import ( "github.com/gotd/td/tg" "github.com/tgdrive/teldrive/internal/cache" "github.com/tgdrive/teldrive/internal/config" - "github.com/tgdrive/teldrive/internal/tgc" + "github.com/tgdrive/teldrive/internal/crypt" "github.com/tgdrive/teldrive/pkg/schemas" "github.com/tgdrive/teldrive/pkg/types" ) @@ -27,7 +27,6 @@ type LinearReader struct { reader io.ReadCloser remaining int64 config *config.TGConfig - worker *tgc.StreamWorker client *tg.Client concurrency int cache cache.Cacher @@ -52,7 +51,6 @@ func calculatePartByteRanges(start, end, partSize int64) []Range { func NewLinearReader(ctx context.Context, client *tg.Client, - worker *tgc.StreamWorker, cache cache.Cacher, file *schemas.FileOutFull, parts []types.Part, @@ -70,7 +68,6 @@ func NewLinearReader(ctx context.Context, ranges: calculatePartByteRanges(start, end, parts[0].Size), config: config, client: client, - worker: worker, concurrency: concurrency, cache: cache, } @@ -137,11 +134,40 @@ func (r *LinearReader) getPartReader() (io.ReadCloser, error) { concurrency: r.concurrency, cache: r.cache, key: fmt.Sprintf("files:location:%s:%d", r.file.Id, partID), - worker: r.worker, } - if r.concurrency < 2 { - return newTGReader(r.ctx, currentRange.Start, currentRange.End, chunkSrc) + var ( + reader io.ReadCloser + err error + ) + if r.file.Encrypted { + salt := r.parts[r.ranges[r.pos].PartNo].Salt + cipher, _ := crypt.NewCipher(r.config.Uploads.EncryptionKey, salt) + reader, err = cipher.DecryptDataSeek(r.ctx, + func(ctx context.Context, + underlyingOffset, + underlyingLimit int64) (io.ReadCloser, error) { + var end int64 + + if underlyingLimit >= 0 { + end = min(r.parts[r.ranges[r.pos].PartNo].Size-1, underlyingOffset+underlyingLimit-1) + } + + if r.concurrency < 2 { + return newTGReader(r.ctx, underlyingOffset, end, chunkSrc) + } + return newTGMultiReader(r.ctx, underlyingOffset, end, r.config, chunkSrc) + + }, currentRange.Start, currentRange.End-currentRange.Start+1) + + } else { + if r.concurrency < 2 { + reader, err = newTGReader(r.ctx, currentRange.Start, currentRange.End, chunkSrc) + } else { + reader, err = newTGMultiReader(r.ctx, currentRange.Start, currentRange.End, r.config, chunkSrc) + } + } - return newTGMultiReader(r.ctx, currentRange.Start, currentRange.End, r.config, chunkSrc) + return reader, err + } diff --git a/internal/reader/tg_multi_reader.go b/internal/reader/tg_multi_reader.go index 364620f..7856159 100644 --- a/internal/reader/tg_multi_reader.go +++ b/internal/reader/tg_multi_reader.go @@ -31,7 +31,6 @@ type chunkSource struct { client *tg.Client key string cache cache.Cacher - worker *tgc.StreamWorker } func (c *chunkSource) ChunkSize(start, end int64) int64 { @@ -42,31 +41,19 @@ func (c *chunkSource) Chunk(ctx context.Context, offset int64, limit int64) ([]b var ( location *tg.InputDocumentFileLocation err error - client *tg.Client ) err = c.cache.Get(c.key, location) - client = c.client - - if c.concurrency > 0 { - tc, err := c.worker.Next(c.channelID) - if err != nil { - return nil, err - } - client = tc.Tg.API() - - } - if err != nil { - location, err = tgc.GetLocation(ctx, client, c.channelID, c.partID) + location, err = tgc.GetLocation(ctx, c.client, c.channelID, c.partID) if err != nil { return nil, err } c.cache.Set(c.key, location, 30*time.Minute) } - return tgc.GetChunk(ctx, client, location, offset, limit) + return tgc.GetChunk(ctx, c.client, location, offset, limit) } diff --git a/internal/reader/tg_multi_reader_test.go b/internal/reader/tg_multi_reader_test.go index 9d9b603..fc1e6b3 100644 --- a/internal/reader/tg_multi_reader_test.go +++ b/internal/reader/tg_multi_reader_test.go @@ -46,7 +46,6 @@ type TestSuite struct { func (suite *TestSuite) SetupTest() { suite.config = &config.TGConfig{Stream: struct { - BotsLimit int MultiThreads int Buffers int ChunkTimeout time.Duration diff --git a/internal/tgc/helpers.go b/internal/tgc/helpers.go index d8012f6..a462753 100644 --- a/internal/tgc/helpers.go +++ b/internal/tgc/helpers.go @@ -234,7 +234,7 @@ func GetLocation(ctx context.Context, client *tg.Client, channelId int64, partId } func CalculateChunkSize(start, end int64) int64 { - chunkSize := int64(1024 * 1024) + chunkSize := int64(512 * 1024) for chunkSize > 1024 && chunkSize > (end-start) { chunkSize /= 2 diff --git a/pkg/services/file.go b/pkg/services/file.go index 7206698..b53bd86 100644 --- a/pkg/services/file.go +++ b/pkg/services/file.go @@ -79,7 +79,6 @@ func randInt64() (int64, error) { type FileService struct { db *gorm.DB cnf *config.Config - worker *tgc.StreamWorker botWorker *tgc.BotWorker cache cache.Cacher kv kv.KV @@ -94,7 +93,7 @@ func NewFileService( kv kv.KV, cache cache.Cacher, logger *zap.SugaredLogger) *FileService { - return &FileService{db: db, cnf: cnf, worker: worker, botWorker: botWorker, cache: cache, kv: kv, logger: logger} + return &FileService{db: db, cnf: cnf, botWorker: botWorker, cache: cache, kv: kv, logger: logger} } func (fs *FileService) CreateFile(c *gin.Context, userId int64, fileIn *schemas.FileIn) (*schemas.FileOut, *types.AppError) { @@ -464,6 +463,7 @@ func (fs *FileService) UpdateParts(c *gin.Context, id string, userId int64, payl fs.cache.Delete(keys...) } + fs.cache.Delete(fmt.Sprintf("files:%s", id)) return &schemas.Message{Message: "file updated"}, nil } @@ -749,25 +749,18 @@ func (fs *FileService) GetFileStream(c *gin.Context, download bool) { } multiThreads = 0 - } else if fs.cnf.TG.DisableBgBots && len(tokens) > 0 { + } else { fs.botWorker.Set(tokens, file.ChannelID) + token, _ = fs.botWorker.Next(file.ChannelID) + middlewares := tgc.Middlewares(&fs.cnf.TG, 5) client, err = tgc.BotClient(c, fs.kv, &fs.cnf.TG, token, middlewares...) - if err != nil { - fs.handleError(err, w) - } - multiThreads = 0 - } else { - fs.worker.Set(tokens[0:min(len(tokens), fs.cnf.TG.Stream.BotsLimit)], file.ChannelID) - c, err := fs.worker.Next(file.ChannelID) if err != nil { fs.handleError(err, w) return } - client = c.Tg } - if download { multiThreads = 0 } @@ -779,11 +772,7 @@ func (fs *FileService) GetFileStream(c *gin.Context, download bool) { fs.handleError(err, w) return nil } - if file.Encrypted { - lr, err = reader.NewDecryptedReader(c, client.API(), fs.worker, fs.cache, file, parts, start, end, &fs.cnf.TG, multiThreads) - } else { - lr, err = reader.NewLinearReader(c, client.API(), fs.worker, fs.cache, file, parts, start, end, &fs.cnf.TG, multiThreads) - } + lr, err = reader.NewLinearReader(c, client.API(), fs.cache, file, parts, start, end, &fs.cnf.TG, multiThreads) if err != nil { fs.handleError(err, w) @@ -799,15 +788,9 @@ func (fs *FileService) GetFileStream(c *gin.Context, download bool) { } return nil } - if fs.cnf.TG.DisableBgBots { - tgc.RunWithAuth(c, client, token, func(ctx context.Context) error { - return handleStream() - }) - } else { - fs.worker.IncActiveStream() - defer fs.worker.DecActiveStreams() - handleStream() - } + tgc.RunWithAuth(c, client, token, func(ctx context.Context) error { + return handleStream() + }) } }