diff --git a/cmd/run.go b/cmd/run.go index 18cd0d0..7251049 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -52,6 +52,7 @@ func NewRun() *cobra.Command { runCmd.Flags().StringP("config", "c", "", "Config file path (default $HOME/.teldrive/config.toml)") runCmd.Flags().IntVarP(&config.Server.Port, "server-port", "p", 8080, "Server port") duration.DurationVar(runCmd.Flags(), &config.Server.GracefulShutdown, "server-graceful-shutdown", 15*time.Second, "Server graceful shutdown timeout") + runCmd.Flags().BoolVar(&config.Server.EnablePprof, "server-enable-pprof", false, "Enable Pprof Profiling") runCmd.Flags().IntVarP(&config.Log.Level, "log-level", "", -1, "Logging level") runCmd.Flags().StringVar(&config.Log.File, "log-file", "", "Logging file path") @@ -219,6 +220,10 @@ func initApp(lc fx.Lifecycle, cfg *config.Config, c *controller.Controller) *gin r := gin.New() + if cfg.Server.EnablePprof { + pprof.Register(r) + } + r.Use(gin.Recovery()) r.Use(ginzap.GinzapWithConfig(logging.DefaultLogger().Desugar(), &ginzap.Config{ @@ -240,7 +245,6 @@ func initApp(lc fx.Lifecycle, cfg *config.Config, c *controller.Controller) *gin }) r = api.InitRouter(r, c, cfg) - pprof.Register(r) srv := &http.Server{ Addr: fmt.Sprintf(":%d", cfg.Server.Port), Handler: r, diff --git a/internal/config/config.go b/internal/config/config.go index 205fd22..a4946c5 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -15,6 +15,7 @@ type Config struct { type ServerConfig struct { Port int GracefulShutdown time.Duration + EnablePprof bool } type TGConfig struct { diff --git a/internal/reader/tg_reader.go b/internal/reader/tg_reader.go index 3d6569c..b19814e 100644 --- a/internal/reader/tg_reader.go +++ b/internal/reader/tg_reader.go @@ -123,6 +123,16 @@ func newTGReader( func (r *tgReader) Close() error { close(r.done) + close(r.bufferChan) + r.closed = true + for b := range r.bufferChan { + if b != nil { + b = nil + } + } + if r.cur != nil { + r.cur = nil + } close(r.err) return nil } @@ -172,8 +182,7 @@ func (r *tgReader) fillBufferConcurrently() error { bufferMap := make(map[int]*buffer) defer func() { - close(r.bufferChan) - r.closed = true + for i := range bufferMap { delete(bufferMap, i) } @@ -227,7 +236,11 @@ func (r *tgReader) fillBufferConcurrently() error { } else { for i := range r.concurrency { if r.currentPart+i+1 <= r.totalParts { - r.bufferChan <- bufferMap[i] + select { + case <-r.done: + return nil + case r.bufferChan <- bufferMap[i]: + } } } r.currentPart += r.concurrency @@ -252,8 +265,6 @@ func (r *tgReader) fillBufferConcurrently() error { func (r *tgReader) fillBufferSequentially() error { - defer close(r.bufferChan) - fetchChunk := func(ctx context.Context) (*buffer, error) { chunk, err := r.chunkSrc.Chunk(ctx, r.offset, r.chunkSize) if err != nil { @@ -283,6 +294,9 @@ func (r *tgReader) fillBufferSequentially() error { r.err <- err return nil } + if r.closed { + return nil + } r.bufferChan <- buf r.currentPart++ r.offset += r.chunkSize