mirror of
https://github.com/morpheus65535/bazarr.git
synced 2025-12-28 20:58:32 +08:00
569 lines
22 KiB
Python
569 lines
22 KiB
Python
# -*- coding: utf-8 -*-
|
||
from __future__ import annotations
|
||
import logging
|
||
import os
|
||
import time
|
||
import io
|
||
import datetime
|
||
|
||
from typing import Set
|
||
from typing import Optional, TYPE_CHECKING
|
||
|
||
from babelfish import language_converters
|
||
from zipfile import ZipFile, is_zipfile
|
||
from requests import Session, Response
|
||
from guessit import guessit
|
||
|
||
from subzero.language import Language
|
||
from subliminal import Episode, Movie
|
||
from subliminal.cache import region
|
||
from subliminal.exceptions import AuthenticationError, ConfigurationError
|
||
from subliminal_patch.exceptions import APIThrottled, ForbiddenError, TooManyRequests
|
||
from .mixins import ProviderRetryMixin
|
||
from subliminal_patch.subtitle import Subtitle
|
||
from subliminal_patch.providers import Provider, utils
|
||
from subliminal_patch.providers.mixins import ProviderSubtitleArchiveMixin
|
||
|
||
if TYPE_CHECKING:
|
||
from subliminal_patch import Video
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
TITLES_EXPIRATION_TIME = datetime.timedelta(hours=6).total_seconds()
|
||
QUERIES_EXPIRATION_TIME = datetime.timedelta(hours=1).total_seconds()
|
||
ARCHIVES_EXPIRATION_TIME = datetime.timedelta(minutes=15).total_seconds()
|
||
|
||
retry_amount = 3
|
||
retry_timeout = 5
|
||
|
||
language_converters.register('subsource = subliminal_patch.converters.subsource:SubsourceConverter')
|
||
supported_languages = list(language_converters['subsource'].to_subsource.keys())
|
||
|
||
|
||
class SubsourceSubtitle(Subtitle):
|
||
provider_name = 'subsource'
|
||
hash_verifiable = False
|
||
hearing_impaired_verifiable = True
|
||
|
||
def __init__(self, language, forced, hearing_impaired, page_link, subtitles_id, release_names, uploader,
|
||
season=None, episode=None, asked_for_episode=None, is_pack=False):
|
||
super().__init__(language)
|
||
language = Language.rebuild(language, hi=hearing_impaired, forced=forced)
|
||
|
||
self.releases = release_names
|
||
self.release_info = ', '.join(release_names)
|
||
self.language = language
|
||
self.forced = forced
|
||
self.hearing_impaired = hearing_impaired
|
||
self.subtitles_id = subtitles_id
|
||
self.page_link = page_link
|
||
self.download_link = None
|
||
self.uploader = uploader
|
||
self.matches = None
|
||
self.season = season
|
||
self.episode = episode
|
||
self.asked_for_episode = asked_for_episode
|
||
self.is_pack = is_pack
|
||
|
||
@property
|
||
def id(self) -> int:
|
||
return self.subtitles_id
|
||
|
||
def get_matches(self, video: Video) -> Set[str]:
|
||
"""
|
||
Analyzes the given subtitles and identifies relevant attributes or associations
|
||
by updating the matches set. Handles movies and series differently to
|
||
match relevant attributes like title, IMDb identifiers, and pack type.
|
||
|
||
:param video: A video instance, typically a movie or a series episode, to
|
||
analyze for matches.
|
||
:return: A set of strings representing identified attributes or matches
|
||
relevant to the given video.
|
||
"""
|
||
matches = set()
|
||
|
||
utils.update_matches(matches, video, self.release_info)
|
||
|
||
# handle movies and series separately
|
||
if isinstance(video, Episode):
|
||
# series
|
||
matches.add('series')
|
||
# imdb
|
||
matches.add('series_imdb_id')
|
||
# season pack
|
||
if self.is_pack:
|
||
matches.add('episode')
|
||
else:
|
||
# title
|
||
matches.add('title')
|
||
# imdb
|
||
matches.add('imdb_id')
|
||
|
||
self.matches = matches
|
||
|
||
return matches
|
||
|
||
|
||
class SubsourceProvider(ProviderRetryMixin, Provider, ProviderSubtitleArchiveMixin):
|
||
"""Subsource Provider"""
|
||
server_hostname = 'api.subsource.net'
|
||
|
||
languages = {Language(*lang) for lang in supported_languages}
|
||
languages.update(set(Language.rebuild(lang, forced=True) for lang in languages))
|
||
languages.update(set(Language.rebuild(lang, hi=True) for lang in languages))
|
||
|
||
video_types = (Episode, Movie)
|
||
|
||
def __init__(self, api_key=None):
|
||
if not api_key:
|
||
raise ConfigurationError('Api_key must be specified')
|
||
|
||
self.session = Session()
|
||
self.session.headers = {'User-Agent': os.environ.get("SZ_USER_AGENT", "Sub-Zero/2")}
|
||
self.api_key = api_key
|
||
self.video = None
|
||
self._started = None
|
||
|
||
def initialize(self):
|
||
self._started = time.time()
|
||
|
||
def terminate(self):
|
||
self.session.close()
|
||
|
||
def _server_url(self) -> str:
|
||
return f'https://{self.server_hostname}/api/v1/'
|
||
|
||
@region.cache_on_arguments(expiration_time=TITLES_EXPIRATION_TIME)
|
||
def search_titles(self, title: str, imdb_id: str, season: int = None) -> Optional[int]:
|
||
"""
|
||
Searches for the ID of a movie or TV show title on an external database using either title, IMDb ID,
|
||
and optionally the season number. The method sends a request to the provider's API server, deserializes
|
||
the response, and attempts to find a matching title based on the provided parameters.
|
||
|
||
:param title: The name of the title to search for, provided as a string.
|
||
:type title: str
|
||
:param imdb_id: The IMDb ID of the title to search for.
|
||
:type imdb_id: str
|
||
:param season: (Optional) The season number if the search is for a TV show. Defaults to None.
|
||
:type season: int, optional
|
||
:return: The ID of the movie or show if found, otherwise None.
|
||
:rtype: Optional[int]
|
||
"""
|
||
title_id = None
|
||
|
||
if imdb_id:
|
||
parameters = {
|
||
'api_key': self.api_key,
|
||
'searchType': 'imdb',
|
||
'imdb': imdb_id,
|
||
}
|
||
logger.debug(f'Searching using this imdb ID: {imdb_id}')
|
||
else:
|
||
parameters = {
|
||
'api_key': self.api_key,
|
||
'searchType': 'text',
|
||
'q': title.lower(),
|
||
}
|
||
logger.debug(f'Searching using this title: {title}')
|
||
|
||
if season:
|
||
parameters['season'] = season
|
||
|
||
results = self.retry(
|
||
lambda: self.session.get(self._server_url() + 'movies/search', params=parameters, timeout=30),
|
||
amount=retry_amount,
|
||
retry_timeout=retry_timeout
|
||
)
|
||
|
||
self._status_raiser(results)
|
||
|
||
# deserialize results
|
||
results_dict = results.json()['data']
|
||
def get_alternative_titles(video):
|
||
titles = set()
|
||
if isinstance(video, Episode):
|
||
if video.series:
|
||
titles.add(video.series)
|
||
for alt in video.alternative_series or []:
|
||
titles.add(alt)
|
||
else:
|
||
if video.title:
|
||
titles.add(video.title)
|
||
return {t.lower() for t in titles if t}
|
||
|
||
|
||
alternative_titles = get_alternative_titles(self.video)
|
||
logger.debug(f"alternative titles: {alternative_titles}")
|
||
|
||
# loop over results
|
||
for result in results_dict:
|
||
if 'title' not in result or 'releaseYear' not in result:
|
||
continue
|
||
|
||
sub_titles = {result['title'].lower()}
|
||
logger.debug(f"Subsource titles: {sub_titles}")
|
||
|
||
if result.get('alternateTitle'):
|
||
sub_titles.add(result['alternateTitle'].lower())
|
||
matched = False
|
||
for alternative_title in alternative_titles:
|
||
for sub in sub_titles:
|
||
if alternative_title in sub:
|
||
matched = True
|
||
if matched:
|
||
break
|
||
if matched:
|
||
if not self.video.year or self.video.year == int(result['releaseYear']):
|
||
title_id = result['movieId']
|
||
break
|
||
else:
|
||
continue
|
||
|
||
if title_id:
|
||
logger.debug(f'Found this title ID: {title_id}')
|
||
else:
|
||
logger.debug(f'No match found for {title}')
|
||
|
||
return title_id
|
||
|
||
@region.cache_on_arguments(expiration_time=QUERIES_EXPIRATION_TIME)
|
||
def query(self, languages: Set[Language], video) -> list:
|
||
"""
|
||
Queries subtitles for the given video in the specified languages. The method takes into account whether
|
||
the video is an episode or a movie, searches using the appropriate parameters, and processes the results
|
||
to return a list of subtitles matching the search criteria.
|
||
|
||
:param languages: A set of `Language` objects specifying the languages for which subtitles are required.
|
||
:param video: A `Video` object (either a `Movie` or `Episode`) containing information about the video
|
||
for which subtitles are being searched.
|
||
:return: A list of `SubsourceSubtitle` objects representing the found subtitles that match
|
||
the provided criteria.
|
||
"""
|
||
self.video = video
|
||
if isinstance(self.video, Episode):
|
||
title = self.video.series
|
||
else:
|
||
title = self.video.title
|
||
|
||
if isinstance(self.video, Episode) and self.video.series_imdb_id:
|
||
imdb_id = self.video.series_imdb_id
|
||
title_id = self.search_titles(title, imdb_id, season=self.video.season)
|
||
elif isinstance(self.video, Movie) and self.video.imdb_id:
|
||
imdb_id = self.video.imdb_id
|
||
title_id = self.search_titles(title, imdb_id)
|
||
else:
|
||
title_id = None
|
||
|
||
if not title_id:
|
||
logger.debug('No title id found for this video')
|
||
return []
|
||
|
||
# we make sure to get only one language to search for
|
||
if len(languages):
|
||
language = list(languages)[0]
|
||
else:
|
||
return []
|
||
|
||
language_name = language_converters['subsource'].convert(language.alpha3, language.country, language.script)
|
||
|
||
logger.debug(f'Searching for this language: {language}')
|
||
|
||
parameters = (
|
||
('api_key', self.api_key),
|
||
('language', language_name.lower()),
|
||
('limit', 100),
|
||
('movieId', title_id)
|
||
)
|
||
|
||
# query the server
|
||
if isinstance(self.video, Episode):
|
||
parameters += (('seasonNumber', self.video.season), ('episodeNumber', self.video.episode))
|
||
res = self.retry(
|
||
lambda: self.session.get(self._server_url() + 'subtitles',
|
||
params=parameters,
|
||
timeout=30),
|
||
amount=retry_amount,
|
||
retry_timeout=retry_timeout
|
||
)
|
||
else:
|
||
res = self.retry(
|
||
lambda: self.session.get(self._server_url() + 'subtitles',
|
||
params=parameters,
|
||
timeout=30),
|
||
amount=retry_amount,
|
||
retry_timeout=retry_timeout
|
||
)
|
||
|
||
self._status_raiser(res)
|
||
|
||
subtitles = []
|
||
|
||
result = res.json()
|
||
|
||
if 'success' in result and not result['success']:
|
||
return []
|
||
|
||
logger.debug(f"Query returned {len(result['data'])} subtitles")
|
||
|
||
if len(result['data']):
|
||
for item in result['data']:
|
||
page_link = f"https://subsource.net{item['link']}"
|
||
is_forced = self._is_forced(item)
|
||
if is_forced and not language.forced:
|
||
continue
|
||
|
||
is_hi = self._is_hi(item)
|
||
if not is_hi and language.hi:
|
||
continue
|
||
|
||
if isinstance(video, Episode):
|
||
season, episode = self._get_season_episode_from_release_info(item['releaseInfo'])
|
||
if season == video.season and (not episode or episode == video.episode):
|
||
subtitle = SubsourceSubtitle(
|
||
language=Language.fromalpha3b(language_converters['subsource'].reverse(item['language']
|
||
.capitalize())[0]),
|
||
forced=is_forced,
|
||
hearing_impaired=is_hi,
|
||
page_link=page_link,
|
||
subtitles_id=item['subtitleId'],
|
||
release_names=item['releaseInfo'],
|
||
uploader=self._get_uploader_name(item),
|
||
season=season,
|
||
episode=episode,
|
||
asked_for_episode=video.episode,
|
||
is_pack=not episode,
|
||
)
|
||
else:
|
||
continue
|
||
else:
|
||
|
||
subtitle = SubsourceSubtitle(
|
||
language=Language.fromalpha3b(language_converters['subsource'].reverse(item['language']
|
||
.capitalize())[0]),
|
||
forced=is_forced,
|
||
hearing_impaired=is_hi,
|
||
page_link=page_link,
|
||
subtitles_id=item['subtitleId'],
|
||
release_names=item['releaseInfo'],
|
||
uploader=self._get_uploader_name(item),
|
||
)
|
||
|
||
subtitles.append(subtitle)
|
||
|
||
return subtitles
|
||
|
||
@staticmethod
|
||
def _is_hi(item: dict) -> bool:
|
||
"""
|
||
Checks if a given subtitle item uses hearing impairment captioning.
|
||
|
||
This static method evaluates the provided subtitle item dictionary to determine
|
||
if it satisfies conditions indicating it is related to hearing-impaired
|
||
captioning. The function checks against specific attributes and tags
|
||
within the item to tell whether it relates to hearing-impaired captioning.
|
||
|
||
:param item: A dictionary containing details about the subtitle item (tags
|
||
or commentary).
|
||
|
||
:return: A boolean value indicating whether the subtitle item is hearing-impaired.
|
||
:rtype: bool
|
||
"""
|
||
if 'hearingImpaired' in item and item['hearingImpaired']:
|
||
return True
|
||
|
||
# Comments include specific mention of removed or non HI
|
||
non_hi_tag = ['hi remove', 'non hi', 'nonhi', 'non-hi', 'non-sdh', 'non sdh', 'nonsdh', 'sdh remove']
|
||
if isinstance(item.get('commentary'), str) and any(x in item.get('commentary', '').lower() for x in non_hi_tag):
|
||
return False
|
||
|
||
# Commentaries include some specific strings
|
||
hi_tag = ['_hi_', ' hi ', '.hi.', 'hi ', ' hi', 'sdh', '𝓢𝓓𝓗', '_cc_', ' cc ', '.cc.', 'closed caption']
|
||
if isinstance(item.get('commentary'), str) and any(x in item.get('commentary', '').lower() for x in hi_tag):
|
||
return True
|
||
|
||
# nothing match so we consider it as non-HI
|
||
return False
|
||
|
||
@staticmethod
|
||
def _is_forced(item: dict) -> bool:
|
||
"""
|
||
Determines whether the given subtitle item is marked as "forced".
|
||
|
||
This utility method checks if the subtitle item contains specific identifiers
|
||
that indicate it is forced, such as the presence of "foreignParts" or predefined
|
||
keywords in its commentary.
|
||
|
||
:param item: Dictionary representing the subtitle item to be checked.
|
||
The dictionary may include keys such as `'foreignParts'`
|
||
and `'commentary'`.
|
||
:type item: dict
|
||
:return: True if the subtitle item is determined to be forced, otherwise False.
|
||
:rtype: bool
|
||
"""
|
||
if 'foreignParts' in item and item['foreignParts']:
|
||
return True
|
||
|
||
# Comments include specific mention of forced subtitles
|
||
forced_tags = ['forced', 'foreign']
|
||
if isinstance(item.get('commentary'), str) and any(x in item.get('commentary', '').lower() for x in forced_tags):
|
||
return True
|
||
|
||
# nothing match so we consider it as normal subtitles
|
||
return False
|
||
|
||
@staticmethod
|
||
def _get_uploader_name(item: dict) -> str:
|
||
"""
|
||
Returns the display name of the uploader based on the given subtitle item
|
||
dictionary.
|
||
|
||
This method identifies the uploader by matching the contributor ID with the
|
||
uploader ID from the provided item. If a match is found, the display name of
|
||
the uploader is returned. If there is no match, an empty string is returned.
|
||
|
||
:param item: Dictionary containing contributor details and uploader ID.
|
||
:type item: dict
|
||
:return: The display name of the uploader if found; otherwise, an empty string.
|
||
:rtype: str
|
||
"""
|
||
for contributor in item['contributors']:
|
||
if contributor['id'] == item['uploaderId']:
|
||
return contributor['displayname']
|
||
return ''
|
||
|
||
@staticmethod
|
||
def _status_raiser(response: Response):
|
||
"""
|
||
Raises exceptions based on the HTTP response status code received.
|
||
|
||
Intercepts the response and raises specific exceptions for various HTTP
|
||
status codes to indicate the type of error condition encountered.
|
||
If the response status code is neither explicitly handled nor 200
|
||
(OK), it will invoke the `raise_for_status` method on the `Response`
|
||
object.
|
||
|
||
:param response: A `Response` object from an HTTP request.
|
||
:type response: Response
|
||
:raises APIThrottled: If the status code is 400, indicating invalid
|
||
request parameters.
|
||
:raises AuthenticationError: If the status code is 401, indicating
|
||
authentication is required.
|
||
:raises ForbiddenError: If the status code is 403, indicating access
|
||
is denied to a resource.
|
||
:raises TooManyRequests: If the status code is 429, indicating a rate
|
||
limit has been exceeded.
|
||
:raises HTTPError: If the status code is not 200 and is not explicitly
|
||
handled by any of the listed exceptions.
|
||
"""
|
||
if response.status_code == 400:
|
||
raise APIThrottled("Invalid request parameters")
|
||
elif response.status_code == 401:
|
||
raise AuthenticationError("Authentication required")
|
||
elif response.status_code == 403:
|
||
raise ForbiddenError("Access denied")
|
||
elif response.status_code == 429:
|
||
raise TooManyRequests("Rate limit exceeded")
|
||
elif response.status_code != 200:
|
||
response.raise_for_status()
|
||
|
||
@staticmethod
|
||
def _get_season_episode_from_release_info(releases_info: list) -> tuple:
|
||
"""
|
||
Extracts season and episode details from a list of release information strings.
|
||
|
||
This static method takes a list of release information strings and uses the `guessit`
|
||
library to extract season and episode numbers. If season and episode numbers are
|
||
found in the release information, they are returned as a tuple. The method stops
|
||
processing once both season and episode values are identified.
|
||
|
||
:param releases_info: A list of strings containing release information from which
|
||
season and episode numbers are to be extracted.
|
||
:type releases_info: list
|
||
:return: A tuple containing the extracted season and episode numbers, or (None, None)
|
||
if they could not be determined.
|
||
:rtype: tuple
|
||
"""
|
||
season = None
|
||
episode = None
|
||
if isinstance(releases_info, list):
|
||
for release_info in releases_info:
|
||
if season and episode:
|
||
break
|
||
|
||
guessed = guessit(release_info, {"type": "episode", "includes": ["season", "episode"]})
|
||
|
||
if not season and 'season' in guessed and guessed['season']:
|
||
season = guessed['season']
|
||
if not episode and 'episode' in guessed and guessed['episode']:
|
||
episode = guessed['episode']
|
||
return season, episode
|
||
|
||
def list_subtitles(self, video: Video, languages: Set[Language]) -> list:
|
||
"""
|
||
List all subtitles available for a given video in specified languages.
|
||
|
||
This function queries available subtitles for the provided video and returns
|
||
them in a list. It supports filtering by a set of specified languages.
|
||
|
||
:param video: The video object for which subtitles need to be listed.
|
||
:type video: Video
|
||
:param languages: The set of languages to filter the list of subtitles.
|
||
:type languages: Set[Language]
|
||
:return: A list of subtitles filtered by the specified languages.
|
||
:rtype: list
|
||
"""
|
||
return self.query(languages, video)
|
||
|
||
def download_subtitle(self, subtitle: SubsourceSubtitle) -> SubsourceSubtitle:
|
||
"""
|
||
Downloads a subtitle file from the provider's API server. This function
|
||
constructs a download URL for the given subtitle, retrieves the
|
||
archive content, and extracts the subtitle data if it is a valid zip
|
||
file.
|
||
|
||
If the subtitle cannot be downloaded or extracted, the content of the
|
||
subtitle will be set to None.
|
||
|
||
:param subtitle: The subtitle object to download.
|
||
:type subtitle: SubsourceSubtitle
|
||
:return: The subtitle object after attempting to download its content.
|
||
:rtype: SubsourceSubtitle
|
||
"""
|
||
logger.debug('Downloading subtitle %r', subtitle)
|
||
download_link = self._server_url() + f"subtitles/{subtitle.id}/download"
|
||
|
||
r = self._get_subtitles_archive(download_link)
|
||
|
||
self._status_raiser(r)
|
||
|
||
if not r:
|
||
logger.error(f'Could not download subtitle from {download_link}')
|
||
subtitle.content = None
|
||
else:
|
||
archive_stream = io.BytesIO(r.content)
|
||
if is_zipfile(archive_stream):
|
||
archive = ZipFile(archive_stream)
|
||
subtitle.content = self.get_subtitle_from_archive(subtitle, archive)
|
||
else:
|
||
logger.error(f'Could not unzip subtitle from {download_link}')
|
||
subtitle.content = None
|
||
|
||
return subtitle
|
||
|
||
@region.cache_on_arguments(expiration_time=ARCHIVES_EXPIRATION_TIME)
|
||
def _get_subtitles_archive(self, download_link: str) -> Response:
|
||
"""
|
||
Fetches a subtitle archive from the given download link. The method uses caching
|
||
to store the result for a defined expiration period and retries the network
|
||
request upon failure due to transient issues.
|
||
|
||
:param download_link: The URL for the subtitles archive to download.
|
||
:type download_link: str
|
||
:return: The HTTP response object containing the subtitle archive.
|
||
:rtype: Response
|
||
"""
|
||
return self.retry(
|
||
lambda: self.session.get(download_link, params={'api_key': self.api_key}, timeout=30),
|
||
amount=retry_amount,
|
||
retry_timeout=retry_timeout
|
||
)
|