feat(cmd): add upload

This commit is contained in:
iyear 2022-09-14 13:46:57 +08:00
parent 6dad5a1fd7
commit 23090c28fd
9 changed files with 361 additions and 1 deletions

61
app/up/iter.go Normal file
View file

@ -0,0 +1,61 @@
package up
import (
"context"
"github.com/gabriel-vasile/mimetype"
"github.com/iyear/tdl/pkg/uploader"
"os"
"path/filepath"
)
type iter struct {
files []string
cur int
}
func newIter(files []string) *iter {
return &iter{
files: files,
cur: -1,
}
}
func (i *iter) Next(_ context.Context) bool {
i.cur++
if i.cur == len(i.files) {
return false
}
return true
}
func (i *iter) Value(_ context.Context) (*uploader.Item, error) {
cur := i.files[i.cur]
mime, err := mimetype.DetectFile(cur)
if err != nil {
return nil, err
}
f, err := os.Open(cur)
if err != nil {
return nil, err
}
stat, err := f.Stat()
if err != nil {
return nil, err
}
return &uploader.Item{
R: f,
Name: filepath.Base(f.Name()),
MIME: mime.String(),
Size: stat.Size(),
}, nil
}
func (i *iter) Total(_ context.Context) int {
return len(i.files)
}

42
app/up/up.go Normal file
View file

@ -0,0 +1,42 @@
package up
import (
"context"
"fmt"
"github.com/fatih/color"
"github.com/gotd/contrib/middleware/floodwait"
"github.com/iyear/tdl/app/internal/tgc"
"github.com/iyear/tdl/pkg/consts"
"github.com/iyear/tdl/pkg/kv"
"github.com/iyear/tdl/pkg/uploader"
)
func Run(ctx context.Context, ns, proxy string, partSize, threads, limit int, paths, excludes []string) error {
kvd, err := kv.New(kv.Options{
Path: consts.KVPath,
NS: ns,
})
if err != nil {
return err
}
files, err := walk(paths, excludes)
if err != nil {
return err
}
color.Blue("Files count: %d", len(files))
c := tgc.New(proxy, kvd, false, floodwait.NewSimpleWaiter())
return c.Run(ctx, func(ctx context.Context) error {
status, err := c.Auth().Status(ctx)
if err != nil {
return err
}
if !status.Authorized {
return fmt.Errorf("not authorized. please login first")
}
return uploader.New(c.API(), partSize, threads, newIter(files)).Upload(ctx, limit)
})
}

37
app/up/walk.go Normal file
View file

@ -0,0 +1,37 @@
package up
import (
"io/fs"
"path/filepath"
)
func walk(paths, excludes []string) ([]string, error) {
files := make([]string, 0)
excludesMap := make(map[string]struct{})
for _, exclude := range excludes {
excludesMap[exclude] = struct{}{}
}
for _, path := range paths {
err := filepath.WalkDir(path, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if d.IsDir() {
return nil
}
if _, ok := excludesMap[filepath.Ext(path)]; ok {
return nil
}
files = append(files, path)
return nil
})
if err != nil {
return nil, err
}
}
return files, nil
}

View file

