From 39494155764bbeba5be3bacd6a7300554cc4cb6f Mon Sep 17 00:00:00 2001 From: horacio9a Date: Mon, 18 Mar 2024 10:37:05 +0100 Subject: [PATCH] Replace for Streamlink 6.7.0 /streamlink_cli/utils/progress.py --- progress_670.py | 297 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 297 insertions(+) create mode 100644 progress_670.py diff --git a/progress_670.py b/progress_670.py new file mode 100644 index 0000000..c6e09ed --- /dev/null +++ b/progress_670.py @@ -0,0 +1,297 @@ +import os +from collections import deque +from math import floor +from pathlib import PurePath +from shutil import get_terminal_size +from string import Formatter as StringFormatter +from threading import Event, RLock, Thread +from time import time +from typing import Callable, Deque, Dict, Iterable, List, Optional, TextIO, Tuple, Union + +from streamlink.compat import is_win32 + + +_stringformatter = StringFormatter() +_TFormat = Iterable[Iterable[Tuple[str, Optional[str], Optional[str], Optional[str]]]] + + +class ProgressFormatter: + # Store formats as a tuple of lists of parsed format strings, + # so when iterating, we don't have to parse over and over again. + # Reserve at least 15 characters for the path, so it can be truncated with enough useful information. + FORMATS: _TFormat = tuple(list(_stringformatter.parse(fmt)) for fmt in ( + "[download] => {path} {written} ({elapsed} @ {speed})", + "[download] => {written} ({elapsed} @ {speed})", + "[download] => {written} ({elapsed} @ {speed})", + "[download] => {written} ({elapsed})", + "[download] => {written}", + )) + FORMATS_NOSPEED: _TFormat = tuple(list(_stringformatter.parse(fmt)) for fmt in ( + "[download] => {path} {written} ({elapsed})", + "[download] => {written} ({elapsed})", + "[download] => {written} ({elapsed})", + "[download] => {written}", + )) + + # Use U+2026 (HORIZONTAL ELLIPSIS) to be able to distinguish between "." and ".." when truncating relative paths + ELLIPSIS: str = "…" + + # widths generated from + # https://www.unicode.org/Public/4.0-Update/EastAsianWidth-4.0.0.txt + # See https://github.com/streamlink/streamlink/pull/2032 + WIDTHS: Iterable[Tuple[int, int]] = ( + (13, 1), + (15, 0), + (126, 1), + (159, 0), + (687, 1), + (710, 0), + (711, 1), + (727, 0), + (733, 1), + (879, 0), + (1154, 1), + (1161, 0), + (4347, 1), + (4447, 2), + (7467, 1), + (7521, 0), + (8369, 1), + (8426, 0), + (9000, 1), + (9002, 2), + (11021, 1), + (12350, 2), + (12351, 1), + (12438, 2), + (12442, 0), + (19893, 2), + (19967, 1), + (55203, 2), + (63743, 1), + (64106, 2), + (65039, 1), + (65059, 0), + (65131, 2), + (65279, 1), + (65376, 2), + (65500, 1), + (65510, 2), + (120831, 1), + (262141, 2), + (1114109, 1), + ) + + # On Windows, we need one less space, or we overflow the line for some reason. + gap = 1 if is_win32 else 0 + + @classmethod + def term_width(cls): + return get_terminal_size().columns - cls.gap + + @classmethod + def _get_width(cls, ordinal: int) -> int: + """Return the width of a specific unicode character when it would be displayed.""" + return next((width for unicode, width in cls.WIDTHS if ordinal <= unicode), 1) + + @classmethod + def width(cls, value: str): + """Return the overall width of a string when it would be displayed.""" + return sum(map(cls._get_width, map(ord, value))) + + @classmethod + def cut(cls, value: str, max_width: int) -> str: + """Cut off the beginning of a string until its display width fits into the output size.""" + current = value + for i in range(len(value)): # pragma: no branch + current = value[i:] + if cls.width(current) <= max_width: + break + return current + + @classmethod + def format(cls, formats: _TFormat, params: Dict[str, Union[str, Callable[[int], str]]]) -> str: + term_width = cls.term_width() + static: List[str] = [] + variable: List[Tuple[int, Callable[[int], str], int]] = [] + + for fmt in formats: + static.clear() + variable.clear() + length = 0 + # Get literal texts, static segments and variable segments from the parsed format + # and calculate the overall length of the literal texts and static segments after substituting them. + for literal_text, field_name, format_spec, _conversion in fmt: + static.append(literal_text) + length += len(literal_text) + if field_name is None: + continue + if field_name not in params: + break + value_or_callable = params[field_name] + if not callable(value_or_callable): + static.append(value_or_callable) + length += len(value_or_callable) + else: + variable.append((len(static), value_or_callable, int(format_spec or 0))) + static.append("") + else: + # No variable segments? Just check if the resulting string fits into the size constraints. + if not variable: + if length > term_width: + continue + else: + break + + # Get the available space for each variable segment (share space equally and round down). + max_width = int((term_width - length) / len(variable)) + # If at least one variable segment doesn't fit, continue with the next format. + if max_width < 1 or any(max_width < min_width for _, __, min_width in variable): + continue + # All variable segments fit, so finally format them, but continue with the next format if there's an error. + # noinspection PyBroadException + try: + for idx, fn, _ in variable: + static[idx] = fn(max_width) + except Exception: + continue + break + + return "".join(static) + + @staticmethod + def _round(num: float, n: int = 2) -> float: + return floor(num * 10 ** n) / 10 ** n + + @classmethod + def format_filesize(cls, size: float, suffix: str = "") -> str: + if size < 1024: + return f"{size:.0f} bytes{suffix}" + if size < 2**20: + return f"{cls._round(size / 2**10, 2):.2f} KB{suffix}" + if size < 2**30: + return f"{cls._round(size / 2**20, 2):.2f} MB{suffix}" + if size < 2**40: + return f"{cls._round(size / 2**30, 2):.2f} GB{suffix}" + + return f"{cls._round(size / 2**40, 2):.2f} TB{suffix}" + + @classmethod + def format_time(cls, elapsed: float) -> str: + elapsed = max(elapsed, 0) + + if elapsed < 60: + return f"{int(elapsed % 60):1d}s" + if elapsed < 3600: + return f"{int(elapsed % 3600 / 60):1d}m{int(elapsed % 60):02d}s" + + return f"{int(elapsed / 3600)}h{int(elapsed % 3600 / 60):02d}m{int(elapsed % 60):02d}s" + + @classmethod + def format_path(cls, path: PurePath, max_width: int) -> str: + # Quick check if the path fits + string = str(path) + width = cls.width(string) + if width <= max_width: + return string + + # Since the path doesn't fit, we always need to add an ellipsis. + # On Windows, we also need to add the "drive" part (which is an empty string on PurePosixPath) + max_width -= cls.width(path.drive) + cls.width(cls.ELLIPSIS) + + # Ignore the path's first part, aka the "anchor" (drive + root) + parts = os.path.sep.join(path.parts[1:] if path.drive else path.parts) + truncated = cls.cut(parts, max_width) + + return f"{path.drive}{cls.ELLIPSIS}{truncated}" + + +class Progress(Thread): + def __init__( + self, + stream: TextIO, + path: PurePath, + interval: float = 0.25, + history: int = 20, + threshold: int = 2, + ): + """ + :param stream: The output stream + :param path: The path that's being written + :param interval: Time in seconds between updates + :param history: Number of seconds of how long download speed history is kept + :param threshold: Number of seconds until download speed is shown + """ + + super().__init__(daemon=True) + self._wait = Event() + self._lock = RLock() + + self.formatter = ProgressFormatter() + + self.stream: TextIO = stream + self.path: PurePath = path + self.interval: float = interval + self.history: Deque[Tuple[float, int]] = deque(maxlen=int(history / interval)) + self.threshold: int = int(threshold / interval) + + self.started: float = 0.0 + self.overall: int = 0 + self.written: int = 0 + + def close(self): + self._wait.set() + + def write(self, chunk: bytes): + size = len(chunk) + with self._lock: + self.overall += size + self.written += size + + def run(self): + self.started = time() + try: + while not self._wait.wait(self.interval): # pragma: no cover + self.update() + finally: + self.print_end() + + def update(self): + with self._lock: + now = time() + formatter = self.formatter + history = self.history + + history.append((now, self.written)) + self.written = 0 + + has_history = len(history) >= self.threshold + if not has_history: + formats = formatter.FORMATS_NOSPEED + speed = "" + else: + formats = formatter.FORMATS + speed = formatter.format_filesize(sum(size for _, size in history) / (now - history[0][0]), "/s") + + params = dict( + written=formatter.format_filesize(self.overall), + elapsed=formatter.format_time(now - self.started), + speed=speed, + path=lambda max_width: formatter.format_path(self.path, max_width), + ) + + status = formatter.format(formats, params) + + self.print_inplace(status) + + def print_inplace(self, msg: str): + """Clears the previous line and prints a new one.""" + term_width = self.formatter.term_width() + spacing = term_width - self.formatter.width(msg) + + self.stream.write(f"\r{msg}{' ' * max(0, spacing)}") + self.stream.flush() + + def print_end(self): + self.stream.write("\n") + self.stream.flush()