feat(file): Optimize file download progress tracking (#7186)

This commit is contained in:
zhengkunwang 2024-11-26 22:04:31 +08:00 committed by GitHub
parent ff0606b896
commit 14c4d3c862
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 70 additions and 334 deletions

View file

@ -751,11 +751,7 @@ func (b *BaseApi) Ws(c *gin.Context) {
func (b *BaseApi) Keys(c *gin.Context) {
res := &response.FileProcessKeys{}
keys, err := global.CACHE.PrefixScanKey("file-wget-")
if err != nil {
helper.InternalServer(c, err)
return
}
keys := global.CACHE.PrefixScanKey("file-wget-")
res.Keys = keys
helper.SuccessWithData(c, res)
}

View file

@ -46,9 +46,6 @@ func Run() {
if _, err := global.Cron.AddJob(fmt.Sprintf("%v %v * * *", mathRand.Intn(60), mathRand.Intn(3)), job.NewAppStoreJob()); err != nil {
global.LOG.Errorf("can not add appstore corn job: %s", err.Error())
}
if _, err := global.Cron.AddJob("@daily", job.NewCacheJob()); err != nil {
global.LOG.Errorf("can not add cache corn job: %s", err.Error())
}
var cronJobs []model.Cronjob
if err := global.DB.Where("status = ?", constant.StatusEnable).Find(&cronJobs).Error; err != nil {

View file

@ -1,27 +0,0 @@
package job
import (
"time"
"github.com/1Panel-dev/1Panel/agent/global"
)
type Cache struct{}
func NewCacheJob() *Cache {
return &Cache{}
}
func (c *Cache) Run() {
global.LOG.Info("run cache gc start ...")
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for range ticker.C {
again:
err := global.CacheDb.RunValueLogGC(0.7)
if err == nil {
goto again
}
}
global.LOG.Info("run cache gc end ...")
}

View file

@ -2,8 +2,7 @@ package global
import (
"github.com/1Panel-dev/1Panel/agent/configs"
"github.com/1Panel-dev/1Panel/agent/init/cache/badger_db"
"github.com/dgraph-io/badger/v4"
badger_db "github.com/1Panel-dev/1Panel/agent/init/cache/db"
"github.com/go-playground/validator/v10"
"github.com/nicksnyder/go-i18n/v2/i18n"
"github.com/robfig/cron/v3"
@ -21,7 +20,6 @@ var (
CONF configs.ServerConfig
VALID *validator.Validate
CACHE *badger_db.Cache
CacheDb *badger.DB
Viper *viper.Viper
Cron *cron.Cron

View file

@ -7,7 +7,6 @@ require (
github.com/aws/aws-sdk-go v1.55.0
github.com/compose-spec/compose-go/v2 v2.1.4
github.com/creack/pty v1.1.21
github.com/dgraph-io/badger/v4 v4.2.0
github.com/docker/compose/v2 v2.29.0
github.com/docker/docker v27.1.0+incompatible
github.com/docker/go-connections v0.5.0
@ -176,6 +175,7 @@ require (
github.com/nwaples/rardecode/v2 v2.0.0-beta.2 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect

View file

@ -601,6 +601,8 @@ github.com/opencontainers/selinux v1.11.0/go.mod h1:E5dMC3VPuVvVHDYmi78qvhJp8+M5
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b h1:FfH+VrHHk6Lxt9HdVS0PXzSXFyS2NbZKXv33FYPol0A=
github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b/go.mod h1:AC62GU6hc0BrNm+9RK9VSiwa/EUe1bkIeFORAMcHvJU=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8=
github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=

View file

@ -1,79 +0,0 @@
package badger_db
import (
"fmt"
"time"
"github.com/dgraph-io/badger/v4"
)
type Cache struct {
db *badger.DB
}
func NewCacheDB(db *badger.DB) *Cache {
return &Cache{
db: db,
}
}
func (c *Cache) Set(key string, value interface{}) error {
err := c.db.Update(func(txn *badger.Txn) error {
v := []byte(fmt.Sprintf("%v", value))
return txn.Set([]byte(key), v)
})
return err
}
func (c *Cache) Del(key string) error {
err := c.db.Update(func(txn *badger.Txn) error {
return txn.Delete([]byte(key))
})
return err
}
func (c *Cache) Clean() error {
return c.db.DropAll()
}
func (c *Cache) Get(key string) ([]byte, error) {
var result []byte
err := c.db.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(key))
if err != nil {
return err
}
err = item.Value(func(val []byte) error {
result = append([]byte{}, val...)
return nil
})
return err
})
return result, err
}
func (c *Cache) SetWithTTL(key string, value interface{}, duration time.Duration) error {
err := c.db.Update(func(txn *badger.Txn) error {
v := []byte(fmt.Sprintf("%v", value))
e := badger.NewEntry([]byte(key), v).WithTTL(duration)
return txn.SetEntry(e)
})
return err
}
func (c *Cache) PrefixScanKey(prefixStr string) ([]string, error) {
var res []string
err := c.db.View(func(txn *badger.Txn) error {
it := txn.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
prefix := []byte(prefixStr)
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
item := it.Item()
k := item.Key()
res = append(res, string(k))
return nil
}
return nil
})
return res, err
}

View file

@ -1,56 +1,10 @@
package cache
import (
"time"
"github.com/1Panel-dev/1Panel/agent/global"
"github.com/1Panel-dev/1Panel/agent/init/cache/badger_db"
"github.com/dgraph-io/badger/v4"
cachedb "github.com/1Panel-dev/1Panel/agent/init/cache/db"
)
func Init() {
c := global.CONF.System.Cache
options := badger.Options{
Dir: c,
ValueDir: c,
ValueLogFileSize: 64 << 20,
ValueLogMaxEntries: 10 << 20,
VLogPercentile: 0.1,
MemTableSize: 32 << 20,
BaseTableSize: 2 << 20,
BaseLevelSize: 10 << 20,
TableSizeMultiplier: 2,
LevelSizeMultiplier: 10,
MaxLevels: 7,
NumGoroutines: 4,
MetricsEnabled: true,
NumCompactors: 2,
NumLevelZeroTables: 5,
NumLevelZeroTablesStall: 15,
NumMemtables: 1,
BloomFalsePositive: 0.01,
BlockSize: 2 * 1024,
SyncWrites: false,
NumVersionsToKeep: 1,
CompactL0OnClose: false,
VerifyValueChecksum: false,
BlockCacheSize: 32 << 20,
IndexCacheSize: 0,
ZSTDCompressionLevel: 1,
EncryptionKey: []byte{},
EncryptionKeyRotationDuration: 10 * 24 * time.Hour, // Default 10 days.
DetectConflicts: true,
NamespaceOffset: -1,
}
cache, err := badger.Open(options)
if err != nil {
panic(err)
}
_ = cache.DropAll()
global.CacheDb = cache
global.CACHE = badger_db.NewCacheDB(cache)
global.LOG.Info("init cache successfully")
global.CACHE = cachedb.NewCacheDB()
}

52
agent/init/cache/db/db.go vendored Normal file
View file

@ -0,0 +1,52 @@
package badger_db
import (
"github.com/patrickmn/go-cache"
"strings"
"time"
)
type Cache struct {
db *cache.Cache
}
func NewCacheDB() *Cache {
db := cache.New(5*time.Minute, 10*time.Minute)
return &Cache{
db: db,
}
}
func (c *Cache) Set(key string, value interface{}) {
c.db.Set(key, value, cache.DefaultExpiration)
}
func (c *Cache) SetWithTTL(key string, value interface{}, d time.Duration) {
c.db.Set(key, value, d)
}
func (c *Cache) Del(key string) {
c.db.Delete(key)
}
func (c *Cache) Clean() error {
return nil
}
func (c *Cache) Get(key string) string {
obj, exist := c.db.Get(key)
if !exist {
return ""
}
return obj.(string)
}
func (c *Cache) PrefixScanKey(prefixStr string) []string {
var res []string
values := c.db.Items()
for key := range values {
if strings.HasPrefix(key, prefixStr) {
res = append(res, key)
}
}
return res
}

View file

@ -4,6 +4,7 @@ import (
"crypto/tls"
"fmt"
"github.com/1Panel-dev/1Panel/agent/init/business"
"github.com/1Panel-dev/1Panel/agent/init/cache"
"net"
"net/http"
"os"
@ -30,6 +31,7 @@ func Start() {
i18n.Init()
log.Init()
db.Init()
cache.Init()
migration.Init()
app.Init()
validator.Init()

View file

@ -248,13 +248,9 @@ func (w *WriteCounter) SaveProcess() {
}
by, _ := json.Marshal(process)
if percentValue < 100 {
if err := global.CACHE.Set(w.Key, string(by)); err != nil {
global.LOG.Errorf("save cache error, err %s", err.Error())
}
global.CACHE.Set(w.Key, string(by))
} else {
if err := global.CACHE.SetWithTTL(w.Key, string(by), time.Second*time.Duration(10)); err != nil {
global.LOG.Errorf("save cache error, err %s", err.Error())
}
global.CACHE.SetWithTTL(w.Key, string(by), time.Second*time.Duration(10))
}
}
@ -293,20 +289,14 @@ func (f FileOp) DownloadFileWithProcess(url, dst, key string, ignoreCertificate
out.Close()
resp.Body.Close()
value, err := global.CACHE.Get(counter.Key)
if err != nil {
global.LOG.Errorf("get cache error,err %s", err.Error())
return
}
value := global.CACHE.Get(counter.Key)
process := &Process{}
_ = json.Unmarshal(value, process)
_ = json.Unmarshal([]byte(value), process)
process.Percent = 100
process.Name = counter.Name
process.Total = process.Written
by, _ := json.Marshal(process)
if err := global.CACHE.SetWithTTL(counter.Key, string(by), time.Second*time.Duration(10)); err != nil {
global.LOG.Errorf("save cache error, err %s", err.Error())
}
global.CACHE.Set(counter.Key, string(by))
}()
return nil
}

View file

@ -134,13 +134,12 @@ func ProcessData(c *Client, inputMsg []byte) {
func getDownloadProcess(progress DownloadProgress) (res []byte, err error) {
var result []files.Process
for _, k := range progress.Keys {
value, err := global.CACHE.Get(k)
if err != nil {
global.LOG.Errorf("get cache error,err %s", err.Error())
return nil, err
value := global.CACHE.Get(k)
if value == "" {
return nil, fmt.Errorf("get cache error,err value is nil")
}
downloadProcess := &files.Process{}
_ = json.Unmarshal(value, downloadProcess)
_ = json.Unmarshal([]byte(value), downloadProcess)
result = append(result, *downloadProcess)
}
res, err = json.Marshal(result)

View file

@ -2,9 +2,7 @@ package global
import (
"github.com/1Panel-dev/1Panel/core/configs"
"github.com/1Panel-dev/1Panel/core/init/cache/badger_db"
"github.com/1Panel-dev/1Panel/core/init/session/psession"
"github.com/dgraph-io/badger/v4"
"github.com/go-playground/validator/v10"
"github.com/nicksnyder/go-i18n/v2/i18n"
"github.com/robfig/cron/v3"
@ -20,8 +18,6 @@ var (
CONF configs.ServerConfig
VALID *validator.Validate
SESSION *psession.PSession
CACHE *badger_db.Cache
CacheDb *badger.DB
Viper *viper.Viper
I18n *i18n.Localizer

View file

@ -1,79 +0,0 @@
package badger_db
import (
"fmt"
"time"
"github.com/dgraph-io/badger/v4"
)
type Cache struct {
db *badger.DB
}
func NewCacheDB(db *badger.DB) *Cache {
return &Cache{
db: db,
}
}
func (c *Cache) Set(key string, value interface{}) error {
err := c.db.Update(func(txn *badger.Txn) error {
v := []byte(fmt.Sprintf("%v", value))
return txn.Set([]byte(key), v)
})
return err
}
func (c *Cache) Del(key string) error {
err := c.db.Update(func(txn *badger.Txn) error {
return txn.Delete([]byte(key))
})
return err
}
func (c *Cache) Clean() error {
return c.db.DropAll()
}
func (c *Cache) Get(key string) ([]byte, error) {
var result []byte
err := c.db.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(key))
if err != nil {
return err
}
err = item.Value(func(val []byte) error {
result = append([]byte{}, val...)
return nil
})
return err
})
return result, err
}
func (c *Cache) SetWithTTL(key string, value interface{}, duration time.Duration) error {
err := c.db.Update(func(txn *badger.Txn) error {
v := []byte(fmt.Sprintf("%v", value))
e := badger.NewEntry([]byte(key), v).WithTTL(duration)
return txn.SetEntry(e)
})
return err
}
func (c *Cache) PrefixScanKey(prefixStr string) ([]string, error) {
var res []string
err := c.db.View(func(txn *badger.Txn) error {
it := txn.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
prefix := []byte(prefixStr)
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
item := it.Item()
k := item.Key()
res = append(res, string(k))
return nil
}
return nil
})
return res, err
}

View file

@ -1,60 +0,0 @@
package cache
import (
"os"
"path"
"time"
"github.com/1Panel-dev/1Panel/core/global"
"github.com/1Panel-dev/1Panel/core/init/cache/badger_db"
"github.com/dgraph-io/badger/v4"
)
func Init() {
c := path.Join(global.CONF.System.BaseDir, "1panel/cache")
_ = os.RemoveAll(c)
_ = os.Mkdir(c, 0755)
options := badger.Options{
Dir: c,
ValueDir: c,
ValueLogFileSize: 64 << 20,
ValueLogMaxEntries: 10 << 20,
VLogPercentile: 0.1,
MemTableSize: 32 << 20,
BaseTableSize: 2 << 20,
BaseLevelSize: 10 << 20,
TableSizeMultiplier: 2,
LevelSizeMultiplier: 10,
MaxLevels: 7,
NumGoroutines: 4,
MetricsEnabled: true,
NumCompactors: 2,
NumLevelZeroTables: 5,
NumLevelZeroTablesStall: 15,
NumMemtables: 1,
BloomFalsePositive: 0.01,
BlockSize: 2 * 1024,
SyncWrites: false,
NumVersionsToKeep: 1,
CompactL0OnClose: false,
VerifyValueChecksum: false,
BlockCacheSize: 32 << 20,
IndexCacheSize: 0,
ZSTDCompressionLevel: 1,
EncryptionKey: []byte{},
EncryptionKeyRotationDuration: 10 * 24 * time.Hour, // Default 10 days.
DetectConflicts: true,
NamespaceOffset: -1,
}
cache, err := badger.Open(options)
if err != nil {
panic(err)
}
_ = cache.DropAll()
global.CacheDb = cache
global.CACHE = badger_db.NewCacheDB(cache)
global.LOG.Info("init cache successfully")
}

View file

@ -42,8 +42,6 @@ func Routers() *gin.Engine {
Router = gin.Default()
Router.Use(i18n.UseI18n())
//Router.Use(sessions.Sessions("test", global.SESSION.Store))
swaggerRouter := Router.Group("1panel")
docs.SwaggerInfo.BasePath = "/api/v2"
swaggerRouter.Use(middleware.JwtAuth()).Use(middleware.SessionAuth()).GET("/swagger/*any", ginSwagger.WrapHandler(swaggerfiles.Handler))

View file

@ -11,7 +11,6 @@ import (
"github.com/1Panel-dev/1Panel/core/global"
"github.com/1Panel-dev/1Panel/core/i18n"
"github.com/1Panel-dev/1Panel/core/init/cache"
"github.com/1Panel-dev/1Panel/core/init/cron"
"github.com/1Panel-dev/1Panel/core/init/db"
"github.com/1Panel-dev/1Panel/core/init/hook"
@ -34,7 +33,6 @@ func Start() {
migration.Init()
validator.Init()
gob.Register(psession.SessionUser{})
cache.Init()
cron.Init()
session.Init()
gin.SetMode("debug")

1
go.mod
View file

@ -8,7 +8,6 @@ require (
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible
github.com/aws/aws-sdk-go v1.55.5
github.com/creack/pty v1.1.9
github.com/dgraph-io/badger/v4 v4.2.0
github.com/fsnotify/fsnotify v1.7.0
github.com/gin-contrib/gzip v1.0.1
github.com/gin-gonic/gin v1.10.0