support ALIAS_DOMAINS

- use verify_prefix_suffix() in /api/alias/custom/new
-
This commit is contained in:
Son NK 2020-01-22 10:22:59 +01:00
parent d7ed0d77bd
commit bdf75951f1
9 changed files with 62 additions and 64 deletions

View file

@ -3,7 +3,7 @@ from flask_cors import cross_origin
from sqlalchemy import desc from sqlalchemy import desc
from app.api.base import api_bp, verify_api_key from app.api.base import api_bp, verify_api_key
from app.config import EMAIL_DOMAIN from app.config import ALIAS_DOMAINS
from app.extensions import db from app.extensions import db
from app.log import LOG from app.log import LOG
from app.models import AliasUsedOn, GenEmail, User from app.models import AliasUsedOn, GenEmail, User
@ -67,9 +67,11 @@ def options():
else: else:
ret["custom"]["suggestion"] = "" ret["custom"]["suggestion"] = ""
ret["custom"]["suffixes"] = []
# maybe better to make sure the suffix is never used before # maybe better to make sure the suffix is never used before
# but this is ok as there's a check when creating a new custom alias # but this is ok as there's a check when creating a new custom alias
ret["custom"]["suffixes"] = [f".{random_word()}@{EMAIL_DOMAIN}"] for domain in ALIAS_DOMAINS:
ret["custom"]["suffixes"].append(f".{random_word()}@{domain}")
for custom_domain in user.verified_custom_domains(): for custom_domain in user.verified_custom_domains():
ret["custom"]["suffixes"].append("@" + custom_domain.domain) ret["custom"]["suffixes"].append("@" + custom_domain.domain)
@ -144,7 +146,9 @@ def options_v2():
# maybe better to make sure the suffix is never used before # maybe better to make sure the suffix is never used before
# but this is ok as there's a check when creating a new custom alias # but this is ok as there's a check when creating a new custom alias
ret["suffixes"] = [f".{random_word()}@{EMAIL_DOMAIN}"] # todo: take into account DISABLE_ALIAS_SUFFIX
for domain in ALIAS_DOMAINS:
ret["suffixes"].append(f".{random_word()}@{domain}")
for custom_domain in user.verified_custom_domains(): for custom_domain in user.verified_custom_domains():
ret["suffixes"].append("@" + custom_domain.domain) ret["suffixes"].append("@" + custom_domain.domain)

View file

@ -3,7 +3,8 @@ from flask import jsonify, request
from flask_cors import cross_origin from flask_cors import cross_origin
from app.api.base import api_bp, verify_api_key from app.api.base import api_bp, verify_api_key
from app.config import EMAIL_DOMAIN, MAX_NB_EMAIL_FREE_PLAN from app.config import MAX_NB_EMAIL_FREE_PLAN
from app.dashboard.views.custom_alias import verify_prefix_suffix
from app.extensions import db from app.extensions import db
from app.log import LOG from app.log import LOG
from app.models import GenEmail, AliasUsedOn from app.models import GenEmail, AliasUsedOn
@ -43,35 +44,12 @@ def new_custom_alias():
if not data: if not data:
return jsonify(error="request body cannot be empty"), 400 return jsonify(error="request body cannot be empty"), 400
alias_prefix = data.get("alias_prefix", "") alias_prefix = data.get("alias_prefix", "").strip()
alias_suffix = data.get("alias_suffix", "") alias_suffix = data.get("alias_suffix", "").strip()
# make sure alias_prefix is not empty
alias_prefix = alias_prefix.strip()
alias_prefix = convert_to_id(alias_prefix) alias_prefix = convert_to_id(alias_prefix)
if not alias_prefix: # should be checked on frontend
LOG.d("user %s submits an empty alias with the prefix %s", user, alias_prefix)
return jsonify(error="alias prefix cannot be empty"), 400
# make sure alias_suffix is either .random_letters@simplelogin.co or @my-domain.com if not verify_prefix_suffix(user, alias_prefix, alias_suffix, user_custom_domains):
alias_suffix = alias_suffix.strip() return jsonify(error="wrong alias prefix or suffix"), 400
if alias_suffix.startswith("@"):
custom_domain = alias_suffix[1:]
if custom_domain not in user_custom_domains:
LOG.d("user %s submits a wrong custom domain %s ", user, custom_domain)
return jsonify(error="error"), 400
else:
if not alias_suffix.startswith("."):
LOG.d("user %s submits a wrong alias suffix %s", user, alias_suffix)
return jsonify(error="error"), 400
if not alias_suffix.endswith(EMAIL_DOMAIN):
LOG.d("user %s submits a wrong alias suffix %s", user, alias_suffix)
return jsonify(error="error"), 400
random_letters = alias_suffix[1 : alias_suffix.find("@")]
if len(random_letters) < 5:
LOG.d("user %s submits a wrong alias suffix %s", user, alias_suffix)
return jsonify(error="error"), 400
full_alias = alias_prefix + alias_suffix full_alias = alias_prefix + alias_suffix
if GenEmail.get_by(email=full_alias): if GenEmail.get_by(email=full_alias):

View file

@ -5,7 +5,8 @@ from wtforms import StringField, validators
from app import email_utils from app import email_utils
from app.auth.base import auth_bp from app.auth.base import auth_bp
from app.config import URL, EMAIL_DOMAIN from app.config import URL
from app.email_utils import email_belongs_to_alias_domains
from app.extensions import db from app.extensions import db
from app.log import LOG from app.log import LOG
from app.models import User, ActivationCode from app.models import User, ActivationCode
@ -31,8 +32,7 @@ def register():
if form.validate_on_submit(): if form.validate_on_submit():
email = form.email.data email = form.email.data
if email_belongs_to_alias_domains(email):
if email.endswith(EMAIL_DOMAIN):
flash( flash(
"You cannot use alias as your personal inbox. Nice try though 😉", "You cannot use alias as your personal inbox. Nice try though 😉",
"error", "error",

View file

@ -1,13 +1,16 @@
from flask import render_template, redirect, url_for, flash, request, session from flask import render_template, redirect, url_for, flash, request, session
from flask_login import login_required, current_user from flask_login import login_required, current_user
from flask_wtf import FlaskForm
from wtforms import StringField, validators, SelectField
from app.config import EMAIL_DOMAIN, HIGHLIGHT_GEN_EMAIL_ID, DISABLE_ALIAS_SUFFIX from app.config import (
HIGHLIGHT_GEN_EMAIL_ID,
DISABLE_ALIAS_SUFFIX,
ALIAS_DOMAINS,
)
from app.dashboard.base import dashboard_bp from app.dashboard.base import dashboard_bp
from app.email_utils import email_belongs_to_alias_domains
from app.extensions import db from app.extensions import db
from app.log import LOG from app.log import LOG
from app.models import GenEmail, DeletedAlias, CustomDomain from app.models import GenEmail
from app.utils import convert_to_id, random_word, word_exist from app.utils import convert_to_id, random_word, word_exist
@ -29,9 +32,10 @@ def custom_alias():
suffixes.append("@" + alias_domain) suffixes.append("@" + alias_domain)
# then default domain # then default domain
suffixes.append( for domain in ALIAS_DOMAINS:
("" if DISABLE_ALIAS_SUFFIX else ".") + random_word() + "@" + EMAIL_DOMAIN suffixes.append(
) ("" if DISABLE_ALIAS_SUFFIX else "." + random_word()) + "@" + domain
)
if request.method == "POST": if request.method == "POST":
alias_prefix = request.form.get("prefix") alias_prefix = request.form.get("prefix")
@ -73,9 +77,12 @@ def verify_prefix_suffix(user, alias_prefix, alias_suffix, user_custom_domains)
alias_suffix = alias_suffix.strip() alias_suffix = alias_suffix.strip()
if alias_suffix.startswith("@"): if alias_suffix.startswith("@"):
alias_domain = alias_suffix[1:] alias_domain = alias_suffix[1:]
# alias_domain can be either custom_domain or if DISABLE_ALIAS_SUFFIX, EMAIL_DOMAIN # alias_domain can be either custom_domain or if DISABLE_ALIAS_SUFFIX, one of the default ALIAS_DOMAINS
if DISABLE_ALIAS_SUFFIX: if DISABLE_ALIAS_SUFFIX:
if alias_domain not in user_custom_domains and alias_domain != EMAIL_DOMAIN: if (
alias_domain not in user_custom_domains
and alias_domain not in ALIAS_DOMAINS
):
LOG.error("wrong alias suffix %s, user %s", alias_suffix, user) LOG.error("wrong alias suffix %s, user %s", alias_suffix, user)
return False return False
else: else:
@ -86,9 +93,11 @@ def verify_prefix_suffix(user, alias_prefix, alias_suffix, user_custom_domains)
if not alias_suffix.startswith("."): if not alias_suffix.startswith("."):
LOG.error("User %s submits a wrong alias suffix %s", user, alias_suffix) LOG.error("User %s submits a wrong alias suffix %s", user, alias_suffix)
return False return False
if not alias_suffix.endswith(EMAIL_DOMAIN):
full_alias = alias_prefix + alias_suffix
if not email_belongs_to_alias_domains(full_alias):
LOG.error( LOG.error(
"Alias suffix should end with default alias domain %s", "Alias suffix should end with one of the alias domains %s",
user, user,
alias_suffix, alias_suffix,
) )

View file

@ -9,8 +9,9 @@ from flask_wtf.file import FileField
from wtforms import StringField, validators from wtforms import StringField, validators
from app import s3, email_utils from app import s3, email_utils
from app.config import URL, EMAIL_DOMAIN from app.config import URL
from app.dashboard.base import dashboard_bp from app.dashboard.base import dashboard_bp
from app.email_utils import email_belongs_to_alias_domains
from app.extensions import db from app.extensions import db
from app.log import LOG from app.log import LOG
from app.models import ( from app.models import (
@ -92,7 +93,7 @@ def setting():
or DeletedAlias.get_by(email=new_email) or DeletedAlias.get_by(email=new_email)
): ):
flash(f"Email {new_email} already used", "error") flash(f"Email {new_email} already used", "error")
elif new_email.endswith(EMAIL_DOMAIN): elif email_belongs_to_alias_domains(new_email):
flash( flash(
"You cannot use alias as your personal inbox. Nice try though 😉", "You cannot use alias as your personal inbox. Nice try though 😉",
"error", "error",

View file

@ -38,7 +38,7 @@ from smtplib import SMTP
from aiosmtpd.controller import Controller from aiosmtpd.controller import Controller
from app.config import EMAIL_DOMAIN, POSTFIX_SERVER, URL from app.config import EMAIL_DOMAIN, POSTFIX_SERVER, URL, ALIAS_DOMAINS
from app.email_utils import ( from app.email_utils import (
get_email_name, get_email_name,
get_email_part, get_email_part,
@ -49,6 +49,7 @@ from app.email_utils import (
delete_header, delete_header,
send_cannot_create_directory_alias, send_cannot_create_directory_alias,
send_cannot_create_domain_alias, send_cannot_create_domain_alias,
email_belongs_to_alias_domains,
) )
from app.extensions import db from app.extensions import db
from app.log import LOG from app.log import LOG
@ -120,7 +121,7 @@ class MailHandler:
on_the_fly = False on_the_fly = False
# check if alias belongs to a directory, ie having directory/anything@EMAIL_DOMAIN format # check if alias belongs to a directory, ie having directory/anything@EMAIL_DOMAIN format
if alias.endswith(EMAIL_DOMAIN): if email_belongs_to_alias_domains(alias):
if "/" in alias or "+" in alias or "#" in alias: if "/" in alias or "+" in alias or "#" in alias:
if "/" in alias: if "/" in alias:
sep = "/" sep = "/"
@ -284,10 +285,10 @@ class MailHandler:
forward_email = ForwardEmail.get_by(reply_email=reply_email) forward_email = ForwardEmail.get_by(reply_email=reply_email)
alias: str = forward_email.gen_email.email alias: str = forward_email.gen_email.email
# alias must end with EMAIL_DOMAIN or custom-domain
alias_domain = alias[alias.find("@") + 1 :] alias_domain = alias[alias.find("@") + 1 :]
if alias_domain != EMAIL_DOMAIN:
# alias must end with one of the ALIAS_DOMAINS or custom-domain
if not email_belongs_to_alias_domains(alias):
if not CustomDomain.get_by(domain=alias_domain): if not CustomDomain.get_by(domain=alias_domain):
return "550 alias unknown by SimpleLogin" return "550 alias unknown by SimpleLogin"
@ -338,9 +339,9 @@ class MailHandler:
envelope.rcpt_options, envelope.rcpt_options,
) )
if alias_domain == EMAIL_DOMAIN: if alias_domain in ALIAS_DOMAINS:
add_dkim_signature(msg, EMAIL_DOMAIN) add_dkim_signature(msg, alias_domain)
# add DKIM-Signature for non-custom-domain alias # add DKIM-Signature for custom-domain alias
else: else:
custom_domain: CustomDomain = CustomDomain.get_by(domain=alias_domain) custom_domain: CustomDomain = CustomDomain.get_by(domain=alias_domain)
if custom_domain.dkim_verified: if custom_domain.dkim_verified:

View file

@ -27,7 +27,8 @@ def test_different_scenarios(flask_client):
assert r.status_code == 200 assert r.status_code == 200
assert r.json["can_create_custom"] assert r.json["can_create_custom"]
assert len(r.json["existing"]) == 1 assert len(r.json["existing"]) == 1
assert r.json["custom"]["suffixes"] assert len(r.json["custom"]["suffixes"]) == 3
assert r.json["custom"]["suggestion"] == "" # no hostname => no suggestion assert r.json["custom"]["suggestion"] == "" # no hostname => no suggestion
# <<< with hostname >>> # <<< with hostname >>>

View file

@ -1,8 +1,9 @@
from flask import url_for from flask import url_for
from app.config import EMAIL_DOMAIN from app.config import EMAIL_DOMAIN, MAX_NB_EMAIL_FREE_PLAN
from app.extensions import db from app.extensions import db
from app.models import User, ApiKey, GenEmail from app.models import User, ApiKey, GenEmail
from app.utils import random_word
def test_success(flask_client): def test_success(flask_client):
@ -15,14 +16,16 @@ def test_success(flask_client):
api_key = ApiKey.create(user.id, "for test") api_key = ApiKey.create(user.id, "for test")
db.session.commit() db.session.commit()
word = random_word()
r = flask_client.post( r = flask_client.post(
url_for("api.new_custom_alias", hostname="www.test.com"), url_for("api.new_custom_alias", hostname="www.test.com"),
headers={"Authentication": api_key.code}, headers={"Authentication": api_key.code},
json={"alias_prefix": "prefix", "alias_suffix": f".abcdef@{EMAIL_DOMAIN}"}, json={"alias_prefix": "prefix", "alias_suffix": f".{word}@{EMAIL_DOMAIN}"},
) )
assert r.status_code == 201 assert r.status_code == 201
assert r.json["alias"] == f"prefix.abcdef@{EMAIL_DOMAIN}" assert r.json["alias"] == f"prefix.{word}@{EMAIL_DOMAIN}"
def test_out_of_quota(flask_client): def test_out_of_quota(flask_client):
@ -35,15 +38,15 @@ def test_out_of_quota(flask_client):
api_key = ApiKey.create(user.id, "for test") api_key = ApiKey.create(user.id, "for test")
db.session.commit() db.session.commit()
# create 3 custom alias to run out of quota # create MAX_NB_EMAIL_FREE_PLAN custom alias to run out of quota
GenEmail.create_new(user.id, prefix="test") for _ in range(MAX_NB_EMAIL_FREE_PLAN):
GenEmail.create_new(user.id, prefix="test") GenEmail.create_new(user.id, prefix="test")
GenEmail.create_new(user.id, prefix="test")
word = random_word()
r = flask_client.post( r = flask_client.post(
url_for("api.new_custom_alias", hostname="www.test.com"), url_for("api.new_custom_alias", hostname="www.test.com"),
headers={"Authentication": api_key.code}, headers={"Authentication": api_key.code},
json={"alias_prefix": "prefix", "alias_suffix": f".abcdef@{EMAIL_DOMAIN}"}, json={"alias_prefix": "prefix", "alias_suffix": f".{word}@{EMAIL_DOMAIN}"},
) )
assert r.status_code == 400 assert r.status_code == 400

View file

@ -5,6 +5,7 @@ URL=http://localhost
# Only print email content, not sending it # Only print email content, not sending it
NOT_SEND_EMAIL=true NOT_SEND_EMAIL=true
EMAIL_DOMAIN=sl.local EMAIL_DOMAIN=sl.local
OTHER_ALIAS_DOMAINS=["d1.test", "d2.test"]
SUPPORT_EMAIL=support@sl.local SUPPORT_EMAIL=support@sl.local
ADMIN_EMAIL=to_fill ADMIN_EMAIL=to_fill
# Max number emails user can generate for free plan # Max number emails user can generate for free plan