mirror of
https://github.com/tgdrive/teldrive.git
synced 2025-09-06 14:37:49 +08:00
refactor: add index for listing channel files and impove check command
This commit is contained in:
parent
303a2b681a
commit
8e4dfd859c
5 changed files with 333 additions and 237 deletions
546
cmd/check.go
546
cmd/check.go
|
@ -5,18 +5,18 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"syscall"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/fatih/color"
|
||||
"github.com/gotd/td/telegram"
|
||||
"github.com/gotd/td/telegram/query"
|
||||
"github.com/gotd/td/telegram/query/messages"
|
||||
"github.com/gotd/td/tg"
|
||||
"github.com/k0kubun/go-ansi"
|
||||
"github.com/jedib0t/go-pretty/v6/progress"
|
||||
"github.com/jedib0t/go-pretty/v6/text"
|
||||
"github.com/manifoldco/promptui"
|
||||
"github.com/schollz/progressbar/v3"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/tgdrive/teldrive/internal/api"
|
||||
"github.com/tgdrive/teldrive/internal/config"
|
||||
|
@ -25,13 +25,19 @@ import (
|
|||
"github.com/tgdrive/teldrive/internal/tgc"
|
||||
"github.com/tgdrive/teldrive/internal/utils"
|
||||
"github.com/tgdrive/teldrive/pkg/models"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"golang.org/x/term"
|
||||
"gorm.io/datatypes"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type channel struct {
|
||||
tg.InputPeerChannel
|
||||
ChannelName string
|
||||
var termWidth = func() (width int, err error) {
|
||||
width, _, err = term.GetSize(int(os.Stdout.Fd()))
|
||||
if err == nil {
|
||||
return width, nil
|
||||
}
|
||||
|
||||
return 0, err
|
||||
}
|
||||
|
||||
type file struct {
|
||||
|
@ -52,13 +58,20 @@ type channelExport struct {
|
|||
Files []exportFile `json:"files"`
|
||||
}
|
||||
|
||||
var termWidth = func() (width int, err error) {
|
||||
width, _, err = term.GetSize(int(os.Stdout.Fd()))
|
||||
if err == nil {
|
||||
return width, nil
|
||||
}
|
||||
|
||||
return 0, err
|
||||
type channelProcessor struct {
|
||||
id int64
|
||||
files []file
|
||||
missingFiles []file
|
||||
orphanMessages []int
|
||||
totalCount int64
|
||||
pw progress.Writer
|
||||
tracker *progress.Tracker
|
||||
channelExport *channelExport
|
||||
client *telegram.Client
|
||||
ctx context.Context
|
||||
db *gorm.DB
|
||||
userId int64
|
||||
clean bool
|
||||
}
|
||||
|
||||
func NewCheckCmd() *cobra.Command {
|
||||
|
@ -84,19 +97,18 @@ func NewCheckCmd() *cobra.Command {
|
|||
cmd.Flags().Bool("export", true, "Export incomplete files to json file")
|
||||
cmd.Flags().Bool("clean", false, "Clean missing and orphan file parts")
|
||||
cmd.Flags().String("user", "", "Telegram User Name")
|
||||
cmd.Flags().Int("concurrent", 4, "Number of concurrent channel processing")
|
||||
return cmd
|
||||
}
|
||||
|
||||
func checkRequiredCheckFlags(cfg *config.ServerCmdConfig) error {
|
||||
var missingFields []string
|
||||
|
||||
if cfg.DB.DataSource == "" {
|
||||
missingFields = append(missingFields, "db-data-source")
|
||||
}
|
||||
if len(missingFields) > 0 {
|
||||
return fmt.Errorf("required configuration values not set: %s", strings.Join(missingFields, ", "))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -131,19 +143,227 @@ func selectUser(user string, users []models.User) (*models.User, error) {
|
|||
return &users[index], nil
|
||||
}
|
||||
|
||||
func (cp *channelProcessor) updateStatus(status string, value int64) {
|
||||
cp.tracker.SetValue(value)
|
||||
cp.tracker.UpdateMessage(fmt.Sprintf("Channel %d: %s", cp.id, status))
|
||||
}
|
||||
|
||||
func (cp *channelProcessor) process() error {
|
||||
cp.updateStatus("Loading files", 0)
|
||||
files, err := cp.loadFiles()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to load files: %w", err)
|
||||
}
|
||||
cp.files = files
|
||||
|
||||
if len(cp.files) == 0 {
|
||||
cp.updateStatus("No files found", 100)
|
||||
return nil
|
||||
}
|
||||
|
||||
cp.updateStatus("Loading messages from Telegram", 0)
|
||||
msgs, total, err := cp.loadChannelMessages()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to load messages: %w", err)
|
||||
}
|
||||
|
||||
if total == 0 && len(msgs) == 0 {
|
||||
cp.updateStatus("No messages found", 100)
|
||||
return nil
|
||||
}
|
||||
if len(msgs) < total {
|
||||
return fmt.Errorf("found %d messages out of %d", len(msgs), total)
|
||||
}
|
||||
|
||||
cp.updateStatus("Processing messages and parts", 0)
|
||||
msgIds := utils.Map(msgs, func(m messages.Elem) int { return m.Msg.GetID() })
|
||||
uploadPartIds := []int{}
|
||||
if err := cp.db.Model(&models.Upload{}).
|
||||
Where("user_id = ?", cp.userId).
|
||||
Where("channel_id = ?", cp.id).
|
||||
Pluck("part_id", &uploadPartIds).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
uploadPartMap := make(map[int]bool)
|
||||
for _, partID := range uploadPartIds {
|
||||
uploadPartMap[partID] = true
|
||||
}
|
||||
|
||||
msgMap := make(map[int]bool)
|
||||
for _, m := range msgIds {
|
||||
if m > 0 && !uploadPartMap[m] {
|
||||
msgMap[m] = true
|
||||
}
|
||||
}
|
||||
|
||||
cp.updateStatus("Checking file integrity", 0)
|
||||
allPartIDs := make(map[int]bool)
|
||||
for _, f := range cp.files {
|
||||
for _, p := range f.Parts {
|
||||
if p.ID == 0 {
|
||||
cp.missingFiles = append(cp.missingFiles, f)
|
||||
break
|
||||
}
|
||||
allPartIDs[p.ID] = true
|
||||
}
|
||||
}
|
||||
|
||||
if len(allPartIDs) == 0 {
|
||||
cp.updateStatus("No parts found", 100)
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, f := range cp.files {
|
||||
for _, p := range f.Parts {
|
||||
if !msgMap[p.ID] {
|
||||
cp.missingFiles = append(cp.missingFiles, f)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for msgID := range msgMap {
|
||||
if !allPartIDs[msgID] {
|
||||
cp.orphanMessages = append(cp.orphanMessages, msgID)
|
||||
}
|
||||
}
|
||||
|
||||
if len(cp.missingFiles) > 0 {
|
||||
cp.channelExport = &channelExport{
|
||||
ChannelID: cp.id,
|
||||
Timestamp: time.Now().Format(time.RFC3339),
|
||||
FileCount: len(cp.missingFiles),
|
||||
Files: make([]exportFile, 0, len(cp.missingFiles)),
|
||||
}
|
||||
|
||||
for _, f := range cp.missingFiles {
|
||||
cp.channelExport.Files = append(cp.channelExport.Files, exportFile{
|
||||
ID: f.ID,
|
||||
Name: f.Name,
|
||||
})
|
||||
}
|
||||
|
||||
if cp.clean {
|
||||
cp.updateStatus("Cleaning files", 0)
|
||||
err = cp.db.Exec("call teldrive.delete_files_bulk($1 , $2)",
|
||||
utils.Map(cp.missingFiles, func(f file) string { return f.ID }), cp.userId).Error
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if cp.clean && len(cp.orphanMessages) > 0 {
|
||||
cp.updateStatus("Cleaning orphan messages", 0)
|
||||
tgc.DeleteMessages(cp.ctx, cp.client, cp.id, cp.orphanMessages)
|
||||
}
|
||||
|
||||
cp.updateStatus("Complete", 100)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cp *channelProcessor) loadFiles() ([]file, error) {
|
||||
var files []file
|
||||
const batchSize = 1000
|
||||
var totalFiles int64
|
||||
var lastID string
|
||||
|
||||
if err := cp.db.Model(&models.File{}).
|
||||
Where("user_id = ?", cp.userId).
|
||||
Where("channel_id = ?", cp.id).
|
||||
Where("type = ?", "file").
|
||||
Count(&totalFiles).Error; err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if totalFiles == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
processed := int64(0)
|
||||
for {
|
||||
var batch []file
|
||||
query := cp.db.WithContext(cp.ctx).Model(&models.File{}).
|
||||
Where("user_id = ?", cp.userId).
|
||||
Where("channel_id = ?", cp.id).
|
||||
Where("type = ?", "file").
|
||||
Order("id").
|
||||
Limit(batchSize)
|
||||
|
||||
if lastID != "" {
|
||||
query = query.Where("id > ?", lastID)
|
||||
}
|
||||
|
||||
result := query.Scan(&batch)
|
||||
if result.Error != nil {
|
||||
return nil, result.Error
|
||||
}
|
||||
|
||||
if len(batch) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
files = append(files, batch...)
|
||||
processed += int64(len(batch))
|
||||
|
||||
lastID = batch[len(batch)-1].ID
|
||||
progress := (float64(processed) / float64(totalFiles)) * 100
|
||||
cp.updateStatus(fmt.Sprintf("Loading files: %d/%d", processed, totalFiles), int64(progress))
|
||||
if len(batch) < batchSize {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return files, nil
|
||||
}
|
||||
|
||||
func (cp *channelProcessor) loadChannelMessages() (msgs []messages.Elem, total int, err error) {
|
||||
|
||||
err = tgc.RunWithAuth(cp.ctx, cp.client, "", func(ctx context.Context) error {
|
||||
var channel *tg.InputChannel
|
||||
channel, err = tgc.GetChannelById(ctx, cp.client.API(), cp.id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
q := query.NewQuery(cp.client.API()).Messages().GetHistory(&tg.InputPeerChannel{
|
||||
ChannelID: cp.id,
|
||||
AccessHash: channel.AccessHash,
|
||||
})
|
||||
|
||||
msgiter := messages.NewIterator(q, 100)
|
||||
total, err = msgiter.Total(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get total messages: %w", err)
|
||||
}
|
||||
|
||||
processed := 0
|
||||
for msgiter.Next(ctx) {
|
||||
msg := msgiter.Value()
|
||||
msgs = append(msgs, msg)
|
||||
processed++
|
||||
|
||||
if processed%100 == 0 {
|
||||
progress := (float64(processed) / float64(total)) * 100
|
||||
cp.updateStatus(fmt.Sprintf("Loading messages: %d/%d", processed, total), int64(progress))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func runCheckCmd(cmd *cobra.Command, cfg *config.ServerCmdConfig) {
|
||||
|
||||
ctx := cmd.Context()
|
||||
|
||||
lg := logging.DefaultLogger().Sugar()
|
||||
|
||||
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
|
||||
|
||||
defer func() {
|
||||
stop()
|
||||
logging.DefaultLogger().Sync()
|
||||
}()
|
||||
defer logging.DefaultLogger().Sync()
|
||||
|
||||
cfg.DB.LogLevel = "fatal"
|
||||
db, err := database.NewDatabase(&cfg.DB, lg)
|
||||
|
||||
if err != nil {
|
||||
lg.Fatalw("failed to create database", "err", err)
|
||||
}
|
||||
|
@ -152,20 +372,28 @@ func runCheckCmd(cmd *cobra.Command, cfg *config.ServerCmdConfig) {
|
|||
if err := db.Model(&models.User{}).Find(&users).Error; err != nil {
|
||||
lg.Fatalw("failed to get users", "err", err)
|
||||
}
|
||||
|
||||
userName, _ := cmd.Flags().GetString("user")
|
||||
user, err := selectUser(userName, users)
|
||||
if err != nil {
|
||||
lg.Fatalw("failed to select user", "err", err)
|
||||
}
|
||||
|
||||
session := models.Session{}
|
||||
if err := db.Model(&models.Session{}).Where("user_id = ?", user.UserId).Order("created_at desc").First(&session).Error; err != nil {
|
||||
if err := db.Model(&models.Session{}).
|
||||
Where("user_id = ?", user.UserId).
|
||||
Order("created_at desc").
|
||||
First(&session).Error; err != nil {
|
||||
lg.Fatalw("failed to get session", "err", err)
|
||||
}
|
||||
channelIds := []int64{}
|
||||
|
||||
if err := db.Model(&models.Channel{}).Where("user_id = ?", user.UserId).Pluck("channel_id", &channelIds).Error; err != nil {
|
||||
channelIds := []int64{}
|
||||
if err := db.Model(&models.Channel{}).
|
||||
Where("user_id = ?", user.UserId).
|
||||
Pluck("channel_id", &channelIds).Error; err != nil {
|
||||
lg.Fatalw("failed to get channels", "err", err)
|
||||
}
|
||||
|
||||
if len(channelIds) == 0 {
|
||||
lg.Fatalw("no channels found")
|
||||
}
|
||||
|
@ -177,231 +405,101 @@ func runCheckCmd(cmd *cobra.Command, cfg *config.ServerCmdConfig) {
|
|||
}
|
||||
|
||||
middlewares := tgc.NewMiddleware(tgconfig, tgc.WithFloodWait(), tgc.WithRateLimit())
|
||||
|
||||
export, _ := cmd.Flags().GetBool("export")
|
||||
|
||||
clean, _ := cmd.Flags().GetBool("clean")
|
||||
concurrent, _ := cmd.Flags().GetInt("concurrent")
|
||||
|
||||
pw := progress.NewWriter()
|
||||
pw.SetAutoStop(false)
|
||||
width := 75
|
||||
if size, err := termWidth(); err == nil {
|
||||
width = int((float32(3) / float32(4)) * float32(size))
|
||||
}
|
||||
pw.SetTrackerLength(width / 5)
|
||||
pw.SetMessageLength(width * 3 / 5)
|
||||
pw.SetStyle(progress.StyleDefault)
|
||||
pw.SetTrackerPosition(progress.PositionRight)
|
||||
pw.SetUpdateFrequency(time.Millisecond * 100)
|
||||
pw.Style().Colors = progress.StyleColorsExample
|
||||
pw.Style().Colors.Message = text.Colors{text.FgBlue}
|
||||
pw.Style().Options.PercentFormat = "%4.1f%%"
|
||||
pw.Style().Visibility.Value = false
|
||||
pw.Style().Options.TimeInProgressPrecision = time.Millisecond
|
||||
pw.Style().Options.ErrorString = color.RedString("failed!")
|
||||
pw.Style().Options.DoneString = color.GreenString("done!")
|
||||
|
||||
var channelExports []channelExport
|
||||
var mutex sync.Mutex
|
||||
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
g.SetLimit(concurrent)
|
||||
|
||||
go pw.Render()
|
||||
|
||||
for _, id := range channelIds {
|
||||
lg.Info("Processing channel: ", id)
|
||||
const batchSize = 1000
|
||||
var offset int
|
||||
var files []file
|
||||
|
||||
for {
|
||||
var batch []file
|
||||
result := db.Model(&models.File{}).
|
||||
Offset(offset).
|
||||
Limit(batchSize).
|
||||
Where("user_id = ?", user.UserId).
|
||||
Where("channel_id = ?", id).
|
||||
Where("type = ?", "file").
|
||||
Scan(&batch)
|
||||
if result.Error != nil {
|
||||
lg.Errorw("failed to load files", "err", result.Error)
|
||||
break
|
||||
g.Go(func() error {
|
||||
|
||||
client, err := tgc.AuthClient(ctx, tgconfig, session.Session, middlewares...)
|
||||
if err != nil {
|
||||
lg.Errorw("failed to create client", "err", err, "channel", id)
|
||||
return fmt.Errorf("failed to create client for channel %d: %w", id, err)
|
||||
}
|
||||
|
||||
files = append(files, batch...)
|
||||
if len(batch) < batchSize {
|
||||
break
|
||||
tracker := &progress.Tracker{
|
||||
Message: fmt.Sprintf("Channel %d: Initializing", id),
|
||||
Total: 100,
|
||||
Units: progress.UnitsDefault,
|
||||
}
|
||||
offset += batchSize
|
||||
}
|
||||
if len(files) == 0 {
|
||||
continue
|
||||
}
|
||||
pw.AppendTracker(tracker)
|
||||
|
||||
lg.Infof("Channel %d: %d files found", id, len(files))
|
||||
|
||||
lg.Infof("Loading messages from telegram")
|
||||
|
||||
client, err := tgc.AuthClient(ctx, tgconfig, session.Session, middlewares...)
|
||||
|
||||
if err != nil {
|
||||
lg.Fatalw("failed to create client", "err", err)
|
||||
}
|
||||
|
||||
msgs, total, err := loadChannelMessages(ctx, client, id)
|
||||
if err != nil {
|
||||
lg.Fatalw("failed to load channel messages", "err", err)
|
||||
}
|
||||
if total == 0 && len(msgs) == 0 {
|
||||
lg.Infof("Channel %d: no messages found", id)
|
||||
continue
|
||||
}
|
||||
if len(msgs) < total {
|
||||
lg.Fatalf("Channel %d: found %d messages out of %d", id, len(msgs), total)
|
||||
continue
|
||||
}
|
||||
|
||||
msgIds := utils.Map(msgs, func(m tg.NotEmptyMessage) int { return m.GetID() })
|
||||
|
||||
uploadPartIds := []int{}
|
||||
if err := db.Model(&models.Upload{}).Where("user_id = ?", user.UserId).Where("channel_id = ?", id).
|
||||
Pluck("part_id", &uploadPartIds).Error; err != nil {
|
||||
lg.Errorw("failed to get upload part ids", "err", err)
|
||||
}
|
||||
|
||||
uploadPartMap := make(map[int]bool)
|
||||
|
||||
for _, partID := range uploadPartIds {
|
||||
uploadPartMap[partID] = true
|
||||
}
|
||||
|
||||
msgMap := make(map[int]bool)
|
||||
for _, m := range msgIds {
|
||||
if m > 0 && !uploadPartMap[m] {
|
||||
msgMap[m] = true
|
||||
}
|
||||
}
|
||||
filesWithMissingParts := []file{}
|
||||
allPartIDs := make(map[int]bool)
|
||||
for _, f := range files {
|
||||
for _, p := range f.Parts {
|
||||
if p.ID == 0 {
|
||||
filesWithMissingParts = append(filesWithMissingParts, f)
|
||||
break
|
||||
}
|
||||
allPartIDs[p.ID] = true
|
||||
}
|
||||
}
|
||||
if len(allPartIDs) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, f := range files {
|
||||
for _, p := range f.Parts {
|
||||
if !msgMap[p.ID] {
|
||||
filesWithMissingParts = append(filesWithMissingParts, f)
|
||||
break
|
||||
}
|
||||
processor := &channelProcessor{
|
||||
id: id,
|
||||
client: client,
|
||||
ctx: ctx,
|
||||
db: db,
|
||||
userId: user.UserId,
|
||||
clean: clean,
|
||||
pw: pw,
|
||||
tracker: tracker,
|
||||
totalCount: 100,
|
||||
}
|
||||
|
||||
}
|
||||
missingMsgIDs := []int{}
|
||||
for msgID := range msgMap {
|
||||
if !allPartIDs[msgID] {
|
||||
missingMsgIDs = append(missingMsgIDs, msgID)
|
||||
}
|
||||
}
|
||||
|
||||
if len(filesWithMissingParts) > 0 {
|
||||
lg.Infof("Channel %d: found %d files with missing parts", id, len(filesWithMissingParts))
|
||||
}
|
||||
|
||||
if export && len(filesWithMissingParts) > 0 {
|
||||
channelData := channelExport{
|
||||
ChannelID: id,
|
||||
Timestamp: time.Now().Format(time.RFC3339),
|
||||
FileCount: len(filesWithMissingParts),
|
||||
Files: make([]exportFile, 0, len(filesWithMissingParts)),
|
||||
if err := processor.process(); err != nil {
|
||||
tracker.MarkAsErrored()
|
||||
return err
|
||||
}
|
||||
|
||||
for _, f := range filesWithMissingParts {
|
||||
channelData.Files = append(channelData.Files, exportFile{
|
||||
ID: f.ID,
|
||||
Name: f.Name,
|
||||
})
|
||||
}
|
||||
if clean {
|
||||
err = db.Exec("call teldrive.delete_files_bulk($1 , $2)",
|
||||
utils.Map(filesWithMissingParts, func(f file) string { return f.ID }), user.UserId).Error
|
||||
if err != nil {
|
||||
lg.Errorw("failed to delete files", "err", err)
|
||||
}
|
||||
if processor.channelExport != nil {
|
||||
mutex.Lock()
|
||||
channelExports = append(channelExports, *processor.channelExport)
|
||||
mutex.Unlock()
|
||||
}
|
||||
|
||||
channelExports = append(channelExports, channelData)
|
||||
}
|
||||
if clean && len(missingMsgIDs) > 0 {
|
||||
lg.Infof("Channel %d: cleaning %d orphan messages", id, len(missingMsgIDs))
|
||||
tgc.DeleteMessages(ctx, client, id, missingMsgIDs)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
if len(channelExports) > 0 {
|
||||
|
||||
if err := g.Wait(); err != nil {
|
||||
lg.Fatal(fmt.Errorf("one or more channels failed to process"))
|
||||
}
|
||||
|
||||
pw.Stop()
|
||||
|
||||
if export && len(channelExports) > 0 {
|
||||
jsonData, err := json.MarshalIndent(channelExports, "", " ")
|
||||
if err != nil {
|
||||
lg.Errorw("failed to marshal JSON", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
err = os.WriteFile("missing_files.json", jsonData, 0644)
|
||||
if err != nil {
|
||||
lg.Errorw("failed to write JSON file", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
lg.Infof("Exported data to missing_files.json")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func loadChannelMessages(ctx context.Context, client *telegram.Client, channelId int64) (msgs []tg.NotEmptyMessage, total int, err error) {
|
||||
errChan := make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
errChan <- tgc.RunWithAuth(ctx, client, "", func(ctx context.Context) error {
|
||||
channel, err := tgc.GetChannelById(ctx, client.API(), channelId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
count := 0
|
||||
|
||||
q := query.NewQuery(client.API()).Messages().GetHistory(&tg.InputPeerChannel{
|
||||
ChannelID: channelId,
|
||||
AccessHash: channel.AccessHash,
|
||||
})
|
||||
|
||||
msgiter := messages.NewIterator(q, 100)
|
||||
|
||||
total, err = msgiter.Total(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get total messages: %w", err)
|
||||
}
|
||||
|
||||
width, err := termWidth()
|
||||
if err != nil {
|
||||
width = 50
|
||||
}
|
||||
bar := progressbar.NewOptions(
|
||||
total,
|
||||
progressbar.OptionSetWriter(ansi.NewAnsiStdout()),
|
||||
progressbar.OptionEnableColorCodes(true),
|
||||
progressbar.OptionThrottle(65*time.Millisecond),
|
||||
progressbar.OptionShowCount(),
|
||||
progressbar.OptionSetWidth(width/3),
|
||||
progressbar.OptionShowIts(),
|
||||
progressbar.OptionSetTheme(progressbar.Theme{
|
||||
Saucer: "[green]=[reset]",
|
||||
SaucerHead: "[green]>[reset]",
|
||||
SaucerPadding: " ",
|
||||
BarStart: "[",
|
||||
BarEnd: "]",
|
||||
}),
|
||||
)
|
||||
|
||||
defer bar.Clear()
|
||||
|
||||
for msgiter.Next(ctx) {
|
||||
msg := msgiter.Value()
|
||||
msgs = append(msgs, msg.Msg)
|
||||
count++
|
||||
bar.Set(count)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}()
|
||||
|
||||
select {
|
||||
case err = <-errChan:
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
case <-ctx.Done():
|
||||
fmt.Print("\r\033[K")
|
||||
err = ctx.Err()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
7
go.mod
7
go.mod
|
@ -6,6 +6,7 @@ require (
|
|||
github.com/Masterminds/semver/v3 v3.3.1
|
||||
github.com/WinterYukky/gorm-extra-clause-plugin v0.3.0
|
||||
github.com/coocood/freecache v1.2.4
|
||||
github.com/fatih/color v1.18.0
|
||||
github.com/glebarez/sqlite v1.11.0
|
||||
github.com/go-chi/chi/v5 v5.2.0
|
||||
github.com/go-chi/cors v1.2.1
|
||||
|
@ -15,12 +16,11 @@ require (
|
|||
github.com/gotd/contrib v0.21.0
|
||||
github.com/gotd/td v0.117.0
|
||||
github.com/iyear/connectproxy v0.1.1
|
||||
github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213
|
||||
github.com/jedib0t/go-pretty/v6 v6.6.5
|
||||
github.com/manifoldco/promptui v0.9.0
|
||||
github.com/mitchellh/mapstructure v1.5.0
|
||||
github.com/ogen-go/ogen v1.8.1
|
||||
github.com/redis/go-redis/v9 v9.7.0
|
||||
github.com/schollz/progressbar/v3 v3.18.0
|
||||
github.com/spf13/cobra v1.8.1
|
||||
github.com/spf13/pflag v1.0.5
|
||||
github.com/spf13/viper v1.19.0
|
||||
|
@ -44,7 +44,6 @@ require (
|
|||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||
github.com/dlclark/regexp2 v1.11.4 // indirect
|
||||
github.com/dustin/go-humanize v1.0.1 // indirect
|
||||
github.com/fatih/color v1.18.0 // indirect
|
||||
github.com/fsnotify/fsnotify v1.8.0 // indirect
|
||||
github.com/ghodss/yaml v1.0.0 // indirect
|
||||
github.com/glebarez/go-sqlite v1.22.0 // indirect
|
||||
|
@ -55,9 +54,9 @@ require (
|
|||
github.com/jackc/puddle/v2 v2.2.2 // indirect
|
||||
github.com/magiconair/properties v1.8.9 // indirect
|
||||
github.com/mattn/go-colorable v0.1.14 // indirect
|
||||
github.com/mattn/go-runewidth v0.0.16 // indirect
|
||||
github.com/mattn/go-sqlite3 v1.14.24 // indirect
|
||||
github.com/mfridman/interpolate v0.0.2 // indirect
|
||||
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect
|
||||
github.com/ncruces/go-strftime v0.1.9 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||
|
|
11
go.sum
11
go.sum
|
@ -21,8 +21,6 @@ github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyY
|
|||
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/chengxilo/virtualterm v1.0.4 h1:Z6IpERbRVlfB8WkOmtbHiDbBANU7cimRIof7mk9/PwM=
|
||||
github.com/chengxilo/virtualterm v1.0.4/go.mod h1:DyxxBZz/x1iqJjFxTFcr6/x+jSpqN0iwWCOK1q10rlY=
|
||||
github.com/chzyer/logex v1.1.10 h1:Swpa1K6QvQznwJRcfTfQJmTE72DqScAa40E+fbHEXEE=
|
||||
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
|
||||
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e h1:fY5BOSpyZCqRo5OhCuC+XN+r/bBCmeuuJtjz+bCNIf8=
|
||||
|
@ -137,12 +135,12 @@ github.com/jackc/pgx/v5 v5.7.2 h1:mLoDLV6sonKlvjIEsV56SkWNCnuNv531l94GaIzO+XI=
|
|||
github.com/jackc/pgx/v5 v5.7.2/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ=
|
||||
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
|
||||
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
||||
github.com/jedib0t/go-pretty/v6 v6.6.5 h1:9PgMJOVBedpgYLI56jQRJYqngxYAAzfEUua+3NgSqAo=
|
||||
github.com/jedib0t/go-pretty/v6 v6.6.5/go.mod h1:Uq/HrbhuFty5WSVNfjpQQe47x16RwVGXIveNGEyGtHs=
|
||||
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
|
||||
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
|
||||
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
|
||||
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
|
||||
github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213 h1:qGQQKEcAR99REcMpsXCp3lJ03zYT1PkRd3kQGPn9GVg=
|
||||
github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw=
|
||||
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
|
||||
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
|
@ -170,8 +168,6 @@ github.com/mfridman/interpolate v0.0.2 h1:pnuTK7MQIxxFz1Gr+rjSIx9u7qVjf5VOoM/u6B
|
|||
github.com/mfridman/interpolate v0.0.2/go.mod h1:p+7uk6oE07mpE/Ik1b8EckO0O4ZXiGAfshKBWLUM9Xg=
|
||||
github.com/microsoft/go-mssqldb v1.8.0 h1:7cyZ/AT7ycDsEoWPIXibd+aVKFtteUNhDGf3aobP+tw=
|
||||
github.com/microsoft/go-mssqldb v1.8.0/go.mod h1:6znkekS3T2vp0waiMhen4GPU1BiAsrP+iXHcE7a7rFo=
|
||||
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db h1:62I3jR2EmQ4l5rM/4FEfDWcRD+abF5XlKShorW5LRoQ=
|
||||
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0dey0ia/Uv7NcFFVbCLtqEBQbrT4OCwCSKTEv6enCw=
|
||||
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
|
||||
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
|
||||
|
@ -202,6 +198,7 @@ github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa
|
|||
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
|
||||
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
|
||||
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
|
||||
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
|
||||
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
|
||||
|
@ -215,8 +212,6 @@ github.com/sagikazarmark/locafero v0.7.0 h1:5MqpDsTGNDhY8sGp0Aowyf0qKsPrhewaLSsF
|
|||
github.com/sagikazarmark/locafero v0.7.0/go.mod h1:2za3Cg5rMaTMoG/2Ulr9AwtFaIppKXTRYnozin4aB5k=
|
||||
github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE=
|
||||
github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ=
|
||||
github.com/schollz/progressbar/v3 v3.18.0 h1:uXdoHABRFmNIjUfte/Ex7WtuyVslrw2wVPQmCN62HpA=
|
||||
github.com/schollz/progressbar/v3 v3.18.0/go.mod h1:IsO3lpbaGuzh8zIMzgY3+J8l4C8GjO0Y9S69eFvNsec=
|
||||
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
|
||||
github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
|
||||
github.com/sethvargo/go-retry v0.3.0 h1:EEt31A35QhrcRZtrYFDTBg91cqZVnFL2navjDrah2SE=
|
||||
|
|
|
@ -59,7 +59,7 @@ type DBPool struct {
|
|||
type DBConfig struct {
|
||||
DataSource string `config:"data-source" description:"Database connection string" required:"true"`
|
||||
PrepareStmt bool `config:"prepare-stmt" description:"Use prepared statements" default:"true"`
|
||||
LogLevel string `config:"log-level" description:"Database logging level" default:"info"`
|
||||
LogLevel string `config:"log-level" description:"Database logging level" default:"error"`
|
||||
Pool DBPool `config:"pool"`
|
||||
}
|
||||
|
||||
|
|
4
internal/database/migrations/20250121211747_index.sql
Normal file
4
internal/database/migrations/20250121211747_index.sql
Normal file
|
@ -0,0 +1,4 @@
|
|||
-- +goose Up
|
||||
-- +goose StatementBegin
|
||||
CREATE INDEX IF NOT EXISTS idx_files_user_channel_type_id ON teldrive.files (user_id, channel_id, type, id);
|
||||
-- +goose StatementEnd
|
Loading…
Add table
Reference in a new issue