2022-01-24 12:07:52 +08:00
|
|
|
import logging
|
2023-03-23 09:36:58 +08:00
|
|
|
from codecs import open as codecs_open
|
|
|
|
from typing import Dict, ItemsView, Optional, Union
|
2020-07-31 17:39:25 +08:00
|
|
|
from urllib.request import urlopen
|
|
|
|
|
2023-03-23 09:36:58 +08:00
|
|
|
from .exceptions import TldImproperlyConfigured, TldIOError
|
2020-07-31 17:39:25 +08:00
|
|
|
from .helpers import project_dir
|
|
|
|
|
2022-01-24 12:07:52 +08:00
|
|
|
__author__ = "Artur Barseghyan"
|
2023-03-23 09:36:58 +08:00
|
|
|
__copyright__ = "2013-2023 Artur Barseghyan"
|
2022-01-24 12:07:52 +08:00
|
|
|
__license__ = "MPL-1.1 OR GPL-2.0-only OR LGPL-2.1-or-later"
|
|
|
|
__all__ = (
|
|
|
|
"BaseTLDSourceParser",
|
|
|
|
"Registry",
|
|
|
|
)
|
|
|
|
|
|
|
|
LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
class Registry(type):
|
|
|
|
|
|
|
|
REGISTRY: Dict[str, "BaseTLDSourceParser"] = {}
|
|
|
|
|
|
|
|
def __new__(mcs, name, bases, attrs):
|
|
|
|
new_cls = type.__new__(mcs, name, bases, attrs)
|
|
|
|
# Here the name of the class is used as key but it could be any class
|
|
|
|
# parameter.
|
|
|
|
if getattr(new_cls, "_uid", None):
|
|
|
|
mcs.REGISTRY[new_cls._uid] = new_cls
|
|
|
|
return new_cls
|
|
|
|
|
|
|
|
@property
|
|
|
|
def _uid(cls) -> str:
|
|
|
|
return getattr(cls, "uid", cls.__name__)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def reset(mcs) -> None:
|
|
|
|
mcs.REGISTRY = {}
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def get(
|
|
|
|
mcs, key: str, default: "BaseTLDSourceParser" = None
|
|
|
|
) -> Union["BaseTLDSourceParser", None]:
|
|
|
|
return mcs.REGISTRY.get(key, default)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def items(mcs) -> ItemsView[str, "BaseTLDSourceParser"]:
|
|
|
|
return mcs.REGISTRY.items()
|
|
|
|
|
|
|
|
# @classmethod
|
|
|
|
# def get_registry(mcs) -> Dict[str, Type]:
|
|
|
|
# return dict(mcs.REGISTRY)
|
|
|
|
#
|
|
|
|
# @classmethod
|
|
|
|
# def pop(mcs, uid) -> None:
|
|
|
|
# mcs.REGISTRY.pop(uid)
|
2020-07-31 17:39:25 +08:00
|
|
|
|
|
|
|
|
|
|
|
class BaseTLDSourceParser(metaclass=Registry):
|
|
|
|
"""Base TLD source parser."""
|
|
|
|
|
|
|
|
uid: Optional[str] = None
|
|
|
|
source_url: str
|
|
|
|
local_path: str
|
2022-01-24 12:07:52 +08:00
|
|
|
include_private: bool = True
|
2020-07-31 17:39:25 +08:00
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def validate(cls):
|
|
|
|
"""Constructor."""
|
|
|
|
if not cls.uid:
|
|
|
|
raise TldImproperlyConfigured(
|
|
|
|
"The `uid` property of the TLD source parser shall be defined."
|
|
|
|
)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def get_tld_names(cls, fail_silently: bool = False, retry_count: int = 0):
|
|
|
|
"""Get tld names.
|
|
|
|
|
|
|
|
:param fail_silently:
|
|
|
|
:param retry_count:
|
|
|
|
:return:
|
|
|
|
"""
|
|
|
|
cls.validate()
|
|
|
|
raise NotImplementedError(
|
|
|
|
"Your TLD source parser shall implement `get_tld_names` method."
|
|
|
|
)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def update_tld_names(cls, fail_silently: bool = False) -> bool:
|
|
|
|
"""Update the local copy of the TLD file.
|
|
|
|
|
|
|
|
:param fail_silently:
|
|
|
|
:return:
|
|
|
|
"""
|
|
|
|
try:
|
|
|
|
remote_file = urlopen(cls.source_url)
|
2022-01-24 12:07:52 +08:00
|
|
|
local_file_abs_path = project_dir(cls.local_path)
|
2023-03-23 09:36:58 +08:00
|
|
|
local_file = codecs_open(local_file_abs_path, "wb", encoding="utf8")
|
2022-01-24 12:07:52 +08:00
|
|
|
local_file.write(remote_file.read().decode("utf8"))
|
2020-07-31 17:39:25 +08:00
|
|
|
local_file.close()
|
|
|
|
remote_file.close()
|
2023-03-23 09:36:58 +08:00
|
|
|
LOGGER.info(
|
2022-01-24 12:07:52 +08:00
|
|
|
f"Fetched '{cls.source_url}' as '{local_file_abs_path}'"
|
|
|
|
)
|
2020-07-31 17:39:25 +08:00
|
|
|
except Exception as err:
|
2023-03-23 09:36:58 +08:00
|
|
|
LOGGER.error(
|
2022-01-24 12:07:52 +08:00
|
|
|
f"Failed fetching '{cls.source_url}'. Reason: {str(err)}"
|
|
|
|
)
|
2020-07-31 17:39:25 +08:00
|
|
|
if fail_silently:
|
|
|
|
return False
|
|
|
|
raise TldIOError(err)
|
|
|
|
|
|
|
|
return True
|