fix: decrypted reader

This commit is contained in:
divyam234 2024-06-22 22:27:42 +05:30
parent 825dc11fe1
commit 64102e801b
6 changed files with 51 additions and 60 deletions

1
go.mod
View file

@ -19,7 +19,6 @@ require (
github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.19.0
github.com/thoas/go-funk v0.9.3
github.com/vmihailenco/msgpack v4.0.4+incompatible
go.etcd.io/bbolt v1.3.10
go.uber.org/fx v1.22.0

4
go.sum
View file

@ -202,7 +202,6 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
@ -213,8 +212,6 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
github.com/thoas/go-funk v0.9.3 h1:7+nAEx3kn5ZJcnDm2Bh23N2yOtweO14bi//dvRtgLpw=
github.com/thoas/go-funk v0.9.3/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE=
@ -314,7 +311,6 @@ gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View file

@ -34,7 +34,7 @@ func NewDecryptedReader(
config *config.TGConfig,
concurrency int,
client *tgc.Client,
worker *tgc.StreamWorker) (io.ReadCloser, error) {
worker *tgc.StreamWorker) (*decrpytedReader, error) {
r := &decrpytedReader{
ctx: ctx,
@ -60,14 +60,14 @@ func NewDecryptedReader(
}
func (r *decrpytedReader) Read(p []byte) (n int, err error) {
func (r *decrpytedReader) Read(p []byte) (int, error) {
if r.limit <= 0 {
return 0, io.EOF
}
n, err = r.reader.Read(p)
r.limit -= int64(n)
n, err := r.reader.Read(p)
if err == io.EOF {
if r.limit > 0 {
err = nil
@ -78,11 +78,12 @@ func (r *decrpytedReader) Read(p []byte) (n int, err error) {
r.pos++
if r.pos < len(r.ranges) {
r.reader, err = r.nextPart()
}
}
return
r.limit -= int64(n)
return n, err
}
func (r *decrpytedReader) Close() (err error) {
if r.reader != nil {
err = r.reader.Close()

View file

@ -92,8 +92,6 @@ func (r *linearReader) Read(p []byte) (int, error) {
n, err := r.reader.Read(p)
r.limit -= int64(n)
if err == io.EOF {
if r.limit > 0 {
err = nil
@ -107,6 +105,7 @@ func (r *linearReader) Read(p []byte) (int, error) {
}
}
r.limit -= int64(n)
return n, err
}

View file

@ -10,6 +10,8 @@ import (
"sync"
"github.com/divyam234/teldrive/internal/cache"
"github.com/divyam234/teldrive/internal/config"
"github.com/divyam234/teldrive/internal/kv"
"github.com/divyam234/teldrive/pkg/types"
"github.com/gotd/td/telegram"
"github.com/gotd/td/tg"
@ -183,13 +185,13 @@ func GetMediaContent(ctx context.Context, client *tg.Client, location tg.InputFi
return buff, nil
}
func GetBotInfo(ctx context.Context, client *telegram.Client, token string) (*types.BotInfo, error) {
func GetBotInfo(ctx context.Context, KV kv.KV, config *config.TGConfig, token string) (*types.BotInfo, error) {
var user *tg.User
client, _ := BotClient(ctx, KV, config, token, Middlewares(config, 5)...)
err := RunWithAuth(ctx, client, token, func(ctx context.Context) error {
user, _ = client.Self(ctx)
return nil
})
if err != nil {
return nil, err
}

View file

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net/http"
"sort"
"strconv"
"strings"
"sync"
@ -14,7 +15,6 @@ import (
"github.com/divyam234/teldrive/internal/cache"
"github.com/divyam234/teldrive/internal/config"
"github.com/divyam234/teldrive/internal/kv"
"github.com/divyam234/teldrive/internal/logging"
"github.com/divyam234/teldrive/internal/tgc"
"github.com/divyam234/teldrive/pkg/models"
"github.com/divyam234/teldrive/pkg/schemas"
@ -24,8 +24,7 @@ import (
"github.com/gotd/td/telegram/query"
"github.com/gotd/td/tg"
"github.com/gotd/td/tgerr"
"github.com/thoas/go-funk"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"github.com/gin-gonic/gin"
"gorm.io/gorm"
@ -209,7 +208,7 @@ func (us *UserService) RemoveSession(c *gin.Context) (*schemas.Message, *types.A
return &schemas.Message{Message: "session deleted"}, nil
}
func (us *UserService) ListChannels(c *gin.Context) (interface{}, *types.AppError) {
func (us *UserService) ListChannels(c *gin.Context) ([]schemas.Channel, *types.AppError) {
_, session := auth.GetUser(c)
client, _ := tgc.AuthClient(c, &us.cnf.TG, session)
@ -232,8 +231,17 @@ func (us *UserService) ListChannels(c *gin.Context) (interface{}, *types.AppErro
return nil
})
res := []schemas.Channel{}
return funk.Values(channels), nil
for _, channel := range channels {
res = append(res, *channel)
}
sort.Slice(res, func(i, j int) bool {
return res[i].ChannelName < res[j].ChannelName
})
return res, nil
}
func (us *UserService) AddBots(c *gin.Context) (*schemas.Message, *types.AppError) {
@ -285,64 +293,52 @@ func (us *UserService) RemoveBots(c *gin.Context) (*schemas.Message, *types.AppE
func (us *UserService) addBots(c context.Context, client *telegram.Client, userId int64, channelId int64, botsTokens []string) (*schemas.Message, *types.AppError) {
botInfo := []types.BotInfo{}
var wg sync.WaitGroup
logger := logging.FromContext(c)
cache := cache.FromContext(c)
botInfoMap := make(map[string]*types.BotInfo)
err := tgc.RunWithAuth(c, client, "", func(ctx context.Context) error {
channel, err := tgc.GetChannelById(ctx, client.API(), channelId)
if err != nil {
logger.Error("error", zap.Error(err))
return err
}
botInfoChannel := make(chan *types.BotInfo, len(botsTokens))
g, _ := errgroup.WithContext(ctx)
waitChan := make(chan struct{}, 6)
g.SetLimit(8)
mapMu := sync.Mutex{}
for _, token := range botsTokens {
waitChan <- struct{}{}
wg.Add(1)
go func(t string) {
info, err := tgc.GetBotInfo(c, client, t)
g.Go(func() error {
info, err := tgc.GetBotInfo(c, us.kv, &us.cnf.TG, token)
if err != nil {
return
return err
}
botPeerClass, err := peer.DefaultResolver(client.API()).ResolveDomain(ctx, info.UserName)
if err != nil {
logger.Error("error", zap.Error(err))
return
return err
}
botPeer := botPeerClass.(*tg.InputPeerUser)
info.AccessHash = botPeer.AccessHash
defer func() {
<-waitChan
wg.Done()
}()
botInfoChannel <- info
}(token)
}
wg.Wait()
close(botInfoChannel)
for result := range botInfoChannel {
botInfo = append(botInfo, *result)
}
if len(botsTokens) == len(botInfo) {
users := funk.Map(botInfo, func(info types.BotInfo) tg.InputUser {
return tg.InputUser{UserID: info.Id, AccessHash: info.AccessHash}
mapMu.Lock()
botInfoMap[token] = info
mapMu.Unlock()
return nil
})
botsToAdd := users.([]tg.InputUser)
for _, user := range botsToAdd {
}
if err = g.Wait(); err != nil {
return err
}
if len(botsTokens) == len(botInfoMap) {
users := []tg.InputUser{}
for _, info := range botInfoMap {
users = append(users, tg.InputUser{UserID: info.Id, AccessHash: info.AccessHash})
}
for _, user := range users {
payload := &tg.ChannelsEditAdminRequest{
Channel: channel,
UserID: tg.InputUserClass(&user),
@ -362,10 +358,8 @@ func (us *UserService) addBots(c context.Context, client *telegram.Client, userI
}
_, err := client.API().ChannelsEditAdmin(ctx, payload)
if err != nil {
logger.Error("error", zap.Error(err))
return err
}
}
} else {
return errors.New("failed to fetch bots")
@ -379,7 +373,7 @@ func (us *UserService) addBots(c context.Context, client *telegram.Client, userI
payload := []models.Bot{}
for _, info := range botInfo {
for _, info := range botInfoMap {
payload = append(payload, models.Bot{UserID: userId, Token: info.Token, BotID: info.Id,
BotUserName: info.UserName, ChannelID: channelId,
})