feat: added dc pooler and support 0 length file size

This commit is contained in:
divyam234 2024-06-02 23:11:18 +05:30
parent 1c6c1c8ea5
commit 943a5012b9
17 changed files with 181 additions and 49 deletions

View file

@ -16,12 +16,12 @@ import (
"github.com/divyam234/teldrive/internal/database"
"github.com/divyam234/teldrive/internal/duration"
"github.com/divyam234/teldrive/internal/kv"
"github.com/divyam234/teldrive/internal/logging"
"github.com/divyam234/teldrive/internal/middleware"
"github.com/divyam234/teldrive/internal/tgc"
"github.com/divyam234/teldrive/internal/utils"
"github.com/divyam234/teldrive/pkg/controller"
"github.com/divyam234/teldrive/pkg/cron"
"github.com/divyam234/teldrive/pkg/logging"
"github.com/divyam234/teldrive/pkg/services"
"github.com/gin-contrib/gzip"
ginzap "github.com/gin-contrib/zap"
@ -86,6 +86,7 @@ func NewRun() *cobra.Command {
runCmd.Flags().StringVar(&config.TG.Uploads.EncryptionKey, "tg-uploads-encryption-key", "", "Uploads encryption key")
runCmd.Flags().IntVar(&config.TG.Uploads.Threads, "tg-uploads-threads", 8, "Uploads threads")
runCmd.Flags().IntVar(&config.TG.Uploads.MaxRetries, "tg-uploads-max-retries", 10, "Uploads Retries")
runCmd.Flags().Int64Var(&config.TG.PoolSize, "tg-pool-size", 8, "Telegram Session pool size")
duration.DurationVar(runCmd.Flags(), &config.TG.ReconnectTimeout, "tg-reconnect-timeout", 5*time.Minute, "Reconnect Timeout")
duration.DurationVar(runCmd.Flags(), &config.TG.Uploads.Retention, "tg-uploads-retention", (24*7)*time.Hour, "Uploads retention duration")
runCmd.MarkFlagRequired("tg-app-id")

View file

@ -34,6 +34,7 @@ type TGConfig struct {
DisableStreamBots bool
Proxy string
ReconnectTimeout time.Duration
PoolSize int64
Uploads struct {
EncryptionKey string
Threads int

View file

@ -4,7 +4,7 @@ import (
"time"
"github.com/divyam234/teldrive/internal/config"
"github.com/divyam234/teldrive/pkg/logging"
"github.com/divyam234/teldrive/internal/logging"
extraClausePlugin "github.com/WinterYukky/gorm-extra-clause-plugin"
"go.uber.org/zap/zapcore"

View file

@ -6,7 +6,7 @@ import (
"fmt"
"time"
"github.com/divyam234/teldrive/pkg/logging"
"github.com/divyam234/teldrive/internal/logging"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

96
internal/pool/pool.go Normal file
View file

@ -0,0 +1,96 @@
// implementation taken from iyear/tdl
package pool
import (
"context"
"sync"
"github.com/divyam234/teldrive/internal/logging"
"github.com/gotd/td/telegram"
"github.com/gotd/td/tg"
"go.uber.org/multierr"
"go.uber.org/zap"
)
type Pool interface {
Client(ctx context.Context, dc int) *tg.Client
Default(ctx context.Context) *tg.Client
Close() error
}
type pool struct {
api *telegram.Client
size int64
mu *sync.Mutex
middlewares []telegram.Middleware
invoke tg.Invoker
close func() error
}
func chainMiddlewares(invoker tg.Invoker, chain ...telegram.Middleware) tg.Invoker {
if len(chain) == 0 {
return invoker
}
for i := len(chain) - 1; i >= 0; i-- {
invoker = chain[i].Handle(invoker)
}
return invoker
}
func NewPool(c *telegram.Client, size int64, middlewares ...telegram.Middleware) Pool {
return &pool{
api: c,
size: size,
mu: &sync.Mutex{},
middlewares: middlewares,
}
}
func (p *pool) current() int {
return p.api.Config().ThisDC
}
func (p *pool) Client(ctx context.Context, dc int) *tg.Client {
return tg.NewClient(p.invoker(ctx, dc))
}
func (p *pool) invoker(ctx context.Context, dc int) tg.Invoker {
p.mu.Lock()
defer p.mu.Unlock()
if p.invoke != nil {
return p.invoke
}
var (
invoker telegram.CloseInvoker
err error
)
if dc == p.current() {
invoker, err = p.api.Pool(p.size)
} else {
invoker, err = p.api.DC(ctx, dc, p.size)
}
if err != nil {
logging.FromContext(ctx).Error("create invoker", zap.Error(err))
return p.api
}
p.close = invoker.Close
p.invoke = chainMiddlewares(invoker, p.middlewares...)
return p.invoke
}
func (p *pool) Default(ctx context.Context) *tg.Client {
return p.Client(ctx, p.current())
}
func (p *pool) Close() (err error) {
err = multierr.Append(err, p.close())
return err
}

View file

@ -3,7 +3,7 @@ package tgc
import (
"context"
"github.com/divyam234/teldrive/pkg/logging"
"github.com/divyam234/teldrive/internal/logging"
"github.com/gotd/td/telegram"
"github.com/pkg/errors"
"go.uber.org/zap"

View file

@ -7,6 +7,8 @@ import (
"github.com/cenkalti/backoff/v4"
"github.com/divyam234/teldrive/internal/config"
"github.com/divyam234/teldrive/internal/kv"
"github.com/divyam234/teldrive/internal/recovery"
"github.com/divyam234/teldrive/internal/retry"
"github.com/divyam234/teldrive/internal/utils"
"github.com/gotd/contrib/middleware/floodwait"
"github.com/gotd/contrib/middleware/ratelimit"
@ -18,12 +20,6 @@ import (
"golang.org/x/time/rate"
)
func defaultMiddlewares() []telegram.Middleware {
return []telegram.Middleware{
floodwait.NewSimpleWaiter(),
}
}
func New(ctx context.Context, config *config.TGConfig, handler telegram.UpdateHandler, storage session.Storage, middlewares ...telegram.Middleware) (*telegram.Client, error) {
var dialer dcs.DialFunc = proxy.Direct.DialContext
@ -40,7 +36,7 @@ func New(ctx context.Context, config *config.TGConfig, handler telegram.UpdateHa
Dial: dialer,
}),
ReconnectionBackoff: func() backoff.BackOff {
return NewBackoff(config.ReconnectTimeout)
return newBackoff(config.ReconnectTimeout)
},
Device: telegram.DeviceConfig{
DeviceModel: config.DeviceModel,
@ -52,7 +48,7 @@ func New(ctx context.Context, config *config.TGConfig, handler telegram.UpdateHa
},
SessionStorage: storage,
RetryInterval: 5 * time.Second,
MaxRetries: -1,
MaxRetries: 5,
DialTimeout: 10 * time.Second,
Middlewares: middlewares,
UpdateHandler: handler,
@ -62,7 +58,9 @@ func New(ctx context.Context, config *config.TGConfig, handler telegram.UpdateHa
}
func NoAuthClient(ctx context.Context, config *config.TGConfig, handler telegram.UpdateHandler, storage session.Storage) (*telegram.Client, error) {
middlewares := defaultMiddlewares()
middlewares := []telegram.Middleware{
floodwait.NewSimpleWaiter(),
}
middlewares = append(middlewares, ratelimit.New(rate.Every(time.Millisecond*100), 5))
return New(ctx, config, handler, storage, middlewares...)
}
@ -82,35 +80,47 @@ func AuthClient(ctx context.Context, config *config.TGConfig, sessionStr string)
if err := loader.Save(context.TODO(), data); err != nil {
return nil, err
}
middlewares := defaultMiddlewares()
middlewares := []telegram.Middleware{
floodwait.NewSimpleWaiter(),
}
middlewares = append(middlewares, ratelimit.New(rate.Every(time.Millisecond*
time.Duration(config.Rate)), config.RateBurst))
return New(ctx, config, nil, storage, middlewares...)
}
func BotClient(ctx context.Context, KV kv.KV, config *config.TGConfig, token string) (*telegram.Client, error) {
func BotClient(ctx context.Context, KV kv.KV, config *config.TGConfig, token string, retries int, passMiddleware bool) (*telegram.Client, []telegram.Middleware, error) {
storage := kv.NewSession(KV, kv.Key("botsession", token))
middlewares := defaultMiddlewares()
middlewares := []telegram.Middleware{
floodwait.NewSimpleWaiter(),
recovery.New(ctx, newBackoff(config.ReconnectTimeout)),
retry.New(retries),
}
if config.RateLimit {
middlewares = append(middlewares, ratelimit.New(rate.Every(time.Millisecond*
time.Duration(config.Rate)), config.RateBurst))
}
return New(ctx, config, nil, storage, middlewares...)
if passMiddleware {
client, err := New(ctx, config, nil, storage, middlewares...)
if err != nil {
return nil, nil, err
}
return client, nil, nil
} else {
client, err := New(ctx, config, nil, storage)
if err != nil {
return nil, nil, err
}
return client, middlewares, nil
}
}
func UploadClient(ctx context.Context, KV kv.KV, config *config.TGConfig, token string, middlewares ...telegram.Middleware) (*telegram.Client, error) {
storage := kv.NewSession(KV, kv.Key("botsession", token))
middlewares = append(middlewares, defaultMiddlewares()...)
if config.RateLimit {
middlewares = append(middlewares, ratelimit.New(rate.Every(time.Millisecond*
time.Duration(config.Rate)), config.RateBurst))
}
return New(ctx, config, nil, storage, middlewares...)
}
func NewBackoff(timeout time.Duration) backoff.BackOff {
func newBackoff(timeout time.Duration) backoff.BackOff {
b := backoff.NewExponentialBackOff()
b.Multiplier = 1.1
b.MaxElapsedTime = timeout

View file

@ -65,7 +65,7 @@ func (w *StreamWorker) Set(bots []string, channelId int64) {
w.currIdx = make(map[int64]int)
w.bots[channelId] = bots
for _, token := range bots {
client, _ := BotClient(w.ctx, w.kv, w.cnf, token)
client, _, _ := BotClient(w.ctx, w.kv, w.cnf, token, 5, true)
w.clients[channelId] = append(w.clients[channelId], &Client{Tg: client, Status: "idle"})
}
w.currIdx[channelId] = 0

View file

@ -4,8 +4,8 @@ import (
"net/http"
"github.com/divyam234/teldrive/internal/cache"
"github.com/divyam234/teldrive/internal/logging"
"github.com/divyam234/teldrive/pkg/httputil"
"github.com/divyam234/teldrive/pkg/logging"
"github.com/divyam234/teldrive/pkg/schemas"
"github.com/divyam234/teldrive/pkg/services"
"github.com/gin-gonic/gin"

View file

@ -7,7 +7,7 @@ import (
"time"
"github.com/divyam234/teldrive/internal/config"
"github.com/divyam234/teldrive/pkg/logging"
"github.com/divyam234/teldrive/internal/logging"
"github.com/divyam234/teldrive/pkg/models"
"github.com/divyam234/teldrive/pkg/services"
"github.com/go-co-op/gocron"

View file

@ -1,7 +1,7 @@
package httputil
import (
"github.com/divyam234/teldrive/pkg/logging"
"github.com/divyam234/teldrive/internal/logging"
"github.com/gin-gonic/gin"
)

View file

@ -111,7 +111,7 @@ func GetUserAuth(c *gin.Context) (int64, string) {
}
func getBotInfo(ctx context.Context, KV kv.KV, config *config.TGConfig, token string) (*types.BotInfo, error) {
client, _ := tgc.BotClient(ctx, KV, config, token)
client, _, _ := tgc.BotClient(ctx, KV, config, token, 5, true)
var user *tg.User
err := tgc.RunWithAuth(ctx, client, token, func(ctx context.Context) error {
user, _ = client.Self(ctx)

View file

@ -17,11 +17,11 @@ import (
"github.com/divyam234/teldrive/internal/config"
"github.com/divyam234/teldrive/internal/database"
"github.com/divyam234/teldrive/internal/http_range"
"github.com/divyam234/teldrive/internal/logging"
"github.com/divyam234/teldrive/internal/md5"
"github.com/divyam234/teldrive/internal/reader"
"github.com/divyam234/teldrive/internal/tgc"
"github.com/divyam234/teldrive/internal/utils"
"github.com/divyam234/teldrive/pkg/logging"
"github.com/divyam234/teldrive/pkg/mapper"
"github.com/divyam234/teldrive/pkg/models"
"github.com/divyam234/teldrive/pkg/schemas"
@ -552,6 +552,21 @@ func (fs *FileService) GetFileStream(c *gin.Context) {
rangeHeader := r.Header.Get("Range")
if file.Size == 0 {
c.Header("Content-Type", file.MimeType)
c.Header("Content-Length", "0")
if rangeHeader != "" {
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", file.Size))
http.Error(w, "Requested Range Not Satisfiable", http.StatusRequestedRangeNotSatisfiable)
return
}
c.Header("Content-Disposition", mime.FormatMediaType("inline", map[string]string{"filename": file.Name}))
w.WriteHeader(http.StatusOK)
return
}
if rangeHeader == "" {
start = 0
end = file.Size - 1

View file

@ -15,10 +15,9 @@ import (
"github.com/divyam234/teldrive/internal/crypt"
"github.com/divyam234/teldrive/internal/kv"
"github.com/divyam234/teldrive/internal/recovery"
"github.com/divyam234/teldrive/internal/retry"
"github.com/divyam234/teldrive/internal/logging"
"github.com/divyam234/teldrive/internal/pool"
"github.com/divyam234/teldrive/internal/tgc"
"github.com/divyam234/teldrive/pkg/logging"
"github.com/divyam234/teldrive/pkg/mapper"
"github.com/divyam234/teldrive/pkg/schemas"
@ -101,6 +100,7 @@ func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *ty
channelId int64
err error
client *telegram.Client
middlewares []telegram.Middleware
token string
index int
channelUser string
@ -124,7 +124,7 @@ func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *ty
fileSize := c.Request.ContentLength
defer c.Request.Body.Close()
defer fileStream.Close()
if uploadQuery.ChannelID == 0 {
channelId, err = GetDefaultChannel(c, us.db, userId)
@ -142,16 +142,26 @@ func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *ty
}
if len(tokens) == 0 {
client, _ = tgc.AuthClient(c, us.cnf, session)
client, err = tgc.AuthClient(c, us.cnf, session)
if err != nil {
return nil, &types.AppError{Error: err}
}
channelUser = strconv.FormatInt(userId, 10)
} else {
us.worker.Set(tokens, channelId)
token, index = us.worker.Next(channelId)
client, _ = tgc.UploadClient(c, us.kv, us.cnf, token, recovery.New(c, tgc.NewBackoff(us.cnf.ReconnectTimeout)),
retry.New(us.cnf.Uploads.MaxRetries))
client, middlewares, err = tgc.BotClient(c, us.kv, us.cnf, token, us.cnf.Uploads.MaxRetries, false)
if err != nil {
return nil, &types.AppError{Error: err}
}
channelUser = strings.Split(token, ":")[0]
}
uploadPool := pool.NewPool(client, int64(us.cnf.PoolSize), middlewares...)
logger := logging.FromContext(c)
logger.Debugw("uploading chunk", "fileName", uploadQuery.FileName,
@ -177,9 +187,9 @@ func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *ty
fileStream, _ = cipher.EncryptData(fileStream)
}
api := client.API()
client := uploadPool.Default(ctx)
u := uploader.NewUploader(api).WithThreads(us.cnf.Uploads.Threads).WithPartSize(512 * 1024)
u := uploader.NewUploader(client).WithThreads(us.cnf.Uploads.Threads).WithPartSize(512 * 1024)
upload, err := u.Upload(ctx, uploader.NewUpload(uploadQuery.PartName, fileStream, fileSize))
@ -189,7 +199,7 @@ func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *ty
document := message.UploadedDocument(upload).Filename(uploadQuery.PartName).ForceFile(true)
sender := message.NewSender(client.API())
sender := message.NewSender(client)
target := sender.To(&tg.InputPeerChannel{ChannelID: channel.ChannelID,
AccessHash: channel.AccessHash})
@ -229,9 +239,8 @@ func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *ty
}
if err := us.db.Create(partUpload).Error; err != nil {
//delete uploaded part if upload fails
if message.ID != 0 {
api.ChannelsDeleteMessages(ctx, &tg.ChannelsDeleteMessagesRequest{Channel: channel, ID: []int{message.ID}})
client.ChannelsDeleteMessages(ctx, &tg.ChannelsDeleteMessagesRequest{Channel: channel, ID: []int{message.ID}})
}
return err
}

View file

@ -11,8 +11,8 @@ import (
"github.com/divyam234/teldrive/internal/cache"
"github.com/divyam234/teldrive/internal/config"
"github.com/divyam234/teldrive/internal/kv"
"github.com/divyam234/teldrive/internal/logging"
"github.com/divyam234/teldrive/internal/tgc"
"github.com/divyam234/teldrive/pkg/logging"
"github.com/divyam234/teldrive/pkg/models"
"github.com/divyam234/teldrive/pkg/schemas"
"github.com/divyam234/teldrive/pkg/types"