bazarr/custom_libs/subliminal_patch/providers/subsource.py

569 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
from __future__ import annotations
import logging
import os
import time
import io
import datetime
from typing import Set
from typing import Optional, TYPE_CHECKING
from babelfish import language_converters
from zipfile import ZipFile, is_zipfile
from requests import Session, Response
from guessit import guessit
from subzero.language import Language
from subliminal import Episode, Movie
from subliminal.cache import region
from subliminal.exceptions import AuthenticationError, ConfigurationError
from subliminal_patch.exceptions import APIThrottled, ForbiddenError, TooManyRequests
from .mixins import ProviderRetryMixin
from subliminal_patch.subtitle import Subtitle
from subliminal_patch.providers import Provider, utils
from subliminal_patch.providers.mixins import ProviderSubtitleArchiveMixin
if TYPE_CHECKING:
from subliminal_patch import Video
logger = logging.getLogger(__name__)
TITLES_EXPIRATION_TIME = datetime.timedelta(hours=6).total_seconds()
QUERIES_EXPIRATION_TIME = datetime.timedelta(hours=1).total_seconds()
ARCHIVES_EXPIRATION_TIME = datetime.timedelta(minutes=15).total_seconds()
retry_amount = 3
retry_timeout = 5
language_converters.register('subsource = subliminal_patch.converters.subsource:SubsourceConverter')
supported_languages = list(language_converters['subsource'].to_subsource.keys())
class SubsourceSubtitle(Subtitle):
provider_name = 'subsource'
hash_verifiable = False
hearing_impaired_verifiable = True
def __init__(self, language, forced, hearing_impaired, page_link, subtitles_id, release_names, uploader,
season=None, episode=None, asked_for_episode=None, is_pack=False):
super().__init__(language)
language = Language.rebuild(language, hi=hearing_impaired, forced=forced)
self.releases = release_names
self.release_info = ', '.join(release_names)
self.language = language
self.forced = forced
self.hearing_impaired = hearing_impaired
self.subtitles_id = subtitles_id
self.page_link = page_link
self.download_link = None
self.uploader = uploader
self.matches = None
self.season = season
self.episode = episode
self.asked_for_episode = asked_for_episode
self.is_pack = is_pack
@property
def id(self) -> int:
return self.subtitles_id
def get_matches(self, video: Video) -> Set[str]:
"""
Analyzes the given subtitles and identifies relevant attributes or associations
by updating the matches set. Handles movies and series differently to
match relevant attributes like title, IMDb identifiers, and pack type.
:param video: A video instance, typically a movie or a series episode, to
analyze for matches.
:return: A set of strings representing identified attributes or matches
relevant to the given video.
"""
matches = set()
utils.update_matches(matches, video, self.release_info)
# handle movies and series separately
if isinstance(video, Episode):
# series
matches.add('series')
# imdb
matches.add('series_imdb_id')
# season pack
if self.is_pack:
matches.add('episode')
else:
# title
matches.add('title')
# imdb
matches.add('imdb_id')
self.matches = matches
return matches
class SubsourceProvider(ProviderRetryMixin, Provider, ProviderSubtitleArchiveMixin):
"""Subsource Provider"""
server_hostname = 'api.subsource.net'
languages = {Language(*lang) for lang in supported_languages}
languages.update(set(Language.rebuild(lang, forced=True) for lang in languages))
languages.update(set(Language.rebuild(lang, hi=True) for lang in languages))
video_types = (Episode, Movie)
def __init__(self, api_key=None):
if not api_key:
raise ConfigurationError('Api_key must be specified')
self.session = Session()
self.session.headers = {'User-Agent': os.environ.get("SZ_USER_AGENT", "Sub-Zero/2")}
self.api_key = api_key
self.video = None
self._started = None
def initialize(self):
self._started = time.time()
def terminate(self):
self.session.close()
def _server_url(self) -> str:
return f'https://{self.server_hostname}/api/v1/'
@region.cache_on_arguments(expiration_time=TITLES_EXPIRATION_TIME)
def search_titles(self, title: str, imdb_id: str, season: int = None) -> Optional[int]:
"""
Searches for the ID of a movie or TV show title on an external database using either title, IMDb ID,
and optionally the season number. The method sends a request to the provider's API server, deserializes
the response, and attempts to find a matching title based on the provided parameters.
:param title: The name of the title to search for, provided as a string.
:type title: str
:param imdb_id: The IMDb ID of the title to search for.
:type imdb_id: str
:param season: (Optional) The season number if the search is for a TV show. Defaults to None.
:type season: int, optional
:return: The ID of the movie or show if found, otherwise None.
:rtype: Optional[int]
"""
title_id = None
if imdb_id:
parameters = {
'api_key': self.api_key,
'searchType': 'imdb',
'imdb': imdb_id,
}
logger.debug(f'Searching using this imdb ID: {imdb_id}')
else:
parameters = {
'api_key': self.api_key,
'searchType': 'text',
'q': title.lower(),
}
logger.debug(f'Searching using this title: {title}')
if season:
parameters['season'] = season
results = self.retry(
lambda: self.session.get(self._server_url() + 'movies/search', params=parameters, timeout=30),
amount=retry_amount,
retry_timeout=retry_timeout
)
self._status_raiser(results)
# deserialize results
results_dict = results.json()['data']
def get_alternative_titles(video):
titles = set()
if isinstance(video, Episode):
if video.series:
titles.add(video.series)
for alt in video.alternative_series or []:
titles.add(alt)
else:
if video.title:
titles.add(video.title)
return {t.lower() for t in titles if t}
alternative_titles = get_alternative_titles(self.video)
logger.debug(f"alternative titles: {alternative_titles}")
# loop over results
for result in results_dict:
if 'title' not in result or 'releaseYear' not in result:
continue
sub_titles = {result['title'].lower()}
logger.debug(f"Subsource titles: {sub_titles}")
if result.get('alternateTitle'):
sub_titles.add(result['alternateTitle'].lower())
matched = False
for alternative_title in alternative_titles:
for sub in sub_titles:
if alternative_title in sub:
matched = True
if matched:
break
if matched:
if not self.video.year or self.video.year == int(result['releaseYear']):
title_id = result['movieId']
break
else:
continue
if title_id:
logger.debug(f'Found this title ID: {title_id}')
else:
logger.debug(f'No match found for {title}')
return title_id
@region.cache_on_arguments(expiration_time=QUERIES_EXPIRATION_TIME)
def query(self, languages: Set[Language], video) -> list:
"""
Queries subtitles for the given video in the specified languages. The method takes into account whether
the video is an episode or a movie, searches using the appropriate parameters, and processes the results
to return a list of subtitles matching the search criteria.
:param languages: A set of `Language` objects specifying the languages for which subtitles are required.
:param video: A `Video` object (either a `Movie` or `Episode`) containing information about the video
for which subtitles are being searched.
:return: A list of `SubsourceSubtitle` objects representing the found subtitles that match
the provided criteria.
"""
self.video = video
if isinstance(self.video, Episode):
title = self.video.series
else:
title = self.video.title
if isinstance(self.video, Episode) and self.video.series_imdb_id:
imdb_id = self.video.series_imdb_id
title_id = self.search_titles(title, imdb_id, season=self.video.season)
elif isinstance(self.video, Movie) and self.video.imdb_id:
imdb_id = self.video.imdb_id
title_id = self.search_titles(title, imdb_id)
else:
title_id = None
if not title_id:
logger.debug('No title id found for this video')
return []
# we make sure to get only one language to search for
if len(languages):
language = list(languages)[0]
else:
return []
language_name = language_converters['subsource'].convert(language.alpha3, language.country, language.script)
logger.debug(f'Searching for this language: {language}')
parameters = (
('api_key', self.api_key),
('language', language_name.lower()),
('limit', 100),
('movieId', title_id)
)
# query the server
if isinstance(self.video, Episode):
parameters += (('seasonNumber', self.video.season), ('episodeNumber', self.video.episode))
res = self.retry(
lambda: self.session.get(self._server_url() + 'subtitles',
params=parameters,
timeout=30),
amount=retry_amount,
retry_timeout=retry_timeout
)
else:
res = self.retry(
lambda: self.session.get(self._server_url() + 'subtitles',
params=parameters,
timeout=30),
amount=retry_amount,
retry_timeout=retry_timeout
)
self._status_raiser(res)
subtitles = []
result = res.json()
if 'success' in result and not result['success']:
return []
logger.debug(f"Query returned {len(result['data'])} subtitles")
if len(result['data']):
for item in result['data']:
page_link = f"https://subsource.net{item['link']}"
is_forced = self._is_forced(item)
if is_forced and not language.forced:
continue
is_hi = self._is_hi(item)
if not is_hi and language.hi:
continue
if isinstance(video, Episode):
season, episode = self._get_season_episode_from_release_info(item['releaseInfo'])
if season == video.season and (not episode or episode == video.episode):
subtitle = SubsourceSubtitle(
language=Language.fromalpha3b(language_converters['subsource'].reverse(item['language']
.capitalize())[0]),
forced=is_forced,
hearing_impaired=is_hi,
page_link=page_link,
subtitles_id=item['subtitleId'],
release_names=item['releaseInfo'],
uploader=self._get_uploader_name(item),
season=season,
episode=episode,
asked_for_episode=video.episode,
is_pack=not episode,
)
else:
continue
else:
subtitle = SubsourceSubtitle(
language=Language.fromalpha3b(language_converters['subsource'].reverse(item['language']
.capitalize())[0]),
forced=is_forced,
hearing_impaired=is_hi,
page_link=page_link,
subtitles_id=item['subtitleId'],
release_names=item['releaseInfo'],
uploader=self._get_uploader_name(item),
)
subtitles.append(subtitle)
return subtitles
@staticmethod
def _is_hi(item: dict) -> bool:
"""
Checks if a given subtitle item uses hearing impairment captioning.
This static method evaluates the provided subtitle item dictionary to determine
if it satisfies conditions indicating it is related to hearing-impaired
captioning. The function checks against specific attributes and tags
within the item to tell whether it relates to hearing-impaired captioning.
:param item: A dictionary containing details about the subtitle item (tags
or commentary).
:return: A boolean value indicating whether the subtitle item is hearing-impaired.
:rtype: bool
"""
if 'hearingImpaired' in item and item['hearingImpaired']:
return True
# Comments include specific mention of removed or non HI
non_hi_tag = ['hi remove', 'non hi', 'nonhi', 'non-hi', 'non-sdh', 'non sdh', 'nonsdh', 'sdh remove']
if isinstance(item.get('commentary'), str) and any(x in item.get('commentary', '').lower() for x in non_hi_tag):
return False
# Commentaries include some specific strings
hi_tag = ['_hi_', ' hi ', '.hi.', 'hi ', ' hi', 'sdh', '𝓢𝓓𝓗', '_cc_', ' cc ', '.cc.', 'closed caption']
if isinstance(item.get('commentary'), str) and any(x in item.get('commentary', '').lower() for x in hi_tag):
return True
# nothing match so we consider it as non-HI
return False
@staticmethod
def _is_forced(item: dict) -> bool:
"""
Determines whether the given subtitle item is marked as "forced".
This utility method checks if the subtitle item contains specific identifiers
that indicate it is forced, such as the presence of "foreignParts" or predefined
keywords in its commentary.
:param item: Dictionary representing the subtitle item to be checked.
The dictionary may include keys such as `'foreignParts'`
and `'commentary'`.
:type item: dict
:return: True if the subtitle item is determined to be forced, otherwise False.
:rtype: bool
"""
if 'foreignParts' in item and item['foreignParts']:
return True
# Comments include specific mention of forced subtitles
forced_tags = ['forced', 'foreign']
if isinstance(item.get('commentary'), str) and any(x in item.get('commentary', '').lower() for x in forced_tags):
return True
# nothing match so we consider it as normal subtitles
return False
@staticmethod
def _get_uploader_name(item: dict) -> str:
"""
Returns the display name of the uploader based on the given subtitle item
dictionary.
This method identifies the uploader by matching the contributor ID with the
uploader ID from the provided item. If a match is found, the display name of
the uploader is returned. If there is no match, an empty string is returned.
:param item: Dictionary containing contributor details and uploader ID.
:type item: dict
:return: The display name of the uploader if found; otherwise, an empty string.
:rtype: str
"""
for contributor in item['contributors']:
if contributor['id'] == item['uploaderId']:
return contributor['displayname']
return ''
@staticmethod
def _status_raiser(response: Response):
"""
Raises exceptions based on the HTTP response status code received.
Intercepts the response and raises specific exceptions for various HTTP
status codes to indicate the type of error condition encountered.
If the response status code is neither explicitly handled nor 200
(OK), it will invoke the `raise_for_status` method on the `Response`
object.
:param response: A `Response` object from an HTTP request.
:type response: Response
:raises APIThrottled: If the status code is 400, indicating invalid
request parameters.
:raises AuthenticationError: If the status code is 401, indicating
authentication is required.
:raises ForbiddenError: If the status code is 403, indicating access
is denied to a resource.
:raises TooManyRequests: If the status code is 429, indicating a rate
limit has been exceeded.
:raises HTTPError: If the status code is not 200 and is not explicitly
handled by any of the listed exceptions.
"""
if response.status_code == 400:
raise APIThrottled("Invalid request parameters")
elif response.status_code == 401:
raise AuthenticationError("Authentication required")
elif response.status_code == 403:
raise ForbiddenError("Access denied")
elif response.status_code == 429:
raise TooManyRequests("Rate limit exceeded")
elif response.status_code != 200:
response.raise_for_status()
@staticmethod
def _get_season_episode_from_release_info(releases_info: list) -> tuple:
"""
Extracts season and episode details from a list of release information strings.
This static method takes a list of release information strings and uses the `guessit`
library to extract season and episode numbers. If season and episode numbers are
found in the release information, they are returned as a tuple. The method stops
processing once both season and episode values are identified.
:param releases_info: A list of strings containing release information from which
season and episode numbers are to be extracted.
:type releases_info: list
:return: A tuple containing the extracted season and episode numbers, or (None, None)
if they could not be determined.
:rtype: tuple
"""
season = None
episode = None
if isinstance(releases_info, list):
for release_info in releases_info:
if season and episode:
break
guessed = guessit(release_info, {"type": "episode", "includes": ["season", "episode"]})
if not season and 'season' in guessed and guessed['season']:
season = guessed['season']
if not episode and 'episode' in guessed and guessed['episode']:
episode = guessed['episode']
return season, episode
def list_subtitles(self, video: Video, languages: Set[Language]) -> list:
"""
List all subtitles available for a given video in specified languages.
This function queries available subtitles for the provided video and returns
them in a list. It supports filtering by a set of specified languages.
:param video: The video object for which subtitles need to be listed.
:type video: Video
:param languages: The set of languages to filter the list of subtitles.
:type languages: Set[Language]
:return: A list of subtitles filtered by the specified languages.
:rtype: list
"""
return self.query(languages, video)
def download_subtitle(self, subtitle: SubsourceSubtitle) -> SubsourceSubtitle:
"""
Downloads a subtitle file from the provider's API server. This function
constructs a download URL for the given subtitle, retrieves the
archive content, and extracts the subtitle data if it is a valid zip
file.
If the subtitle cannot be downloaded or extracted, the content of the
subtitle will be set to None.
:param subtitle: The subtitle object to download.
:type subtitle: SubsourceSubtitle
:return: The subtitle object after attempting to download its content.
:rtype: SubsourceSubtitle
"""
logger.debug('Downloading subtitle %r', subtitle)
download_link = self._server_url() + f"subtitles/{subtitle.id}/download"
r = self._get_subtitles_archive(download_link)
self._status_raiser(r)
if not r:
logger.error(f'Could not download subtitle from {download_link}')
subtitle.content = None
else:
archive_stream = io.BytesIO(r.content)
if is_zipfile(archive_stream):
archive = ZipFile(archive_stream)
subtitle.content = self.get_subtitle_from_archive(subtitle, archive)
else:
logger.error(f'Could not unzip subtitle from {download_link}')
subtitle.content = None
return subtitle
@region.cache_on_arguments(expiration_time=ARCHIVES_EXPIRATION_TIME)
def _get_subtitles_archive(self, download_link: str) -> Response:
"""
Fetches a subtitle archive from the given download link. The method uses caching
to store the result for a defined expiration period and retries the network
request upon failure due to transient issues.
:param download_link: The URL for the subtitles archive to download.
:type download_link: str
:return: The HTTP response object containing the subtitle archive.
:rtype: Response
"""
return self.retry(
lambda: self.session.get(download_link, params={'api_key': self.api_key}, timeout=30),
amount=retry_amount,
retry_timeout=retry_timeout
)