start bots in background

This commit is contained in:
divyam234 2023-09-24 01:56:04 +05:30
parent 7b3220732e
commit 3683c17c3c
4 changed files with 87 additions and 41 deletions

View file

@ -138,29 +138,20 @@ func rangedParts(parts []types.Part, start, end int64) []types.Part {
func GetChannelById(ctx context.Context, client *telegram.Client, channelID int64, userID string) (*tg.InputChannel, error) { func GetChannelById(ctx context.Context, client *telegram.Client, channelID int64, userID string) (*tg.InputChannel, error) {
channel := &tg.InputChannel{} channel := &tg.InputChannel{}
inputChannel := &tg.InputChannel{
key := kv.Key("channels", strconv.FormatInt(channelID, 10), userID) ChannelID: channelID,
}
err := kv.GetValue(database.KV, key, channel) channels, err := client.API().ChannelsGetChannels(ctx, []tg.InputChannelClass{inputChannel})
if err != nil { if err != nil {
inputChannel := &tg.InputChannel{ return nil, err
ChannelID: channelID,
}
channels, err := client.API().ChannelsGetChannels(ctx, []tg.InputChannelClass{inputChannel})
if err != nil {
return nil, err
}
if len(channels.GetChats()) == 0 {
return nil, errors.New("no channels found")
}
channel = channels.GetChats()[0].(*tg.Channel).AsInput()
kv.SetValue(database.KV, key, channel)
} }
if len(channels.GetChats()) == 0 {
return nil, errors.New("no channels found")
}
channel = channels.GetChats()[0].(*tg.Channel).AsInput()
return channel, nil return channel, nil
} }

View file

@ -1,7 +1,6 @@
package services package services
import ( import (
"context"
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
"errors" "errors"
@ -21,7 +20,6 @@ import (
"github.com/divyam234/teldrive/utils/md5" "github.com/divyam234/teldrive/utils/md5"
"github.com/divyam234/teldrive/utils/reader" "github.com/divyam234/teldrive/utils/reader"
"github.com/divyam234/teldrive/utils/tgc" "github.com/divyam234/teldrive/utils/tgc"
"github.com/gotd/td/telegram"
"github.com/divyam234/teldrive/types" "github.com/divyam234/teldrive/types"
@ -413,32 +411,37 @@ func (fs *FileService) GetFileStream(c *gin.Context) {
return return
} }
var client *telegram.Client if len(tokens) == 0 {
http.Error(w, "bots not found", http.StatusBadRequest)
return
}
var token string var token string
var channelUser string var channelUser string
if len(tokens) == 0 { limit := utils.Min(len(tokens), 10)
client, _ = tgc.UserLogin(jwtUser.TgSession)
channelUser = jwtUser.Subject tgc.StreamWorkers.Set(tokens[:limit])
} else {
tgc.Workers.Set(tokens) client, err := tgc.StreamWorkers.Next()
token = tgc.Workers.Next()
client, _ = tgc.BotLogin(token) if err != nil {
channelUser = strings.Split(token, ":")[0] http.Error(w, err.Error(), http.StatusBadRequest)
return
} }
channelUser = strings.Split(token, ":")[0]
if r.Method != "HEAD" { if r.Method != "HEAD" {
tgc.RunWithAuth(c, client, token, func(ctx context.Context) error {
parts, err := getParts(c, client, file, channelUser) parts, err := getParts(c, client.Tg, file, channelUser)
if err != nil { if err != nil {
return err http.Error(w, err.Error(), http.StatusInternalServerError)
} return
parts = rangedParts(parts, start, end) }
r, _ := reader.NewLinearReader(c, client, parts) parts = rangedParts(parts, start, end)
io.CopyN(w, r, contentLength) r, _ := reader.NewLinearReader(c, client.Tg, parts)
return nil io.CopyN(w, r, contentLength)
})
} }
} }

View file

@ -21,6 +21,13 @@ func Max[T constraints.Ordered](a, b T) T {
return b return b
} }
func Min[T constraints.Ordered](a, b T) T {
if a > b {
return b
}
return a
}
func CamelToPascalCase(input string) string { func CamelToPascalCase(input string) string {
var result strings.Builder var result strings.Builder
upperNext := true upperNext := true

View file

@ -2,6 +2,9 @@ package tgc
import ( import (
"sync" "sync"
"github.com/gotd/contrib/bg"
"github.com/gotd/td/telegram"
) )
type BotWorkers struct { type BotWorkers struct {
@ -27,3 +30,45 @@ func (w *BotWorkers) Next() string {
} }
var Workers = &BotWorkers{} var Workers = &BotWorkers{}
type Client struct {
Tg *telegram.Client
Stop bg.StopFunc
Status string
}
type streamWorkers struct {
sync.Mutex
bots []string
clients []*Client
index int
}
func (w *streamWorkers) Set(bots []string) {
w.Lock()
defer w.Unlock()
if len(w.clients) == 0 {
w.bots = bots
for _, token := range bots {
client, _ := BotLogin(token)
w.clients = append(w.clients, &Client{Tg: client, Status: "idle"})
}
}
}
func (w *streamWorkers) Next() (*Client, error) {
w.Lock()
defer w.Unlock()
w.index = (w.index + 1) % len(w.clients)
if w.clients[w.index].Status == "idle" {
stop, err := bg.Connect(w.clients[w.index].Tg)
if err != nil {
return nil, err
}
w.clients[w.index].Stop = stop
w.clients[w.index].Status = "running"
}
return w.clients[w.index], nil
}
var StreamWorkers = &streamWorkers{}