Enabled debug logging and disabled lazy bots for streaming

This commit is contained in:
divyam234 2023-12-17 18:35:36 +05:30
parent 095bb1a823
commit f0f19e1898
22 changed files with 322 additions and 259 deletions

3
.gitignore vendored
View file

@ -27,4 +27,5 @@ sslcerts
*.env.example
*.env.local
*.env.staging
*.db
*.db
logs

View file

@ -129,14 +129,16 @@ In addition to the mandatory variables, you can also set the following optional
- `COOKIE_SAME_SITE` : Only needed when frontend is on other domain (Default true).
- `LAZY_STREAM_BOTS` : If set to true start Bot session and close immediately when stream or download request is over otherwise run bots forever till server stops (Default false).
- `BG_BOTS_LIMIT` : If LAZY_STREAM_BOTS is set to false it start atmost BG_BOTS_LIMIT no of bots in background to prevent connection recreation on every request (Default 5).
- `UPLOAD_RETENTION` : No of days to keep incomplete uploads parts in channel afterwards these parts are deleted (Default 15).
- `ENCRYPTION_KEY` : Password for Encryption.
- `DEV` : DEV mode to enable debug logging(Default false).
- `LOG_SQL` : Log sql queries (Default false).
> [!WARNING]
> Keep your Password safe once generated teldrive uses same encryption as of rclone internally
so you don't need to enable crypt in rclone.**Teldrive generates random salt for each file part and saves in database so its more secure than rclone crypt whereas in rclone same salt value is used for all files which can be compromised easily**. Enabling crypt in rclone makes UI reduntant so encrypting files in teldrive internally is better way to encrypt files and more secure encryption than rclone.To encrypt files see more about teldrive rclone config.

View file

