Move dmarc management to its own file

This commit is contained in:
Adrià Casajús 2022-04-08 11:28:14 +02:00
parent b128d64563
commit 68e58c0876
No known key found for this signature in database
GPG key ID: F0033226A5AFC9B9
4 changed files with 177 additions and 177 deletions

View file

@ -56,8 +56,3 @@ class MailSentFromReverseAlias(SLException):
"""raised when receiving an email sent from a reverse alias""" """raised when receiving an email sent from a reverse alias"""
pass pass
class DmarcSoftFail(SLException):
pass

157
app/handler/dmarc.py Normal file
View file

@ -0,0 +1,157 @@
import uuid
from io import BytesIO
from typing import Optional
from aiosmtpd.handlers import Message
from aiosmtpd.smtp import Envelope
from app import s3
from app.config import (
DMARC_CHECK_ENABLED,
ALERT_QUARANTINE_DMARC,
ALERT_DMARC_FAILED_REPLY_PHASE,
)
from app.email import headers, status
from app.email_utils import (
get_header_unicode,
send_email_with_rate_control,
render,
add_or_replace_header,
to_bytes,
add_header,
)
from app.handler.spamd_result import SpamdResult, Phase, DmarcCheckResult
from app.log import LOG
from app.models import Alias, Contact, Notification, EmailLog, RefusedEmail
def apply_dmarc_policy_for_forward_phase(
alias: Alias, contact: Contact, envelope: Envelope, msg: Message
) -> Optional[str]:
spam_result = SpamdResult.extract_from_headers(msg, Phase.forward)
if not DMARC_CHECK_ENABLED or not spam_result:
return None
from_header = get_header_unicode(msg[headers.FROM])
if spam_result.dmarc == DmarcCheckResult.soft_fail:
LOG.w(
f"dmarc forward: soft_fail from contact {contact.email} to alias {alias.email}."
f"mail_from:{envelope.mail_from}, from_header: {from_header}"
)
changed_msg = add_header(
msg,
f"""This email failed anti-phishing checks when it was received by SimpleLogin, be careful with its content.""",
f"""
<p style="color:red">
This email failed anti-phishing checks when it was received by SimpleLogin, be careful with its content.
</p>
""",
)
# Change the payload inline
msg.set_payload(changed_msg.get_payload())
return None
if spam_result.dmarc in (
DmarcCheckResult.quarantine,
DmarcCheckResult.reject,
):
LOG.w(
f"dmarc forward: put email from {contact} to {alias} to quarantine. {spam_result.event_data()}, "
f"mail_from:{envelope.mail_from}, from_header: {msg[headers.FROM]}"
)
email_log = quarantine_dmarc_failed_forward_email(alias, contact, envelope, msg)
Notification.create(
user_id=alias.user_id,
title=f"{alias.email} has a new mail in quarantine",
message=Notification.render(
"notification/message-quarantine.html", alias=alias
),
commit=True,
)
user = alias.user
send_email_with_rate_control(
user,
ALERT_QUARANTINE_DMARC,
user.email,
f"An email sent to {alias.email} has been quarantined",
render(
"transactional/message-quarantine-dmarc.txt.jinja2",
from_header=from_header,
alias=alias,
refused_email_url=email_log.get_dashboard_url(),
),
render(
"transactional/message-quarantine-dmarc.html",
from_header=from_header,
alias=alias,
refused_email_url=email_log.get_dashboard_url(),
),
max_nb_alert=10,
ignore_smtp_error=True,
)
return status.E215
return None
def quarantine_dmarc_failed_forward_email(alias, contact, envelope, msg) -> EmailLog:
add_or_replace_header(msg, headers.SL_DIRECTION, "Forward")
msg[headers.SL_ENVELOPE_FROM] = envelope.mail_from
random_name = str(uuid.uuid4())
s3_report_path = f"refused-emails/full-{random_name}.eml"
s3.upload_email_from_bytesio(
s3_report_path, BytesIO(to_bytes(msg)), f"full-{random_name}"
)
refused_email = RefusedEmail.create(
full_report_path=s3_report_path, user_id=alias.user_id, flush=True
)
return EmailLog.create(
user_id=alias.user_id,
mailbox_id=alias.mailbox_id,
contact_id=contact.id,
alias_id=alias.id,
message_id=str(msg[headers.MESSAGE_ID]),
refused_email_id=refused_email.id,
is_spam=True,
blocked=True,
commit=True,
)
def apply_dmarc_policy_for_reply_phase(
alias_from: Alias, contact_recipient: Contact, envelope: Envelope, msg: Message
) -> Optional[str]:
spam_result = SpamdResult.extract_from_headers(msg, Phase.reply)
if not DMARC_CHECK_ENABLED or not spam_result:
return None
if spam_result.dmarc not in (
DmarcCheckResult.quarantine,
DmarcCheckResult.reject,
DmarcCheckResult.soft_fail,
):
return None
LOG.w(
f"dmarc reply: Put email from {alias_from.email} to {contact_recipient} into quarantine. {spam_result.event_data()}, "
f"mail_from:{envelope.mail_from}, from_header: {msg[headers.FROM]}"
)
send_email_with_rate_control(
alias_from.user,
ALERT_DMARC_FAILED_REPLY_PHASE,
alias_from.user.email,
f"Attempt to send an email to your contact {contact_recipient.email} from {envelope.mail_from}",
render(
"transactional/spoof-reply.txt",
contact=contact_recipient,
alias=alias_from,
sender=envelope.mail_from,
),
render(
"transactional/spoof-reply.html",
contact=contact_recipient,
alias=alias_from,
sender=envelope.mail_from,
),
)
return status.E215

