diff --git a/go.mod b/go.mod index b2162a5..521f284 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/go-jose/go-jose/v3 v3.0.3 github.com/gotd/contrib v0.20.0 github.com/gotd/td v0.102.0 + github.com/iyear/connectproxy v0.1.1 github.com/magiconair/properties v1.8.7 github.com/mitchellh/go-homedir v1.1.0 github.com/pkg/errors v0.9.1 @@ -39,7 +40,6 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect - github.com/iyear/connectproxy v0.1.1 // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/mfridman/interpolate v0.0.2 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect @@ -101,7 +101,7 @@ require ( go.uber.org/multierr v1.11.0 // indirect golang.org/x/arch v0.8.0 // indirect golang.org/x/crypto v0.23.0 - golang.org/x/net v0.25.0 // indirect + golang.org/x/net v0.25.0 golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.20.0 // indirect golang.org/x/text v0.15.0 // indirect diff --git a/internal/tgc/connect.go b/internal/tgc/connect.go new file mode 100644 index 0000000..cf03f80 --- /dev/null +++ b/internal/tgc/connect.go @@ -0,0 +1,78 @@ +package tgc + +import ( + "context" + "errors" + + "github.com/gotd/td/telegram" +) + +type StopFunc func() error + +type connectOptions struct { + ctx context.Context + token string +} + +type Option interface { + apply(o *connectOptions) +} + +type fnOption func(o *connectOptions) + +func (f fnOption) apply(o *connectOptions) { + f(o) +} + +func WithContext(ctx context.Context) Option { + return fnOption(func(o *connectOptions) { + o.ctx = ctx + }) +} + +func WithBotToken(token string) Option { + return fnOption(func(o *connectOptions) { + o.token = token + }) +} + +func Connect(client *telegram.Client, options ...Option) (StopFunc, error) { + opt := &connectOptions{ + ctx: context.Background(), + } + for _, o := range options { + o.apply(opt) + } + + ctx, cancel := context.WithCancel(opt.ctx) + + errC := make(chan error, 1) + initDone := make(chan struct{}) + go func() { + defer close(errC) + errC <- RunWithAuth(ctx, client, opt.token, func(ctx context.Context) error { + close(initDone) + <-ctx.Done() + if errors.Is(ctx.Err(), context.Canceled) { + return nil + } + return ctx.Err() + }) + }() + + select { + case <-ctx.Done(): // context canceled + cancel() + return func() error { return nil }, ctx.Err() + case err := <-errC: // startup timeout + cancel() + return func() error { return nil }, err + case <-initDone: // init done + } + + stopFn := func() error { + cancel() + return <-errC + } + return stopFn, nil +} diff --git a/internal/tgc/workers.go b/internal/tgc/workers.go index d7748e5..f3da13b 100644 --- a/internal/tgc/workers.go +++ b/internal/tgc/workers.go @@ -6,7 +6,6 @@ import ( "github.com/divyam234/teldrive/internal/config" "github.com/divyam234/teldrive/internal/kv" - "github.com/gotd/contrib/bg" "github.com/gotd/td/telegram" ) @@ -42,7 +41,7 @@ func NewUploadWorker() *UploadWorker { type Client struct { Tg *telegram.Client - Stop bg.StopFunc + Stop StopFunc Status string } @@ -81,7 +80,7 @@ func (w *StreamWorker) Next(channelId int64) (*Client, int, error) { nextClient := w.clients[channelId][index] w.currIdx[channelId] = (index + 1) % len(w.clients[channelId]) if nextClient.Status == "idle" { - stop, err := bg.Connect(nextClient.Tg) + stop, err := Connect(nextClient.Tg, WithBotToken(w.bots[channelId][index])) if err != nil { return nil, 0, err } @@ -103,7 +102,7 @@ func (w *StreamWorker) UserWorker(client *telegram.Client, userId int64) (*Clien } nextClient := w.clients[userId][0] if nextClient.Status == "idle" { - stop, err := bg.Connect(nextClient.Tg, bg.WithContext(w.ctx)) + stop, err := Connect(nextClient.Tg, WithContext(w.ctx)) if err != nil { return nil, err }