@ -1,21 +1,27 @@
package api
import (
"time"
"github.com/divyam234/teldrive/pkg/controller"
"github.com/divyam234/teldrive/pkg/middleware"
"github.com/divyam234/teldrive/ui"
ginzap "github.com/gin-contrib/zap"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
)
func InitRouter() *gin.Engine {
func InitRouter(logger *zap.Logger) *gin.Engine {
r := gin.Default()
r := gin.New()
r.Use(gin.Recovery())
r.Use(ginzap.Ginzap(logger, time.RFC3339, true))
r.Use(ginzap.RecoveryWithZap(logger, true))
r.Use(middleware.Cors())
c := controller.NewController()
c := controller.NewController(logger)
api := r.Group("/api")
{

View file

@ -3,43 +3,39 @@ package main
import (
"fmt"
"mime"
"path/filepath"
"github.com/divyam234/teldrive/api"
"github.com/divyam234/teldrive/internal/cron"
"github.com/divyam234/teldrive/internal/logger"
"github.com/divyam234/teldrive/internal/utils"
"github.com/divyam234/teldrive/pkg/database"
cnf "github.com/divyam234/teldrive/config"
"github.com/divyam234/teldrive/config"
"github.com/divyam234/teldrive/internal/cache"
"github.com/gin-gonic/gin"
)
func main() {
gin.SetMode(gin.ReleaseMode)
config.InitConfig()
cnf.InitConfig()
if config.GetConfig().Dev {
gin.SetMode(gin.DebugMode)
} else {
gin.SetMode(gin.ReleaseMode)
}
logger.InitLogger()
log := logger.InitLogger()
database.InitDB()
cache.InitCache()
cron.StartCronJobs()
cron.StartCronJobs(log)
mime.AddExtensionType(".js", "application/javascript")
r := api.InitRouter()
r := api.InitRouter(log)
r.Run(fmt.Sprintf(":%d", config.GetConfig().Port))
config := cnf.GetConfig()
certDir := filepath.Join(config.ExecDir, "sslcerts")
ok, _ := utils.PathExists(certDir)
if ok && config.Https {
r.RunTLS(fmt.Sprintf(":%d", config.Port), filepath.Join(certDir, "cert.pem"), filepath.Join(certDir, "key.pem"))
} else {
r.Run(fmt.Sprintf(":%d", config.Port))
}
}

View file

@ -29,10 +29,11 @@ type Config struct {
TgClientLangPack string `envconfig:"TG_CLIENT_LANG_PACK" default:"webk"`
RunMigrations bool `envconfig:"RUN_MIGRATIONS" default:"true"`
Port int `envconfig:"PORT" default:"8080"`
LazyStreamBots bool `envconfig:"LAZY_STREAM_BOTS" default:"false"`
BgBotsLimit int `envconfig:"BG_BOTS_LIMIT" default:"5"`
UploadRetention int `envconfig:"UPLOAD_RETENTION" default:"15"`
DisableStreamBots bool `envconfig:"DISABLE_STREAM_BOTS" default:"false"`
Dev bool `envconfig:"DEV" default:"false"`
LogSql bool `envconfig:"LOG_SQL" default:"false"`
EncryptionKey string `envconfig:"ENCRYPTION_KEY"`
ExecDir string
}

View file

@ -7,6 +7,7 @@ services:
container_name: teldrive
volumes:
- ./teldrive.db:/app/teldrive.db:rw
- ./logs:/app/logs:rw
env_file: teldrive.env
ports:
- 8080:8080

3
go.mod
View file

@ -5,6 +5,7 @@ go 1.21
require (
github.com/coocood/freecache v1.2.4
github.com/divyam234/cors v1.4.2
github.com/gin-contrib/zap v0.2.0
github.com/gin-gonic/gin v1.9.1
github.com/go-co-op/gocron v1.37.0
github.com/go-jose/go-jose/v3 v3.0.1
@ -18,6 +19,7 @@ require (
go.etcd.io/bbolt v1.3.8
go.uber.org/zap v1.26.0
golang.org/x/time v0.5.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
gorm.io/driver/postgres v1.5.4
gorm.io/gorm v1.25.5
)
@ -29,6 +31,7 @@ require (
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/sethvargo/go-retry v0.2.4 // indirect
)
require (

4
go.sum
View file

@ -58,6 +58,8 @@ github.com/gin-contrib/secure v0.0.1 h1:DMMx3xXDY+MLA9kzIPHksyzC5/V5J6014c/WAmdS
github.com/gin-contrib/secure v0.0.1/go.mod h1:6kseOBFrSR3Is/kM1jDhCg/WsXAMvKJkuPvG9dGph/c=
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-contrib/zap v0.2.0 h1:HLvt3rZXyC8XC+s2lHzMFow3UDqiEbfrBWJyHHS6L8A=
github.com/gin-contrib/zap v0.2.0/go.mod h1:eqfbe9ZmI+GgTZF6nRiC2ZwDeM4DK1Viwc8OxTCphh0=
github.com/gin-gonic/contrib v0.0.0-20221130124618-7e01895a63f2 h1:dyuNlYlG1faymw39NdJddnzJICy6587tiGSVioWhYoE=
github.com/gin-gonic/contrib v0.0.0-20221130124618-7e01895a63f2/go.mod h1:iqneQ2Df3omzIVTkIfn7c1acsVnMGiSLn4XF5Blh3Yg=
github.com/gin-gonic/gin v1.5.0/go.mod h1:Nd6IXA8m5kNZdNEHMBd93KT+mdY3+bewLgRvmCsR2Do=
@ -331,6 +333,8 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EV
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE=
gopkg.in/go-playground/validator.v9 v9.29.1/go.mod h1:+c9/zcJMFNgbLvly1L1V+PpxWdVbfP1avr/N00E2vyQ=
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=

View file

@ -14,6 +14,7 @@ import (
"github.com/divyam234/teldrive/pkg/services"
"github.com/go-co-op/gocron"
"github.com/gotd/td/tg"
"go.uber.org/zap"
)
type Files []File
@ -60,7 +61,7 @@ type UploadResult struct {
ChannelId int64
}
func deleteTGMessages(ctx context.Context, result Result) error {
func deleteTGMessages(ctx context.Context, logger *zap.Logger, result Result) error {
db := database.DB
@ -82,7 +83,7 @@ func deleteTGMessages(ctx context.Context, result Result) error {
}
err = tgc.RunWithAuth(ctx, client, "", func(ctx context.Context) error {
err = tgc.RunWithAuth(ctx, logger, client, "", func(ctx context.Context) error {
channel, err := services.GetChannelById(ctx, client, result.ChannelId, strconv.FormatInt(result.UserId, 10))
@ -102,10 +103,10 @@ func deleteTGMessages(ctx context.Context, result Result) error {
db.Where("id = any($1)", fileIds).Delete(&models.File{})
}
return nil
return err
}
func cleanUploadsMessages(ctx context.Context, result UploadResult) error {
func cleanUploadsMessages(ctx context.Context, logger *zap.Logger, result UploadResult) error {
db := database.DB
@ -115,7 +116,7 @@ func cleanUploadsMessages(ctx context.Context, result UploadResult) error {
return err
}
err = tgc.RunWithAuth(ctx, client, "", func(ctx context.Context) error {
err = tgc.RunWithAuth(ctx, logger, client, "", func(ctx context.Context) error {
channel, err := services.GetChannelById(ctx, client, result.ChannelId, strconv.FormatInt(result.UserId, 10))
@ -139,10 +140,10 @@ func cleanUploadsMessages(ctx context.Context, result UploadResult) error {
db.Where("part_id = any($1)", parts).Delete(&models.Upload{})
}
return nil
return err
}
func filesDeleteJob() {
func filesDeleteJob(logger *zap.Logger) {
db := database.DB
ctx, cancel := context.WithCancel(context.Background())
@ -161,11 +162,16 @@ func filesDeleteJob() {
}
for _, row := range results {
deleteTGMessages(ctx, row)
err := deleteTGMessages(ctx, logger, row)
if err != nil {
logger.Error("failed to clean pending files", zap.Int64("user", row.UserId), zap.Error(err))
}
logger.Info("cleaned pending files", zap.Int64("user", row.UserId),
zap.Int64("channel", row.ChannelId))
}
}
func uploadCleanJob() {
func uploadCleanJob(logger *zap.Logger) {
db := database.DB
ctx, cancel := context.WithCancel(context.Background())
c := config.GetConfig()
@ -183,22 +189,29 @@ func uploadCleanJob() {
return
}
for _, row := range upResults {
cleanUploadsMessages(ctx, row)
err := cleanUploadsMessages(ctx, logger, row)
if err != nil {
logger.Error("failed to clean orpahan file parts", zap.Int64("user", row.UserId), zap.Error(err))
}
logger.Info("cleaned orpahan file parts", zap.Int64("user", row.UserId),
zap.Int64("channel", row.ChannelId))
}
}
func folderSizeUpdate() {
func folderSizeUpdate(logger *zap.Logger) {
database.DB.Exec("call teldrive.update_size();")
logger.Info("updates folder sizes")
}
func StartCronJobs() {
func StartCronJobs(logger *zap.Logger) {
scheduler := gocron.NewScheduler(time.UTC)
scheduler.Every(1).Hour().Do(filesDeleteJob)
scheduler.Every(1).Hour().Do(filesDeleteJob, logger)
scheduler.Every(12).Hour().Do(uploadCleanJob)
scheduler.Every(12).Hour().Do(uploadCleanJob, logger)
scheduler.Every(2).Hour().Do(folderSizeUpdate)
scheduler.Every(2).Hour().Do(folderSizeUpdate, logger)
scheduler.StartAsync()
}

View file

@ -2,18 +2,51 @@ package logger
import (
"os"
"time"
"github.com/divyam234/teldrive/config"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"gopkg.in/natefinch/lumberjack.v2"
)
var Logger *zap.Logger
func InitLogger() *zap.Logger {
customTimeEncoder := func(t time.Time, enc zapcore.PrimitiveArrayEncoder) {
enc.AppendString(t.Format("02/01/2006 03:04 PM"))
}
var (
consoleConfig zapcore.EncoderConfig
logLevel zapcore.Level
)
func InitLogger() {
config := zap.NewProductionEncoderConfig()
config.EncodeTime = zapcore.ISO8601TimeEncoder
consoleEncoder := zapcore.NewConsoleEncoder(config)
defaultLogLevel := zapcore.InfoLevel
core := zapcore.NewCore(consoleEncoder, zapcore.AddSync(os.Stdout), defaultLogLevel)
Logger = zap.New(core, zap.AddCaller(), zap.AddStacktrace(zapcore.ErrorLevel))
if config.GetConfig().Dev {
consoleConfig = zap.NewDevelopmentEncoderConfig()
logLevel = zap.DebugLevel
} else {
consoleConfig = zap.NewProductionEncoderConfig()
logLevel = zap.InfoLevel
}
consoleConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder
consoleConfig.EncodeTime = customTimeEncoder
consoleEncoder := zapcore.NewConsoleEncoder(consoleConfig)
fileEncoderConfig := zap.NewProductionEncoderConfig()
fileEncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
fileEncoder := zapcore.NewJSONEncoder(fileEncoderConfig)
fileWriter := zapcore.AddSync(&lumberjack.Logger{
Filename: "logs/teldrive.log",
MaxSize: 10,
MaxBackups: 3,
MaxAge: 7,
Compress: true,
})
core := zapcore.NewTee(
zapcore.NewCore(consoleEncoder, zapcore.AddSync(os.Stdout), logLevel),
zapcore.NewCore(fileEncoder, fileWriter, zapcore.DebugLevel),
)
return zap.New(core, zap.AddStacktrace(zapcore.FatalLevel))
}

View file

@ -3,13 +3,12 @@ package tgc
import (
"context"
"github.com/divyam234/teldrive/internal/logger"
"github.com/gotd/td/telegram"
"github.com/pkg/errors"
"go.uber.org/zap"
)
func RunWithAuth(ctx context.Context, client *telegram.Client, token string, f func(ctx context.Context) error) error {
func RunWithAuth(ctx context.Context, logger *zap.Logger, client *telegram.Client, token string, f func(ctx context.Context) error) error {
return client.Run(ctx, func(ctx context.Context) error {
status, err := client.Auth().Status(ctx)
if err != nil {
@ -20,18 +19,18 @@ func RunWithAuth(ctx context.Context, client *telegram.Client, token string, f f
if !status.Authorized {
return errors.Errorf("not authorized. please login first")
}
logger.Logger.Info("User Session",
logger.Debug("User Session",
zap.Int64("id", status.User.ID),
zap.String("username", status.User.Username))
} else {
if !status.Authorized {
logger.Logger.Info("creating bot session")
logger.Debug("creating bot session")
_, err := client.Auth().Bot(ctx, token)
if err != nil {
return err
}
status, _ = client.Auth().Status(ctx)
logger.Logger.Info("Bot Session",
logger.Debug("Bot Session",
zap.Int64("id", status.User.ID),
zap.String("username", status.User.Username))
}

View file

@ -5,7 +5,7 @@ import (
"time"
"github.com/cenkalti/backoff/v4"
cnf "github.com/divyam234/teldrive/config"
"github.com/divyam234/teldrive/config"
"github.com/divyam234/teldrive/internal/kv"
"github.com/divyam234/teldrive/internal/recovery"
"github.com/divyam234/teldrive/internal/retry"
@ -18,7 +18,8 @@ import (
"golang.org/x/time/rate"
)
func deviceConfig(appConfig *cnf.Config) telegram.DeviceConfig {
func deviceConfig() telegram.DeviceConfig {
appConfig := config.GetConfig()
config := telegram.DeviceConfig{
DeviceModel: appConfig.TgClientDeviceModel,
SystemVersion: appConfig.TgClientSystemVersion,
@ -42,8 +43,6 @@ func New(ctx context.Context, handler telegram.UpdateHandler, storage session.St
_clock := tdclock.System
config := cnf.GetConfig()
noUpdates := true
if handler != nil {
@ -54,7 +53,7 @@ func New(ctx context.Context, handler telegram.UpdateHandler, storage session.St
ReconnectionBackoff: func() backoff.BackOff {
return Backoff(_clock)
},
Device: deviceConfig(config),
Device: deviceConfig(),
SessionStorage: storage,
RetryInterval: time.Second,
MaxRetries: 10,
@ -65,7 +64,7 @@ func New(ctx context.Context, handler telegram.UpdateHandler, storage session.St
UpdateHandler: handler,
}
return telegram.NewClient(config.AppId, config.AppHash, opts)
return telegram.NewClient(config.GetConfig().AppId, config.GetConfig().AppHash, opts)
}
func NoLogin(ctx context.Context, handler telegram.UpdateHandler, storage session.Storage) *telegram.Client {
@ -90,17 +89,15 @@ func UserLogin(ctx context.Context, sessionStr string) (*telegram.Client, error)
return nil, err
}
middlewares, _ := NewDefaultMiddlewares(ctx)
config := cnf.GetConfig()
middlewares = append(middlewares, ratelimit.New(rate.Every(time.Millisecond*time.Duration(config.Rate)), config.RateBurst))
middlewares = append(middlewares, ratelimit.New(rate.Every(time.Millisecond*time.Duration(config.GetConfig().Rate)), config.GetConfig().RateBurst))
return New(ctx, nil, storage, middlewares...), nil
}
func BotLogin(ctx context.Context, token string) (*telegram.Client, error) {
config := cnf.GetConfig()
storage := kv.NewSession(database.KV, kv.Key("botsession", token))
middlewares, _ := NewDefaultMiddlewares(ctx)
if config.RateLimit {
middlewares = append(middlewares, ratelimit.New(rate.Every(time.Millisecond*time.Duration(config.Rate)), config.RateBurst))
if config.GetConfig().RateLimit {
middlewares = append(middlewares, ratelimit.New(rate.Every(time.Millisecond*time.Duration(config.GetConfig().Rate)), config.GetConfig().RateBurst))
}
return New(ctx, nil, storage, middlewares...), nil

View file

@ -8,13 +8,13 @@ import (
"github.com/gotd/td/telegram"
)
type BotWorkers struct {
type UploadWorker struct {
mu sync.Mutex
bots map[int64][]string
currIdx map[int64]int
}
func (w *BotWorkers) Set(bots []string, channelId int64) {
func (w *UploadWorker) Set(bots []string, channelId int64) {
w.mu.Lock()
defer w.mu.Unlock()
_, ok := w.bots[channelId]
@ -26,30 +26,28 @@ func (w *BotWorkers) Set(bots []string, channelId int64) {
}
}
func (w *BotWorkers) Next(channelId int64) string {
func (w *UploadWorker) Next(channelId int64) (string, int) {
w.mu.Lock()
defer w.mu.Unlock()
index := w.currIdx[channelId]
w.currIdx[channelId] = (index + 1) % len(w.bots[channelId])
return w.bots[channelId][index]
return w.bots[channelId][index], index
}
var Workers = &BotWorkers{}
type Client struct {
Tg *telegram.Client
Stop bg.StopFunc
Status string
}
type streamWorkers struct {
type StreamWorker struct {
mu sync.Mutex
bots map[int64][]string
clients map[int64][]*Client
currIdx map[int64]int
}
func (w *streamWorkers) Set(bots []string, channelId int64) {
func (w *StreamWorker) Set(bots []string, channelId int64) {
w.mu.Lock()
defer w.mu.Unlock()
_, ok := w.bots[channelId]
@ -67,7 +65,7 @@ func (w *streamWorkers) Set(bots []string, channelId int64) {
}
func (w *streamWorkers) Next(channelId int64) (*Client, int, error) {
func (w *StreamWorker) Next(channelId int64) (*Client, int, error) {
w.mu.Lock()
defer w.mu.Unlock()
index := w.currIdx[channelId]
@ -84,7 +82,7 @@ func (w *streamWorkers) Next(channelId int64) (*Client, int, error) {
return nextClient, index, nil
}
func (w *streamWorkers) UserWorker(client *telegram.Client) (*Client, error) {
func (w *StreamWorker) UserWorker(client *telegram.Client) (*Client, error) {
w.mu.Lock()
defer w.mu.Unlock()
@ -108,5 +106,3 @@ func (w *streamWorkers) UserWorker(client *telegram.Client) (*Client, error) {
}
return nextClient, nil
}
var StreamWorkers = &streamWorkers{}

View file

@ -3,6 +3,7 @@ package controller
import (
"github.com/divyam234/teldrive/pkg/database"
"github.com/divyam234/teldrive/pkg/services"
"go.uber.org/zap"
)
type Controller struct {
@ -12,11 +13,11 @@ type Controller struct {
AuthService *services.AuthService
}
func NewController() *Controller {
func NewController(logger *zap.Logger) *Controller {
return &Controller{
FileService: services.NewFileService(database.DB),
UserService: services.NewUserService(database.DB),
UploadService: services.NewUploadService(database.DB),
AuthService: services.NewAuthService(database.DB),
FileService: services.NewFileService(database.DB, logger),
UserService: services.NewUserService(database.DB, logger),
UploadService: services.NewUploadService(database.DB, logger),
AuthService: services.NewAuthService(database.DB, logger),
}
}

View file

@ -30,7 +30,7 @@ func (fc *Controller) UpdateFile(c *gin.Context) {
func (fc *Controller) GetFileByID(c *gin.Context) {
res, err := fc.FileService.GetFileByID(c)
if err != nil {
httputil.NewError(c, http.StatusNotFound, err)
httputil.NewError(c, http.StatusNotFound, err.Error)
return
}

View file

@ -7,7 +7,7 @@ import (
"path/filepath"
"time"
cnf "github.com/divyam234/teldrive/config"
"github.com/divyam234/teldrive/config"
"github.com/divyam234/teldrive/internal/kv"
"github.com/pressly/goose/v3"
"go.etcd.io/bbolt"
@ -26,20 +26,24 @@ func InitDB() {
var err error
logLevel := logger.Silent
if config.GetConfig().LogSql {
logLevel = logger.Info
}
newLogger := logger.New(
log.New(os.Stdout, "\r\n", log.LstdFlags),
logger.Config{
SlowThreshold: time.Second,
LogLevel: logger.Silent,
IgnoreRecordNotFoundError: true,
ParameterizedQueries: true,
Colorful: false,
SlowThreshold: time.Second,
LogLevel: logLevel,
ParameterizedQueries: true,
Colorful: true,
},
)
config := cnf.GetConfig()
DB, err = gorm.Open(postgres.Open(config.DatabaseUrl), &gorm.Config{
DB, err = gorm.Open(postgres.Open(config.GetConfig().DatabaseUrl), &gorm.Config{
NamingStrategy: schema.NamingStrategy{
TablePrefix: "teldrive.",
SingularTable: false,
@ -63,12 +67,12 @@ func InitDB() {
sqlDB.SetConnMaxIdleTime(10 * time.Minute)
go func() {
if config.RunMigrations {
if config.GetConfig().RunMigrations {
migrate()
}
}()
boltDB, err := bbolt.Open(filepath.Join(config.ExecDir, "teldrive.db"), 0666, &bbolt.Options{
boltDB, err := bbolt.Open(filepath.Join(config.GetConfig().ExecDir, "teldrive.db"), 0666, &bbolt.Options{
Timeout: time.Second,
NoGrowSync: false,
})

View file

@ -1,7 +1,8 @@
package schemas
type UploadQuery struct {
Filename string `form:"fileName" binding:"required"`
PartName string `form:"partName" binding:"required"`
FileName string `form:"fileName" binding:"required"`
PartNo int `form:"partNo" binding:"required"`
ChannelID int64 `form:"channelId"`
Encrypted bool `form:"encrypted"`

View file

@ -16,7 +16,7 @@ import (
"strconv"
"time"
cnf "github.com/divyam234/teldrive/config"
"github.com/divyam234/teldrive/config"
"github.com/divyam234/teldrive/internal/auth"
"github.com/divyam234/teldrive/internal/tgc"
"github.com/divyam234/teldrive/internal/utils"
@ -31,6 +31,7 @@ import (
"github.com/gotd/td/telegram/auth/qrlogin"
"github.com/gotd/td/tg"
"github.com/gotd/td/tgerr"
"go.uber.org/zap"
"gorm.io/gorm"
)
@ -38,13 +39,15 @@ type AuthService struct {
Db *gorm.DB
SessionMaxAge int
SessionCookieName string
log *zap.Logger
}
func NewAuthService(db *gorm.DB) *AuthService {
func NewAuthService(db *gorm.DB, logger *zap.Logger) *AuthService {
return &AuthService{
Db: db,
SessionMaxAge: 30 * 24 * 60 * 60,
SessionCookieName: "user-session"}
SessionCookieName: "user-session",
log: logger.Named("auth")}
}
func ip4toInt(IPv4Address net.IP) int64 {
@ -88,22 +91,21 @@ func generateTgSession(dcID int, authKey []byte, port int) string {
func setCookie(c *gin.Context, key string, value string, age int) {
config := cnf.GetConfig()
if config.CookieSameSite {
if config.GetConfig().CookieSameSite {
c.SetSameSite(2)
} else {
c.SetSameSite(4)
}
c.SetCookie(key, value, age, "/", c.Request.Host, config.Https, true)
c.SetCookie(key, value, age, "/", "", config.GetConfig().Https, true)
}
func checkUserIsAllowed(userName string) bool {
config := cnf.GetConfig()
found := false
if len(config.AllowedUsers) > 0 {
for _, user := range config.AllowedUsers {
allowedUsers := config.GetConfig().AllowedUsers
if len(allowedUsers) > 0 {
for _, user := range allowedUsers {
if user == userName {
found = true
break
@ -234,7 +236,7 @@ func (as *AuthService) Logout(c *gin.Context) (*schemas.Message, *types.AppError
jwtUser := val.(*types.JWTClaims)
client, _ := tgc.UserLogin(c, jwtUser.TgSession)
tgc.RunWithAuth(c, client, "", func(ctx context.Context) error {
tgc.RunWithAuth(c, as.log, client, "", func(ctx context.Context) error {
_, err := client.API().AuthLogOut(c)
return err
})

View file

@ -21,6 +21,7 @@ import (
"github.com/gotd/td/tg"
"github.com/pkg/errors"
"github.com/thoas/go-funk"
"go.uber.org/zap"
)
type buffer struct {
@ -99,10 +100,10 @@ func getUserAuth(c *gin.Context) (int64, string) {
return userId, jwtUser.TgSession
}
func getBotInfo(ctx context.Context, token string) (*types.BotInfo, error) {
func getBotInfo(ctx context.Context, logger *zap.Logger, token string) (*types.BotInfo, error) {
client, _ := tgc.BotLogin(ctx, token)
var user *tg.User
err := tgc.RunWithAuth(ctx, client, token, func(ctx context.Context) error {
err := tgc.RunWithAuth(ctx, logger, client, token, func(ctx context.Context) error {
user, _ = client.Self(ctx)
return nil
})

View file

@ -11,7 +11,7 @@ import (
"strconv"
"strings"
cnf "github.com/divyam234/teldrive/config"
"github.com/divyam234/teldrive/config"
"github.com/divyam234/teldrive/internal/cache"
"github.com/divyam234/teldrive/internal/http_range"
"github.com/divyam234/teldrive/internal/md5"
@ -22,6 +22,7 @@ import (
"github.com/divyam234/teldrive/pkg/models"
"github.com/divyam234/teldrive/pkg/schemas"
"github.com/gotd/td/tg"
"go.uber.org/zap"
"github.com/divyam234/teldrive/pkg/types"
@ -32,19 +33,41 @@ import (
"gorm.io/gorm/clause"
)
const (
updateFileContext = "file update"
getFileByIDContext = "getting file by ID"
listFilesContext = "listing files"
getPathIDContext = "getting path ID"
makeDirectoryContext = "making directory"
copyFileContext = "copying file"
moveFilesContext = "moving files"
deleteFilesContext = "deleting files"
moveDirectoryContext = "moving directory"
bindJSONContext = "binding JSON"
bindQueryContext = "binding query"
)
type FileService struct {
Db *gorm.DB
Db *gorm.DB
log *zap.Logger
worker *tgc.StreamWorker
}
func NewFileService(db *gorm.DB) *FileService {
return &FileService{Db: db}
func NewFileService(db *gorm.DB, logger *zap.Logger) *FileService {
return &FileService{Db: db, log: logger.Named("files"),
worker: &tgc.StreamWorker{}}
}
func (fs *FileService) logAndReturn(context string, err error, errCode int) *types.AppError {
fs.log.Error(context, zap.Error(err))
return &types.AppError{Error: err, Code: errCode}
}
func (fs *FileService) CreateFile(c *gin.Context) (*schemas.FileOut, *types.AppError) {
userId, _ := getUserAuth(c)
var fileIn schemas.CreateFile
if err := c.ShouldBindJSON(&fileIn); err != nil {
return nil, &types.AppError{Error: err, Code: http.StatusBadRequest}
return nil, fs.logAndReturn(bindJSONContext, err, http.StatusBadRequest)
}
var fileDB models.File
@ -54,7 +77,7 @@ func (fs *FileService) CreateFile(c *gin.Context) (*schemas.FileOut, *types.AppE
if fileIn.Path != "" {
var parent models.File
if err := fs.Db.Where("type = ? AND path = ?", "folder", fileIn.Path).First(&parent).Error; err != nil {
return nil, &types.AppError{Error: err, Code: http.StatusNotFound}
return nil, fs.logAndReturn(bindJSONContext, err, http.StatusInternalServerError)
}
fileDB.ParentID = parent.ID
}
@ -76,7 +99,7 @@ func (fs *FileService) CreateFile(c *gin.Context) (*schemas.FileOut, *types.AppE
if fileIn.ChannelID == 0 {
channelId, err = GetDefaultChannel(c, userId)
if err != nil {
return nil, &types.AppError{Error: err, Code: http.StatusInternalServerError}
return nil, fs.logAndReturn("default channel", err, http.StatusInternalServerError)
}
}
fileDB.ChannelID = utils.Int64Pointer(channelId)
@ -102,10 +125,9 @@ func (fs *FileService) CreateFile(c *gin.Context) (*schemas.FileOut, *types.AppE
if err := fs.Db.Create(&fileDB).Error; err != nil {
pgErr := err.(*pgconn.PgError)
if pgErr.Code == "23505" {
return nil, &types.AppError{Error: errors.New("file exists"), Code: http.StatusInternalServerError}
return nil, fs.logAndReturn("file exists", err, http.StatusInternalServerError)
}
return nil, &types.AppError{Error: err, Code: http.StatusInternalServerError}
return nil, fs.logAndReturn("file create", err, http.StatusInternalServerError)
}
res := mapper.ToFileOut(fileDB)
@ -127,16 +149,16 @@ func (fs *FileService) UpdateFile(c *gin.Context) (*schemas.FileOut, *types.AppE
if fileUpdate.Type == "folder" && fileUpdate.Name != "" {
if err := fs.Db.Raw("select * from teldrive.update_folder(?, ?)", fileID, fileUpdate.Name).Scan(&files).Error; err != nil {
return nil, &types.AppError{Error: err, Code: http.StatusInternalServerError}
return nil, fs.logAndReturn(updateFileContext, err, http.StatusInternalServerError)
}
} else {
if err := fs.Db.Model(&files).Clauses(clause.Returning{}).Where("id = ?", fileID).Updates(fileUpdate).Error; err != nil {
return nil, &types.AppError{Error: err, Code: http.StatusInternalServerError}
return nil, fs.logAndReturn(updateFileContext, err, http.StatusInternalServerError)
}
}
if len(files) == 0 {
return nil, &types.AppError{Error: errors.New("file not updated"), Code: http.StatusInternalServerError}
return nil, fs.logAndReturn(updateFileContext, errors.New("update failed"), http.StatusInternalServerError)
}
file := mapper.ToFileOut(files[0])
@ -149,8 +171,7 @@ func (fs *FileService) UpdateFile(c *gin.Context) (*schemas.FileOut, *types.AppE
}
func (fs *FileService) GetFileByID(c *gin.Context) (*schemas.FileOutFull, error) {
func (fs *FileService) GetFileByID(c *gin.Context) (*schemas.FileOutFull, *types.AppError) {
fileID := c.Param("fileID")
var file []models.File
@ -158,14 +179,14 @@ func (fs *FileService) GetFileByID(c *gin.Context) (*schemas.FileOutFull, error)
fs.Db.Model(&models.File{}).Where("id = ?", fileID).Find(&file)
if len(file) == 0 {
return nil, errors.New("file not found")
err := errors.New("file not found")
return nil, fs.logAndReturn(getFileByIDContext, err, http.StatusNotFound)
}
return mapper.ToFileOutFull(file[0]), nil
}
func (fs *FileService) ListFiles(c *gin.Context) (*schemas.FileResponse, *types.AppError) {
userId, _ := getUserAuth(c)
var (
@ -181,15 +202,15 @@ func (fs *FileService) ListFiles(c *gin.Context) (*schemas.FileResponse, *types.
fileQuery.UserID = userId
if err := c.ShouldBindQuery(&pagingParams); err != nil {
return nil, &types.AppError{Error: err, Code: http.StatusBadRequest}
return nil, fs.logAndReturn(listFilesContext, err, http.StatusBadRequest)
}
if err := c.ShouldBindQuery(&sortingParams); err != nil {
return nil, &types.AppError{Error: err, Code: http.StatusBadRequest}
return nil, fs.logAndReturn(listFilesContext, err, http.StatusBadRequest)
}
if err := c.ShouldBindQuery(&fileQuery); err != nil {
return nil, &types.AppError{Error: err, Code: http.StatusBadRequest}
return nil, fs.logAndReturn(listFilesContext, err, http.StatusBadRequest)
}
var (
@ -199,7 +220,7 @@ func (fs *FileService) ListFiles(c *gin.Context) (*schemas.FileResponse, *types.
if fileQuery.Path != "" {
pathId, err = fs.getPathId(fileQuery.Path)
if err != nil {
return nil, &types.AppError{Error: err, Code: http.StatusNotFound}
return nil, fs.logAndReturn(listFilesContext, err, http.StatusNotFound)
}
}
@ -219,7 +240,7 @@ func (fs *FileService) ListFiles(c *gin.Context) (*schemas.FileResponse, *types.
err := mapstructure.Decode(fileQuery, &filterQuery)
if err != nil {
return nil, &types.AppError{Error: err, Code: http.StatusBadRequest}
return nil, fs.logAndReturn(listFilesContext, err, http.StatusBadRequest)
}
delete(filterQuery, "op")
@ -243,7 +264,6 @@ func (fs *FileService) ListFiles(c *gin.Context) (*schemas.FileResponse, *types.
setOrderFilter(query, &pagingParams, &sortingParams)
query.Order(getOrder(sortingParams))
}
var results []schemas.FileOut
@ -276,33 +296,29 @@ func (fs *FileService) getPathId(path string) (string, error) {
}
func (fs *FileService) MakeDirectory(c *gin.Context) (*schemas.FileOut, *types.AppError) {
var payload schemas.MkDir
var files []models.File
if err := c.ShouldBindJSON(&payload); err != nil {
return nil, &types.AppError{Error: err, Code: http.StatusBadRequest}
return nil, fs.logAndReturn(makeDirectoryContext, err, http.StatusBadRequest)
}
userId, _ := getUserAuth(c)
if err := fs.Db.Raw("select * from teldrive.create_directories(?, ?)", userId, payload.Path).
Scan(&files).Error; err != nil {
return nil, &types.AppError{Error: err, Code: http.StatusInternalServerError}
return nil, fs.logAndReturn(makeDirectoryContext, err, http.StatusInternalServerError)
}
file := mapper.ToFileOut(files[0])
return &file, nil
}
func (fs *FileService) CopyFile(c *gin.Context) (*schemas.FileOut, *types.AppError) {
var payload schemas.Copy
if err := c.ShouldBindJSON(&payload); err != nil {
return nil, &types.AppError{Error: err, Code: http.StatusBadRequest}
return nil, fs.logAndReturn(copyFileContext, err, http.StatusBadRequest)
}
userId, session := getUserAuth(c)
@ -317,7 +333,7 @@ func (fs *FileService) CopyFile(c *gin.Context) (*schemas.FileOut, *types.AppErr
newIds := models.Parts{}
err := tgc.RunWithAuth(c, client, "", func(ctx context.Context) error {
err := tgc.RunWithAuth(c, fs.log, client, "", func(ctx context.Context) error {
user := strconv.FormatInt(userId, 10)
messages, err := getTGMessages(c, client, file.Parts, file.ChannelID, user)
if err != nil {
@ -365,13 +381,13 @@ func (fs *FileService) CopyFile(c *gin.Context) (*schemas.FileOut, *types.AppErr
})
if err != nil {
return nil, &types.AppError{Error: err, Code: http.StatusBadRequest}
return nil, fs.logAndReturn(copyFileContext, err, http.StatusBadRequest)
}
var destRes []models.File
if err := fs.Db.Raw("select * from teldrive.create_directories(?, ?)", userId, payload.Destination).Scan(&destRes).Error; err != nil {
return nil, &types.AppError{Error: err, Code: http.StatusInternalServerError}
return nil, fs.logAndReturn(copyFileContext, err, http.StatusInternalServerError)
}
dest := destRes[0]
@ -390,72 +406,65 @@ func (fs *FileService) CopyFile(c *gin.Context) (*schemas.FileOut, *types.AppErr
dbFile.ChannelID = &file.ChannelID
if err := fs.Db.Create(&dbFile).Error; err != nil {
return nil, &types.AppError{Error: err, Code: http.StatusInternalServerError}
return nil, fs.logAndReturn(copyFileContext, err, http.StatusInternalServerError)
}
out := mapper.ToFileOut(dbFile)
return &out, nil
}
func (fs *FileService) MoveFiles(c *gin.Context) (*schemas.Message, *types.AppError) {
var payload schemas.FileOperation
if err := c.ShouldBindJSON(&payload); err != nil {
return nil, &types.AppError{Error: err, Code: http.StatusBadRequest}
return nil, fs.logAndReturn(moveFilesContext, err, http.StatusBadRequest)
}
var destination models.File
if err := fs.Db.Model(&models.File{}).Select("id").Where("path = ?", payload.Destination).First(&destination).Error; errors.Is(err, gorm.ErrRecordNotFound) {
return nil, &types.AppError{Error: err, Code: http.StatusNotFound}
return nil, fs.logAndReturn(moveFilesContext, err, http.StatusNotFound)
}
if err := fs.Db.Model(&models.File{}).Where("id IN ?", payload.Files).UpdateColumn("parent_id", destination.ID).Error; err != nil {
return nil, &types.AppError{Error: err, Code: http.StatusInternalServerError}
return nil, fs.logAndReturn(moveFilesContext, err, http.StatusInternalServerError)
}
return &schemas.Message{Message: "files moved"}, nil
}
func (fs *FileService) DeleteFiles(c *gin.Context) (*schemas.Message, *types.AppError) {
var payload schemas.FileOperation
if err := c.ShouldBindJSON(&payload); err != nil {
return nil, &types.AppError{Error: err, Code: http.StatusBadRequest}
return nil, fs.logAndReturn(deleteFilesContext, err, http.StatusBadRequest)
}
if err := fs.Db.Exec("call teldrive.delete_files($1)", payload.Files).Error; err != nil {
return nil, &types.AppError{Error: err, Code: http.StatusInternalServerError}
return nil, fs.logAndReturn(deleteFilesContext, err, http.StatusInternalServerError)
}
return &schemas.Message{Message: "files deleted"}, nil
}
func (fs *FileService) MoveDirectory(c *gin.Context) (*schemas.Message, *types.AppError) {
var payload schemas.DirMove
if err := c.ShouldBindJSON(&payload); err != nil {
return nil, &types.AppError{Error: err, Code: http.StatusBadRequest}
return nil, fs.logAndReturn(moveDirectoryContext, err, http.StatusBadRequest)
}
userId, _ := getUserAuth(c)
if err := fs.Db.Exec("select * from teldrive.move_directory(? , ? , ?)", payload.Source, payload.Destination, userId).Error; err != nil {
return nil, &types.AppError{Error: err, Code: http.StatusInternalServerError}
return nil, fs.logAndReturn(moveDirectoryContext, err, http.StatusInternalServerError)
}
return &schemas.Message{Message: "directory moved"}, nil
}
func (fs *FileService) GetFileStream(c *gin.Context) {
w := c.Writer
r := c.Request
@ -464,7 +473,7 @@ func (fs *FileService) GetFileStream(c *gin.Context) {
authHash := c.Query("hash")
if authHash == "" {
http.Error(w, "misssing hash param", http.StatusBadRequest)
http.Error(w, "missing hash param", http.StatusBadRequest)
return
}
@ -480,11 +489,12 @@ func (fs *FileService) GetFileStream(c *gin.Context) {
key := fmt.Sprintf("files:%s", fileID)
err = cache.GetCache().Get(key, file)
var appErr *types.AppError
if err != nil {
file, err = fs.GetFileByID(c)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
file, appErr = fs.GetFileByID(c)
if appErr != nil {
http.Error(w, appErr.Error.Error(), http.StatusBadRequest)
return
}
cache.GetCache().Set(key, file, 0)
@ -518,6 +528,7 @@ func (fs *FileService) GetFileStream(c *gin.Context) {
start = ranges[0].Start
end = ranges[0].End
c.Header("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, file.Size))
w.WriteHeader(http.StatusPartialContent)
}
@ -546,86 +557,68 @@ func (fs *FileService) GetFileStream(c *gin.Context) {
tokens, err := getBotsToken(c, session.UserId, file.ChannelID)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
fs.log.Error("failed to get bots", zap.Error(err))
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
config := cnf.GetConfig()
var (
token, channelUser string
lr io.ReadCloser
channelUser string
lr io.ReadCloser
)
if config.LazyStreamBots {
tgc.Workers.Set(tokens, file.ChannelID)
token = tgc.Workers.Next(file.ChannelID)
client, _ := tgc.BotLogin(c, token)
channelUser = strings.Split(token, ":")[0]
if r.Method != "HEAD" {
tgc.RunWithAuth(c, client, token, func(ctx context.Context) error {
parts, err := getParts(c, client, file, channelUser)
if err != nil {
return err
}
parts = rangedParts(parts, start, end)
if file.Encrypted {
lr, _ = reader.NewDecryptedReader(c, client, parts, contentLength)
} else {
lr, _ = reader.NewLinearReader(c, client, parts, contentLength)
}
io.CopyN(w, lr, contentLength)
return nil
})
var client *tgc.Client
if config.GetConfig().DisableStreamBots || len(tokens) == 0 {
tgClient, _ := tgc.UserLogin(c, session.Session)
client, err = fs.worker.UserWorker(tgClient)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
channelUser = strconv.FormatInt(session.UserId, 10)
fs.log.Debug("requesting file", zap.String("name", file.Name),
zap.String("user", channelUser), zap.Int64("start", start),
zap.Int64("end", end), zap.Int64("fileSize", file.Size))
} else {
var index int
limit := min(len(tokens), config.GetConfig().BgBotsLimit)
var client *tgc.Client
fs.worker.Set(tokens[:limit], file.ChannelID)
if config.DisableStreamBots || len(tokens) == 0 {
tgClient, _ := tgc.UserLogin(c, session.Session)
client, err = tgc.StreamWorkers.UserWorker(tgClient)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
channelUser = strconv.FormatInt(session.UserId, 10)
} else {
var index int
limit := min(len(tokens), config.BgBotsLimit)
tgc.StreamWorkers.Set(tokens[:limit], file.ChannelID)
client, index, err = tgc.StreamWorkers.Next(file.ChannelID)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
channelUser = strings.Split(tokens[index], ":")[0]
client, index, err = fs.worker.Next(file.ChannelID)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if r.Method != "HEAD" {
parts, err := getParts(c, client.Tg, file, channelUser)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
parts = rangedParts(parts, start, end)
if file.Encrypted {
lr, _ = reader.NewDecryptedReader(c, client.Tg, parts, contentLength)
} else {
lr, _ = reader.NewLinearReader(c, client.Tg, parts, contentLength)
}
io.CopyN(w, lr, contentLength)
}
channelUser = strings.Split(tokens[index], ":")[0]
fs.log.Debug("requesting file", zap.String("name", file.Name),
zap.String("bot", channelUser), zap.Int("botNo", index), zap.Int64("start", start),
zap.Int64("end", end), zap.Int64("fileSize", file.Size))
}
if r.Method != "HEAD" {
parts, err := getParts(c, client.Tg, file, channelUser)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
parts = rangedParts(parts, start, end)
if file.Encrypted {
lr, _ = reader.NewDecryptedReader(c, client.Tg, parts, contentLength)
} else {
lr, _ = reader.NewLinearReader(c, client.Tg, parts, contentLength)
}
if _, err := io.CopyN(w, lr, contentLength); err != nil {
fs.log.Debug("closed file stream", zap.Error(err))
}
}
}
func setOrderFilter(query *gorm.DB, pagingParams *schemas.PaginationQuery, sortingParams *schemas.SortingQuery) *gorm.DB {

View file

@ -5,17 +5,17 @@ import (
"crypto/rand"
"crypto/sha256"
"encoding/base64"
"errors"
"net/http"
"strconv"
"strings"
"time"
cnf "github.com/divyam234/teldrive/config"
"github.com/divyam234/teldrive/config"
"github.com/divyam234/teldrive/internal/crypt"
"github.com/divyam234/teldrive/internal/tgc"
"github.com/divyam234/teldrive/pkg/mapper"
"github.com/divyam234/teldrive/pkg/schemas"
"go.uber.org/zap"
"github.com/divyam234/teldrive/pkg/types"
@ -31,11 +31,14 @@ import (
const saltLength = 32
type UploadService struct {
Db *gorm.DB
Db *gorm.DB
log *zap.Logger
worker *tgc.UploadWorker
}
func NewUploadService(db *gorm.DB) *UploadService {
return &UploadService{Db: db}
func NewUploadService(db *gorm.DB, logger *zap.Logger) *UploadService {
return &UploadService{Db: db, log: logger.Named("uploads"),
worker: &tgc.UploadWorker{}}
}
func generateRandomSalt() (string, error) {
@ -52,14 +55,18 @@ func generateRandomSalt() (string, error) {
return hashedSalt, nil
}
func (us *UploadService) logAndReturn(context string, err error, errCode int) *types.AppError {
us.log.Error(context, zap.Error(err))
return &types.AppError{Error: err, Code: errCode}
}
func (us *UploadService) GetUploadFileById(c *gin.Context) (*schemas.UploadOut, *types.AppError) {
uploadId := c.Param("id")
parts := []schemas.UploadPartOut{}
config := cnf.GetConfig()
if err := us.Db.Model(&models.Upload{}).Order("part_no").Where("upload_id = ?", uploadId).
Where("created_at >= ?", time.Now().UTC().AddDate(0, 0, -config.UploadRetention)).
Where("created_at >= ?", time.Now().UTC().AddDate(0, 0, -config.GetConfig().UploadRetention)).
Find(&parts).Error; err != nil {
return nil, &types.AppError{Error: err, Code: http.StatusInternalServerError}
return nil, us.logAndReturn("get upload", err, http.StatusInternalServerError)
}
return &schemas.UploadOut{Parts: parts}, nil
@ -68,7 +75,7 @@ func (us *UploadService) GetUploadFileById(c *gin.Context) (*schemas.UploadOut,
func (us *UploadService) DeleteUploadFile(c *gin.Context) (*schemas.Message, *types.AppError) {
uploadId := c.Param("id")
if err := us.Db.Where("upload_id = ?", uploadId).Delete(&models.Upload{}).Error; err != nil {
return nil, &types.AppError{Error: err, Code: http.StatusInternalServerError}
return nil, us.logAndReturn("delete upload", err, http.StatusInternalServerError)
}
return &schemas.Message{Message: "upload deleted"}, nil
@ -104,13 +111,13 @@ func (us *UploadService) CreateUploadPart(c *gin.Context) (*schemas.UploadPartOu
}
func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *types.AppError) {
var (
uploadQuery schemas.UploadQuery
channelId int64
err error
client *telegram.Client
token string
index int
channelUser string
out *schemas.UploadPartOut
)
@ -118,7 +125,7 @@ func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *ty
uploadQuery.PartNo = 1
if err := c.ShouldBindQuery(&uploadQuery); err != nil {
return nil, &types.AppError{Error: err, Code: http.StatusBadRequest}
return nil, us.logAndReturn("UploadFile", err, http.StatusBadRequest)
}
userId, session := getUserAuth(c)
@ -129,12 +136,10 @@ func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *ty
fileSize := c.Request.ContentLength
fileName := uploadQuery.Filename
if uploadQuery.ChannelID == 0 {
channelId, err = GetDefaultChannel(c, userId)
if err != nil {
return nil, &types.AppError{Error: err, Code: http.StatusInternalServerError}
return nil, us.logAndReturn("uploadFile", err, http.StatusInternalServerError)
}
} else {
channelId = uploadQuery.ChannelID
@ -143,20 +148,25 @@ func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *ty
tokens, err := getBotsToken(c, userId, channelId)
if err != nil {
return nil, &types.AppError{Error: err, Code: http.StatusInternalServerError}
return nil, us.logAndReturn("uploadFile", err, http.StatusInternalServerError)
}
if len(tokens) == 0 {
client, _ = tgc.UserLogin(c, session)
channelUser = strconv.FormatInt(userId, 10)
} else {
tgc.Workers.Set(tokens, channelId)
token = tgc.Workers.Next(channelId)
us.worker.Set(tokens, channelId)
token, index = us.worker.Next(channelId)
client, _ = tgc.BotLogin(c, token)
channelUser = strings.Split(token, ":")[0]
}
err = tgc.RunWithAuth(c, client, token, func(ctx context.Context) error {
us.log.Debug("uploading file", zap.String("fileName", uploadQuery.FileName),
zap.String("partName", uploadQuery.PartName),
zap.String("bot", channelUser), zap.Int("botNo", index),
zap.Int("chunkNo", uploadQuery.PartNo), zap.Int64("partSize", fileSize))
err = tgc.RunWithAuth(c, us.log, client, token, func(ctx context.Context) error {
channel, err := GetChannelById(ctx, client, channelId, channelUser)
@ -164,15 +174,13 @@ func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *ty
return err
}
config := cnf.GetConfig()
var salt string
if uploadQuery.Encrypted {
//gen random Salt
salt, _ = generateRandomSalt()
cipher, _ := crypt.NewCipher(config.EncryptionKey, salt)
cipher, _ := crypt.NewCipher(config.GetConfig().EncryptionKey, salt)
fileSize = crypt.EncryptedSize(fileSize)
fileStream, _ = cipher.EncryptData(fileStream)
}
@ -181,13 +189,13 @@ func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *ty
u := uploader.NewUploader(api).WithThreads(16).WithPartSize(512 * 1024)
upload, err := u.Upload(c, uploader.NewUpload(fileName, fileStream, fileSize))
upload, err := u.Upload(c, uploader.NewUpload(uploadQuery.PartName, fileStream, fileSize))
if err != nil {
return err
}
document := message.UploadedDocument(upload).Filename(fileName).ForceFile(true)
document := message.UploadedDocument(upload).Filename(uploadQuery.PartName).ForceFile(true)
sender := message.NewSender(client.API())
@ -210,15 +218,10 @@ func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *ty
message = channelMsg.Message.(*tg.Message)
break
}
}
if message.ID == 0 {
return errors.New("failed to upload part")
}
partUpload := &models.Upload{
Name: fileName,
Name: uploadQuery.PartName,
UploadId: uploadId,
PartId: message.ID,
ChannelID: channelId,
@ -239,8 +242,12 @@ func (us *UploadService) UploadFile(c *gin.Context) (*schemas.UploadPartOut, *ty
})
if err != nil {
return nil, &types.AppError{Error: err, Code: http.StatusInternalServerError}
return nil, us.logAndReturn("uploadFile", err, http.StatusInternalServerError)
}
us.log.Debug("upload finished", zap.String("fileName", uploadQuery.FileName),
zap.String("partName", uploadQuery.PartName),
zap.Int("chunkNo", uploadQuery.PartNo))
return out, nil
}

View file

@ -18,6 +18,7 @@ import (
"github.com/gotd/td/telegram/query"
"github.com/gotd/td/tg"
"github.com/thoas/go-funk"
"go.uber.org/zap"
"github.com/gin-gonic/gin"
"gorm.io/gorm"
@ -25,11 +26,12 @@ import (
)
type UserService struct {
Db *gorm.DB
Db *gorm.DB
log *zap.Logger
}
func NewUserService(db *gorm.DB) *UserService {
return &UserService{Db: db}
func NewUserService(db *gorm.DB, logger *zap.Logger) *UserService {
return &UserService{Db: db, log: logger.Named("users")}
}
func (us *UserService) GetProfilePhoto(c *gin.Context) {
@ -42,7 +44,7 @@ func (us *UserService) GetProfilePhoto(c *gin.Context) {
return
}
err = tgc.RunWithAuth(c, client, "", func(ctx context.Context) error {
err = tgc.RunWithAuth(c, us.log, client, "", func(ctx context.Context) error {
self, err := client.Self(c)
if err != nil {
return err
@ -210,7 +212,7 @@ func (us *UserService) addBots(c context.Context, client *telegram.Client, userI
var wg sync.WaitGroup
err := tgc.RunWithAuth(c, client, "", func(ctx context.Context) error {
err := tgc.RunWithAuth(c, us.log, client, "", func(ctx context.Context) error {
channel, err := GetChannelById(ctx, client, channelId, strconv.FormatInt(userId, 10))
if err != nil {
@ -229,7 +231,7 @@ func (us *UserService) addBots(c context.Context, client *telegram.Client, userI
waitChan <- struct{}{}
wg.Add(1)
go func(t string) {
info, err := getBotInfo(c, t)
info, err := getBotInfo(c, us.log, t)
if err != nil {
return
}