teldrive/internal/tgc/workers.go

121 lines
2.8 KiB
Go
Raw Normal View History

2023-09-20 03:20:44 +08:00
package tgc
import (
2023-11-25 12:31:29 +08:00
"context"
2023-09-20 03:20:44 +08:00
"sync"
2023-09-24 04:26:04 +08:00
"github.com/divyam234/teldrive/internal/config"
"github.com/divyam234/teldrive/internal/kv"
2023-09-24 04:26:04 +08:00
"github.com/gotd/td/telegram"
2023-09-20 03:20:44 +08:00
)
type UploadWorker struct {
2023-11-16 23:21:35 +08:00
mu sync.Mutex
bots map[int64][]string
currIdx map[int64]int
2023-09-20 03:20:44 +08:00
}
func (w *UploadWorker) Set(bots []string, channelId int64) {
2023-11-16 23:21:35 +08:00
w.mu.Lock()
defer w.mu.Unlock()
_, ok := w.bots[channelId]
if !ok {
w.bots = make(map[int64][]string)
w.currIdx = make(map[int64]int)
w.bots[channelId] = bots
w.currIdx[channelId] = 0
2023-09-20 03:20:44 +08:00
}
}
func (w *UploadWorker) Next(channelId int64) (string, int) {
2023-11-16 23:21:35 +08:00
w.mu.Lock()
defer w.mu.Unlock()
index := w.currIdx[channelId]
w.currIdx[channelId] = (index + 1) % len(w.bots[channelId])
return w.bots[channelId][index], index
2023-09-20 03:20:44 +08:00
}
func NewUploadWorker() *UploadWorker {
return &UploadWorker{}
}
2023-09-24 04:26:04 +08:00
type Client struct {
Tg *telegram.Client
Stop StopFunc
2023-09-24 04:26:04 +08:00
Status string
}
type StreamWorker struct {
2023-11-16 23:21:35 +08:00
mu sync.Mutex
bots map[int64][]string
clients map[int64][]*Client
currIdx map[int64]int
2024-02-13 00:02:55 +08:00
cnf *config.TGConfig
kv kv.KV
2024-04-19 04:46:47 +08:00
ctx context.Context
2023-09-24 04:26:04 +08:00
}
func (w *StreamWorker) Set(bots []string, channelId int64) {
2023-11-16 23:21:35 +08:00
w.mu.Lock()
defer w.mu.Unlock()
_, ok := w.bots[channelId]
if !ok {
w.bots = make(map[int64][]string)
w.clients = make(map[int64][]*Client)
w.currIdx = make(map[int64]int)
w.bots[channelId] = bots
2023-09-24 04:26:04 +08:00
for _, token := range bots {
client, _, _ := BotClient(w.ctx, w.kv, w.cnf, token, 5, true)
2023-11-16 23:21:35 +08:00
w.clients[channelId] = append(w.clients[channelId], &Client{Tg: client, Status: "idle"})
2023-09-24 04:26:04 +08:00
}
2023-11-16 23:21:35 +08:00
w.currIdx[channelId] = 0
2023-09-24 04:26:04 +08:00
}
2023-11-16 23:21:35 +08:00
2023-09-24 04:26:04 +08:00
}
func (w *StreamWorker) Next(channelId int64) (*Client, int, error) {
2023-11-16 23:21:35 +08:00
w.mu.Lock()
defer w.mu.Unlock()
index := w.currIdx[channelId]
nextClient := w.clients[channelId][index]
w.currIdx[channelId] = (index + 1) % len(w.clients[channelId])
if nextClient.Status == "idle" {
stop, err := Connect(nextClient.Tg, WithBotToken(w.bots[channelId][index]))
2023-09-24 04:26:04 +08:00
if err != nil {
2023-11-02 21:51:30 +08:00
return nil, 0, err
2023-09-24 04:26:04 +08:00
}
2023-11-16 23:21:35 +08:00
nextClient.Stop = stop
nextClient.Status = "running"
}
return nextClient, index, nil
}
2023-12-25 06:06:14 +08:00
func (w *StreamWorker) UserWorker(client *telegram.Client, userId int64) (*Client, error) {
2023-11-16 23:21:35 +08:00
w.mu.Lock()
defer w.mu.Unlock()
2023-12-25 06:06:14 +08:00
_, ok := w.clients[userId]
2023-11-16 23:21:35 +08:00
if !ok {
w.clients = make(map[int64][]*Client)
2023-12-25 06:06:14 +08:00
w.clients[userId] = append(w.clients[userId], &Client{Tg: client, Status: "idle"})
2023-11-16 23:21:35 +08:00
}
2023-12-25 06:06:14 +08:00
nextClient := w.clients[userId][0]
2023-11-16 23:21:35 +08:00
if nextClient.Status == "idle" {
stop, err := Connect(nextClient.Tg, WithContext(w.ctx))
2023-11-16 23:21:35 +08:00
if err != nil {
return nil, err
}
nextClient.Stop = stop
nextClient.Status = "running"
2023-09-24 04:26:04 +08:00
}
2023-11-16 23:21:35 +08:00
return nextClient, nil
2023-09-24 04:26:04 +08:00
}
2024-04-19 04:46:47 +08:00
func NewStreamWorker(ctx context.Context) func(cnf *config.Config, kv kv.KV) *StreamWorker {
return func(cnf *config.Config, kv kv.KV) *StreamWorker {
return &StreamWorker{cnf: &cnf.TG, kv: kv, ctx: ctx}
}
}