@ -7,6 +7,7 @@ import (
"github.com/iyear/tdl/cmd/chat"
"github.com/iyear/tdl/cmd/dl"
"github.com/iyear/tdl/cmd/login"
"github.com/iyear/tdl/cmd/up"
"github.com/iyear/tdl/cmd/version"
"github.com/iyear/tdl/pkg/consts"
"github.com/iyear/tdl/pkg/utils"
@ -27,7 +28,7 @@ var cmd = &cobra.Command{
}
func init() {
cmd.AddCommand(version.Cmd, login.Cmd, dl.CmdDL, chat.Cmd)
cmd.AddCommand(version.Cmd, login.Cmd, dl.CmdDL, chat.Cmd, up.Cmd)
cmd.PersistentFlags().String("proxy", "", "proxy address, only socks5 is supported, format: protocol://username:password@host:port")
cmd.PersistentFlags().StringP("ns", "n", "", "namespace for Telegram session")

34
cmd/up/up.go Normal file
View file

@ -0,0 +1,34 @@
package up
import (
"github.com/iyear/tdl/app/up"
"github.com/spf13/cobra"
)
var (
partSize int
threads int
limit int
paths []string
excludes []string
)
var Cmd = &cobra.Command{
Use: "up",
Aliases: []string{"upload"},
Short: "Upload anything to Telegram",
Example: "tdl up -h",
RunE: func(cmd *cobra.Command, args []string) error {
return up.Run(cmd.Context(), cmd.Flag("ns").Value.String(), cmd.Flag("proxy").Value.String(), partSize, threads, limit, paths, excludes)
},
}
func init() {
Cmd.PersistentFlags().IntVarP(&partSize, "part-size", "s", 512*1024, "part size for uploading, max is 512*1024")
Cmd.PersistentFlags().IntVarP(&threads, "threads", "t", 8, "threads for uploading one item")
Cmd.PersistentFlags().IntVarP(&limit, "limit", "l", 2, "max number of concurrent tasks")
Cmd.Flags().StringSliceVarP(&paths, "path", "p", []string{}, "it can be dirs or files")
Cmd.Flags().StringSliceVarP(&excludes, "excludes", "e", []string{}, "exclude the specified file extensions")
}

19
pkg/uploader/iter.go Normal file
View file

@ -0,0 +1,19 @@
package uploader
import (
"context"
"io"
)
type Iter interface {
Next(ctx context.Context) bool
Value(ctx context.Context) (*Item, error)
Total(ctx context.Context) int
}
type Item struct {
R io.ReadCloser
Name string
MIME string
Size int64
}

16
pkg/uploader/progress.go Normal file
View file

@ -0,0 +1,16 @@
package uploader
import (
"context"
"github.com/gotd/td/telegram/uploader"
"github.com/jedib0t/go-pretty/v6/progress"
)
type _progress struct {
tracker *progress.Tracker
}
func (p *_progress) Chunk(_ context.Context, state uploader.ProgressState) error {
p.tracker.SetValue(state.Uploaded)
return nil
}

115
pkg/uploader/uploader.go Normal file
View file

@ -0,0 +1,115 @@
package uploader
import (
"context"
"errors"
"github.com/fatih/color"
"github.com/gotd/td/telegram/message"
"github.com/gotd/td/telegram/message/styling"
"github.com/gotd/td/telegram/uploader"
"github.com/gotd/td/tg"
"github.com/iyear/tdl/pkg/prog"
"github.com/iyear/tdl/pkg/utils"
"github.com/jedib0t/go-pretty/v6/progress"
"golang.org/x/sync/errgroup"
"io"
"time"
)
type Uploader struct {
client *tg.Client
pw progress.Writer
partSize int
threads int
iter Iter
}
func New(client *tg.Client, partSize int, threads int, iter Iter) *Uploader {
return &Uploader{
client: client,
pw: prog.New(),
partSize: partSize,
threads: threads,
iter: iter,
}
}
func (u *Uploader) Upload(ctx context.Context, limit int) error {
u.pw.Log(color.GreenString("All files will be uploaded to 'Saved Messages' dialog"))
u.pw.SetNumTrackersExpected(u.iter.Total(ctx))
go u.pw.Render()
wg, errctx := errgroup.WithContext(ctx)
wg.SetLimit(limit)
for u.iter.Next(ctx) {
item, err := u.iter.Value(ctx)
if err != nil {
u.pw.Log(color.RedString("Get item failed: %v, skip...", err))
continue
}
wg.Go(func() error {
// d.pw.Log(color.MagentaString("name: %s,size: %s", item.Name, utils.Byte.FormatBinaryBytes(item.Size)))
return u.upload(errctx, item)
})
}
err := wg.Wait()
if err != nil {
u.pw.Stop()
for u.pw.IsRenderInProgress() {
time.Sleep(time.Millisecond * 10)
}
if errors.Is(err, context.Canceled) {
color.Red("Upload aborted.")
}
return err
}
for u.pw.IsRenderInProgress() {
if u.pw.LengthActive() == 0 {
u.pw.Stop()
}
time.Sleep(10 * time.Millisecond)
}
return nil
}
func (u *Uploader) upload(ctx context.Context, item *Item) error {
defer func(R io.ReadCloser) {
_ = R.Close()
}(item.R)
tracker := prog.AppendTracker(u.pw, item.Name, item.Size)
up := uploader.NewUploader(u.client).
WithPartSize(u.partSize).WithThreads(u.threads).WithProgress(&_progress{tracker: tracker})
f, err := up.Upload(ctx, uploader.NewUpload(item.Name, item.R, item.Size))
if err != nil {
return err
}
doc := message.UploadedDocument(f,
styling.Code(item.Name),
styling.Plain(" - "),
styling.Code(item.MIME),
).MIME(item.MIME).Filename(item.Name)
var media message.MediaOption
if utils.Media.IsVideo(item.MIME) {
media = doc.Video().SupportsStreaming()
}
if utils.Media.IsAudio(item.MIME) {
media = doc.Audio().Title(utils.FS.GetNameWithoutExt(item.Name))
}
_, err = message.NewSender(u.client).WithUploader(up).Self().Media(ctx, media)
return err
}

35
pkg/utils/mime.go Normal file
View file

@ -0,0 +1,35 @@
package utils
import "strings"
type mime struct{}
var MIME = mime{}
func (m mime) split(mime string) (primary string, sub string, ok bool) {
types := strings.Split(mime, "/")
if len(types) != 2 {
return
}
return types[0], types[1], true
}
func (m mime) IsVideo(mime string) bool {
primary, _, ok := m.split(mime)
return primary == "video" && ok
}
func (m mime) IsAudio(mime string) bool {
primary, _, ok := m.split(mime)
return primary == "audio" && ok
}
func (m mime) IsImage(mime string) bool {
primary, _, ok := m.split(mime)
return primary == "image" && ok
}