mirror of
https://github.com/morpheus65535/bazarr.git
synced 2026-01-18 15:02:58 +08:00
319 lines
10 KiB
Python
319 lines
10 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""Subtis provider for subliminal."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import struct
|
|
from json import JSONDecodeError
|
|
from typing import TYPE_CHECKING
|
|
from urllib.parse import quote
|
|
|
|
from guessit import guessit
|
|
from requests import Session
|
|
from requests.exceptions import HTTPError, RequestException, Timeout
|
|
from subliminal.video import Episode, Movie
|
|
from subliminal_patch.providers import Provider
|
|
from subliminal_patch.subtitle import Subtitle, guess_matches
|
|
from subzero.language import Language
|
|
|
|
if TYPE_CHECKING:
|
|
from subliminal.video import Video as SubtitleVideo
|
|
|
|
__version__ = "0.9.2"
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
API_BASE_URL = "https://api.subt.is/v1"
|
|
USER_AGENT = f"Bazarr/Subtis/{__version__}"
|
|
REQUEST_TIMEOUT_SECONDS = 10
|
|
DOWNLOAD_TIMEOUT_SECONDS = 30
|
|
|
|
|
|
class SubtisSubtitle(Subtitle):
|
|
"""Subtitle representation for the Subtis provider.
|
|
|
|
Represents a Spanish subtitle from the subt.is API with metadata
|
|
for matching against video files.
|
|
"""
|
|
|
|
provider_name: str = "subtis"
|
|
hash_verifiable: bool = False
|
|
|
|
def __init__(
|
|
self,
|
|
language: Language,
|
|
video: Movie,
|
|
page_link: str,
|
|
title: str,
|
|
download_url: str,
|
|
is_synced: bool = True,
|
|
) -> None:
|
|
super().__init__(language, hearing_impaired=False, page_link=page_link)
|
|
self.video = video
|
|
self.download_url = download_url
|
|
self.is_synced = is_synced
|
|
self._title = str(title).strip()
|
|
sync_indicator = "" if is_synced else " [fuzzy match]"
|
|
self.release_info = f"{self._title}{sync_indicator}"
|
|
|
|
@property
|
|
def id(self) -> str:
|
|
return self.page_link
|
|
|
|
def get_matches(self, video: SubtitleVideo) -> set[str]:
|
|
matches: set[str] = set()
|
|
|
|
if isinstance(video, Movie):
|
|
matches |= guess_matches(video, guessit(self._title, {"type": "movie"}))
|
|
|
|
return matches
|
|
|
|
|
|
class SubtisProvider(Provider):
|
|
"""Subtis subtitle provider for Spanish language subtitles.
|
|
|
|
Searches the subt.is API for subtitles using a cascade of increasingly
|
|
broad matching strategies (hash -> bytes -> filename -> alternative).
|
|
Currently supports movies only.
|
|
"""
|
|
|
|
languages: set[Language] = {Language.fromalpha2("es")}
|
|
video_types: tuple[type[Movie], ...] = (Movie,)
|
|
provider_name: str = "subtis"
|
|
version: str = __version__
|
|
|
|
def __init__(self) -> None:
|
|
self.session: Session | None = None
|
|
|
|
def initialize(self) -> None:
|
|
self.session = Session()
|
|
self.session.headers.update(
|
|
{
|
|
"User-Agent": USER_AGENT,
|
|
"Accept": "application/json",
|
|
}
|
|
)
|
|
|
|
def terminate(self) -> None:
|
|
if self.session is not None:
|
|
self.session.close()
|
|
self.session = None
|
|
|
|
def _encode_filename(self, filename: str) -> str:
|
|
return quote(filename, safe="")
|
|
|
|
def _build_hash_url(self, video_hash: str) -> str:
|
|
return f"{API_BASE_URL}/subtitle/find/file/hash/{video_hash}"
|
|
|
|
def _build_bytes_url(self, file_size: int) -> str:
|
|
return f"{API_BASE_URL}/subtitle/find/file/bytes/{file_size}"
|
|
|
|
def _build_filename_url(self, filename: str) -> str:
|
|
encoded = self._encode_filename(filename)
|
|
return f"{API_BASE_URL}/subtitle/find/file/name/{encoded}"
|
|
|
|
def _build_alternative_url(self, filename: str) -> str:
|
|
encoded = self._encode_filename(filename)
|
|
return f"{API_BASE_URL}/subtitle/file/alternative/{encoded}"
|
|
|
|
def _compute_video_hash(self, file_path: str) -> str | None:
|
|
"""Compute OpenSubtitles hash for a video file.
|
|
|
|
Hash is: size + checksum(first 64KB) + checksum(last 64KB)
|
|
"""
|
|
try:
|
|
file_size = os.path.getsize(file_path)
|
|
if file_size <= 0:
|
|
return None
|
|
|
|
def _checksum_at(offset: int, length: int) -> int:
|
|
checksum = 0
|
|
with open(file_path, "rb") as handle:
|
|
handle.seek(offset)
|
|
data = handle.read(length)
|
|
if not data:
|
|
return 0
|
|
padding = (8 - (len(data) % 8)) % 8
|
|
if padding:
|
|
data += b"\0" * padding
|
|
for chunk in struct.iter_unpack("<Q", data):
|
|
checksum += chunk[0]
|
|
return checksum
|
|
|
|
chunk_size = min(65536, file_size)
|
|
head_sum = _checksum_at(0, chunk_size)
|
|
tail_offset = max(file_size - chunk_size, 0)
|
|
tail_sum = _checksum_at(tail_offset, chunk_size)
|
|
|
|
file_hash = (file_size + head_sum + tail_sum) & 0xFFFFFFFFFFFFFFFF
|
|
return f"{file_hash:016x}"
|
|
except OSError as error:
|
|
logger.warning("Unable to compute hash for %s: %s", file_path, error)
|
|
return None
|
|
|
|
def _parse_api_response(
|
|
self,
|
|
response_data: dict[str, object],
|
|
) -> tuple[str, str] | None:
|
|
"""Extract subtitle link and title from API response.
|
|
|
|
Expects response in format:
|
|
{"subtitle": {"subtitle_link": "..."}, "title": {"title_name": "..."}}
|
|
|
|
Returns:
|
|
Tuple of (subtitle_link, title_name), or None if required fields
|
|
are missing. Uses "Unknown" as fallback for missing title_name.
|
|
"""
|
|
if not isinstance(response_data, dict):
|
|
return None
|
|
|
|
subtitle_data = response_data.get("subtitle")
|
|
if not isinstance(subtitle_data, dict):
|
|
return None
|
|
|
|
subtitle_link = subtitle_data.get("subtitle_link")
|
|
if not isinstance(subtitle_link, str) or not subtitle_link:
|
|
return None
|
|
|
|
title_data = response_data.get("title", {})
|
|
title_name = (
|
|
title_data.get("title_name", "Unknown")
|
|
if isinstance(title_data, dict)
|
|
else "Unknown"
|
|
)
|
|
|
|
return subtitle_link, str(title_name)
|
|
|
|
def _fetch_subtitle(
|
|
self,
|
|
url: str,
|
|
filename: str,
|
|
) -> tuple[str, str] | None:
|
|
"""Fetch and parse subtitle from URL.
|
|
|
|
Returns tuple of (subtitle_link, title_name) or None if not found.
|
|
"""
|
|
if self.session is None:
|
|
logger.warning("Session not initialized")
|
|
return None
|
|
|
|
try:
|
|
response = self.session.get(url, timeout=REQUEST_TIMEOUT_SECONDS)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
return self._parse_api_response(data)
|
|
except Timeout:
|
|
logger.warning("Request timed out for %s", filename)
|
|
except HTTPError as error:
|
|
if error.response.status_code != 404:
|
|
logger.warning("HTTP %s for %s", error.response.status_code, filename)
|
|
except RequestException as error:
|
|
logger.warning("Network error for %s: %s", filename, error)
|
|
except JSONDecodeError as error:
|
|
logger.warning("Invalid JSON response for %s: %s", filename, error)
|
|
return None
|
|
|
|
def query(self, language: Language, video: Movie | Episode) -> list[SubtisSubtitle]:
|
|
if not video.name:
|
|
logger.warning("Missing video name")
|
|
return []
|
|
|
|
filename = os.path.basename(video.name)
|
|
|
|
video_hash: str | None = None
|
|
if os.path.exists(video.name):
|
|
video_hash = self._compute_video_hash(video.name)
|
|
|
|
cascade_steps: list[tuple[str, bool, str]] = []
|
|
if video_hash:
|
|
cascade_steps.append((self._build_hash_url(video_hash), True, "hash"))
|
|
if video.size:
|
|
cascade_steps.append((self._build_bytes_url(video.size), True, "bytes"))
|
|
cascade_steps.append((self._build_filename_url(filename), True, "name"))
|
|
cascade_steps.append(
|
|
(self._build_alternative_url(filename), False, "alternative")
|
|
)
|
|
|
|
for url, is_synced, method in cascade_steps:
|
|
parsed = self._fetch_subtitle(url, filename)
|
|
if parsed:
|
|
subtitle_link, title_name = parsed
|
|
logger.debug(
|
|
"Found subtitle via cascade search (%s) for %s",
|
|
method,
|
|
filename,
|
|
)
|
|
return [
|
|
SubtisSubtitle(
|
|
language=language,
|
|
video=video,
|
|
page_link=url,
|
|
title=title_name,
|
|
download_url=subtitle_link,
|
|
is_synced=is_synced,
|
|
)
|
|
]
|
|
|
|
logger.info("No subtitle found for %s", filename)
|
|
return []
|
|
|
|
def list_subtitles(
|
|
self,
|
|
video: Movie | Episode,
|
|
languages: set[Language],
|
|
) -> list[SubtisSubtitle]:
|
|
if isinstance(video, Episode):
|
|
logger.debug("TV show support not yet implemented")
|
|
return []
|
|
|
|
subtitles: list[SubtisSubtitle] = []
|
|
for language in languages:
|
|
subtitles.extend(self.query(language, video))
|
|
return subtitles
|
|
|
|
def download_subtitle(self, subtitle: SubtisSubtitle) -> None:
|
|
"""Download subtitle content from the API.
|
|
|
|
Fetches the subtitle file from subtitle.download_url and stores
|
|
the content in subtitle.content. Handles network errors gracefully.
|
|
"""
|
|
if self.session is None:
|
|
logger.warning("Session not initialized")
|
|
return
|
|
|
|
if not subtitle.download_url:
|
|
logger.warning("No download URL available")
|
|
return
|
|
|
|
try:
|
|
response = self.session.get(
|
|
subtitle.download_url,
|
|
timeout=DOWNLOAD_TIMEOUT_SECONDS,
|
|
)
|
|
response.raise_for_status()
|
|
|
|
if not response.content:
|
|
logger.warning("Empty subtitle content")
|
|
return
|
|
|
|
subtitle.content = response.content
|
|
|
|
except Timeout:
|
|
logger.warning(
|
|
"Download timed out from %s",
|
|
subtitle.download_url,
|
|
)
|
|
except HTTPError as error:
|
|
logger.warning(
|
|
"HTTP error %s downloading from %s",
|
|
error.response.status_code,
|
|
subtitle.download_url,
|
|
)
|
|
except RequestException as error:
|
|
logger.warning(
|
|
"Network error downloading from %s: %s",
|
|
subtitle.download_url,
|
|
error,
|
|
)
|