mirror of
https://github.com/tgdrive/teldrive.git
synced 2025-09-10 00:15:27 +08:00
refactor: improve cron jobs
This commit is contained in:
parent
8a38aaddac
commit
abcf81b204
5 changed files with 28 additions and 20 deletions
|
@ -29,7 +29,6 @@ type AsyncReader struct {
|
||||||
err error
|
err error
|
||||||
cur *buffer
|
cur *buffer
|
||||||
exited chan struct{}
|
exited chan struct{}
|
||||||
size int
|
|
||||||
closed bool
|
closed bool
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
}
|
}
|
||||||
|
@ -54,7 +53,6 @@ func (a *AsyncReader) init(rd io.ReadCloser, buffers int) {
|
||||||
a.exited = make(chan struct{})
|
a.exited = make(chan struct{})
|
||||||
a.buffers = buffers
|
a.buffers = buffers
|
||||||
a.cur = nil
|
a.cur = nil
|
||||||
a.size = softStartInitial
|
|
||||||
|
|
||||||
for i := 0; i < buffers; i++ {
|
for i := 0; i < buffers; i++ {
|
||||||
a.token <- struct{}{}
|
a.token <- struct{}{}
|
||||||
|
@ -67,10 +65,6 @@ func (a *AsyncReader) init(rd io.ReadCloser, buffers int) {
|
||||||
select {
|
select {
|
||||||
case <-a.token:
|
case <-a.token:
|
||||||
b := a.getBuffer()
|
b := a.getBuffer()
|
||||||
if a.size < BufferSize {
|
|
||||||
b.buf = b.buf[:a.size]
|
|
||||||
a.size <<= 1
|
|
||||||
}
|
|
||||||
err := b.read(a.in)
|
err := b.read(a.in)
|
||||||
a.ready <- b
|
a.ready <- b
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -93,7 +87,7 @@ func (a *AsyncReader) putBuffer(b *buffer) {
|
||||||
|
|
||||||
func (a *AsyncReader) getBuffer() *buffer {
|
func (a *AsyncReader) getBuffer() *buffer {
|
||||||
bufferPoolOnce.Do(func() {
|
bufferPoolOnce.Do(func() {
|
||||||
bufferPool = pool.New(bufferCacheFlushTime, BufferSize, bufferCacheSize, true)
|
bufferPool = pool.New(bufferCacheFlushTime, BufferSize, bufferCacheSize, false)
|
||||||
})
|
})
|
||||||
return &buffer{
|
return &buffer{
|
||||||
buf: bufferPool.Get(),
|
buf: bufferPool.Get(),
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
"github.com/divyam234/teldrive/pkg/schemas"
|
"github.com/divyam234/teldrive/pkg/schemas"
|
||||||
"github.com/divyam234/teldrive/pkg/services"
|
"github.com/divyam234/teldrive/pkg/services"
|
||||||
"github.com/go-co-op/gocron"
|
"github.com/go-co-op/gocron"
|
||||||
|
"github.com/jackc/pgx/v5/pgtype"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"gorm.io/datatypes"
|
"gorm.io/datatypes"
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
|
@ -89,11 +90,17 @@ func (c *CronService) CleanFiles(ctx context.Context) {
|
||||||
err := services.DeleteTGMessages(ctx, &c.cnf.TG, row.Session, row.ChannelId, row.UserId, ids)
|
err := services.DeleteTGMessages(ctx, &c.cnf.TG, row.Session, row.ChannelId, row.UserId, ids)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Errorw("failed to clean files", err)
|
c.logger.Errorw("failed to delete messages", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
c.db.Where("id = any($1)", fileIds).Delete(&models.File{})
|
items := pgtype.Array[string]{
|
||||||
|
Elements: fileIds,
|
||||||
|
Valid: true,
|
||||||
|
Dims: []pgtype.ArrayDimension{{Length: int32(len(fileIds)), LowerBound: 1}},
|
||||||
|
}
|
||||||
|
|
||||||
|
c.db.Where("id = any($1)", items).Delete(&models.File{})
|
||||||
|
|
||||||
c.logger.Infow("cleaned files", "user", row.UserId, "channel", row.ChannelId)
|
c.logger.Infow("cleaned files", "user", row.UserId, "channel", row.ChannelId)
|
||||||
}
|
}
|
||||||
|
@ -114,16 +121,20 @@ func (c *CronService) CleanUploads(ctx context.Context) {
|
||||||
|
|
||||||
for _, result := range upResults {
|
for _, result := range upResults {
|
||||||
|
|
||||||
if result.Session == "" && len(result.Parts) > 0 {
|
if result.Session != "" && len(result.Parts) > 0 {
|
||||||
c.db.Where("part_id = any($1)", result.Parts).Delete(&models.Upload{})
|
err := services.DeleteTGMessages(ctx, &c.cnf.TG, result.Session, result.ChannelId, result.UserId, result.Parts)
|
||||||
return
|
if err != nil {
|
||||||
|
c.logger.Errorw("failed to delete messages", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
err := services.DeleteTGMessages(ctx, &c.cnf.TG, result.Session, result.ChannelId, result.UserId, result.Parts)
|
items := pgtype.Array[int]{
|
||||||
if err != nil {
|
Elements: result.Parts,
|
||||||
c.logger.Errorw("failed to delete messages", err)
|
Valid: true,
|
||||||
return
|
Dims: []pgtype.ArrayDimension{{Length: int32(len(result.Parts)), LowerBound: 1}},
|
||||||
}
|
}
|
||||||
c.db.Where("part_id = any($1)", result.Parts).Delete(&models.Upload{})
|
c.db.Where("part_id = any(?)", items).Where("channel_id = ?", result.ChannelId).
|
||||||
|
Where("user_id = ?", result.UserId).Delete(&models.Upload{}).Delete(&models.Upload{})
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,6 +12,7 @@ type TgSession struct {
|
||||||
type Session struct {
|
type Session struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
UserName string `json:"userName"`
|
UserName string `json:"userName"`
|
||||||
|
UserId int64 `json:"userId"`
|
||||||
IsPremium bool `json:"isPremium"`
|
IsPremium bool `json:"isPremium"`
|
||||||
Hash string `json:"hash"`
|
Hash string `json:"hash"`
|
||||||
Expires string `json:"expires"`
|
Expires string `json:"expires"`
|
||||||
|
|
|
@ -168,8 +168,11 @@ func (as *AuthService) GetSession(c *gin.Context) *schemas.Session {
|
||||||
|
|
||||||
newExpires := now.Add(as.cnf.JWT.SessionTime)
|
newExpires := now.Add(as.cnf.JWT.SessionTime)
|
||||||
|
|
||||||
|
userId, _ := strconv.ParseInt(jwePayload.Subject, 10, 64)
|
||||||
|
|
||||||
session := &schemas.Session{Name: jwePayload.Name,
|
session := &schemas.Session{Name: jwePayload.Name,
|
||||||
UserName: jwePayload.UserName,
|
UserName: jwePayload.UserName,
|
||||||
|
UserId: userId,
|
||||||
Hash: jwePayload.Hash,
|
Hash: jwePayload.Hash,
|
||||||
Expires: newExpires.Format(time.RFC3339)}
|
Expires: newExpires.Format(time.RFC3339)}
|
||||||
|
|
||||||
|
|
|
@ -356,13 +356,12 @@ func DeleteTGMessages(ctx context.Context, cnf *config.TGConfig, session string,
|
||||||
start := i * batchSize
|
start := i * batchSize
|
||||||
end := min((i+1)*batchSize, len(ids))
|
end := min((i+1)*batchSize, len(ids))
|
||||||
batchIds := ids[start:end]
|
batchIds := ids[start:end]
|
||||||
go func() error {
|
g.Go(func() error {
|
||||||
messageDeleteRequest := tg.ChannelsDeleteMessagesRequest{Channel: channel, ID: batchIds}
|
messageDeleteRequest := tg.ChannelsDeleteMessagesRequest{Channel: channel, ID: batchIds}
|
||||||
_, err = client.API().ChannelsDeleteMessages(ctx, &messageDeleteRequest)
|
_, err = client.API().ChannelsDeleteMessages(ctx, &messageDeleteRequest)
|
||||||
return err
|
return err
|
||||||
}()
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
return g.Wait()
|
return g.Wait()
|
||||||
})
|
})
|
||||||
return err
|
return err
|
||||||
|
|
Loading…
Add table
Reference in a new issue