2022-01-05 01:06:08 +08:00
|
|
|
from email.message import EmailMessage
|
|
|
|
|
2022-03-18 22:44:07 +08:00
|
|
|
from aiosmtpd.smtp import Envelope
|
|
|
|
|
|
|
|
import email_handler
|
|
|
|
from app.email import headers, status
|
2022-03-22 01:33:18 +08:00
|
|
|
from app.models import (
|
|
|
|
User,
|
|
|
|
Alias,
|
|
|
|
AuthorizedAddress,
|
|
|
|
IgnoredEmail,
|
|
|
|
EmailLog,
|
|
|
|
Notification,
|
|
|
|
)
|
2022-01-05 01:06:08 +08:00
|
|
|
from email_handler import (
|
|
|
|
get_mailbox_from_mail_from,
|
|
|
|
should_ignore,
|
|
|
|
is_automatic_out_of_office,
|
|
|
|
)
|
2022-03-21 19:03:11 +08:00
|
|
|
from tests.utils import load_eml_file, create_random_user
|
2020-09-28 23:41:16 +08:00
|
|
|
|
|
|
|
|
|
|
|
def test_get_mailbox_from_mail_from(flask_client):
|
|
|
|
user = User.create(
|
|
|
|
email="a@b.c",
|
|
|
|
password="password",
|
|
|
|
name="Test User",
|
|
|
|
activated=True,
|
|
|
|
commit=True,
|
|
|
|
)
|
|
|
|
alias = Alias.create(
|
|
|
|
user_id=user.id,
|
|
|
|
email="first@d1.test",
|
|
|
|
mailbox_id=user.default_mailbox_id,
|
|
|
|
commit=True,
|
|
|
|
)
|
|
|
|
|
|
|
|
mb = get_mailbox_from_mail_from("a@b.c", alias)
|
|
|
|
assert mb.email == "a@b.c"
|
|
|
|
|
|
|
|
mb = get_mailbox_from_mail_from("unauthorized@gmail.com", alias)
|
|
|
|
assert mb is None
|
|
|
|
|
|
|
|
# authorized address
|
|
|
|
AuthorizedAddress.create(
|
|
|
|
user_id=user.id,
|
|
|
|
mailbox_id=user.default_mailbox_id,
|
|
|
|
email="unauthorized@gmail.com",
|
|
|
|
commit=True,
|
|
|
|
)
|
|
|
|
mb = get_mailbox_from_mail_from("unauthorized@gmail.com", alias)
|
|
|
|
assert mb.email == "a@b.c"
|
2021-06-22 23:52:24 +08:00
|
|
|
|
|
|
|
|
|
|
|
def test_should_ignore(flask_client):
|
|
|
|
assert should_ignore("mail_from", []) is False
|
|
|
|
|
|
|
|
assert not should_ignore("mail_from", ["rcpt_to"])
|
|
|
|
IgnoredEmail.create(mail_from="mail_from", rcpt_to="rcpt_to", commit=True)
|
|
|
|
assert should_ignore("mail_from", ["rcpt_to"])
|
2022-01-05 01:06:08 +08:00
|
|
|
|
|
|
|
|
|
|
|
def test_is_automatic_out_of_office():
|
|
|
|
msg = EmailMessage()
|
|
|
|
assert not is_automatic_out_of_office(msg)
|
|
|
|
|
2022-01-05 22:21:54 +08:00
|
|
|
msg[headers.AUTO_SUBMITTED] = "auto-replied"
|
2022-01-05 01:06:08 +08:00
|
|
|
assert is_automatic_out_of_office(msg)
|
|
|
|
|
2022-01-05 22:21:54 +08:00
|
|
|
del msg[headers.AUTO_SUBMITTED]
|
2022-01-05 01:06:08 +08:00
|
|
|
assert not is_automatic_out_of_office(msg)
|
|
|
|
|
2022-01-05 22:21:54 +08:00
|
|
|
msg[headers.AUTO_SUBMITTED] = "auto-generated"
|
2022-01-05 01:06:08 +08:00
|
|
|
assert is_automatic_out_of_office(msg)
|
2022-03-18 02:03:36 +08:00
|
|
|
|
2022-03-18 04:36:25 +08:00
|
|
|
|
2022-03-21 19:31:25 +08:00
|
|
|
def test_dmarc_quarantine(flask_client):
|
2022-03-18 22:44:07 +08:00
|
|
|
user = create_random_user()
|
2022-03-21 19:03:11 +08:00
|
|
|
alias = Alias.create_new_random(user)
|
2022-03-21 19:31:25 +08:00
|
|
|
msg = load_eml_file("dmarc_quarantine.eml", {"alias_email": alias.email})
|
2022-03-18 22:44:07 +08:00
|
|
|
envelope = Envelope()
|
|
|
|
envelope.mail_from = msg["from"]
|
|
|
|
envelope.rcpt_tos = [msg["to"]]
|
|
|
|
result = email_handler.handle(envelope, msg)
|
2022-03-23 00:44:08 +08:00
|
|
|
assert result == status.E215
|
2022-03-18 22:44:07 +08:00
|
|
|
email_logs = (
|
|
|
|
EmailLog.filter_by(user_id=user.id, alias_id=alias.id)
|
|
|
|
.order_by(EmailLog.id.desc())
|
|
|
|
.all()
|
|
|
|
)
|
|
|
|
assert len(email_logs) == 1
|
|
|
|
email_log = email_logs[0]
|
|
|
|
assert email_log.blocked
|
|
|
|
assert email_log.refused_email_id
|
2022-03-22 01:33:18 +08:00
|
|
|
notifications = Notification.filter_by(user_id=user.id).all()
|
|
|
|
assert len(notifications) == 1
|
|
|
|
assert f"{alias.email} has a new mail in quarantine" == notifications[0].title
|
2022-03-22 00:38:41 +08:00
|
|
|
|
|
|
|
|
2022-03-26 01:12:33 +08:00
|
|
|
# todo: re-enable test when softfail is quarantined
|
|
|
|
# def test_gmail_dmarc_softfail(flask_client):
|
|
|
|
# user = create_random_user()
|
|
|
|
# alias = Alias.create_new_random(user)
|
|
|
|
# msg = load_eml_file("dmarc_gmail_softfail.eml", {"alias_email": alias.email})
|
|
|
|
# envelope = Envelope()
|
|
|
|
# envelope.mail_from = msg["from"]
|
|
|
|
# envelope.rcpt_tos = [msg["to"]]
|
|
|
|
# result = email_handler.handle(envelope, msg)
|
|
|
|
# assert result == status.E215
|
|
|
|
# email_logs = (
|
|
|
|
# EmailLog.filter_by(user_id=user.id, alias_id=alias.id)
|
|
|
|
# .order_by(EmailLog.id.desc())
|
|
|
|
# .all()
|
|
|
|
# )
|
|
|
|
# assert len(email_logs) == 1
|
|
|
|
# email_log = email_logs[0]
|
|
|
|
# assert email_log.blocked
|
|
|
|
# assert email_log.refused_email_id
|