mirror of
https://github.com/tgdrive/teldrive.git
synced 2025-09-06 14:37:49 +08:00
fix: memory leak in streams
This commit is contained in:
parent
470aeb053a
commit
04755c76e6
3 changed files with 25 additions and 6 deletions
|
@ -52,6 +52,7 @@ func NewRun() *cobra.Command {
|
|||
runCmd.Flags().StringP("config", "c", "", "Config file path (default $HOME/.teldrive/config.toml)")
|
||||
runCmd.Flags().IntVarP(&config.Server.Port, "server-port", "p", 8080, "Server port")
|
||||
duration.DurationVar(runCmd.Flags(), &config.Server.GracefulShutdown, "server-graceful-shutdown", 15*time.Second, "Server graceful shutdown timeout")
|
||||
runCmd.Flags().BoolVar(&config.Server.EnablePprof, "server-enable-pprof", false, "Enable Pprof Profiling")
|
||||
|
||||
runCmd.Flags().IntVarP(&config.Log.Level, "log-level", "", -1, "Logging level")
|
||||
runCmd.Flags().StringVar(&config.Log.File, "log-file", "", "Logging file path")
|
||||
|
@ -219,6 +220,10 @@ func initApp(lc fx.Lifecycle, cfg *config.Config, c *controller.Controller) *gin
|
|||
|
||||
r := gin.New()
|
||||
|
||||
if cfg.Server.EnablePprof {
|
||||
pprof.Register(r)
|
||||
}
|
||||
|
||||
r.Use(gin.Recovery())
|
||||
|
||||
r.Use(ginzap.GinzapWithConfig(logging.DefaultLogger().Desugar(), &ginzap.Config{
|
||||
|
@ -240,7 +245,6 @@ func initApp(lc fx.Lifecycle, cfg *config.Config, c *controller.Controller) *gin
|
|||
})
|
||||
|
||||
r = api.InitRouter(r, c, cfg)
|
||||
pprof.Register(r)
|
||||
srv := &http.Server{
|
||||
Addr: fmt.Sprintf(":%d", cfg.Server.Port),
|
||||
Handler: r,
|
||||
|
|
|
@ -15,6 +15,7 @@ type Config struct {
|
|||
type ServerConfig struct {
|
||||
Port int
|
||||
GracefulShutdown time.Duration
|
||||
EnablePprof bool
|
||||
}
|
||||
|
||||
type TGConfig struct {
|
||||
|
|
|
@ -123,6 +123,16 @@ func newTGReader(
|
|||
|
||||
func (r *tgReader) Close() error {
|
||||
close(r.done)
|
||||
close(r.bufferChan)
|
||||
r.closed = true
|
||||
for b := range r.bufferChan {
|
||||
if b != nil {
|
||||
b = nil
|
||||
}
|
||||
}
|
||||
if r.cur != nil {
|
||||
r.cur = nil
|
||||
}
|
||||
close(r.err)
|
||||
return nil
|
||||
}
|
||||
|
@ -172,8 +182,7 @@ func (r *tgReader) fillBufferConcurrently() error {
|
|||
bufferMap := make(map[int]*buffer)
|
||||
|
||||
defer func() {
|
||||
close(r.bufferChan)
|
||||
r.closed = true
|
||||
|
||||
for i := range bufferMap {
|
||||
delete(bufferMap, i)
|
||||
}
|
||||
|
@ -227,7 +236,11 @@ func (r *tgReader) fillBufferConcurrently() error {
|
|||
} else {
|
||||
for i := range r.concurrency {
|
||||
if r.currentPart+i+1 <= r.totalParts {
|
||||
r.bufferChan <- bufferMap[i]
|
||||
select {
|
||||
case <-r.done:
|
||||
return nil
|
||||
case r.bufferChan <- bufferMap[i]:
|
||||
}
|
||||
}
|
||||
}
|
||||
r.currentPart += r.concurrency
|
||||
|
@ -252,8 +265,6 @@ func (r *tgReader) fillBufferConcurrently() error {
|
|||
|
||||
func (r *tgReader) fillBufferSequentially() error {
|
||||
|
||||
defer close(r.bufferChan)
|
||||
|
||||
fetchChunk := func(ctx context.Context) (*buffer, error) {
|
||||
chunk, err := r.chunkSrc.Chunk(ctx, r.offset, r.chunkSize)
|
||||
if err != nil {
|
||||
|
@ -283,6 +294,9 @@ func (r *tgReader) fillBufferSequentially() error {
|
|||
r.err <- err
|
||||
return nil
|
||||
}
|
||||
if r.closed {
|
||||
return nil
|
||||
}
|
||||
r.bufferChan <- buf
|
||||
r.currentPart++
|
||||
r.offset += r.chunkSize
|
||||
|
|
Loading…
Add table
Reference in a new issue