mirror of
https://github.com/horacio9a/cam4-anonymous.git
synced 2024-12-27 01:55:22 +08:00
Replace for Streamlink 6.7.0 /streamlink_cli/utils/progress.py
This commit is contained in:
parent
07e853dc5b
commit
3949415576
1 changed files with 297 additions and 0 deletions
297
progress_670.py
Normal file
297
progress_670.py
Normal file
|
@ -0,0 +1,297 @@
|
|||
import os
|
||||
from collections import deque
|
||||
from math import floor
|
||||
from pathlib import PurePath
|
||||
from shutil import get_terminal_size
|
||||
from string import Formatter as StringFormatter
|
||||
from threading import Event, RLock, Thread
|
||||
from time import time
|
||||
from typing import Callable, Deque, Dict, Iterable, List, Optional, TextIO, Tuple, Union
|
||||
|
||||
from streamlink.compat import is_win32
|
||||
|
||||
|
||||
_stringformatter = StringFormatter()
|
||||
_TFormat = Iterable[Iterable[Tuple[str, Optional[str], Optional[str], Optional[str]]]]
|
||||
|
||||
|
||||
class ProgressFormatter:
|
||||
# Store formats as a tuple of lists of parsed format strings,
|
||||
# so when iterating, we don't have to parse over and over again.
|
||||
# Reserve at least 15 characters for the path, so it can be truncated with enough useful information.
|
||||
FORMATS: _TFormat = tuple(list(_stringformatter.parse(fmt)) for fmt in (
|
||||
"[download] => {path} {written} ({elapsed} @ {speed})",
|
||||
"[download] => {written} ({elapsed} @ {speed})",
|
||||
"[download] => {written} ({elapsed} @ {speed})",
|
||||
"[download] => {written} ({elapsed})",
|
||||
"[download] => {written}",
|
||||
))
|
||||
FORMATS_NOSPEED: _TFormat = tuple(list(_stringformatter.parse(fmt)) for fmt in (
|
||||
"[download] => {path} {written} ({elapsed})",
|
||||
"[download] => {written} ({elapsed})",
|
||||
"[download] => {written} ({elapsed})",
|
||||
"[download] => {written}",
|
||||
))
|
||||
|
||||
# Use U+2026 (HORIZONTAL ELLIPSIS) to be able to distinguish between "." and ".." when truncating relative paths
|
||||
ELLIPSIS: str = "…"
|
||||
|
||||
# widths generated from
|
||||
# https://www.unicode.org/Public/4.0-Update/EastAsianWidth-4.0.0.txt
|
||||
# See https://github.com/streamlink/streamlink/pull/2032
|
||||
WIDTHS: Iterable[Tuple[int, int]] = (
|
||||
(13, 1),
|
||||
(15, 0),
|
||||
(126, 1),
|
||||
(159, 0),
|
||||
(687, 1),
|
||||
(710, 0),
|
||||
(711, 1),
|
||||
(727, 0),
|
||||
(733, 1),
|
||||
(879, 0),
|
||||
(1154, 1),
|
||||
(1161, 0),
|
||||
(4347, 1),
|
||||
(4447, 2),
|
||||
(7467, 1),
|
||||
(7521, 0),
|
||||
(8369, 1),
|
||||
(8426, 0),
|
||||
(9000, 1),
|
||||
(9002, 2),
|
||||
(11021, 1),
|
||||
(12350, 2),
|
||||
(12351, 1),
|
||||
(12438, 2),
|
||||
(12442, 0),
|
||||
(19893, 2),
|
||||
(19967, 1),
|
||||
(55203, 2),
|
||||
(63743, 1),
|
||||
(64106, 2),
|
||||
(65039, 1),
|
||||
(65059, 0),
|
||||
(65131, 2),
|
||||
(65279, 1),
|
||||
(65376, 2),
|
||||
(65500, 1),
|
||||
(65510, 2),
|
||||
(120831, 1),
|
||||
(262141, 2),
|
||||
(1114109, 1),
|
||||
)
|
||||
|
||||
# On Windows, we need one less space, or we overflow the line for some reason.
|
||||
gap = 1 if is_win32 else 0
|
||||
|
||||
@classmethod
|
||||
def term_width(cls):
|
||||
return get_terminal_size().columns - cls.gap
|
||||
|
||||
@classmethod
|
||||
def _get_width(cls, ordinal: int) -> int:
|
||||
"""Return the width of a specific unicode character when it would be displayed."""
|
||||
return next((width for unicode, width in cls.WIDTHS if ordinal <= unicode), 1)
|
||||
|
||||
@classmethod
|
||||
def width(cls, value: str):
|
||||
"""Return the overall width of a string when it would be displayed."""
|
||||
return sum(map(cls._get_width, map(ord, value)))
|
||||
|
||||
@classmethod
|
||||
def cut(cls, value: str, max_width: int) -> str:
|
||||
"""Cut off the beginning of a string until its display width fits into the output size."""
|
||||
current = value
|
||||
for i in range(len(value)): # pragma: no branch
|
||||
current = value[i:]
|
||||
if cls.width(current) <= max_width:
|
||||
break
|
||||
return current
|
||||
|
||||
@classmethod
|
||||
def format(cls, formats: _TFormat, params: Dict[str, Union[str, Callable[[int], str]]]) -> str:
|
||||
term_width = cls.term_width()
|
||||
static: List[str] = []
|
||||
variable: List[Tuple[int, Callable[[int], str], int]] = []
|
||||
|
||||
for fmt in formats:
|
||||
static.clear()
|
||||
variable.clear()
|
||||
length = 0
|
||||
# Get literal texts, static segments and variable segments from the parsed format
|
||||
# and calculate the overall length of the literal texts and static segments after substituting them.
|
||||
for literal_text, field_name, format_spec, _conversion in fmt:
|
||||
static.append(literal_text)
|
||||
length += len(literal_text)
|
||||
if field_name is None:
|
||||
continue
|
||||
if field_name not in params:
|
||||
break
|
||||
value_or_callable = params[field_name]
|
||||
if not callable(value_or_callable):
|
||||
static.append(value_or_callable)
|
||||
length += len(value_or_callable)
|
||||
else:
|
||||
variable.append((len(static), value_or_callable, int(format_spec or 0)))
|
||||
static.append("")
|
||||
else:
|
||||
# No variable segments? Just check if the resulting string fits into the size constraints.
|
||||
if not variable:
|
||||
if length > term_width:
|
||||
continue
|
||||
else:
|
||||
break
|
||||
|
||||
# Get the available space for each variable segment (share space equally and round down).
|
||||
max_width = int((term_width - length) / len(variable))
|
||||
# If at least one variable segment doesn't fit, continue with the next format.
|
||||
if max_width < 1 or any(max_width < min_width for _, __, min_width in variable):
|
||||
continue
|
||||
# All variable segments fit, so finally format them, but continue with the next format if there's an error.
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
for idx, fn, _ in variable:
|
||||
static[idx] = fn(max_width)
|
||||
except Exception:
|
||||
continue
|
||||
break
|
||||
|
||||
return "".join(static)
|
||||
|
||||
@staticmethod
|
||||
def _round(num: float, n: int = 2) -> float:
|
||||
return floor(num * 10 ** n) / 10 ** n
|
||||
|
||||
@classmethod
|
||||
def format_filesize(cls, size: float, suffix: str = "") -> str:
|
||||
if size < 1024:
|
||||
return f"{size:.0f} bytes{suffix}"
|
||||
if size < 2**20:
|
||||
return f"{cls._round(size / 2**10, 2):.2f} KB{suffix}"
|
||||
if size < 2**30:
|
||||
return f"{cls._round(size / 2**20, 2):.2f} MB{suffix}"
|
||||
if size < 2**40:
|
||||
return f"{cls._round(size / 2**30, 2):.2f} GB{suffix}"
|
||||
|
||||
return f"{cls._round(size / 2**40, 2):.2f} TB{suffix}"
|
||||
|
||||
@classmethod
|
||||
def format_time(cls, elapsed: float) -> str:
|
||||
elapsed = max(elapsed, 0)
|
||||
|
||||
if elapsed < 60:
|
||||
return f"{int(elapsed % 60):1d}s"
|
||||
if elapsed < 3600:
|
||||
return f"{int(elapsed % 3600 / 60):1d}m{int(elapsed % 60):02d}s"
|
||||
|
||||
return f"{int(elapsed / 3600)}h{int(elapsed % 3600 / 60):02d}m{int(elapsed % 60):02d}s"
|
||||
|
||||
@classmethod
|
||||
def format_path(cls, path: PurePath, max_width: int) -> str:
|
||||
# Quick check if the path fits
|
||||
string = str(path)
|
||||
width = cls.width(string)
|
||||
if width <= max_width:
|
||||
return string
|
||||
|
||||
# Since the path doesn't fit, we always need to add an ellipsis.
|
||||
# On Windows, we also need to add the "drive" part (which is an empty string on PurePosixPath)
|
||||
max_width -= cls.width(path.drive) + cls.width(cls.ELLIPSIS)
|
||||
|
||||
# Ignore the path's first part, aka the "anchor" (drive + root)
|
||||
parts = os.path.sep.join(path.parts[1:] if path.drive else path.parts)
|
||||
truncated = cls.cut(parts, max_width)
|
||||
|
||||
return f"{path.drive}{cls.ELLIPSIS}{truncated}"
|
||||
|
||||
|
||||
class Progress(Thread):
|
||||
def __init__(
|
||||
self,
|
||||
stream: TextIO,
|
||||
path: PurePath,
|
||||
interval: float = 0.25,
|
||||
history: int = 20,
|
||||
threshold: int = 2,
|
||||
):
|
||||
"""
|
||||
:param stream: The output stream
|
||||
:param path: The path that's being written
|
||||
:param interval: Time in seconds between updates
|
||||
:param history: Number of seconds of how long download speed history is kept
|
||||
:param threshold: Number of seconds until download speed is shown
|
||||
"""
|
||||
|
||||
super().__init__(daemon=True)
|
||||
self._wait = Event()
|
||||
self._lock = RLock()
|
||||
|
||||
self.formatter = ProgressFormatter()
|
||||
|
||||
self.stream: TextIO = stream
|
||||
self.path: PurePath = path
|
||||
self.interval: float = interval
|
||||
self.history: Deque[Tuple[float, int]] = deque(maxlen=int(history / interval))
|
||||
self.threshold: int = int(threshold / interval)
|
||||
|
||||
self.started: float = 0.0
|
||||
self.overall: int = 0
|
||||
self.written: int = 0
|
||||
|
||||
def close(self):
|
||||
self._wait.set()
|
||||
|
||||
def write(self, chunk: bytes):
|
||||
size = len(chunk)
|
||||
with self._lock:
|
||||
self.overall += size
|
||||
self.written += size
|
||||
|
||||
def run(self):
|
||||
self.started = time()
|
||||
try:
|
||||
while not self._wait.wait(self.interval): # pragma: no cover
|
||||
self.update()
|
||||
finally:
|
||||
self.print_end()
|
||||
|
||||
def update(self):
|
||||
with self._lock:
|
||||
now = time()
|
||||
formatter = self.formatter
|
||||
history = self.history
|
||||
|
||||
history.append((now, self.written))
|
||||
self.written = 0
|
||||
|
||||
has_history = len(history) >= self.threshold
|
||||
if not has_history:
|
||||
formats = formatter.FORMATS_NOSPEED
|
||||
speed = ""
|
||||
else:
|
||||
formats = formatter.FORMATS
|
||||
speed = formatter.format_filesize(sum(size for _, size in history) / (now - history[0][0]), "/s")
|
||||
|
||||
params = dict(
|
||||
written=formatter.format_filesize(self.overall),
|
||||
elapsed=formatter.format_time(now - self.started),
|
||||
speed=speed,
|
||||
path=lambda max_width: formatter.format_path(self.path, max_width),
|
||||
)
|
||||
|
||||
status = formatter.format(formats, params)
|
||||
|
||||
self.print_inplace(status)
|
||||
|
||||
def print_inplace(self, msg: str):
|
||||
"""Clears the previous line and prints a new one."""
|
||||
term_width = self.formatter.term_width()
|
||||
spacing = term_width - self.formatter.width(msg)
|
||||
|
||||
self.stream.write(f"\r{msg}{' ' * max(0, spacing)}")
|
||||
self.stream.flush()
|
||||
|
||||
def print_end(self):
|
||||
self.stream.write("\n")
|
||||
self.stream.flush()
|
Loading…
Reference in a new issue