improve cache handling

This commit is contained in:
divyam234 2023-11-02 19:21:30 +05:30
parent f2ca5162a3
commit 2533234de8
13 changed files with 204 additions and 124 deletions

View file

@ -0,0 +1,20 @@
-- +goose Up
-- +goose StatementBegin
ALTER TABLE teldrive.users DROP COLUMN IF EXISTS tg_session;
CREATE TABLE teldrive.sessions (
session text NOT NULL,
user_id bigint NOT NULL,
hash text NOT NULL,
created_at timestamp null default timezone('utc'::text,now()),
PRIMARY KEY(session, hash),
FOREIGN KEY (user_id) REFERENCES teldrive.users(user_id)
);
-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
DROP TABLE IF EXISTS teldrive.sessions;
-- +goose StatementEnd

2
go.mod
View file

@ -3,6 +3,7 @@ module github.com/divyam234/teldrive
go 1.21
require (
github.com/coocood/freecache v1.2.4
github.com/divyam234/cors v1.4.2
github.com/gin-gonic/gin v1.9.1
github.com/go-co-op/gocron v1.35.3
@ -24,6 +25,7 @@ require (
)
require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chenzhuoyu/iasm v0.9.0 // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect

5
go.sum
View file

@ -4,12 +4,17 @@ github.com/bytedance/sonic v1.10.0 h1:qtNZduETEIWJVIyDl01BeNxur2rW9OwTQ/yBqFRkKE
github.com/bytedance/sonic v1.10.0/go.mod h1:iZcSUejdk5aukTND/Eu/ivjQuEL0Cu9/rf50Hi0u/g4=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk=
github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d h1:77cEq6EriyTZ0g/qfRdp61a3Uu/AWrgIq2s0ClJV1g0=
github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d/go.mod h1:8EPpVsBuRksnlj1mLy4AWzRNQYxauNi62uWcE3to6eA=
github.com/chenzhuoyu/iasm v0.9.0 h1:9fhXjVzq5hUy2gkhhgHl95zG2cEAhw9OSGs8toWWAwo=
github.com/chenzhuoyu/iasm v0.9.0/go.mod h1:Xjy2NpN3h7aUqeqM+woSuuvxmIe6+DDsiNLIrkAmYog=
github.com/coocood/freecache v1.2.4 h1:UdR6Yz/X1HW4fZOuH0Z94KwG851GWOSknua5VUbb/5M=
github.com/coocood/freecache v1.2.4/go.mod h1:RBUWa/Cy+OHdfTGFEhEuE1pMCMX51Ncizj7rthiQ3vk=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=

View file

@ -12,6 +12,7 @@ import (
"github.com/divyam234/teldrive/utils"
"github.com/divyam234/cors"
"github.com/divyam234/teldrive/utils/cache"
"github.com/divyam234/teldrive/utils/cron"
"github.com/gin-gonic/gin"
"github.com/go-co-op/gocron"
@ -29,6 +30,8 @@ func main() {
database.InitDB()
cache.InitCache()
scheduler := gocron.NewScheduler(time.UTC)
scheduler.Every(1).Hour().Do(cron.FilesDeleteJob)

12
models/session.model.go Normal file
View file

@ -0,0 +1,12 @@
package models
import (
"time"
)
type Session struct {
UserId int64 `gorm:"type:bigint;primaryKey"`
Hash string `gorm:"type:text"`
Session string `gorm:"type:text"`
CreatedAt time.Time `gorm:"default:timezone('utc'::text, now())"`
}

View file

@ -96,15 +96,4 @@ func addUserRoutes(rg *gin.RouterGroup) {
c.JSON(http.StatusOK, res)
})
r.DELETE("/cache", func(c *gin.Context) {
res, err := userService.ClearCache(c)
if err != nil {
c.AbortWithError(err.Code, err.Error)
return
}
c.JSON(http.StatusOK, res)
})
}

View file

@ -16,13 +16,11 @@ import (
"strconv"
"time"
"github.com/divyam234/teldrive/database"
"github.com/divyam234/teldrive/models"
"github.com/divyam234/teldrive/schemas"
"github.com/divyam234/teldrive/types"
"github.com/divyam234/teldrive/utils"
"github.com/divyam234/teldrive/utils/auth"
"github.com/divyam234/teldrive/utils/kv"
"github.com/divyam234/teldrive/utils/tgc"
"github.com/gin-gonic/gin"
"github.com/go-jose/go-jose/v3/jwt"
@ -145,9 +143,8 @@ func (as *AuthService) LogIn(c *gin.Context) (*schemas.Message, *types.AppError)
IsPremium: session.IsPremium,
}
tokenBytes, _ := json.Marshal(jwtClaims)
md5hash := md5.Sum(tokenBytes)
hexToken := hex.EncodeToString(md5hash[:])
tokenhash := md5.Sum([]byte(session.Sesssion))
hexToken := hex.EncodeToString(tokenhash[:])
jwtClaims.Hash = hexToken
jweToken, err := auth.Encode(jwtClaims)
@ -168,7 +165,7 @@ func (as *AuthService) LogIn(c *gin.Context) (*schemas.Message, *types.AppError)
if err := as.Db.Model(&models.User{}).Where("user_id = ?", session.UserID).
Find(&result).Error; err != nil {
return nil, &types.AppError{Error: errors.New("failed to create or update user"),
return nil, &types.AppError{Error: errors.New("failed to find user"),
Code: http.StatusInternalServerError}
}
if len(result) == 0 {
@ -192,17 +189,16 @@ func (as *AuthService) LogIn(c *gin.Context) (*schemas.Message, *types.AppError)
return nil, &types.AppError{Error: errors.New("failed to create or update user"),
Code: http.StatusInternalServerError}
}
} else {
if err := as.Db.Model(&models.User{}).Where("user_id = ?", session.UserID).
Update("tg_session", session.Sesssion).Error; err != nil {
return nil, &types.AppError{Error: errors.New("failed to create or update user"),
Code: http.StatusInternalServerError}
}
}
setCookie(c, as.SessionCookieName, jweToken, as.SessionMaxAge)
database.KV.Set(kv.Key("sessions", hexToken), tokenBytes)
//create session
if err := as.Db.Create(&models.Session{UserId: session.UserID, Hash: hexToken, Session: session.Sesssion}).Error; err != nil {
return nil, &types.AppError{Error: errors.New("failed to create user session"),
Code: http.StatusInternalServerError}
}
return &schemas.Message{Status: true, Message: "login success"}, nil
}
@ -254,7 +250,7 @@ func (as *AuthService) Logout(c *gin.Context) (*schemas.Message, *types.AppError
})
setCookie(c, as.SessionCookieName, "", -1)
database.KV.Delete(kv.Key("sessions", jwtUser.Hash))
as.Db.Where("session = ?", jwtUser.TgSession).Delete(&models.Session{})
return &schemas.Message{Status: true, Message: "logout success"}, nil
}

