mirror of
https://github.com/tgdrive/teldrive.git
synced 2025-02-24 23:13:53 +08:00
remove goroutines from linear reader
This commit is contained in:
parent
2533234de8
commit
61113620cd
1 changed files with 47 additions and 81 deletions
|
@ -15,11 +15,17 @@ type linearReader struct {
|
|||
ctx context.Context
|
||||
parts []types.Part
|
||||
pos int
|
||||
reader io.ReadCloser
|
||||
client *telegram.Client
|
||||
next func() []byte
|
||||
buffer []byte
|
||||
bytesread int64
|
||||
chunkSize int64
|
||||
sync.Mutex
|
||||
i int64
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func (*linearReader) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewLinearReader(ctx context.Context, client *telegram.Client, parts []types.Part) (io.ReadCloser, error) {
|
||||
|
@ -31,47 +37,37 @@ func NewLinearReader(ctx context.Context, client *telegram.Client, parts []types
|
|||
chunkSize: int64(1024 * 1024),
|
||||
}
|
||||
|
||||
res, err := r.nextPart()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
r.reader = res
|
||||
r.next = r.partStream()
|
||||
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func (r *linearReader) Read(p []byte) (n int, err error) {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
n, err = r.reader.Read(p)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
if r.i >= int64(len(r.buffer)) {
|
||||
r.buffer = r.next()
|
||||
if len(r.buffer) == 0 && r.pos == len(r.parts)-1 {
|
||||
return 0, io.EOF
|
||||
}
|
||||
r.i = 0
|
||||
}
|
||||
|
||||
n = copy(p, r.buffer[r.i:])
|
||||
|
||||
r.i += int64(n)
|
||||
|
||||
r.bytesread += int64(n)
|
||||
|
||||
if r.bytesread == r.parts[r.pos].Length && r.pos < len(r.parts)-1 {
|
||||
r.pos++
|
||||
r.reader, err = r.nextPart()
|
||||
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
r.next = r.partStream()
|
||||
r.bytesread = 0
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (r *linearReader) Close() (err error) {
|
||||
if r.reader != nil {
|
||||
err = r.reader.Close()
|
||||
r.reader = nil
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (r *linearReader) chunk(offset int64, limit int64) ([]byte, error) {
|
||||
|
||||
req := &tg.UploadGetFileRequest{
|
||||
|
@ -94,31 +90,9 @@ func (r *linearReader) chunk(offset int64, limit int64) ([]byte, error) {
|
|||
}
|
||||
}
|
||||
|
||||
func (r *linearReader) nextPart() (io.ReadCloser, error) {
|
||||
stream := r.tgRangeStream()
|
||||
ir, iw := io.Pipe()
|
||||
|
||||
go func() {
|
||||
defer iw.Close()
|
||||
|
||||
for {
|
||||
|
||||
data, more := <-stream
|
||||
if !more {
|
||||
return
|
||||
}
|
||||
|
||||
_, err := iw.Write(data)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return ir, nil
|
||||
}
|
||||
|
||||
func (r *linearReader) tgRangeStream() chan []byte {
|
||||
func (r *linearReader) partStream() func() []byte {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
start := r.parts[r.pos].Start
|
||||
end := r.parts[r.pos].End
|
||||
|
@ -132,41 +106,33 @@ func (r *linearReader) tgRangeStream() chan []byte {
|
|||
|
||||
currentPart := 1
|
||||
|
||||
stream := make(chan []byte)
|
||||
readData := func() []byte {
|
||||
|
||||
go func() {
|
||||
if currentPart > partCount {
|
||||
return make([]byte, 0)
|
||||
}
|
||||
|
||||
defer close(stream)
|
||||
res, _ := r.chunk(offset, r.chunkSize)
|
||||
|
||||
for {
|
||||
if len(res) == 0 {
|
||||
return res
|
||||
} else if partCount == 1 {
|
||||
res = res[firstPartCut:lastPartCut]
|
||||
|
||||
res, _ := r.chunk(offset, r.chunkSize)
|
||||
} else if currentPart == 1 {
|
||||
res = res[firstPartCut:]
|
||||
|
||||
if len(res) == 0 {
|
||||
return
|
||||
} else if partCount == 1 {
|
||||
res = res[firstPartCut:lastPartCut]
|
||||
|
||||
} else if currentPart == 1 {
|
||||
res = res[firstPartCut:]
|
||||
|
||||
} else if currentPart == partCount {
|
||||
res = res[:lastPartCut]
|
||||
|
||||
}
|
||||
|
||||
stream <- res
|
||||
|
||||
currentPart++
|
||||
|
||||
offset += r.chunkSize
|
||||
|
||||
if currentPart > partCount {
|
||||
return
|
||||
}
|
||||
} else if currentPart == partCount {
|
||||
res = res[:lastPartCut]
|
||||
|
||||
}
|
||||
}()
|
||||
|
||||
return stream
|
||||
currentPart++
|
||||
|
||||
offset += r.chunkSize
|
||||
|
||||
return res
|
||||
|
||||
}
|
||||
return readData
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue