app/app/greylisting.py
2020-04-04 18:07:05 +02:00

100 lines
2.8 KiB
Python

import arrow
from app.alias_utils import try_auto_create
from app.config import (
MAX_ACTIVITY_DURING_MINUTE_PER_ALIAS,
MAX_ACTIVITY_DURING_MINUTE_PER_MAILBOX,
)
from app.extensions import db
from app.log import LOG
from app.models import Alias, EmailLog, Contact
def greylisting_needed_for_alias(alias: Alias) -> bool:
min_time = arrow.now().shift(minutes=-1)
# get the nb of activity on this alias
nb_activity = (
db.session.query(EmailLog)
.join(Contact, EmailLog.contact_id == Contact.id)
.filter(Contact.alias_id == alias.id, EmailLog.created_at > min_time,)
.group_by(EmailLog.id)
.count()
)
if nb_activity > MAX_ACTIVITY_DURING_MINUTE_PER_ALIAS:
LOG.d(
"Too much forward on alias %s. Nb Activity %s", alias, nb_activity,
)
return True
return False
def greylisting_needed_for_mailbox(alias: Alias) -> bool:
min_time = arrow.now().shift(minutes=-1)
# get nb of activity on this mailbox
nb_activity = (
db.session.query(EmailLog)
.join(Contact, EmailLog.contact_id == Contact.id)
.join(Alias, Contact.alias_id == Alias.id)
.filter(Alias.mailbox_id == alias.mailbox_id, EmailLog.created_at > min_time,)
.group_by(EmailLog.id)
.count()
)
if nb_activity > MAX_ACTIVITY_DURING_MINUTE_PER_MAILBOX:
LOG.d(
"Too much forward on mailbox %s, alias %s. Nb Activity %s",
alias.mailbox,
alias,
nb_activity,
)
return True
return False
def greylisting_needed_forward_phase(alias_address: str) -> bool:
alias = Alias.get_by(email=alias_address)
if alias:
return greylisting_needed_for_alias(alias) or greylisting_needed_for_mailbox(
alias
)
else:
LOG.d(
"alias %s not exist. Try to see if it can be created on the fly",
alias_address,
)
alias = try_auto_create(alias_address)
if alias:
return greylisting_needed_for_mailbox(alias)
return False
def greylisting_needed_reply_phase(reply_email: str) -> bool:
contact = Contact.get_by(reply_email=reply_email)
if not contact:
return False
alias = contact.alias
return greylisting_needed_for_alias(alias) or greylisting_needed_for_mailbox(alias)
def greylisting_needed(mail_from: str, rcpt_tos: [str]) -> bool:
for rcpt_to in rcpt_tos:
if rcpt_to.startswith("reply+") or rcpt_to.startswith("ra+"):
reply_email = rcpt_to.lower()
if greylisting_needed_reply_phase(reply_email):
return True
else:
# Forward phase
address = rcpt_to.lower() # alias@SL
if greylisting_needed_forward_phase(address):
return True
return False