refactor: stream reader

This commit is contained in:
divyam234 2024-03-10 20:10:09 +05:30
parent 886e1149fa
commit acbf8b71d4
3 changed files with 65 additions and 50 deletions

View file

@ -15,8 +15,8 @@ type decrpytedReader struct {
pos int
client *telegram.Client
reader io.ReadCloser
bytesread int64
contentLength int64
limit int64
err error
encryptionKey string
}
@ -24,14 +24,14 @@ func NewDecryptedReader(
ctx context.Context,
client *telegram.Client,
parts []types.Part,
contentLength int64,
limit int64,
encryptionKey string) (io.ReadCloser, error) {
r := &decrpytedReader{
ctx: ctx,
parts: parts,
client: client,
contentLength: contentLength,
limit: limit,
encryptionKey: encryptionKey,
}
res, err := r.nextPart()
@ -48,25 +48,31 @@ func NewDecryptedReader(
func (r *decrpytedReader) Read(p []byte) (n int, err error) {
if r.err != nil {
return 0, r.err
}
if r.limit <= 0 {
return 0, io.EOF
}
n, err = r.reader.Read(p)
if err == io.EOF || n == 0 {
if err == nil {
r.limit -= int64(n)
}
if err == io.EOF {
if r.limit > 0 {
err = nil
}
r.pos++
if r.pos < len(r.parts) {
r.reader, err = r.nextPart()
if err != nil {
return 0, err
}
r.reader, err = newTGReader(r.ctx, r.client, r.parts[r.pos])
}
}
r.bytesread += int64(n)
if r.bytesread == r.contentLength {
return n, io.EOF
}
return n, nil
r.err = err
return
}
func (r *decrpytedReader) Close() (err error) {
@ -93,7 +99,7 @@ func (r *decrpytedReader) nextPart() (io.ReadCloser, error) {
end = min(r.parts[r.pos].Size-1, underlyingOffset+underlyingLimit-1)
}
return NewTGReader(r.ctx, r.client, types.Part{
return newTGReader(r.ctx, r.client, types.Part{
Start: underlyingOffset,
End: end,
Location: r.parts[r.pos].Location,

View file

@ -9,29 +9,29 @@ import (
)
type linearReader struct {
ctx context.Context
parts []types.Part
pos int
client *telegram.Client
reader io.ReadCloser
bytesread int64
contentLength int64
ctx context.Context
parts []types.Part
pos int
client *telegram.Client
reader io.ReadCloser
limit int64
err error
}
func NewLinearReader(ctx context.Context,
client *telegram.Client,
parts []types.Part,
contentLength int64,
limit int64,
) (reader io.ReadCloser, err error) {
r := &linearReader{
ctx: ctx,
parts: parts,
client: client,
contentLength: contentLength,
ctx: ctx,
parts: parts,
client: client,
limit: limit,
}
reader, err = NewTGReader(r.ctx, r.client, r.parts[r.pos])
reader, err = newTGReader(r.ctx, r.client, r.parts[r.pos])
if err != nil {
return nil, err
@ -43,24 +43,31 @@ func NewLinearReader(ctx context.Context,
func (r *linearReader) Read(p []byte) (n int, err error) {
if r.err != nil {
return 0, r.err
}
if r.limit <= 0 {
return 0, io.EOF
}
n, err = r.reader.Read(p)
if err == io.EOF || n == 0 {
if err == nil {
r.limit -= int64(n)
}
if err == io.EOF {
if r.limit > 0 {
err = nil
}
r.pos++
if r.pos < len(r.parts) {
r.reader, err = NewTGReader(r.ctx, r.client, r.parts[r.pos])
if err != nil {
return 0, err
}
r.reader, err = newTGReader(r.ctx, r.client, r.parts[r.pos])
}
}
r.bytesread += int64(n)
if r.bytesread == r.contentLength {
return n, io.EOF
}
return n, nil
r.err = err
return
}
func (r *linearReader) Close() (err error) {

View file

@ -18,7 +18,7 @@ type tgReader struct {
end int64
next func() ([]byte, error)
buffer []byte
bytesread int64
limit int64
chunkSize int64
i int64
}
@ -33,7 +33,7 @@ func calculateChunkSize(start, end int64) int64 {
return chunkSize
}
func NewTGReader(
func newTGReader(
ctx context.Context,
client *telegram.Client,
part types.Part,
@ -47,6 +47,7 @@ func NewTGReader(
start: part.Start,
end: part.End,
chunkSize: calculateChunkSize(part.Start, part.End),
limit: part.End - part.Start + 1,
}
r.next = r.partStream()
return r, nil
@ -54,6 +55,10 @@ func NewTGReader(
func (r *tgReader) Read(p []byte) (n int, err error) {
if r.limit <= 0 {
return 0, io.EOF
}
if r.i >= int64(len(r.buffer)) {
r.buffer, err = r.next()
if err != nil {
@ -71,12 +76,9 @@ func (r *tgReader) Read(p []byte) (n int, err error) {
}
n = copy(p, r.buffer[r.i:])
r.i += int64(n)
r.bytesread += int64(n)
r.limit -= int64(n)
if r.bytesread == r.end-r.start+1 {
return n, io.EOF
}
return n, nil
return
}
func (*tgReader) Close() error {