mirror of
https://github.com/tgdrive/teldrive.git
synced 2025-09-03 21:14:28 +08:00
refactor: remove bg bots
This commit is contained in:
parent
2f34b8096c
commit
a1440b1d7a
11 changed files with 64 additions and 217 deletions
|
@ -64,7 +64,6 @@ func NewRun() *cobra.Command {
|
|||
duration.DurationVar(runCmd.Flags(), &config.CronJobs.CleanUploadsInterval, "cronjobs-clean-uploads-interval", 12*time.Hour, "Clean uploads interval")
|
||||
duration.DurationVar(runCmd.Flags(), &config.CronJobs.FolderSizeInterval, "cronjobs-folder-size-interval", 2*time.Hour, "Folder size update interval")
|
||||
|
||||
runCmd.Flags().StringVar(&config.Cache.Type, "cache-type", "memory", "Cache type redis or memory")
|
||||
runCmd.Flags().IntVar(&config.Cache.MaxSize, "cache-max-size", 10*1024*1024, "Max Cache max size (memory)")
|
||||
runCmd.Flags().StringVar(&config.Cache.RedisAddr, "cache-redis-addr", "localhost:6379", "Redis address")
|
||||
runCmd.Flags().StringVar(&config.Cache.RedisPass, "cache-redis-pass", "", "Redis password")
|
||||
|
@ -99,7 +98,6 @@ func NewRun() *cobra.Command {
|
|||
runCmd.Flags().StringVar(&config.TG.SystemLangCode, "tg-system-lang-code", "en-US", "System language code")
|
||||
runCmd.Flags().StringVar(&config.TG.LangPack, "tg-lang-pack", "webk", "Language pack")
|
||||
runCmd.Flags().StringVar(&config.TG.Proxy, "tg-proxy", "", "HTTP OR SOCKS5 proxy URL")
|
||||
runCmd.Flags().BoolVar(&config.TG.DisableBgBots, "tg-disable-bg-bots", false, "Disable Background bots")
|
||||
runCmd.Flags().BoolVar(&config.TG.DisableStreamBots, "tg-disable-stream-bots", false, "Disable Stream bots")
|
||||
runCmd.Flags().BoolVar(&config.TG.EnableLogging, "tg-enable-logging", false, "Enable telegram client logging")
|
||||
runCmd.Flags().StringVar(&config.TG.Uploads.EncryptionKey, "tg-uploads-encryption-key", "", "Uploads encryption key")
|
||||
|
@ -111,7 +109,6 @@ func NewRun() *cobra.Command {
|
|||
duration.DurationVar(runCmd.Flags(), &config.TG.BgBotsCheckInterval, "tg-bg-bots-check-interval", 4*time.Hour, "Interval for checking Idle background bots")
|
||||
runCmd.Flags().IntVar(&config.TG.Stream.MultiThreads, "tg-stream-multi-threads", 0, "Stream multi-threads")
|
||||
runCmd.Flags().IntVar(&config.TG.Stream.Buffers, "tg-stream-buffers", 8, "No of Stream buffers")
|
||||
runCmd.Flags().IntVar(&config.TG.Stream.BotsLimit, "tg-stream-bots-limit", 5, "Stream bots limit")
|
||||
duration.DurationVar(runCmd.Flags(), &config.TG.Stream.ChunkTimeout, "tg-stream-chunk-timeout", 20*time.Second, "Chunk Fetch Timeout")
|
||||
runCmd.MarkFlagRequired("tg-app-id")
|
||||
runCmd.MarkFlagRequired("tg-app-hash")
|
||||
|
|
12
go.mod
12
go.mod
|
@ -1,6 +1,6 @@
|
|||
module github.com/tgdrive/teldrive
|
||||
|
||||
go 1.22
|
||||
go 1.22.0
|
||||
|
||||
require (
|
||||
github.com/WinterYukky/gorm-extra-clause-plugin v0.2.1
|
||||
|
@ -12,7 +12,7 @@ require (
|
|||
github.com/go-co-op/gocron v1.37.0
|
||||
github.com/golang-jwt/jwt/v5 v5.2.1
|
||||
github.com/gotd/contrib v0.20.0
|
||||
github.com/gotd/td v0.108.0
|
||||
github.com/gotd/td v0.109.0
|
||||
github.com/iyear/connectproxy v0.1.1
|
||||
github.com/magiconair/properties v1.8.7
|
||||
github.com/mitchellh/go-homedir v1.1.0
|
||||
|
@ -46,7 +46,7 @@ require (
|
|||
github.com/hashicorp/hcl v1.0.0 // indirect
|
||||
github.com/inconshreveable/mousetrap v1.1.0 // indirect
|
||||
github.com/jackc/puddle/v2 v2.2.1 // indirect
|
||||
github.com/mattn/go-sqlite3 v1.14.22 // indirect
|
||||
github.com/mattn/go-sqlite3 v1.14.23 // indirect
|
||||
github.com/mfridman/interpolate v0.0.2 // indirect
|
||||
github.com/mitchellh/mapstructure v1.5.0 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
|
||||
|
@ -60,7 +60,7 @@ require (
|
|||
github.com/subosito/gotenv v1.6.0 // indirect
|
||||
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
|
||||
go.uber.org/dig v1.18.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 // indirect
|
||||
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect
|
||||
gopkg.in/ini.v1 v1.67.0 // indirect
|
||||
gorm.io/driver/mysql v1.5.7 // indirect
|
||||
|
||||
|
@ -79,7 +79,7 @@ require (
|
|||
github.com/go-faster/xor v1.0.0 // indirect
|
||||
github.com/go-playground/locales v0.14.1 // indirect
|
||||
github.com/go-playground/universal-translator v0.18.1 // indirect
|
||||
github.com/go-playground/validator/v10 v10.22.0 // indirect
|
||||
github.com/go-playground/validator/v10 v10.22.1 // indirect
|
||||
github.com/goccy/go-json v0.10.3 // indirect
|
||||
github.com/gorilla/websocket v1.5.3
|
||||
github.com/gotd/ige v0.2.2 // indirect
|
||||
|
@ -106,7 +106,7 @@ require (
|
|||
go.opentelemetry.io/otel/trace v1.29.0 // indirect
|
||||
go.uber.org/atomic v1.11.0 // indirect
|
||||
go.uber.org/multierr v1.11.0 // indirect
|
||||
golang.org/x/arch v0.9.0 // indirect
|
||||
golang.org/x/arch v0.10.0 // indirect
|
||||
golang.org/x/crypto v0.27.0
|
||||
golang.org/x/net v0.29.0
|
||||
golang.org/x/sync v0.8.0
|
||||
|
|
20
go.sum
20
go.sum
|
@ -71,8 +71,8 @@ github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/o
|
|||
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
|
||||
github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
|
||||
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
|
||||
github.com/go-playground/validator/v10 v10.22.0 h1:k6HsTZ0sTnROkhS//R0O+55JgM8C4Bx7ia+JlgcnOao=
|
||||
github.com/go-playground/validator/v10 v10.22.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
|
||||
github.com/go-playground/validator/v10 v10.22.1 h1:40JcKH+bBNGFczGuoBYgX4I6m/i27HYW8P9FDk5PbgA=
|
||||
github.com/go-playground/validator/v10 v10.22.1/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
|
||||
github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
|
||||
github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y=
|
||||
github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg=
|
||||
|
@ -98,8 +98,8 @@ github.com/gotd/ige v0.2.2 h1:XQ9dJZwBfDnOGSTxKXBGP4gMud3Qku2ekScRjDWWfEk=
|
|||
github.com/gotd/ige v0.2.2/go.mod h1:tuCRb+Y5Y3eNTo3ypIfNpQ4MFjrnONiL2jN2AKZXmb0=
|
||||
github.com/gotd/neo v0.1.5 h1:oj0iQfMbGClP8xI59x7fE/uHoTJD7NZH9oV1WNuPukQ=
|
||||
github.com/gotd/neo v0.1.5/go.mod h1:9A2a4bn9zL6FADufBdt7tZt+WMhvZoc5gWXihOPoiBQ=
|
||||
github.com/gotd/td v0.108.0 h1:aU4ishpnJYmkEf8hdNw8O4CRKdpT9pK2lDIIIjm+ADA=
|
||||
github.com/gotd/td v0.108.0/go.mod h1:rHtaG0hd4EY0ice4f9CVH/JxsA7ZICqkcH3aFSVZplg=
|
||||
github.com/gotd/td v0.109.0 h1:aWV9NgnlHl+4AiOGVh+KQSnlqaYpO483hUdRshkGxFA=
|
||||
github.com/gotd/td v0.109.0/go.mod h1:Fh4Y7cb3DWhTFZiHnShWOmXK3l9W1HZorfGaxxA7wuE=
|
||||
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
|
||||
|
@ -144,8 +144,8 @@ github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0V
|
|||
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
|
||||
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
||||
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
|
||||
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
|
||||
github.com/mattn/go-sqlite3 v1.14.23 h1:gbShiuAP1W5j9UOksQ06aiiqPMxYecovVGwmTxWtuw0=
|
||||
github.com/mattn/go-sqlite3 v1.14.23/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
|
||||
github.com/mfridman/interpolate v0.0.2 h1:pnuTK7MQIxxFz1Gr+rjSIx9u7qVjf5VOoM/u6BbAxPY=
|
||||
github.com/mfridman/interpolate v0.0.2/go.mod h1:p+7uk6oE07mpE/Ik1b8EckO0O4ZXiGAfshKBWLUM9Xg=
|
||||
github.com/microsoft/go-mssqldb v1.7.2 h1:CHkFJiObW7ItKTJfHo1QX7QBBD1iV+mn1eOyRP3b/PA=
|
||||
|
@ -242,12 +242,12 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
|
|||
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
|
||||
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
|
||||
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
|
||||
golang.org/x/arch v0.9.0 h1:ub9TgUInamJ8mrZIGlBG6/4TqWeMszd4N8lNorbrr6k=
|
||||
golang.org/x/arch v0.9.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys=
|
||||
golang.org/x/arch v0.10.0 h1:S3huipmSclq3PJMNe76NGwkBR504WFkQ5dhzWzP8ZW8=
|
||||
golang.org/x/arch v0.10.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys=
|
||||
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
|
||||
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70=
|
||||
golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 h1:kx6Ds3MlpiUHKj7syVnbp57++8WpuKPcR5yjLBjvLEA=
|
||||
golang.org/x/exp v0.0.0-20240823005443-9b4947da3948/go.mod h1:akd2r19cwCdwSwWeIdzYQGa/EZZyqcOdwWiwj5L5eKQ=
|
||||
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk=
|
||||
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY=
|
||||
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
|
||||
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
|
||||
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
|
|
5
internal/cache/cache.go
vendored
5
internal/cache/cache.go
vendored
|
@ -25,10 +25,9 @@ type MemoryCache struct {
|
|||
|
||||
func NewCache(ctx context.Context, conf *config.Config) Cacher {
|
||||
var cacher Cacher
|
||||
switch conf.Cache.Type {
|
||||
case "memory":
|
||||
if conf.Cache.RedisAddr == "" {
|
||||
cacher = NewMemoryCache(conf.Cache.MaxSize)
|
||||
case "redis":
|
||||
} else {
|
||||
cacher = NewRedisCache(ctx, redis.NewClient(&redis.Options{
|
||||
Addr: conf.Cache.RedisAddr,
|
||||
Password: conf.Cache.RedisPass,
|
||||
|
|
|
@ -12,7 +12,6 @@ type Config struct {
|
|||
TG TGConfig
|
||||
CronJobs CronJobConfig
|
||||
Cache struct {
|
||||
Type string
|
||||
MaxSize int
|
||||
RedisAddr string
|
||||
RedisPass string
|
||||
|
@ -47,7 +46,6 @@ type TGConfig struct {
|
|||
SystemLangCode string
|
||||
LangPack string
|
||||
SessionFile string
|
||||
DisableBgBots bool
|
||||
DisableStreamBots bool
|
||||
BgBotsCheckInterval time.Duration
|
||||
Proxy string
|
||||
|
@ -61,7 +59,6 @@ type TGConfig struct {
|
|||
Retention time.Duration
|
||||
}
|
||||
Stream struct {
|
||||
BotsLimit int
|
||||
MultiThreads int
|
||||
Buffers int
|
||||
ChunkTimeout time.Duration
|
||||
|
|
|
@ -1,141 +0,0 @@
|
|||
package reader
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/gotd/td/tg"
|
||||
"github.com/tgdrive/teldrive/internal/cache"
|
||||
"github.com/tgdrive/teldrive/internal/config"
|
||||
"github.com/tgdrive/teldrive/internal/crypt"
|
||||
"github.com/tgdrive/teldrive/internal/tgc"
|
||||
"github.com/tgdrive/teldrive/pkg/schemas"
|
||||
"github.com/tgdrive/teldrive/pkg/types"
|
||||
)
|
||||
|
||||
type DecrpytedReader struct {
|
||||
ctx context.Context
|
||||
file *schemas.FileOutFull
|
||||
parts []types.Part
|
||||
ranges []Range
|
||||
pos int
|
||||
reader io.ReadCloser
|
||||
remaining int64
|
||||
config *config.TGConfig
|
||||
worker *tgc.StreamWorker
|
||||
client *tg.Client
|
||||
concurrency int
|
||||
cache cache.Cacher
|
||||
}
|
||||
|
||||
func NewDecryptedReader(
|
||||
ctx context.Context,
|
||||
client *tg.Client,
|
||||
worker *tgc.StreamWorker,
|
||||
cache cache.Cacher,
|
||||
file *schemas.FileOutFull,
|
||||
parts []types.Part,
|
||||
start,
|
||||
end int64,
|
||||
config *config.TGConfig,
|
||||
concurrency int,
|
||||
) (*DecrpytedReader, error) {
|
||||
|
||||
r := &DecrpytedReader{
|
||||
ctx: ctx,
|
||||
parts: parts,
|
||||
file: file,
|
||||
remaining: end - start + 1,
|
||||
ranges: calculatePartByteRanges(start, end, parts[0].DecryptedSize),
|
||||
config: config,
|
||||
client: client,
|
||||
worker: worker,
|
||||
concurrency: concurrency,
|
||||
cache: cache,
|
||||
}
|
||||
if err := r.initializeReader(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return r, nil
|
||||
|
||||
}
|
||||
|
||||
func (r *DecrpytedReader) Read(p []byte) (int, error) {
|
||||
if r.remaining <= 0 {
|
||||
return 0, io.EOF
|
||||
}
|
||||
|
||||
n, err := r.reader.Read(p)
|
||||
r.remaining -= int64(n)
|
||||
|
||||
if err == io.EOF && r.remaining > 0 {
|
||||
if err := r.moveToNextPart(); err != nil {
|
||||
return n, err
|
||||
}
|
||||
err = nil
|
||||
}
|
||||
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (r *DecrpytedReader) Close() error {
|
||||
if r.reader != nil {
|
||||
err := r.reader.Close()
|
||||
r.reader = nil
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *DecrpytedReader) initializeReader() error {
|
||||
reader, err := r.getPartReader()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.reader = reader
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *DecrpytedReader) moveToNextPart() error {
|
||||
r.reader.Close()
|
||||
r.pos++
|
||||
if r.pos < len(r.ranges) {
|
||||
return r.initializeReader()
|
||||
}
|
||||
return io.EOF
|
||||
}
|
||||
|
||||
func (r *DecrpytedReader) getPartReader() (io.ReadCloser, error) {
|
||||
currentRange := r.ranges[r.pos]
|
||||
salt := r.parts[r.ranges[r.pos].PartNo].Salt
|
||||
cipher, _ := crypt.NewCipher(r.config.Uploads.EncryptionKey, salt)
|
||||
partID := r.parts[currentRange.PartNo].ID
|
||||
|
||||
chunkSrc := &chunkSource{
|
||||
channelID: r.file.ChannelID,
|
||||
partID: partID,
|
||||
client: r.client,
|
||||
concurrency: r.concurrency,
|
||||
cache: r.cache,
|
||||
key: fmt.Sprintf("files:location:%s:%d", r.file.Id, partID),
|
||||
worker: r.worker,
|
||||
}
|
||||
|
||||
return cipher.DecryptDataSeek(r.ctx,
|
||||
func(ctx context.Context,
|
||||
underlyingOffset,
|
||||
underlyingLimit int64) (io.ReadCloser, error) {
|
||||
var end int64
|
||||
|
||||
if underlyingLimit >= 0 {
|
||||
end = min(r.parts[r.ranges[r.pos].PartNo].Size-1, underlyingOffset+underlyingLimit-1)
|
||||
}
|
||||
|
||||
if r.concurrency < 2 {
|
||||
return newTGReader(r.ctx, underlyingOffset, end, chunkSrc)
|
||||
}
|
||||
return newTGMultiReader(r.ctx, underlyingOffset, end, r.config, chunkSrc)
|
||||
|
||||
}, currentRange.Start, currentRange.End-currentRange.Start+1)
|
||||
}
|
|
@ -8,7 +8,7 @@ import (
|
|||
"github.com/gotd/td/tg"
|
||||
"github.com/tgdrive/teldrive/internal/cache"
|
||||
"github.com/tgdrive/teldrive/internal/config"
|
||||
"github.com/tgdrive/teldrive/internal/tgc"
|
||||
"github.com/tgdrive/teldrive/internal/crypt"
|
||||
"github.com/tgdrive/teldrive/pkg/schemas"
|
||||
"github.com/tgdrive/teldrive/pkg/types"
|
||||
)
|
||||
|
@ -27,7 +27,6 @@ type LinearReader struct {
|
|||
reader io.ReadCloser
|
||||
remaining int64
|
||||
config *config.TGConfig
|
||||
worker *tgc.StreamWorker
|
||||
client *tg.Client
|
||||
concurrency int
|
||||
cache cache.Cacher
|
||||
|
@ -52,7 +51,6 @@ func calculatePartByteRanges(start, end, partSize int64) []Range {
|
|||
|
||||
func NewLinearReader(ctx context.Context,
|
||||
client *tg.Client,
|
||||
worker *tgc.StreamWorker,
|
||||
cache cache.Cacher,
|
||||
file *schemas.FileOutFull,
|
||||
parts []types.Part,
|
||||
|
@ -70,7 +68,6 @@ func NewLinearReader(ctx context.Context,
|
|||
ranges: calculatePartByteRanges(start, end, parts[0].Size),
|
||||
config: config,
|
||||
client: client,
|
||||
worker: worker,
|
||||
concurrency: concurrency,
|
||||
cache: cache,
|
||||
}
|
||||
|
@ -137,11 +134,40 @@ func (r *LinearReader) getPartReader() (io.ReadCloser, error) {
|
|||
concurrency: r.concurrency,
|
||||
cache: r.cache,
|
||||
key: fmt.Sprintf("files:location:%s:%d", r.file.Id, partID),
|
||||
worker: r.worker,
|
||||
}
|
||||
|
||||
if r.concurrency < 2 {
|
||||
return newTGReader(r.ctx, currentRange.Start, currentRange.End, chunkSrc)
|
||||
var (
|
||||
reader io.ReadCloser
|
||||
err error
|
||||
)
|
||||
if r.file.Encrypted {
|
||||
salt := r.parts[r.ranges[r.pos].PartNo].Salt
|
||||
cipher, _ := crypt.NewCipher(r.config.Uploads.EncryptionKey, salt)
|
||||
reader, err = cipher.DecryptDataSeek(r.ctx,
|
||||
func(ctx context.Context,
|
||||
underlyingOffset,
|
||||
underlyingLimit int64) (io.ReadCloser, error) {
|
||||
var end int64
|
||||
|
||||
if underlyingLimit >= 0 {
|
||||
end = min(r.parts[r.ranges[r.pos].PartNo].Size-1, underlyingOffset+underlyingLimit-1)
|
||||
}
|
||||
|
||||
if r.concurrency < 2 {
|
||||
return newTGReader(r.ctx, underlyingOffset, end, chunkSrc)
|
||||
}
|
||||
return newTGMultiReader(r.ctx, underlyingOffset, end, r.config, chunkSrc)
|
||||
|
||||
}, currentRange.Start, currentRange.End-currentRange.Start+1)
|
||||
|
||||
} else {
|
||||
if r.concurrency < 2 {
|
||||
reader, err = newTGReader(r.ctx, currentRange.Start, currentRange.End, chunkSrc)
|
||||
} else {
|
||||
reader, err = newTGMultiReader(r.ctx, currentRange.Start, currentRange.End, r.config, chunkSrc)
|
||||
}
|
||||
|
||||
}
|
||||
return newTGMultiReader(r.ctx, currentRange.Start, currentRange.End, r.config, chunkSrc)
|
||||
return reader, err
|
||||
|
||||
}
|
||||
|
|
|
@ -31,7 +31,6 @@ type chunkSource struct {
|
|||
client *tg.Client
|
||||
key string
|
||||
cache cache.Cacher
|
||||
worker *tgc.StreamWorker
|
||||
}
|
||||
|
||||
func (c *chunkSource) ChunkSize(start, end int64) int64 {
|
||||
|
@ -42,31 +41,19 @@ func (c *chunkSource) Chunk(ctx context.Context, offset int64, limit int64) ([]b
|
|||
var (
|
||||
location *tg.InputDocumentFileLocation
|
||||
err error
|
||||
client *tg.Client
|
||||
)
|
||||
|
||||
err = c.cache.Get(c.key, location)
|
||||
|
||||
client = c.client
|
||||
|
||||
if c.concurrency > 0 {
|
||||
tc, err := c.worker.Next(c.channelID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
client = tc.Tg.API()
|
||||
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
location, err = tgc.GetLocation(ctx, client, c.channelID, c.partID)
|
||||
location, err = tgc.GetLocation(ctx, c.client, c.channelID, c.partID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.cache.Set(c.key, location, 30*time.Minute)
|
||||
}
|
||||
|
||||
return tgc.GetChunk(ctx, client, location, offset, limit)
|
||||
return tgc.GetChunk(ctx, c.client, location, offset, limit)
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -46,7 +46,6 @@ type TestSuite struct {
|
|||
|
||||
func (suite *TestSuite) SetupTest() {
|
||||
suite.config = &config.TGConfig{Stream: struct {
|
||||
BotsLimit int
|
||||
MultiThreads int
|
||||
Buffers int
|
||||
ChunkTimeout time.Duration
|
||||
|
|
|
@ -234,7 +234,7 @@ func GetLocation(ctx context.Context, client *tg.Client, channelId int64, partId
|
|||
}
|
||||
|
||||
func CalculateChunkSize(start, end int64) int64 {
|
||||
chunkSize := int64(1024 * 1024)
|
||||
chunkSize := int64(512 * 1024)
|
||||
|
||||
for chunkSize > 1024 && chunkSize > (end-start) {
|
||||
chunkSize /= 2
|
||||
|
|
|
@ -79,7 +79,6 @@ func randInt64() (int64, error) {
|
|||
type FileService struct {
|
||||
db *gorm.DB
|
||||
cnf *config.Config
|
||||
worker *tgc.StreamWorker
|
||||
botWorker *tgc.BotWorker
|
||||
cache cache.Cacher
|
||||
kv kv.KV
|
||||
|
@ -94,7 +93,7 @@ func NewFileService(
|
|||
kv kv.KV,
|
||||
cache cache.Cacher,
|
||||
logger *zap.SugaredLogger) *FileService {
|
||||
return &FileService{db: db, cnf: cnf, worker: worker, botWorker: botWorker, cache: cache, kv: kv, logger: logger}
|
||||
return &FileService{db: db, cnf: cnf, botWorker: botWorker, cache: cache, kv: kv, logger: logger}
|
||||
}
|
||||
|
||||
func (fs *FileService) CreateFile(c *gin.Context, userId int64, fileIn *schemas.FileIn) (*schemas.FileOut, *types.AppError) {
|
||||
|
@ -464,6 +463,7 @@ func (fs *FileService) UpdateParts(c *gin.Context, id string, userId int64, payl
|
|||
fs.cache.Delete(keys...)
|
||||
|
||||
}
|
||||
fs.cache.Delete(fmt.Sprintf("files:%s", id))
|
||||
|
||||
return &schemas.Message{Message: "file updated"}, nil
|
||||
}
|
||||
|
@ -749,25 +749,18 @@ func (fs *FileService) GetFileStream(c *gin.Context, download bool) {
|
|||
}
|
||||
multiThreads = 0
|
||||
|
||||
} else if fs.cnf.TG.DisableBgBots && len(tokens) > 0 {
|
||||
} else {
|
||||
fs.botWorker.Set(tokens, file.ChannelID)
|
||||
|
||||
token, _ = fs.botWorker.Next(file.ChannelID)
|
||||
|
||||
middlewares := tgc.Middlewares(&fs.cnf.TG, 5)
|
||||
client, err = tgc.BotClient(c, fs.kv, &fs.cnf.TG, token, middlewares...)
|
||||
if err != nil {
|
||||
fs.handleError(err, w)
|
||||
}
|
||||
multiThreads = 0
|
||||
} else {
|
||||
fs.worker.Set(tokens[0:min(len(tokens), fs.cnf.TG.Stream.BotsLimit)], file.ChannelID)
|
||||
c, err := fs.worker.Next(file.ChannelID)
|
||||
if err != nil {
|
||||
fs.handleError(err, w)
|
||||
return
|
||||
}
|
||||
client = c.Tg
|
||||
}
|
||||
|
||||
if download {
|
||||
multiThreads = 0
|
||||
}
|
||||
|
@ -779,11 +772,7 @@ func (fs *FileService) GetFileStream(c *gin.Context, download bool) {
|
|||
fs.handleError(err, w)
|
||||
return nil
|
||||
}
|
||||
if file.Encrypted {
|
||||
lr, err = reader.NewDecryptedReader(c, client.API(), fs.worker, fs.cache, file, parts, start, end, &fs.cnf.TG, multiThreads)
|
||||
} else {
|
||||
lr, err = reader.NewLinearReader(c, client.API(), fs.worker, fs.cache, file, parts, start, end, &fs.cnf.TG, multiThreads)
|
||||
}
|
||||
lr, err = reader.NewLinearReader(c, client.API(), fs.cache, file, parts, start, end, &fs.cnf.TG, multiThreads)
|
||||
|
||||
if err != nil {
|
||||
fs.handleError(err, w)
|
||||
|
@ -799,15 +788,9 @@ func (fs *FileService) GetFileStream(c *gin.Context, download bool) {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
if fs.cnf.TG.DisableBgBots {
|
||||
tgc.RunWithAuth(c, client, token, func(ctx context.Context) error {
|
||||
return handleStream()
|
||||
})
|
||||
} else {
|
||||
fs.worker.IncActiveStream()
|
||||
defer fs.worker.DecActiveStreams()
|
||||
handleStream()
|
||||
}
|
||||
tgc.RunWithAuth(c, client, token, func(ctx context.Context) error {
|
||||
return handleStream()
|
||||
})
|
||||
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue