mirror of
https://github.com/simple-login/app.git
synced 2024-09-20 06:55:59 +08:00
Create generate_reply_email() and refactor
This commit is contained in:
parent
5781bebfd0
commit
75ba1669e0
|
@ -16,7 +16,7 @@ from app.api.serializer import (
|
|||
)
|
||||
from app.config import EMAIL_DOMAIN
|
||||
from app.dashboard.views.alias_log import get_alias_log
|
||||
from app.email_utils import parseaddr_unicode, is_valid_email
|
||||
from app.email_utils import parseaddr_unicode, is_valid_email, generate_reply_email
|
||||
from app.extensions import db
|
||||
from app.log import LOG
|
||||
from app.models import Alias, Contact, Mailbox, AliasMailbox
|
||||
|
@ -393,14 +393,6 @@ def create_contact_route(alias_id):
|
|||
if not contact_addr:
|
||||
return jsonify(error="Contact cannot be empty"), 400
|
||||
|
||||
# generate a reply_email, make sure it is unique
|
||||
# not use while to avoid infinite loop
|
||||
reply_email = f"ra+{random_string(25)}@{EMAIL_DOMAIN}"
|
||||
for _ in range(1000):
|
||||
reply_email = f"ra+{random_string(25)}@{EMAIL_DOMAIN}"
|
||||
if not Contact.get_by(reply_email=reply_email):
|
||||
break
|
||||
|
||||
contact_name, contact_email = parseaddr_unicode(contact_addr)
|
||||
if not is_valid_email(contact_email):
|
||||
return jsonify(error=f"invalid contact email {contact_email}"), 400
|
||||
|
@ -414,7 +406,7 @@ def create_contact_route(alias_id):
|
|||
alias_id=alias.id,
|
||||
website_email=contact_email,
|
||||
name=contact_name,
|
||||
reply_email=reply_email,
|
||||
reply_email=generate_reply_email(),
|
||||
)
|
||||
|
||||
LOG.d("create reverse-alias for %s %s", contact_addr, alias)
|
||||
|
|
|
@ -11,7 +11,7 @@ from wtforms import StringField, validators, ValidationError
|
|||
|
||||
from app.config import EMAIL_DOMAIN, PAGE_LIMIT
|
||||
from app.dashboard.base import dashboard_bp
|
||||
from app.email_utils import parseaddr_unicode, is_valid_email
|
||||
from app.email_utils import parseaddr_unicode, is_valid_email, generate_reply_email
|
||||
from app.extensions import db
|
||||
from app.log import LOG
|
||||
from app.models import Alias, Contact, EmailLog
|
||||
|
@ -166,14 +166,6 @@ def alias_contact_manager(alias_id):
|
|||
if new_contact_form.validate():
|
||||
contact_addr = new_contact_form.email.data.strip()
|
||||
|
||||
# generate a reply_email, make sure it is unique
|
||||
# not use while to avoid infinite loop
|
||||
reply_email = f"ra+{random_string(25)}@{EMAIL_DOMAIN}"
|
||||
for _ in range(1000):
|
||||
reply_email = f"ra+{random_string(25)}@{EMAIL_DOMAIN}"
|
||||
if not Contact.get_by(reply_email=reply_email):
|
||||
break
|
||||
|
||||
try:
|
||||
contact_name, contact_email = parseaddr_unicode(contact_addr)
|
||||
except Exception:
|
||||
|
@ -211,7 +203,7 @@ def alias_contact_manager(alias_id):
|
|||
alias_id=alias.id,
|
||||
website_email=contact_email,
|
||||
name=contact_name,
|
||||
reply_email=reply_email,
|
||||
reply_email=generate_reply_email(),
|
||||
)
|
||||
|
||||
LOG.d("create reverse-alias for %s", contact_addr)
|
||||
|
|
|
@ -7,6 +7,7 @@ from email.mime.multipart import MIMEMultipart
|
|||
from email.mime.text import MIMEText
|
||||
from email.utils import make_msgid, formatdate, parseaddr
|
||||
from smtplib import SMTP
|
||||
from uuid import uuid4
|
||||
|
||||
import arrow
|
||||
import dkim
|
||||
|
@ -31,11 +32,13 @@ from app.config import (
|
|||
SENDER,
|
||||
URL,
|
||||
LANDING_PAGE_URL,
|
||||
EMAIL_DOMAIN,
|
||||
)
|
||||
from app.dns_utils import get_mx_domains
|
||||
from app.extensions import db
|
||||
from app.log import LOG
|
||||
from app.models import Mailbox, User, SentAlert, CustomDomain, SLDomain
|
||||
from app.models import Mailbox, User, SentAlert, CustomDomain, SLDomain, Contact
|
||||
from app.utils import random_string
|
||||
|
||||
|
||||
def render(template_name, **kwargs) -> str:
|
||||
|
@ -713,3 +716,18 @@ def add_header(msg: Message, text_header, html_header) -> Message:
|
|||
|
||||
LOG.d("No header added for %s", msg.get_content_type())
|
||||
return msg
|
||||
|
||||
|
||||
def generate_reply_email() -> str:
|
||||
"""
|
||||
generate a reply_email (aka reverse-alias), make sure it isn't used by any contact
|
||||
"""
|
||||
# not use while to avoid infinite loop
|
||||
for _ in range(1000):
|
||||
reply_email = f"ra+{random_string(25)}@{EMAIL_DOMAIN}"
|
||||
if not Contact.get_by(reply_email=reply_email):
|
||||
return reply_email
|
||||
|
||||
# use UUID as fallback
|
||||
reply_email = f"ra+{uuid4()}@{EMAIL_DOMAIN}"
|
||||
return reply_email
|
||||
|
|
|
@ -1185,7 +1185,7 @@ class Contact(db.Model, ModelMixin):
|
|||
# when user clicks on "reply", they will reply to this address.
|
||||
# This address allows to hide user personal email
|
||||
# this reply email is created every time a website sends an email to user
|
||||
# it has the prefix "reply+" to distinguish with other email
|
||||
# it has the prefix "reply+" or "ra+" to distinguish with other email
|
||||
reply_email = db.Column(db.String(512), nullable=False)
|
||||
|
||||
# whether a contact is created via CC
|
||||
|
|
|
@ -102,6 +102,7 @@ from app.email_utils import (
|
|||
should_add_dkim_signature,
|
||||
add_header,
|
||||
get_header_unicode,
|
||||
generate_reply_email,
|
||||
)
|
||||
from app.extensions import db
|
||||
from app.greylisting import greylisting_needed
|
||||
|
@ -375,19 +376,6 @@ def replace_str_in_msg(msg: Message, fr: str, to: str):
|
|||
return msg
|
||||
|
||||
|
||||
def generate_reply_email():
|
||||
# generate a reply_email, make sure it is unique
|
||||
# not use while loop to avoid infinite loop
|
||||
reply_email = f"reply+{random_string(30)}@{EMAIL_DOMAIN}"
|
||||
for _ in range(1000):
|
||||
if not Contact.get_by(reply_email=reply_email):
|
||||
# found!
|
||||
break
|
||||
reply_email = f"reply+{random_string(30)}@{EMAIL_DOMAIN}"
|
||||
|
||||
return reply_email
|
||||
|
||||
|
||||
def should_append_alias(msg: Message, address: str):
|
||||
"""whether an alias should be appended to TO header in message"""
|
||||
|
||||
|
@ -923,7 +911,7 @@ def handle_reply(envelope, msg: Message, rcpt_to: str) -> (bool, str):
|
|||
+ _MIME_HEADERS,
|
||||
)
|
||||
|
||||
# replace "ra+string@simplelogin.co" by the contact email in the email body
|
||||
# replace the reverse-alias (i.e. "ra+string@simplelogin.co") by the contact email in the email body
|
||||
# as this is usually included when replying
|
||||
if user.replace_reverse_alias:
|
||||
if msg.is_multipart():
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import email
|
||||
from email.message import EmailMessage
|
||||
|
||||
from app.config import MAX_ALERT_24H
|
||||
from app.config import MAX_ALERT_24H, EMAIL_DOMAIN
|
||||
from app.email_utils import (
|
||||
get_email_domain_part,
|
||||
can_create_directory_for_address,
|
||||
|
@ -16,6 +16,7 @@ from app.email_utils import (
|
|||
is_valid_email,
|
||||
add_header,
|
||||
to_bytes,
|
||||
generate_reply_email,
|
||||
)
|
||||
from app.extensions import db
|
||||
from app.models import User, CustomDomain
|
||||
|
@ -386,3 +387,9 @@ def test_to_bytes():
|
|||
|
||||
msg = email.message_from_string("éèà€")
|
||||
assert to_bytes(msg).decode() == "\néèà€"
|
||||
|
||||
|
||||
def test_generate_reply_email(flask_client):
|
||||
reply_email = generate_reply_email()
|
||||
assert reply_email.startswith("ra+")
|
||||
assert reply_email.endswith(EMAIL_DOMAIN)
|
||||
|
|
Loading…
Reference in a new issue