View file

@ -87,15 +87,14 @@ from app.config import (
OLD_UNSUBSCRIBER, OLD_UNSUBSCRIBER,
ALERT_FROM_ADDRESS_IS_REVERSE_ALIAS, ALERT_FROM_ADDRESS_IS_REVERSE_ALIAS,
ALERT_TO_NOREPLY, ALERT_TO_NOREPLY,
DMARC_CHECK_ENABLED,
ALERT_QUARANTINE_DMARC,
ALERT_DMARC_FAILED_REPLY_PHASE,
) )
from app.db import Session from app.db import Session
from app.handler.dmarc import (
apply_dmarc_policy_for_reply_phase,
apply_dmarc_policy_for_forward_phase,
)
from app.handler.spamd_result import ( from app.handler.spamd_result import (
SpamdResult, SpamdResult,
Phase,
DmarcCheckResult,
SPFCheckResult, SPFCheckResult,
) )
from app.email import status, headers from app.email import status, headers
@ -144,7 +143,6 @@ from app.errors import (
VERPForward, VERPForward,
VERPReply, VERPReply,
CannotCreateContactForReverseAlias, CannotCreateContactForReverseAlias,
DmarcSoftFail,
) )
from app.log import LOG, set_message_id from app.log import LOG, set_message_id
from app.models import ( from app.models import (
@ -546,137 +544,6 @@ def handle_email_sent_to_ourself(alias, from_addr: str, msg: Message, user):
) )
def apply_dmarc_policy_for_forward_phase(
alias: Alias, contact: Contact, envelope: Envelope, msg: Message
) -> Optional[str]:
spam_result = SpamdResult.extract_from_headers(msg, Phase.forward)
if not DMARC_CHECK_ENABLED or not spam_result:
return None
from_header = get_header_unicode(msg[headers.FROM])
if spam_result.dmarc == DmarcCheckResult.soft_fail:
LOG.w(
f"dmarc forward: soft_fail from contact {contact.email} to alias {alias.email}."
f"mail_from:{envelope.mail_from}, from_header: {from_header}"
)
raise DmarcSoftFail
if spam_result.dmarc in (
DmarcCheckResult.quarantine,
DmarcCheckResult.reject,
):
LOG.w(
f"dmarc forward: put email from {contact} to {alias} to quarantine. {spam_result.event_data()}, "
f"mail_from:{envelope.mail_from}, from_header: {msg[headers.FROM]}"
)
email_log = quarantine_dmarc_failed_forward_email(alias, contact, envelope, msg)
Notification.create(
user_id=alias.user_id,
title=f"{alias.email} has a new mail in quarantine",
message=Notification.render(
"notification/message-quarantine.html", alias=alias
),
commit=True,
)
user = alias.user
send_email_with_rate_control(
user,
ALERT_QUARANTINE_DMARC,
user.email,
f"An email sent to {alias.email} has been quarantined",
render(
"transactional/message-quarantine-dmarc.txt.jinja2",
from_header=from_header,
alias=alias,
refused_email_url=email_log.get_dashboard_url(),
),
render(
"transactional/message-quarantine-dmarc.html",
from_header=from_header,
alias=alias,
refused_email_url=email_log.get_dashboard_url(),
),
max_nb_alert=10,
ignore_smtp_error=True,
)
return status.E215
return None
def quarantine_dmarc_failed_forward_email(alias, contact, envelope, msg) -> EmailLog:
add_or_replace_header(msg, headers.SL_DIRECTION, "Forward")
msg[headers.SL_ENVELOPE_TO] = alias.email
msg[headers.SL_ENVELOPE_FROM] = envelope.mail_from
add_or_replace_header(msg, "From", contact.new_addr())
# replace CC & To emails by reverse-alias for all emails that are not alias
try:
replace_header_when_forward(msg, alias, "Cc")
replace_header_when_forward(msg, alias, "To")
except CannotCreateContactForReverseAlias:
Session.commit()
raise
random_name = str(uuid.uuid4())
s3_report_path = f"refused-emails/full-{random_name}.eml"
s3.upload_email_from_bytesio(
s3_report_path, BytesIO(to_bytes(msg)), f"full-{random_name}"
)
refused_email = RefusedEmail.create(
full_report_path=s3_report_path, user_id=alias.user_id, flush=True
)
return EmailLog.create(
user_id=alias.user_id,
mailbox_id=alias.mailbox_id,
contact_id=contact.id,
alias_id=alias.id,
message_id=str(msg[headers.MESSAGE_ID]),
refused_email_id=refused_email.id,
is_spam=True,
blocked=True,
commit=True,
)
def apply_dmarc_policy_for_reply_phase(
alias_from: Alias, contact_recipient: Contact, envelope: Envelope, msg: Message
) -> Optional[str]:
spam_result = SpamdResult.extract_from_headers(msg, Phase.reply)
if not DMARC_CHECK_ENABLED or not spam_result:
return None
if spam_result.dmarc not in (
DmarcCheckResult.quarantine,
DmarcCheckResult.reject,
DmarcCheckResult.soft_fail,
):
return None
LOG.w(
f"dmarc reply: Put email from {alias_from.email} to {contact_recipient} into quarantine. {spam_result.event_data()}, "
f"mail_from:{envelope.mail_from}, from_header: {msg[headers.FROM]}"
)
send_email_with_rate_control(
alias_from.user,
ALERT_DMARC_FAILED_REPLY_PHASE,
alias_from.user.email,
f"Attempt to send an email to your contact {contact_recipient.email} from {envelope.mail_from}",
render(
"transactional/spoof-reply.txt",
contact=contact_recipient,
alias=alias_from,
sender=envelope.mail_from,
),
render(
"transactional/spoof-reply.html",
contact=contact_recipient,
alias=alias_from,
sender=envelope.mail_from,
),
)
return status.E215
def handle_forward(envelope, msg: Message, rcpt_to: str) -> List[Tuple[bool, str]]: def handle_forward(envelope, msg: Message, rcpt_to: str) -> List[Tuple[bool, str]]:
"""return an array of SMTP status (is_success, smtp_status) """return an array of SMTP status (is_success, smtp_status)
is_success indicates whether an email has been delivered and is_success indicates whether an email has been delivered and
@ -758,22 +625,11 @@ def handle_forward(envelope, msg: Message, rcpt_to: str) -> List[Tuple[bool, str
return [(True, res_status)] return [(True, res_status)]
# Check if we need to reject or quarantine based on dmarc # Check if we need to reject or quarantine based on dmarc
try: dmarc_delivery_status = apply_dmarc_policy_for_forward_phase(
dmarc_delivery_status = apply_dmarc_policy_for_forward_phase( alias, contact, envelope, msg
alias, contact, envelope, msg )
) if dmarc_delivery_status is not None:
if dmarc_delivery_status is not None: return [(False, dmarc_delivery_status)]
return [(False, dmarc_delivery_status)]
except DmarcSoftFail:
msg = add_header(
msg,
f"""This email failed anti-phishing checks when it was received by SimpleLogin, be careful with its content.""",
f"""
<p style="color:red">
This email failed anti-phishing checks when it was received by SimpleLogin, be careful with its content.
</p>
""",
)
ret = [] ret = []
mailboxes = alias.mailboxes mailboxes = alias.mailboxes

View file

@ -104,25 +104,17 @@ def test_dmarc_forward_quarantine(flask_client):
assert f"{alias.email} has a new mail in quarantine" == notifications[0].title assert f"{alias.email} has a new mail in quarantine" == notifications[0].title
# todo: re-enable test when softfail is quarantined def test_gmail_dmarc_softfail(flask_client):
# def test_gmail_dmarc_softfail(flask_client): user = create_random_user()
# user = create_random_user() alias = Alias.create_new_random(user)
# alias = Alias.create_new_random(user) msg = load_eml_file("dmarc_gmail_softfail.eml", {"alias_email": alias.email})
# msg = load_eml_file("dmarc_gmail_softfail.eml", {"alias_email": alias.email}) envelope = Envelope()
# envelope = Envelope() envelope.mail_from = msg["from"]
# envelope.mail_from = msg["from"] envelope.rcpt_tos = [msg["to"]]
# envelope.rcpt_tos = [msg["to"]] result = email_handler.handle(envelope, msg)
# result = email_handler.handle(envelope, msg) assert result == status.E200
# assert result == status.E215 payload = msg.get_payload()
# email_logs = ( assert payload.find("failed anti-phishing checks") > -1
# EmailLog.filter_by(user_id=user.id, alias_id=alias.id)
# .order_by(EmailLog.id.desc())
# .all()
# )
# assert len(email_logs) == 1
# email_log = email_logs[0]
# assert email_log.blocked
# assert email_log.refused_email_id
def test_prevent_5xx_from_spf(flask_client): def test_prevent_5xx_from_spf(flask_client):