View file

@ -12,7 +12,7 @@ import (
"github.com/divyam234/teldrive/schemas"
"github.com/divyam234/teldrive/types"
"github.com/divyam234/teldrive/utils"
"github.com/divyam234/teldrive/utils/kv"
"github.com/divyam234/teldrive/utils/cache"
"github.com/divyam234/teldrive/utils/tgc"
"github.com/gin-gonic/gin"
"github.com/gotd/td/telegram"
@ -21,6 +21,12 @@ import (
"github.com/thoas/go-funk"
)
func CopyList[T any](list []T) []T {
newList := make([]T, len(list))
copy(newList, list)
return newList
}
func getChunk(ctx context.Context, tgClient *telegram.Client, location tg.InputFileLocationClass, offset int64, limit int64) ([]byte, error) {
req := &tg.UploadGetFileRequest{
@ -84,6 +90,16 @@ func getBotInfo(ctx context.Context, token string) (*BotInfo, error) {
func getParts(ctx context.Context, client *telegram.Client, file *schemas.FileOutFull, userID string) ([]types.Part, error) {
parts := []types.Part{}
key := fmt.Sprintf("messages:%s:%s", file.ID, userID)
err := cache.GetCache().Get(key, &parts)
if err == nil {
return parts, nil
}
ids := funk.Map(*file.Parts, func(part models.Part) tg.InputMessageClass {
return tg.InputMessageClass(&tg.InputMessageID{ID: int(part.ID)})
})
@ -104,8 +120,6 @@ func getParts(ctx context.Context, client *telegram.Client, file *schemas.FileOu
messages := res.(*tg.MessagesChannelMessages)
parts := []types.Part{}
for _, message := range messages.Messages {
item := message.(*tg.Message)
media := item.Media.(*tg.MessageMediaDocument)
@ -113,6 +127,7 @@ func getParts(ctx context.Context, client *telegram.Client, file *schemas.FileOu
location := document.AsInputDocumentFileLocation()
parts = append(parts, types.Part{Location: location, Start: 0, End: document.Size - 1, Size: document.Size})
}
cache.GetCache().Set(key, &parts, 3600)
return parts, nil
}
@ -124,7 +139,7 @@ func rangedParts(parts []types.Part, start, end int64) []types.Part {
endPartNumber := int64(math.Ceil(float64(end) / float64(chunkSize)))
partsToDownload := parts[startPartNumber:endPartNumber]
partsToDownload := CopyList(parts[startPartNumber:endPartNumber])
partsToDownload[0].Start = start % chunkSize
partsToDownload[len(partsToDownload)-1].End = end % chunkSize
@ -159,19 +174,21 @@ func GetDefaultChannel(ctx context.Context, userID int64) (int64, error) {
var channelID int64
key := kv.Key("users", strconv.FormatInt(userID, 10), "channel")
key := fmt.Sprintf("users:channel:%d", userID)
err := kv.GetValue(database.KV, key, &channelID)
err := cache.GetCache().Get(key, &channelID)
if err != nil {
var channelIds []int64
database.DB.Model(&models.Channel{}).Where("user_id = ?", userID).Where("selected = ?", true).
Pluck("channel_id", &channelIds)
if err == nil {
return channelID, nil
}
if len(channelIds) == 1 {
channelID = channelIds[0]
kv.SetValue(database.KV, key, &channelID)
}
var channelIds []int64
database.DB.Model(&models.Channel{}).Where("user_id = ?", userID).Where("selected = ?", true).
Pluck("channel_id", &channelIds)
if len(channelIds) == 1 {
channelID = channelIds[0]
cache.GetCache().Set(key, channelID, 0)
}
if channelID == 0 {
@ -190,18 +207,42 @@ func GetBotsToken(ctx context.Context, userID int64) ([]string, error) {
return nil, err
}
key := kv.Key("users", strconv.FormatInt(userID, 10), strconv.FormatInt(channelId, 10), "bots")
key := fmt.Sprintf("users:bots:%d:%d", userID, channelId)
err = kv.GetValue(database.KV, key, &bots)
err = cache.GetCache().Get(key, &bots)
if err != nil {
if err := database.DB.Model(&models.Bot{}).Where("user_id = ?", userID).
Where("channel_id = ?", channelId).Pluck("token", &bots).Error; err != nil {
return nil, err
}
kv.SetValue(database.KV, key, &bots)
if err == nil {
return bots, nil
}
if err := database.DB.Model(&models.Bot{}).Where("user_id = ?", userID).
Where("channel_id = ?", channelId).Pluck("token", &bots).Error; err != nil {
return nil, err
}
cache.GetCache().Set(key, &bots, 0)
return bots, nil
}
func GetSessionByHash(hash string) (*models.Session, error) {
var session models.Session
key := fmt.Sprintf("sessions:%s", hash)
err := cache.GetCache().Get(key, &session)
if err == nil {
return &session, nil
}
if err := database.DB.Model(&models.Session{}).Where("hash = ?", hash).First(&session).Error; err != nil {
return nil, err
}
cache.GetCache().Set(key, &session, 0)
return &session, nil
}

View file

@ -3,7 +3,6 @@ package services
import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
@ -11,12 +10,11 @@ import (
"strconv"
"strings"
"github.com/divyam234/teldrive/database"
"github.com/divyam234/teldrive/mapper"
"github.com/divyam234/teldrive/models"
"github.com/divyam234/teldrive/schemas"
"github.com/divyam234/teldrive/utils"
"github.com/divyam234/teldrive/utils/kv"
"github.com/divyam234/teldrive/utils/cache"
"github.com/divyam234/teldrive/utils/md5"
"github.com/divyam234/teldrive/utils/reader"
"github.com/divyam234/teldrive/utils/tgc"
@ -128,8 +126,9 @@ func (fs *FileService) UpdateFile(c *gin.Context) (*schemas.FileOut, *types.AppE
file := mapper.MapFileToFileOut(files[0])
key := kv.Key("files", fileID)
database.KV.Delete(key)
key := fmt.Sprintf("files:%s", fileID)
cache.GetCache().Delete(key)
return &file, nil
@ -337,34 +336,26 @@ func (fs *FileService) GetFileStream(c *gin.Context) {
return
}
data, err := database.KV.Get(kv.Key("sessions", authHash))
session, err := GetSessionByHash(authHash)
if err != nil {
http.Error(w, "hash missing relogin", http.StatusBadRequest)
return
}
jwtUser := &types.JWTClaims{}
err = json.Unmarshal(data, jwtUser)
if err != nil {
http.Error(w, "invalid hash", http.StatusBadRequest)
return
}
file := &schemas.FileOutFull{}
key := kv.Key("files", fileID)
key := fmt.Sprintf("files:%s", fileID)
err = cache.GetCache().Get(key, file)
err = kv.GetValue(database.KV, key, file)
if err != nil {
file, err = fs.GetFileByID(c)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
kv.SetValue(database.KV, key, file)
cache.GetCache().Set(key, file, 0)
}
c.Header("Accept-Ranges", "bytes")
@ -411,9 +402,7 @@ func (fs *FileService) GetFileStream(c *gin.Context) {
c.Header("Content-Disposition", fmt.Sprintf("%s; filename=\"%s\"", disposition, file.Name))
userID, _ := strconv.ParseInt(jwtUser.Subject, 10, 64)
tokens, err := GetBotsToken(c, userID)
tokens, err := GetBotsToken(c, session.UserId)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
@ -429,8 +418,8 @@ func (fs *FileService) GetFileStream(c *gin.Context) {
if config.LazyStreamBots || len(tokens) == 0 {
var client *telegram.Client
if len(tokens) == 0 {
client, _ = tgc.UserLogin(jwtUser.TgSession)
channelUser = jwtUser.Subject
client, _ = tgc.UserLogin(session.Session)
channelUser = strconv.FormatInt(session.UserId, 10)
} else {
tgc.Workers.Set(tokens)
token = tgc.Workers.Next()
@ -455,14 +444,14 @@ func (fs *FileService) GetFileStream(c *gin.Context) {
tgc.StreamWorkers.Set(tokens[:limit])
client, err := tgc.StreamWorkers.Next()
client, index, err := tgc.StreamWorkers.Next()
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
channelUser = strings.Split(token, ":")[0]
channelUser = strings.Split(tokens[index], ":")[0]
if r.Method != "HEAD" {

View file

@ -13,7 +13,7 @@ import (
"github.com/divyam234/teldrive/models"
"github.com/divyam234/teldrive/schemas"
"github.com/divyam234/teldrive/types"
"github.com/divyam234/teldrive/utils/kv"
"github.com/divyam234/teldrive/utils/cache"
"github.com/divyam234/teldrive/utils/tgc"
"github.com/gotd/td/telegram"
"github.com/gotd/td/telegram/message/peer"
@ -121,9 +121,8 @@ func (us *UserService) UpdateChannel(c *gin.Context) (*schemas.Message, *types.A
us.Db.Model(&models.Channel{}).Where("channel_id != ?", payload.ChannelID).
Where("user_id = ?", userId).Update("selected", false)
key := kv.Key("users", strconv.FormatInt(userId, 10), "channel")
database.KV.Delete(key)
kv.SetValue(database.KV, key, payload.ChannelID)
key := fmt.Sprintf("users:channel:%d", userId)
cache.GetCache().Set(key, payload.ChannelID, 0)
return &schemas.Message{Status: true, Message: "channel updated"}, nil
}
@ -191,9 +190,8 @@ func (us *UserService) RemoveBots(c *gin.Context) (*schemas.Message, *types.AppE
Delete(&models.Bot{}).Error; err != nil {
return nil, &types.AppError{Error: errors.New("failed to delete bots"), Code: http.StatusInternalServerError}
}
key := kv.Key("users", strconv.FormatInt(userID, 10), strconv.FormatInt(channelId, 10), "bots")
database.KV.Delete(key)
cache.GetCache().Delete(fmt.Sprintf("users:bots:%d:%d", userID, channelId))
return &schemas.Message{Status: true, Message: "bots deleted"}, nil
@ -232,39 +230,6 @@ func (us *UserService) RevokeBotSession(c *gin.Context) (*schemas.Message, *type
}
func (us *UserService) ClearCache(c *gin.Context) (*schemas.Message, *types.AppError) {
pattern := []byte("users")
err := database.BoltDB.Update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket([]byte("teldrive"))
if bucket == nil {
return errors.New("bucket not found")
}
c := bucket.Cursor()
for key, _ := c.First(); key != nil; key, _ = c.Next() {
if bytes.HasPrefix(key, pattern) {
if err := c.Delete(); err != nil {
return err
}
}
}
return nil
})
if err != nil {
return nil, &types.AppError{Error: errors.New("failed to clear cache"),
Code: http.StatusInternalServerError}
}
return &schemas.Message{Status: true, Message: "cache cleared"}, nil
}
func (us *UserService) addBots(c context.Context, client *telegram.Client, userId int64, channelId int64, botsTokens []string) (*schemas.Message, *types.AppError) {
botInfo := []BotInfo{}
@ -359,8 +324,7 @@ func (us *UserService) addBots(c context.Context, client *telegram.Client, userI
})
}
key := kv.Key("users", strconv.FormatInt(userId, 10), strconv.FormatInt(channelId, 10), "bots")
database.KV.Delete(key)
cache.GetCache().Delete(fmt.Sprintf("users:bots:%d:%d", userId, channelId))
if err := us.Db.Clauses(clause.OnConflict{DoNothing: true}).Create(&payload).Error; err != nil {
return nil, &types.AppError{Error: errors.New("failed to add bots"), Code: http.StatusInternalServerError}

57
utils/cache/main.go vendored Normal file
View file

@ -0,0 +1,57 @@
package cache
import (
"encoding/json"
"sync"
"github.com/coocood/freecache"
)
var cache *Cache
type Cache struct {
cache *freecache.Cache
mu sync.RWMutex
}
func InitCache() {
cache = &Cache{cache: freecache.NewCache(10 * 1024 * 1024)}
}
func GetCache() *Cache {
return cache
}
func (c *Cache) Get(key string, value interface{}) error {
c.mu.RLock()
defer c.mu.RUnlock()
got, err := cache.cache.Get([]byte(key))
if err != nil {
return err
}
return json.Unmarshal(got, value)
}
func (c *Cache) Set(key string, value interface{}, expireSeconds int) error {
c.mu.Lock()
defer c.mu.Unlock()
data, err := json.Marshal(value)
if err != nil {
return err
}
cache.cache.Set([]byte(key), data, expireSeconds)
return nil
}
func (c *Cache) Delete(key string) error {
c.mu.Lock()
defer c.mu.Unlock()
cache.cache.Del([]byte(key))
return nil
}

View file

@ -50,14 +50,14 @@ func (a *UpFiles) Scan(value interface{}) error {
type Result struct {
Files Files
TgSession string
Session string
UserId int64
ChannelId int64
}
type UploadResult struct {
Files UpFiles
TgSession string
Session string
UserId int64
ChannelId int64
}
@ -66,7 +66,7 @@ func deleteTGMessages(ctx context.Context, result Result) error {
db := database.DB
client, err := tgc.UserLogin(result.TgSession)
client, err := tgc.UserLogin(result.Session)
if err != nil {
return err
@ -111,7 +111,7 @@ func cleanUploadsMessages(ctx context.Context, result UploadResult) error {
db := database.DB
client, err := tgc.UserLogin(result.TgSession)
client, err := tgc.UserLogin(result.Session)
if err != nil {
return err
@ -157,11 +157,12 @@ func FilesDeleteJob() {
var results []Result
if err := db.Model(&models.File{}).
Select("JSONB_AGG(jsonb_build_object('id',files.id, 'parts',files.parts)) as files", "files.channel_id", "files.user_id", "u.tg_session").
Select("JSONB_AGG(jsonb_build_object('id',files.id, 'parts',files.parts)) as files", "files.channel_id", "files.user_id", "s.session").
Joins("left join teldrive.users as u on u.user_id = files.user_id").
Joins("left join (select * from teldrive.sessions order by created_at desc limit 1) as s on u.user_id = s.user_id").
Where("type = ?", "file").
Where("status = ?", "pending_deletion").
Group("files.channel_id").Group("files.user_id").Group("u.tg_session").
Group("files.channel_id").Group("files.user_id").Group("s.session").
Scan(&results).Error; err != nil {
return
}
@ -179,10 +180,11 @@ func UploadCleanJob() {
var upResults []UploadResult
if err := db.Model(&models.Upload{}).
Select("JSONB_AGG(jsonb_build_object('id',uploads.id,'partId',uploads.part_id)) as files", "uploads.channel_id", "uploads.user_id", "u.tg_session").
Select("JSONB_AGG(jsonb_build_object('id',uploads.id,'partId',uploads.part_id)) as files", "uploads.channel_id", "uploads.user_id", "s.session").
Joins("left join teldrive.users as u on u.user_id = uploads.user_id").
Joins("left join (select * from teldrive.sessions order by created_at desc limit 1) as s on s.user_id = uploads.user_id").
Where("uploads.created_at < ?", time.Now().UTC().AddDate(0, 0, -15)).
Group("uploads.channel_id").Group("uploads.user_id").Group("u.tg_session").
Group("uploads.channel_id").Group("uploads.user_id").Group("s.session").
Scan(&upResults).Error; err != nil {
return
}

View file

@ -56,19 +56,19 @@ func (w *streamWorkers) Set(bots []string) {
}
}
func (w *streamWorkers) Next() (*Client, error) {
func (w *streamWorkers) Next() (*Client, int, error) {
w.Lock()
defer w.Unlock()
w.index = (w.index + 1) % len(w.clients)
if w.clients[w.index].Status == "idle" {
stop, err := bg.Connect(w.clients[w.index].Tg)
if err != nil {
return nil, err
return nil, 0, err
}
w.clients[w.index].Stop = stop
w.clients[w.index].Status = "running"
}
return w.clients[w.index], nil
return w.clients[w.index], w.index, nil
}
var StreamWorkers = &streamWorkers{}