Multi domain check (#2372)

* Accept partner and sl domains

* fix

* Fix html

* properly calculate mx_domains

* Improve custom domain admin template

* reformat
This commit is contained in:
Adrià Casajús 2025-01-31 17:36:35 +01:00 committed by GitHub
parent bb6300944d
commit a609b3b312
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 414 additions and 261 deletions

View file

@ -1,21 +1,25 @@
from __future__ import annotations from __future__ import annotations
from typing import Optional, List from typing import Optional, List
import arrow import arrow
import sqlalchemy import sqlalchemy
from flask_admin import BaseView
from flask_admin.form import SecureForm
from flask_admin.model.template import EndpointLinkRowAction
from markupsafe import Markup
from app import models, s3, config
from flask import redirect, url_for, request, flash, Response from flask import redirect, url_for, request, flash, Response
from flask_admin import BaseView
from flask_admin import expose, AdminIndexView from flask_admin import expose, AdminIndexView
from flask_admin.actions import action from flask_admin.actions import action
from flask_admin.contrib import sqla from flask_admin.contrib import sqla
from flask_admin.form import SecureForm
from flask_admin.model.template import EndpointLinkRowAction
from flask_login import current_user from flask_login import current_user
from markupsafe import Markup
from app.custom_domain_validation import CustomDomainValidation, DomainValidationResult from app import models, s3, config
from app.custom_domain_validation import (
CustomDomainValidation,
DomainValidationResult,
ExpectedValidationRecords,
)
from app.db import Session from app.db import Session
from app.dns_utils import get_network_dns_client from app.dns_utils import get_network_dns_client
from app.events.event_dispatcher import EventDispatcher from app.events.event_dispatcher import EventDispatcher
@ -929,13 +933,13 @@ class EmailSearchAdmin(BaseView):
class CustomDomainWithValidationData: class CustomDomainWithValidationData:
def __init__(self, domain: CustomDomain): def __init__(self, domain: CustomDomain):
self.domain: CustomDomain = domain self.domain: CustomDomain = domain
self.ownership_expected: Optional[str] = None self.ownership_expected: Optional[ExpectedValidationRecords] = None
self.ownership_validation: Optional[DomainValidationResult] = None self.ownership_validation: Optional[DomainValidationResult] = None
self.mx_expected: Optional[str] = None self.mx_expected: Optional[dict[int, ExpectedValidationRecords]] = None
self.mx_validation: Optional[DomainValidationResult] = None self.mx_validation: Optional[DomainValidationResult] = None
self.spf_expected: Optional[str] = None self.spf_expected: Optional[ExpectedValidationRecords] = None
self.spf_validation: Optional[DomainValidationResult] = None self.spf_validation: Optional[DomainValidationResult] = None
self.dkim_expected: {str: str} = {} self.dkim_expected: {str: ExpectedValidationRecords} = {}
self.dkim_validation: {str: str} = {} self.dkim_validation: {str: str} = {}
@ -990,7 +994,6 @@ class CustomDomainSearchResult:
custom_domain custom_domain
) )
out.domains.append(validation_data) out.domains.append(validation_data)
print(validation_data.dkim_expected, validation_data.dkim_validation)
return out return out
@ -1020,7 +1023,6 @@ class CustomDomainSearchAdmin(BaseView):
if cd is not None: if cd is not None:
user = cd.user user = cd.user
search = CustomDomainSearchResult.from_user(user) search = CustomDomainSearchResult.from_user(user)
print("NEW", search.domains)
return self.render( return self.render(
"admin/custom_domain_search.html", "admin/custom_domain_search.html",

View file

@ -5,9 +5,7 @@ from app import config
from app.constants import DMARC_RECORD from app.constants import DMARC_RECORD
from app.db import Session from app.db import Session
from app.dns_utils import ( from app.dns_utils import (
MxRecord,
DNSClient, DNSClient,
is_mx_equivalent,
get_network_dns_client, get_network_dns_client,
) )
from app.models import CustomDomain from app.models import CustomDomain
@ -21,6 +19,39 @@ class DomainValidationResult:
errors: [str] errors: [str]
@dataclass
class ExpectedValidationRecords:
recommended: str
allowed: list[str]
def is_mx_equivalent(
mx_domains: dict[int, list[str]],
expected_mx_domains: dict[int, ExpectedValidationRecords],
) -> bool:
"""
Compare mx_domains with ref_mx_domains to see if they are equivalent.
mx_domains and ref_mx_domains are list of (priority, domain)
The priority order is taken into account but not the priority number.
For example, [(1, domain1), (2, domain2)] is equivalent to [(10, domain1), (20, domain2)]
"""
expected_prios = []
for prio in expected_mx_domains:
expected_prios.append(prio)
if len(expected_prios) != len(mx_domains):
return False
for prio_position, prio_value in enumerate(sorted(mx_domains.keys())):
for domain in mx_domains[prio_value]:
if domain not in expected_mx_domains[expected_prios[prio_position]].allowed:
return False
return True
class CustomDomainValidation: class CustomDomainValidation:
def __init__( def __init__(
self, self,
@ -37,59 +68,88 @@ class CustomDomainValidation:
or config.PARTNER_CUSTOM_DOMAIN_VALIDATION_PREFIXES or config.PARTNER_CUSTOM_DOMAIN_VALIDATION_PREFIXES
) )
def get_ownership_verification_record(self, domain: CustomDomain) -> str: def get_ownership_verification_record(
prefix = "sl" self, domain: CustomDomain
) -> ExpectedValidationRecords:
prefixes = ["sl"]
if ( if (
domain.partner_id is not None domain.partner_id is not None
and domain.partner_id in self._partner_domain_validation_prefixes and domain.partner_id in self._partner_domain_validation_prefixes
): ):
prefix = self._partner_domain_validation_prefixes[domain.partner_id] prefixes.insert(
0, self._partner_domain_validation_prefixes[domain.partner_id]
)
if not domain.ownership_txt_token: if not domain.ownership_txt_token:
domain.ownership_txt_token = random_string(30) domain.ownership_txt_token = random_string(30)
Session.commit() Session.commit()
return f"{prefix}-verification={domain.ownership_txt_token}" valid = [
f"{prefix}-verification={domain.ownership_txt_token}" for prefix in prefixes
]
return ExpectedValidationRecords(recommended=valid[0], allowed=valid)
def get_expected_mx_records(self, domain: CustomDomain) -> list[MxRecord]: def get_expected_mx_records(
records = [] self, domain: CustomDomain
) -> dict[int, ExpectedValidationRecords]:
records = {}
if domain.partner_id is not None and domain.partner_id in self._partner_domains: if domain.partner_id is not None and domain.partner_id in self._partner_domains:
domain = self._partner_domains[domain.partner_id] domain = self._partner_domains[domain.partner_id]
records.append(MxRecord(10, f"mx1.{domain}.")) records[10] = [f"mx1.{domain}."]
records.append(MxRecord(20, f"mx2.{domain}.")) records[20] = [f"mx2.{domain}."]
else:
# Default ones # Default ones
for priority, domain in config.EMAIL_SERVERS_WITH_PRIORITY: for priority, domain in config.EMAIL_SERVERS_WITH_PRIORITY:
records.append(MxRecord(priority, domain)) if priority not in records:
records[priority] = []
records[priority].append(domain)
return records return {
priority: ExpectedValidationRecords(
recommended=records[priority][0], allowed=records[priority]
)
for priority in records
}
def get_expected_spf_domain(self, domain: CustomDomain) -> str: def get_expected_spf_domain(
self, domain: CustomDomain
) -> ExpectedValidationRecords:
records = []
if domain.partner_id is not None and domain.partner_id in self._partner_domains: if domain.partner_id is not None and domain.partner_id in self._partner_domains:
return self._partner_domains[domain.partner_id] records.append(self._partner_domains[domain.partner_id])
else: else:
return config.EMAIL_DOMAIN records.append(config.EMAIL_DOMAIN)
return ExpectedValidationRecords(recommended=records[0], allowed=records)
def get_expected_spf_record(self, domain: CustomDomain) -> str: def get_expected_spf_record(self, domain: CustomDomain) -> str:
spf_domain = self.get_expected_spf_domain(domain) spf_domain = self.get_expected_spf_domain(domain)
return f"v=spf1 include:{spf_domain} ~all" return f"v=spf1 include:{spf_domain.recommended} ~all"
def get_dkim_records(self, domain: CustomDomain) -> {str: str}: def get_dkim_records(
self, domain: CustomDomain
) -> {str: ExpectedValidationRecords}:
""" """
Get a list of dkim records to set up. Depending on the custom_domain, whether if it's from a partner or not, Get a list of dkim records to set up. Depending on the custom_domain, whether if it's from a partner or not,
it will return the default ones or the partner ones. it will return the default ones or the partner ones.
""" """
# By default use the default domain # By default use the default domain
dkim_domain = self.dkim_domain dkim_domains = [self.dkim_domain]
if domain.partner_id is not None: if domain.partner_id is not None:
# Domain is from a partner. Retrieve the partner config and use that domain if exists # Domain is from a partner. Retrieve the partner config and use that domain as preferred if it exists
dkim_domain = self._partner_domains.get(domain.partner_id, dkim_domain) partner_domain = self._partner_domains.get(domain.partner_id, None)
if partner_domain is not None:
dkim_domains.insert(0, partner_domain)
return { output = {}
f"{key}._domainkey": f"{key}._domainkey.{dkim_domain}" for key in ("dkim", "dkim02", "dkim03"):
for key in ("dkim", "dkim02", "dkim03") records = [
} f"{key}._domainkey.{dkim_domain}" for dkim_domain in dkim_domains
]
output[f"{key}._domainkey"] = ExpectedValidationRecords(
recommended=records[0], allowed=records
)
return output
def validate_dkim_records(self, custom_domain: CustomDomain) -> dict[str, str]: def validate_dkim_records(self, custom_domain: CustomDomain) -> dict[str, str]:
""" """
@ -102,7 +162,7 @@ class CustomDomainValidation:
for prefix, expected_record in expected_records.items(): for prefix, expected_record in expected_records.items():
custom_record = f"{prefix}.{custom_domain.domain}" custom_record = f"{prefix}.{custom_domain.domain}"
dkim_record = self._dns_client.get_cname_record(custom_record) dkim_record = self._dns_client.get_cname_record(custom_record)
if dkim_record == expected_record: if dkim_record in expected_record.allowed:
correct_records[prefix] = custom_record correct_records[prefix] = custom_record
else: else:
invalid_records[custom_record] = dkim_record or "empty" invalid_records[custom_record] = dkim_record or "empty"
@ -138,11 +198,15 @@ class CustomDomainValidation:
Check if the custom_domain has added the ownership verification records Check if the custom_domain has added the ownership verification records
""" """
txt_records = self._dns_client.get_txt_record(custom_domain.domain) txt_records = self._dns_client.get_txt_record(custom_domain.domain)
expected_verification_record = self.get_ownership_verification_record( expected_verification_records = self.get_ownership_verification_record(
custom_domain custom_domain
) )
found = False
if expected_verification_record in txt_records: for verification_record in expected_verification_records.allowed:
if verification_record in txt_records:
found = True
break
if found:
custom_domain.ownership_verified = True custom_domain.ownership_verified = True
emit_user_audit_log( emit_user_audit_log(
user=custom_domain.user, user=custom_domain.user,
@ -161,10 +225,11 @@ class CustomDomainValidation:
expected_mx_records = self.get_expected_mx_records(custom_domain) expected_mx_records = self.get_expected_mx_records(custom_domain)
if not is_mx_equivalent(mx_domains, expected_mx_records): if not is_mx_equivalent(mx_domains, expected_mx_records):
return DomainValidationResult( errors = []
success=False, for prio in mx_domains:
errors=[f"{record.priority} {record.domain}" for record in mx_domains], for mx_domain in mx_domains[prio]:
) errors.append(f"{prio} {mx_domain}")
return DomainValidationResult(success=False, errors=errors)
else: else:
custom_domain.verified = True custom_domain.verified = True
emit_user_audit_log( emit_user_audit_log(
@ -180,7 +245,7 @@ class CustomDomainValidation:
) -> DomainValidationResult: ) -> DomainValidationResult:
spf_domains = self._dns_client.get_spf_domain(custom_domain.domain) spf_domains = self._dns_client.get_spf_domain(custom_domain.domain)
expected_spf_domain = self.get_expected_spf_domain(custom_domain) expected_spf_domain = self.get_expected_spf_domain(custom_domain)
if expected_spf_domain in spf_domains: if len(set(expected_spf_domain.allowed).intersection(set(spf_domains))) > 0:
custom_domain.spf_verified = True custom_domain.spf_verified = True
emit_user_audit_log( emit_user_audit_log(
user=custom_domain.user, user=custom_domain.user,
@ -221,8 +286,8 @@ class CustomDomainValidation:
self, txt_records: List[str], custom_domain: CustomDomain self, txt_records: List[str], custom_domain: CustomDomain
) -> List[str]: ) -> List[str]:
final_records = [] final_records = []
verification_record = self.get_ownership_verification_record(custom_domain) verification_records = self.get_ownership_verification_record(custom_domain)
for record in txt_records: for record in txt_records:
if record != verification_record: if record not in verification_records.allowed:
final_records.append(record) final_records.append(record)
return final_records return final_records

View file

@ -5,8 +5,8 @@ from flask_login import login_required, current_user
from flask_wtf import FlaskForm from flask_wtf import FlaskForm
from wtforms import StringField, validators, IntegerField from wtforms import StringField, validators, IntegerField
from app.constants import DMARC_RECORD
from app.config import EMAIL_SERVERS_WITH_PRIORITY, EMAIL_DOMAIN from app.config import EMAIL_SERVERS_WITH_PRIORITY, EMAIL_DOMAIN
from app.constants import DMARC_RECORD
from app.custom_domain_utils import delete_custom_domain, set_custom_domain_mailboxes from app.custom_domain_utils import delete_custom_domain, set_custom_domain_mailboxes
from app.custom_domain_validation import CustomDomainValidation from app.custom_domain_validation import CustomDomainValidation
from app.dashboard.base import dashboard_bp from app.dashboard.base import dashboard_bp
@ -137,7 +137,7 @@ def domain_detail_dns(custom_domain_id):
return render_template( return render_template(
"dashboard/domain_detail/dns.html", "dashboard/domain_detail/dns.html",
EMAIL_SERVERS_WITH_PRIORITY=EMAIL_SERVERS_WITH_PRIORITY, EMAIL_SERVERS_WITH_PRIORITY=EMAIL_SERVERS_WITH_PRIORITY,
ownership_record=domain_validator.get_ownership_verification_record( ownership_records=domain_validator.get_ownership_verification_record(
custom_domain custom_domain
), ),
expected_mx_records=domain_validator.get_expected_mx_records(custom_domain), expected_mx_records=domain_validator.get_expected_mx_records(custom_domain),

View file

@ -272,7 +272,6 @@ def mailbox_confirm_email_change_route():
code = request.args.get("code") code = request.args.get("code")
if code: if code:
print("HAS OCO", code)
try: try:
mailbox = mailbox_utils.verify_mailbox_code(current_user, mailbox_id, code) mailbox = mailbox_utils.verify_mailbox_code(current_user, mailbox_id, code)
flash("Successfully changed mailbox email", "success") flash("Successfully changed mailbox email", "success")
@ -280,7 +279,6 @@ def mailbox_confirm_email_change_route():
url_for("dashboard.mailbox_detail_route", mailbox_id=mailbox.id) url_for("dashboard.mailbox_detail_route", mailbox_id=mailbox.id)
) )
except mailbox_utils.MailboxError as e: except mailbox_utils.MailboxError as e:
print(e)
flash(f"Cannot verify mailbox: {e.msg}", "error") flash(f"Cannot verify mailbox: {e.msg}", "error")
return redirect(url_for("dashboard.mailbox_route")) return redirect(url_for("dashboard.mailbox_route"))
else: else:

View file

@ -1,5 +1,4 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List, Optional from typing import List, Optional
import dns.resolver import dns.resolver
@ -9,42 +8,13 @@ from app.config import NAMESERVERS
_include_spf = "include:" _include_spf = "include:"
@dataclass
class MxRecord:
priority: int
domain: str
def is_mx_equivalent(
mx_domains: List[MxRecord], ref_mx_domains: List[MxRecord]
) -> bool:
"""
Compare mx_domains with ref_mx_domains to see if they are equivalent.
mx_domains and ref_mx_domains are list of (priority, domain)
The priority order is taken into account but not the priority number.
For example, [(1, domain1), (2, domain2)] is equivalent to [(10, domain1), (20, domain2)]
"""
mx_domains = sorted(mx_domains, key=lambda x: x.priority)
ref_mx_domains = sorted(ref_mx_domains, key=lambda x: x.priority)
if len(mx_domains) < len(ref_mx_domains):
return False
for actual, expected in zip(mx_domains, ref_mx_domains):
if actual.domain != expected.domain:
return False
return True
class DNSClient(ABC): class DNSClient(ABC):
@abstractmethod @abstractmethod
def get_cname_record(self, hostname: str) -> Optional[str]: def get_cname_record(self, hostname: str) -> Optional[str]:
pass pass
@abstractmethod @abstractmethod
def get_mx_domains(self, hostname: str) -> List[MxRecord]: def get_mx_domains(self, hostname: str) -> dict[int, list[str]]:
pass pass
def get_spf_domain(self, hostname: str) -> List[str]: def get_spf_domain(self, hostname: str) -> List[str]:
@ -88,21 +58,24 @@ class NetworkDNSClient(DNSClient):
except Exception: except Exception:
return None return None
def get_mx_domains(self, hostname: str) -> List[MxRecord]: def get_mx_domains(self, hostname: str) -> dict[int, list[str]]:
""" """
return list of (priority, domain name) sorted by priority (lowest priority first) return list of (priority, domain name) sorted by priority (lowest priority first)
domain name ends with a "." at the end. domain name ends with a "." at the end.
""" """
ret = {}
try: try:
answers = self._resolver.resolve(hostname, "MX", search=True) answers = self._resolver.resolve(hostname, "MX", search=True)
ret = []
for a in answers: for a in answers:
record = a.to_text() # for ex '20 alt2.aspmx.l.google.com.' record = a.to_text() # for ex '20 alt2.aspmx.l.google.com.'
parts = record.split(" ") parts = record.split(" ")
ret.append(MxRecord(priority=int(parts[0]), domain=parts[1])) prio = int(parts[0])
return sorted(ret, key=lambda x: x.priority) if prio not in ret:
ret[prio] = []
ret[prio].append(parts[1])
except Exception: except Exception:
return [] pass
return ret
def get_txt_record(self, hostname: str) -> List[str]: def get_txt_record(self, hostname: str) -> List[str]:
try: try:
@ -119,14 +92,14 @@ class NetworkDNSClient(DNSClient):
class InMemoryDNSClient(DNSClient): class InMemoryDNSClient(DNSClient):
def __init__(self): def __init__(self):
self.cname_records: dict[str, Optional[str]] = {} self.cname_records: dict[str, Optional[str]] = {}
self.mx_records: dict[str, List[MxRecord]] = {} self.mx_records: dict[int, dict[int, list[str]]] = {}
self.spf_records: dict[str, List[str]] = {} self.spf_records: dict[str, List[str]] = {}
self.txt_records: dict[str, List[str]] = {} self.txt_records: dict[str, List[str]] = {}
def set_cname_record(self, hostname: str, cname: str): def set_cname_record(self, hostname: str, cname: str):
self.cname_records[hostname] = cname self.cname_records[hostname] = cname
def set_mx_records(self, hostname: str, mx_list: List[MxRecord]): def set_mx_records(self, hostname: str, mx_list: dict[int, list[str]]):
self.mx_records[hostname] = mx_list self.mx_records[hostname] = mx_list
def set_txt_record(self, hostname: str, txt_list: List[str]): def set_txt_record(self, hostname: str, txt_list: List[str]):
@ -135,9 +108,8 @@ class InMemoryDNSClient(DNSClient):
def get_cname_record(self, hostname: str) -> Optional[str]: def get_cname_record(self, hostname: str) -> Optional[str]:
return self.cname_records.get(hostname) return self.cname_records.get(hostname)
def get_mx_domains(self, hostname: str) -> List[MxRecord]: def get_mx_domains(self, hostname: str) -> dict[int, list[str]]:
mx_list = self.mx_records.get(hostname, []) return self.mx_records.get(hostname, {})
return sorted(mx_list, key=lambda x: x.priority)
def get_txt_record(self, hostname: str) -> List[str]: def get_txt_record(self, hostname: str) -> List[str]:
return self.txt_records.get(hostname, []) return self.txt_records.get(hostname, [])
@ -147,5 +119,5 @@ def get_network_dns_client() -> NetworkDNSClient:
return NetworkDNSClient(NAMESERVERS) return NetworkDNSClient(NAMESERVERS)
def get_mx_domains(hostname: str) -> List[MxRecord]: def get_mx_domains(hostname: str) -> dict[int, list[str]]:
return get_network_dns_client().get_mx_domains(hostname) return get_network_dns_client().get_mx_domains(hostname)

View file

@ -657,7 +657,11 @@ def get_mx_domain_list(domain) -> [str]:
""" """
priority_domains = get_mx_domains(domain) priority_domains = get_mx_domains(domain)
return [d.domain[:-1] for d in priority_domains] mx_domains = []
for prio in priority_domains:
for domain in priority_domains[prio]:
mx_domains.append(domain[:-1])
return mx_domains
def personal_email_already_used(email_address: str) -> bool: def personal_email_already_used(email_address: str) -> bool:

View file

@ -14,9 +14,9 @@ from sqlalchemy.sql import Insert, text
from app import s3, config from app import s3, config
from app.alias_utils import nb_email_log_for_mailbox from app.alias_utils import nb_email_log_for_mailbox
from app.api.views.apple import verify_receipt from app.api.views.apple import verify_receipt
from app.custom_domain_validation import CustomDomainValidation from app.custom_domain_validation import CustomDomainValidation, is_mx_equivalent
from app.db import Session from app.db import Session
from app.dns_utils import get_mx_domains, is_mx_equivalent from app.dns_utils import get_mx_domains
from app.email_utils import ( from app.email_utils import (
send_email, send_email,
send_trial_end_soon_email, send_trial_end_soon_email,

View file

@ -1,7 +1,28 @@
{% extends 'admin/master.html' %} {% extends 'admin/master.html' %}
{% block head_css %}
{{ super() }}
<style>
.card-shadow {
box-shadow: 0 4px 8px rgba(0, 0, 0, 0.15);
border-radius: 8px;
}
.domain-title {
background-color: #007bff;
color: white;
padding: 10px;
border-radius: 8px 8px 0 0;
}
.status-icon {
font-size: 1.2em;
}
</style>
{% endblock %}
{% macro show_user(user) -%} {% macro show_user(user) -%}
<h4>User <a href="/admin/email_search?email={{ user.email }}">{{ user.email }}</a> with ID {{ user.id }}.</h4> <h4>
User <a href="/admin/email_search?email={{ user.email }}">{{ user.email }}</a> with ID {{ user.id }}.
</h4>
<table class="table"> <table class="table">
<thead> <thead>
<tr> <tr>
@ -37,48 +58,96 @@
</tbody> </tbody>
</table> </table>
{%- endmacro %} {%- endmacro %}
{% macro show_verification(title, expected, errors) -%} {% macro show_verification(title, expected, errors) -%}
{% if not expected %} {% if not expected %}
<h4 class="mb-3">{{ title }} <span class="text-success">Verified</span></h4>
<li class="list-group-item d-flex justify-content-between align-items-center">
<h5>{{ title }}</h5>
<span class="text-success status-icon"><i class="fa fa-check-circle"></i></span>
</li>
{% else %} {% else %}
<h4 class="mb-3">{{ title }}</h4> <li class="list-group-item">
<p>Expected</p> <h5>{{ title }}</h5>
<p>{{expected}}</p> <p>
<p>Current response</p> <strong>Expected:</strong> {{ expected.recommended }}
<ul class="list-group"> </p>
{% for error in errors %} <p>
<li class="list-group-item">{{ error }}</li> <strong>Allowed:</strong>
{% endfor %} <ul>
{% for expected_record in expected.allowed %}<li>{{ expected_record }}</li>{% endfor %}
</ul> </ul>
</p>
<p>
<strong>Current response:</strong>
</p>
{% for error in errors %}
<ul class="list-group">
<li class="list-group-item">{{ error }}</li>
</ul>
{% endfor %}
</li>
{% endif %} {% endif %}
{%- endmacro %} {%- endmacro %}
{% macro show_mx_verification(title, expected, errors) -%}
{% if not expected %}
<li class="list-group-item d-flex justify-content-between align-items-center">
<h5>{{ title }}</h5>
<span class="text-success status-icon"><i class="fa fa-check-circle"></i></span>
</li>
{% else %}
<li class="list-group-item">
<h5>{{ title }}</h5>
<ul>
<li class="list-group-item">
{% for prio in expected %}
<p>
<strong>Priority {{ prio }}:</strong> {{ expected[prio].recommended }}
</p>
<p>
<strong>Allowed:</strong>
<ul>
{% for expected_record in expected[prio].allowed %}<li>{{ expected_record }}</li>{% endfor %}
</ul>
</p>
<p>
<strong>Current response:</strong>
</p>
{% for error in errors %}
{% macro show_domain(domain_with_data) -%}
<h3>Domain {{ domain_with_data.domain.domain }}</h3>
{% set domain = domain_with_data.domain %}
<ul class="list-group"> <ul class="list-group">
<li class="list-group-item"> <li class="list-group-item">{{ error }}</li>
{{ show_verification("Ownership", domain_with_data.ownership_expected, domain_with_data.ownership_validation.errors) }} </ul>
</li> {% endfor %}
<li class="list-group-item">
{{ show_verification("MX", domain_with_data.mx_expected, domain_with_data.mx_validation.errors) }}
</li>
<li class="list-group-item">
{{ show_verification("SPF", domain_with_data.spf_expected, domain_with_data.spf_validation.errors) }}
</li>
{% for dkim_domain in domain_with_data.dkim_expected %}
<li class="list-group-item">
{{ show_verification("DKIM {}.{}".format(dkim_domain, domain.domain), domain_with_data.dkim_expected[dkim_domain], [domain_with_data.dkim_validation.get(dkim_domain+"."+domain.domain,'')]) }}
</li> </li>
{% endfor %} {% endfor %}
</ul> </ul>
</li>
{% endif %}
{%- endmacro %} {%- endmacro %}
{% macro show_domain(domain_with_data) -%}
<div class="col-md-3 mb-4">
<div class="card card-shadow">
<div class="domain-title text-center">
<h4>Domain {{ domain_with_data.domain.domain }}</h4>
</div>
<div class="card-body">
{% set domain = domain_with_data.domain %}
<ul class="list-group">
{{ show_verification("Ownership", domain_with_data.ownership_expected, domain_with_data.ownership_validation.errors) }}
{{ show_mx_verification("MX", domain_with_data.mx_expected, domain_with_data.mx_validation.errors) }}
{{ show_verification("SPF", domain_with_data.spf_expected, domain_with_data.spf_validation.errors) }}
{% for dkim_domain in domain_with_data.dkim_expected %}
{{ show_verification("DKIM {}.{}".format(dkim_domain, domain.domain) , domain_with_data.dkim_expected[dkim_domain], [domain_with_data.dkim_validation.get(dkim_domain+"."+domain.domain,'')]) }}
{% endfor %}
</ul>
</div>
</div>
</div>
{%- endmacro %}
{% block body %} {% block body %}
<div class="border border-dark border-2 mt-1 mb-2 p-3"> <div class="border border-dark border-2 mt-1 mb-2 p-3">
@ -94,25 +163,19 @@
</form> </form>
</div> </div>
{% if data.no_match and query %} {% if data.no_match and query %}
<div class="border border-dark border-2 mt-1 mb-2 p-3 alert alert-warning" <div class="border border-dark border-2 mt-1 mb-2 p-3 alert alert-warning"
role="alert">No user, alias or mailbox found for {{ query }}</div> role="alert">No user, alias or mailbox found for {{ query }}</div>
{% endif %} {% endif %}
{% if data.user %} {% if data.user %}
<div class="border border-dark border-2 mt-1 mb-2 p-3"> <div class="border border-dark border-2 mt-1 mb-2 p-3">
<h3 class="mb-3">Found User {{ data.user.email }}</h3> <h3 class="mb-3">Found User {{ data.user.email }}</h3>
{{ show_user(data.user) }} {{ show_user(data.user) }}
</div> </div>
{% endif %} {% endif %}
<div class="d-flex"> <div class="row mt-4">
{% for domain_with_data in data.domains %}{{ show_domain(domain_with_data) }}{% endfor %}
{% for domain_with_data in data.domains %}
<div class="card m-2 border-dark" style="width: 30rem;">
<div class="card-body">
{{ show_domain(domain_with_data) }}
</div> </div>
</div> </div>
{% endfor %}
</div>
{% endblock %} {% endblock %}

View file

@ -38,7 +38,7 @@
Value: <em data-toggle="tooltip" Value: <em data-toggle="tooltip"
title="Click to copy" title="Click to copy"
class="clipboard" class="clipboard"
data-clipboard-text="{{ ownership_record }}">{{ ownership_record }}</em> data-clipboard-text="{{ ownership_records.recommended }}">{{ ownership_records.recommended }}</em>
</div> </div>
<form method="post" action="#ownership-form"> <form method="post" action="#ownership-form">
{{ csrf_form.csrf_token }} {{ csrf_form.csrf_token }}
@ -91,7 +91,7 @@
<br /> <br />
Some domain registrars (Namecheap, CloudFlare, etc) might also use <em>@</em> for the root domain. Some domain registrars (Namecheap, CloudFlare, etc) might also use <em>@</em> for the root domain.
</div> </div>
{% for record in expected_mx_records %} {% for prio in expected_mx_records %}
<div class="mb-3 p-3 dns-record"> <div class="mb-3 p-3 dns-record">
Record: MX Record: MX
@ -99,12 +99,12 @@
Domain: {{ custom_domain.domain }} or Domain: {{ custom_domain.domain }} or
<b>@</b> <b>@</b>
<br /> <br />
Priority: {{ record.priority }} Priority: {{ prio }}
<br /> <br />
Target: <em data-toggle="tooltip" Target: <em data-toggle="tooltip"
title="Click to copy" title="Click to copy"
class="clipboard" class="clipboard"
data-clipboard-text="{{ record.domain }}">{{ record.domain }}</em> data-clipboard-text="{{ expected_mx_records[prio].recommended }}">{{ expected_mx_records[prio].recommended }}</em>
</div> </div>
{% endfor %} {% endfor %}
<form method="post" action="#mx-form"> <form method="post" action="#mx-form">
@ -251,8 +251,8 @@
<em data-toggle="tooltip" <em data-toggle="tooltip"
title="Click to copy" title="Click to copy"
class="clipboard" class="clipboard"
data-clipboard-text="{{ dkim_cname_value }}." data-clipboard-text="{{ dkim_cname_value.recommended }}."
style="overflow-wrap: break-word">{{ dkim_cname_value }}.</em> style="overflow-wrap: break-word">{{ dkim_cname_value.recommended }}.</em>
</div> </div>
{% endfor %} {% endfor %}
<div class="alert alert-info"> <div class="alert alert-info">

View file

@ -4,8 +4,8 @@ from app import config
from app.constants import DMARC_RECORD from app.constants import DMARC_RECORD
from app.custom_domain_validation import CustomDomainValidation from app.custom_domain_validation import CustomDomainValidation
from app.db import Session from app.db import Session
from app.dns_utils import InMemoryDNSClient
from app.models import CustomDomain, User from app.models import CustomDomain, User
from app.dns_utils import InMemoryDNSClient, MxRecord
from app.proton.utils import get_proton_partner from app.proton.utils import get_proton_partner
from app.utils import random_string from app.utils import random_string
from tests.utils import create_new_user, random_domain from tests.utils import create_new_user, random_domain
@ -33,9 +33,12 @@ def test_custom_domain_validation_get_dkim_records():
records = validator.get_dkim_records(custom_domain) records = validator.get_dkim_records(custom_domain)
assert len(records) == 3 assert len(records) == 3
assert records["dkim02._domainkey"] == f"dkim02._domainkey.{domain}" assert records["dkim02._domainkey"].recommended == f"dkim02._domainkey.{domain}"
assert records["dkim03._domainkey"] == f"dkim03._domainkey.{domain}" assert records["dkim02._domainkey"].allowed == [f"dkim02._domainkey.{domain}"]
assert records["dkim._domainkey"] == f"dkim._domainkey.{domain}" assert records["dkim03._domainkey"].recommended == f"dkim03._domainkey.{domain}"
assert records["dkim03._domainkey"].allowed == [f"dkim03._domainkey.{domain}"]
assert records["dkim._domainkey"].recommended == f"dkim._domainkey.{domain}"
assert records["dkim._domainkey"].allowed == [f"dkim._domainkey.{domain}"]
def test_custom_domain_validation_get_dkim_records_for_partner(): def test_custom_domain_validation_get_dkim_records_for_partner():
@ -53,9 +56,25 @@ def test_custom_domain_validation_get_dkim_records_for_partner():
records = validator.get_dkim_records(custom_domain) records = validator.get_dkim_records(custom_domain)
assert len(records) == 3 assert len(records) == 3
assert records["dkim02._domainkey"] == f"dkim02._domainkey.{dkim_domain}" assert (
assert records["dkim03._domainkey"] == f"dkim03._domainkey.{dkim_domain}" records["dkim02._domainkey"].recommended == f"dkim02._domainkey.{dkim_domain}"
assert records["dkim._domainkey"] == f"dkim._domainkey.{dkim_domain}" )
assert records["dkim02._domainkey"].allowed == [
f"dkim02._domainkey.{dkim_domain}",
f"dkim02._domainkey.{domain}",
]
assert (
records["dkim03._domainkey"].recommended == f"dkim03._domainkey.{dkim_domain}"
)
assert records["dkim03._domainkey"].allowed == [
f"dkim03._domainkey.{dkim_domain}",
f"dkim03._domainkey.{domain}",
]
assert records["dkim._domainkey"].recommended == f"dkim._domainkey.{dkim_domain}"
assert records["dkim._domainkey"].allowed == [
f"dkim._domainkey.{dkim_domain}",
f"dkim._domainkey.{domain}",
]
# get_expected_mx_records # get_expected_mx_records
@ -75,8 +94,8 @@ def test_custom_domain_validation_get_expected_mx_records_regular_domain():
assert len(records) == len(config.EMAIL_SERVERS_WITH_PRIORITY) assert len(records) == len(config.EMAIL_SERVERS_WITH_PRIORITY)
for i in range(len(config.EMAIL_SERVERS_WITH_PRIORITY)): for i in range(len(config.EMAIL_SERVERS_WITH_PRIORITY)):
config_record = config.EMAIL_SERVERS_WITH_PRIORITY[i] config_record = config.EMAIL_SERVERS_WITH_PRIORITY[i]
assert records[i].priority == config_record[0] assert records[config_record[0]].recommended == config_record[1]
assert records[i].domain == config_record[1] assert records[config_record[0]].allowed == [config_record[1]]
def test_custom_domain_validation_get_expected_mx_records_domain_from_partner(): def test_custom_domain_validation_get_expected_mx_records_domain_from_partner():
@ -89,14 +108,15 @@ def test_custom_domain_validation_get_expected_mx_records_domain_from_partner():
dkim_domain = random_domain() dkim_domain = random_domain()
validator = CustomDomainValidation(dkim_domain) validator = CustomDomainValidation(dkim_domain)
records = validator.get_expected_mx_records(custom_domain) expected_records = validator.get_expected_mx_records(custom_domain)
# As the domain is a partner_domain but there is no custom config for partner, default records # As the domain is a partner_domain but there is no custom config for partner, default records
# should be used # should be used
assert len(records) == len(config.EMAIL_SERVERS_WITH_PRIORITY) assert len(expected_records) == len(config.EMAIL_SERVERS_WITH_PRIORITY)
for i in range(len(config.EMAIL_SERVERS_WITH_PRIORITY)): for i in range(len(config.EMAIL_SERVERS_WITH_PRIORITY)):
config_record = config.EMAIL_SERVERS_WITH_PRIORITY[i] config_record = config.EMAIL_SERVERS_WITH_PRIORITY[i]
assert records[i].priority == config_record[0] expected = expected_records[config_record[0]]
assert records[i].domain == config_record[1] assert expected.recommended == config_record[1]
assert expected.allowed == [config_record[1]]
def test_custom_domain_validation_get_expected_mx_records_domain_from_partner_with_custom_config(): def test_custom_domain_validation_get_expected_mx_records_domain_from_partner_with_custom_config():
@ -112,15 +132,21 @@ def test_custom_domain_validation_get_expected_mx_records_domain_from_partner_wi
validator = CustomDomainValidation( validator = CustomDomainValidation(
dkim_domain, partner_domains={partner_id: expected_mx_domain} dkim_domain, partner_domains={partner_id: expected_mx_domain}
) )
records = validator.get_expected_mx_records(custom_domain) expected_records = validator.get_expected_mx_records(custom_domain)
# As the domain is a partner_domain and there is a custom config for partner, partner records # As the domain is a partner_domain and there is a custom config for partner, partner records
# should be used # should be used
assert len(records) == 2 assert len(expected_records) == 2
sl_domains = config.EMAIL_SERVERS_WITH_PRIORITY
assert records[0].priority == 10 assert expected_records[10].recommended == f"mx1.{expected_mx_domain}."
assert records[0].domain == f"mx1.{expected_mx_domain}." expected = [f"mx1.{expected_mx_domain}."]
assert records[1].priority == 20 expected.extend([sl_dom[1] for sl_dom in sl_domains if sl_dom[0] == 10])
assert records[1].domain == f"mx2.{expected_mx_domain}." assert expected_records[10].allowed == expected
assert expected_records[20].recommended == f"mx2.{expected_mx_domain}."
expected = [f"mx2.{expected_mx_domain}."]
expected.extend([sl_dom[1] for sl_dom in sl_domains if sl_dom[0] == 20])
assert expected_records[20].allowed == expected
# get_expected_spf_records # get_expected_spf_records
@ -309,7 +335,7 @@ def test_custom_domain_validation_validate_ownership_success():
domain = create_custom_domain(random_domain()) domain = create_custom_domain(random_domain())
dns_client.set_txt_record( dns_client.set_txt_record(
domain.domain, [validator.get_ownership_verification_record(domain)] domain.domain, validator.get_ownership_verification_record(domain).allowed
) )
res = validator.validate_domain_ownership(domain) res = validator.validate_domain_ownership(domain)
@ -336,7 +362,7 @@ def test_custom_domain_validation_validate_ownership_from_partner_success():
Session.commit() Session.commit()
dns_client.set_txt_record( dns_client.set_txt_record(
domain.domain, [validator.get_ownership_verification_record(domain)] domain.domain, validator.get_ownership_verification_record(domain).allowed
) )
res = validator.validate_domain_ownership(domain) res = validator.validate_domain_ownership(domain)
@ -370,7 +396,7 @@ def test_custom_domain_validation_validate_mx_records_wrong_records_failure():
wrong_record_1 = random_string() wrong_record_1 = random_string()
wrong_record_2 = random_string() wrong_record_2 = random_string()
wrong_records = [MxRecord(10, wrong_record_1), MxRecord(20, wrong_record_2)] wrong_records = {10: [wrong_record_1], 20: [wrong_record_2]}
dns_client.set_mx_records(domain.domain, wrong_records) dns_client.set_mx_records(domain.domain, wrong_records)
res = validator.validate_mx_records(domain) res = validator.validate_mx_records(domain)
@ -387,7 +413,12 @@ def test_custom_domain_validation_validate_mx_records_success():
domain = create_custom_domain(random_domain()) domain = create_custom_domain(random_domain())
dns_client.set_mx_records(domain.domain, validator.get_expected_mx_records(domain)) mx_records_by_prio = validator.get_expected_mx_records(domain)
dns_records = {
priority: mx_records_by_prio[priority].allowed
for priority in mx_records_by_prio
}
dns_client.set_mx_records(domain.domain, dns_records)
res = validator.validate_mx_records(domain) res = validator.validate_mx_records(domain)
assert res.success is True assert res.success is True
@ -485,10 +516,13 @@ def test_custom_domain_validation_validate_spf_cleans_verification_record():
domain.partner_id = proton_partner_id domain.partner_id = proton_partner_id
Session.commit() Session.commit()
ownership_records = validator.get_ownership_verification_record(domain)
for ownership_record in ownership_records.allowed:
wrong_record = random_string() wrong_record = random_string()
dns_client.set_txt_record( dns_client.set_txt_record(
hostname=domain.domain, hostname=domain.domain,
txt_list=[wrong_record, validator.get_ownership_verification_record(domain)], txt_list=[wrong_record, ownership_record],
) )
res = validator.validate_spf_records(domain) res = validator.validate_spf_records(domain)

View file

@ -1,9 +1,8 @@
from app.custom_domain_validation import is_mx_equivalent, ExpectedValidationRecords
from app.dns_utils import ( from app.dns_utils import (
get_mx_domains, get_mx_domains,
get_network_dns_client, get_network_dns_client,
is_mx_equivalent,
InMemoryDNSClient, InMemoryDNSClient,
MxRecord,
) )
from tests.utils import random_domain from tests.utils import random_domain
@ -17,9 +16,9 @@ def test_get_mx_domains():
assert len(r) > 0 assert len(r) > 0
for x in r: for prio in r:
assert x.priority > 0 assert prio > 0
assert x.domain assert len(r[prio]) > 0
def test_get_spf_domain(): def test_get_spf_domain():
@ -33,33 +32,49 @@ def test_get_txt_record():
def test_is_mx_equivalent(): def test_is_mx_equivalent():
assert is_mx_equivalent([], []) assert is_mx_equivalent({}, {})
assert is_mx_equivalent( assert is_mx_equivalent(
mx_domains=[MxRecord(1, "domain")], ref_mx_domains=[MxRecord(1, "domain")] mx_domains={1: ["domain"]},
expected_mx_domains={
1: ExpectedValidationRecords(recommended="nop", allowed=["domain"])
},
) )
assert is_mx_equivalent( assert is_mx_equivalent(
mx_domains=[MxRecord(10, "domain1"), MxRecord(20, "domain2")], mx_domains={10: ["domain10"], 20: ["domain20"]},
ref_mx_domains=[MxRecord(10, "domain1"), MxRecord(20, "domain2")], expected_mx_domains={
10: ExpectedValidationRecords(recommended="nop", allowed=["domain10"]),
20: ExpectedValidationRecords(recommended="nop", allowed=["domain20"]),
},
) )
assert is_mx_equivalent( assert is_mx_equivalent(
mx_domains=[MxRecord(5, "domain1"), MxRecord(10, "domain2")], mx_domains={5: ["domain1"], 10: ["domain2"]},
ref_mx_domains=[MxRecord(10, "domain1"), MxRecord(20, "domain2")], expected_mx_domains={
10: ExpectedValidationRecords(recommended="nop", allowed=["domain1"]),
20: ExpectedValidationRecords(recommended="nop", allowed=["domain2"]),
},
) )
assert is_mx_equivalent(
mx_domains=[ assert not is_mx_equivalent(
MxRecord(5, "domain1"), mx_domains={10: ["domain10", "domain11"], 20: ["domain20"]},
MxRecord(10, "domain2"), expected_mx_domains={
MxRecord(20, "domain3"), 10: ExpectedValidationRecords(recommended="nop", allowed=["domain10"]),
], 20: ExpectedValidationRecords(recommended="nop", allowed=["domain20"]),
ref_mx_domains=[MxRecord(10, "domain1"), MxRecord(20, "domain2")], },
) )
assert not is_mx_equivalent( assert not is_mx_equivalent(
mx_domains=[MxRecord(5, "domain1"), MxRecord(10, "domain2")], mx_domains={5: ["domain1"], 10: ["domain2"], 20: ["domain3"]},
ref_mx_domains=[ expected_mx_domains={
MxRecord(10, "domain1"), 10: ExpectedValidationRecords(recommended="nop", allowed=["domain1"]),
MxRecord(20, "domain2"), 20: ExpectedValidationRecords(recommended="nop", allowed=["domain2"]),
MxRecord(20, "domain3"), },
], )
assert not is_mx_equivalent(
mx_domains={10: ["domain1"]},
expected_mx_domains={
10: ExpectedValidationRecords(recommended="nop", allowed=["domain1"]),
20: ExpectedValidationRecords(recommended="nop", allowed=["domain2"]),
},
) )