mirror of
https://github.com/simple-login/app.git
synced 2025-10-06 05:17:41 +08:00
Proper check of mx domains (#2382)
* Proper check of mx domains * Moved proton servers to config * Added function * Make sure we have a global dns * Typo
This commit is contained in:
parent
ffb070cd19
commit
e17ae4210b
4 changed files with 77 additions and 14 deletions
|
@ -62,6 +62,17 @@ def get_env_dict(env_var: str) -> dict[str, str]:
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def get_env_csv(env_var: str, default: Optional[str]) -> list[str]:
|
||||||
|
"""
|
||||||
|
Get an env variable and convert it into a list of strings separated by,
|
||||||
|
Syntax is: val1,val2
|
||||||
|
"""
|
||||||
|
value = os.getenv(env_var, default)
|
||||||
|
if not value:
|
||||||
|
return []
|
||||||
|
return [field.strip() for field in value.split(",") if field.strip()]
|
||||||
|
|
||||||
|
|
||||||
config_file = os.environ.get("CONFIG")
|
config_file = os.environ.get("CONFIG")
|
||||||
if config_file:
|
if config_file:
|
||||||
config_file = get_abs_path(config_file)
|
config_file = get_abs_path(config_file)
|
||||||
|
@ -171,6 +182,14 @@ FIRST_ALIAS_DOMAIN = os.environ.get("FIRST_ALIAS_DOMAIN") or EMAIL_DOMAIN
|
||||||
# e.g. [(10, "mx1.hostname."), (10, "mx2.hostname.")]
|
# e.g. [(10, "mx1.hostname."), (10, "mx2.hostname.")]
|
||||||
EMAIL_SERVERS_WITH_PRIORITY = sl_getenv("EMAIL_SERVERS_WITH_PRIORITY")
|
EMAIL_SERVERS_WITH_PRIORITY = sl_getenv("EMAIL_SERVERS_WITH_PRIORITY")
|
||||||
|
|
||||||
|
PROTON_MX_SERVERS = get_env_csv(
|
||||||
|
"PROTON_MX_SERVERS", "mail.protonmail.ch., mailsec.protonmail.ch."
|
||||||
|
)
|
||||||
|
|
||||||
|
PROTON_EMAIL_DOMAINS = get_env_csv(
|
||||||
|
"PROTON_EMAIL_DOMAINS", "proton.me, protonmail.com, protonmail.ch, proton.ch, pm.me"
|
||||||
|
)
|
||||||
|
|
||||||
# disable the alias suffix, i.e. the ".random_word" part
|
# disable the alias suffix, i.e. the ".random_word" part
|
||||||
DISABLE_ALIAS_SUFFIX = "DISABLE_ALIAS_SUFFIX" in os.environ
|
DISABLE_ALIAS_SUFFIX = "DISABLE_ALIAS_SUFFIX" in os.environ
|
||||||
|
|
||||||
|
|
|
@ -115,9 +115,20 @@ class InMemoryDNSClient(DNSClient):
|
||||||
return self.txt_records.get(hostname, [])
|
return self.txt_records.get(hostname, [])
|
||||||
|
|
||||||
|
|
||||||
def get_network_dns_client() -> NetworkDNSClient:
|
global_dns_client: Optional[DNSClient] = None
|
||||||
|
|
||||||
|
|
||||||
|
def get_network_dns_client() -> DNSClient:
|
||||||
|
global global_dns_client
|
||||||
|
if global_dns_client is not None:
|
||||||
|
return global_dns_client
|
||||||
return NetworkDNSClient(NAMESERVERS)
|
return NetworkDNSClient(NAMESERVERS)
|
||||||
|
|
||||||
|
|
||||||
|
def set_global_dns_client(dns_client: Optional[DNSClient]):
|
||||||
|
global global_dns_client
|
||||||
|
global_dns_client = dns_client
|
||||||
|
|
||||||
|
|
||||||
def get_mx_domains(hostname: str) -> dict[int, list[str]]:
|
def get_mx_domains(hostname: str) -> dict[int, list[str]]:
|
||||||
return get_network_dns_client().get_mx_domains(hostname)
|
return get_network_dns_client().get_mx_domains(hostname)
|
||||||
|
|
|
@ -2838,24 +2838,20 @@ class Mailbox(Base, ModelMixin):
|
||||||
return len(alias_ids)
|
return len(alias_ids)
|
||||||
|
|
||||||
def is_proton(self) -> bool:
|
def is_proton(self) -> bool:
|
||||||
if (
|
for proton_email_domain in config.PROTON_EMAIL_DOMAINS:
|
||||||
self.email.endswith("@proton.me")
|
if self.email.endswith(f"@{proton_email_domain}"):
|
||||||
or self.email.endswith("@protonmail.com")
|
return True
|
||||||
or self.email.endswith("@protonmail.ch")
|
|
||||||
or self.email.endswith("@proton.ch")
|
|
||||||
or self.email.endswith("@pm.me")
|
|
||||||
):
|
|
||||||
return True
|
|
||||||
|
|
||||||
from app.email_utils import get_email_local_part
|
from app.email_utils import get_email_local_part
|
||||||
|
|
||||||
mx_domains = get_mx_domains(get_email_local_part(self.email))
|
mx_domains = get_mx_domains(get_email_local_part(self.email))
|
||||||
|
|
||||||
|
proton_mx_domains = config.PROTON_MX_SERVERS
|
||||||
# Proton is the first domain
|
# Proton is the first domain
|
||||||
if mx_domains and mx_domains[0].domain in (
|
for prio in mx_domains:
|
||||||
"mail.protonmail.ch.",
|
for mx_domain in mx_domains[prio]:
|
||||||
"mailsec.protonmail.ch.",
|
if mx_domain in proton_mx_domains:
|
||||||
):
|
return True
|
||||||
return True
|
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
37
tests/models/test_mailbox.py
Normal file
37
tests/models/test_mailbox.py
Normal file
|
@ -0,0 +1,37 @@
|
||||||
|
from app import config
|
||||||
|
from app.dns_utils import set_global_dns_client, InMemoryDNSClient
|
||||||
|
from app.email_utils import get_email_local_part
|
||||||
|
from app.models import Mailbox
|
||||||
|
from tests.utils import create_new_user, random_email
|
||||||
|
|
||||||
|
dns_client = InMemoryDNSClient()
|
||||||
|
|
||||||
|
|
||||||
|
def setup_module():
|
||||||
|
set_global_dns_client(dns_client)
|
||||||
|
|
||||||
|
|
||||||
|
def teardown_module():
|
||||||
|
set_global_dns_client(None)
|
||||||
|
|
||||||
|
|
||||||
|
def test_is_proton_with_email_domain():
|
||||||
|
user = create_new_user()
|
||||||
|
mailbox = Mailbox.create(
|
||||||
|
user_id=user.id, email=f"test@{config.PROTON_EMAIL_DOMAINS[0]}"
|
||||||
|
)
|
||||||
|
assert mailbox.is_proton()
|
||||||
|
mailbox = Mailbox.create(user_id=user.id, email="a@b.c")
|
||||||
|
assert not mailbox.is_proton()
|
||||||
|
|
||||||
|
|
||||||
|
def test_is_proton_with_mx_domain():
|
||||||
|
email = random_email()
|
||||||
|
dns_client.set_mx_records(
|
||||||
|
get_email_local_part(email), {10: config.PROTON_MX_SERVERS}
|
||||||
|
)
|
||||||
|
user = create_new_user()
|
||||||
|
mailbox = Mailbox.create(user_id=user.id, email=email)
|
||||||
|
assert mailbox.is_proton()
|
||||||
|
dns_client.set_mx_records(get_email_local_part(email), {10: ["nowhere.net"]})
|
||||||
|
assert not mailbox.is_proton()
|
Loading…
Add table
Reference in a new issue