From d11c2686b992e02c489e861fd044aedcac11d783 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A0=20Casaj=C3=BAs?= Date: Tue, 30 Jul 2024 13:36:48 +0200 Subject: [PATCH] Move mailbox management to a module (#2164) --- app/api/views/mailbox.py | 84 ++----- app/dashboard/views/mailbox.py | 176 ++++----------- app/mailbox_utils.py | 183 +++++++++++++++ app/models.py | 10 + .../versions/2024_073011_1c14339aae90_.py | 42 ++++ .../emails/transactional/verify-mailbox.html | 9 +- .../transactional/verify-mailbox.txt.jinja2 | 4 + tests/api/test_mailbox.py | 2 +- tests/test_mailbox_utils.py | 213 ++++++++++++++++++ 9 files changed, 523 insertions(+), 200 deletions(-) create mode 100644 app/mailbox_utils.py create mode 100644 migrations/versions/2024_073011_1c14339aae90_.py create mode 100644 tests/test_mailbox_utils.py diff --git a/app/api/views/mailbox.py b/app/api/views/mailbox.py index 1c2c0f92..1e86b5b6 100644 --- a/app/api/views/mailbox.py +++ b/app/api/views/mailbox.py @@ -1,22 +1,18 @@ from smtplib import SMTPRecipientsRefused -import arrow from flask import g from flask import jsonify from flask import request +from app import mailbox_utils from app.api.base import api_bp, require_api_auth -from app.config import JOB_DELETE_MAILBOX -from app.dashboard.views.mailbox import send_verification_email from app.dashboard.views.mailbox_detail import verify_mailbox_change from app.db import Session from app.email_utils import ( mailbox_already_used, email_can_be_used_as_mailbox, ) -from app.email_validation import is_valid_email -from app.log import LOG -from app.models import Mailbox, Job +from app.models import Mailbox from app.utils import sanitize_email @@ -44,31 +40,15 @@ def create_mailbox(): user = g.user mailbox_email = sanitize_email(request.get_json().get("email")) - if not user.is_premium(): - return jsonify(error="Only premium plan can add additional mailbox"), 400 + try: + new_mailbox = mailbox_utils.create_mailbox(user, mailbox_email) + except mailbox_utils.MailboxError as e: + return jsonify(error=e.msg), 400 - if not is_valid_email(mailbox_email): - return jsonify(error=f"{mailbox_email} invalid"), 400 - elif mailbox_already_used(mailbox_email, user): - return jsonify(error=f"{mailbox_email} already used"), 400 - elif not email_can_be_used_as_mailbox(mailbox_email): - return ( - jsonify( - error=f"{mailbox_email} cannot be used. Please note a mailbox cannot " - f"be a disposable email address" - ), - 400, - ) - else: - new_mailbox = Mailbox.create(email=mailbox_email, user_id=user.id) - Session.commit() - - send_verification_email(user, new_mailbox) - - return ( - jsonify(mailbox_to_dict(new_mailbox)), - 201, - ) + return ( + jsonify(mailbox_to_dict(new_mailbox)), + 201, + ) @api_bp.route("/mailboxes/", methods=["DELETE"]) @@ -86,47 +66,17 @@ def delete_mailbox(mailbox_id): """ user = g.user - mailbox = Mailbox.get(mailbox_id) - - if not mailbox or mailbox.user_id != user.id: - return jsonify(error="Forbidden"), 403 - - if mailbox.id == user.default_mailbox_id: - return jsonify(error="You cannot delete the default mailbox"), 400 - data = request.get_json() or {} transfer_mailbox_id = data.get("transfer_aliases_to") if transfer_mailbox_id and int(transfer_mailbox_id) >= 0: - transfer_mailbox = Mailbox.get(transfer_mailbox_id) + transfer_mailbox_id = int(transfer_mailbox_id) + else: + transfer_mailbox_id = None - if not transfer_mailbox or transfer_mailbox.user_id != user.id: - return ( - jsonify(error="You must transfer the aliases to a mailbox you own."), - 403, - ) - - if transfer_mailbox_id == mailbox_id: - return ( - jsonify( - error="You can not transfer the aliases to the mailbox you want to delete." - ), - 400, - ) - - if not transfer_mailbox.verified: - return jsonify(error="Your new mailbox is not verified"), 400 - - # Schedule delete account job - LOG.w("schedule delete mailbox job for %s", mailbox) - Job.create( - name=JOB_DELETE_MAILBOX, - payload={ - "mailbox_id": mailbox.id, - "transfer_mailbox_id": transfer_mailbox_id, - }, - run_at=arrow.now(), - commit=True, - ) + try: + mailbox_utils.delete_mailbox(user, mailbox_id, transfer_mailbox_id) + except mailbox_utils.MailboxError as e: + return jsonify(error=e.msg), 400 return jsonify(deleted=True), 200 diff --git a/app/dashboard/views/mailbox.py b/app/dashboard/views/mailbox.py index ee113c2e..bf4daf4b 100644 --- a/app/dashboard/views/mailbox.py +++ b/app/dashboard/views/mailbox.py @@ -2,7 +2,6 @@ import base64 import binascii import json -import arrow from flask import render_template, request, redirect, url_for, flash from flask_login import login_required, current_user from flask_wtf import FlaskForm @@ -10,19 +9,12 @@ from itsdangerous import TimestampSigner from wtforms import validators, IntegerField from wtforms.fields.html5 import EmailField -from app import parallel_limiter -from app.config import MAILBOX_SECRET, URL, JOB_DELETE_MAILBOX +from app import parallel_limiter, mailbox_utils +from app.config import MAILBOX_SECRET from app.dashboard.base import dashboard_bp from app.db import Session -from app.email_utils import ( - email_can_be_used_as_mailbox, - mailbox_already_used, - render, - send_email, -) -from app.email_validation import is_valid_email from app.log import LOG -from app.models import Mailbox, Job +from app.models import Mailbox from app.utils import CSRFValidationForm @@ -58,120 +50,59 @@ def mailbox_route(): if not delete_mailbox_form.validate(): flash("Invalid request", "warning") return redirect(request.url) - mailbox = Mailbox.get(delete_mailbox_form.mailbox_id.data) - - if not mailbox or mailbox.user_id != current_user.id: - flash("Invalid mailbox. Refresh the page", "warning") + try: + mailbox = mailbox_utils.delete_mailbox( + current_user, + delete_mailbox_form.mailbox_id.data, + delete_mailbox_form.transfer_mailbox_id.data, + ) + except mailbox_utils.MailboxError as e: + flash(e.msg, "warning") return redirect(url_for("dashboard.mailbox_route")) - - if mailbox.id == current_user.default_mailbox_id: - flash("You cannot delete default mailbox", "error") - return redirect(url_for("dashboard.mailbox_route")) - - transfer_mailbox_id = delete_mailbox_form.transfer_mailbox_id.data - if transfer_mailbox_id and transfer_mailbox_id > 0: - transfer_mailbox = Mailbox.get(transfer_mailbox_id) - - if not transfer_mailbox or transfer_mailbox.user_id != current_user.id: - flash( - "You must transfer the aliases to a mailbox you own.", "error" - ) - return redirect(url_for("dashboard.mailbox_route")) - - if transfer_mailbox.id == mailbox.id: - flash( - "You can not transfer the aliases to the mailbox you want to delete.", - "error", - ) - return redirect(url_for("dashboard.mailbox_route")) - - if not transfer_mailbox.verified: - flash("Your new mailbox is not verified", "error") - return redirect(url_for("dashboard.mailbox_route")) - - # Schedule delete account job - LOG.w( - f"schedule delete mailbox job for {mailbox.id} with transfer to mailbox {transfer_mailbox_id}" - ) - Job.create( - name=JOB_DELETE_MAILBOX, - payload={ - "mailbox_id": mailbox.id, - "transfer_mailbox_id": transfer_mailbox_id - if transfer_mailbox_id > 0 - else None, - }, - run_at=arrow.now(), - commit=True, - ) - flash( f"Mailbox {mailbox.email} scheduled for deletion." f"You will receive a confirmation email when the deletion is finished", "success", ) - return redirect(url_for("dashboard.mailbox_route")) + if request.form.get("form-name") == "set-default": if not csrf_form.validate(): flash("Invalid request", "warning") return redirect(request.url) - mailbox_id = request.form.get("mailbox_id") - mailbox = Mailbox.get(mailbox_id) - - if not mailbox or mailbox.user_id != current_user.id: - flash("Unknown error. Refresh the page", "warning") + try: + mailbox_id = request.form.get("mailbox_id") + mailbox = mailbox_utils.set_default_mailbox(current_user, mailbox_id) + except mailbox_utils.MailboxError as e: + flash(e.msg, "warning") return redirect(url_for("dashboard.mailbox_route")) - if mailbox.id == current_user.default_mailbox_id: - flash("This mailbox is already default one", "error") - return redirect(url_for("dashboard.mailbox_route")) - - if not mailbox.verified: - flash("Cannot set unverified mailbox as default", "error") - return redirect(url_for("dashboard.mailbox_route")) - - current_user.default_mailbox_id = mailbox.id - Session.commit() flash(f"Mailbox {mailbox.email} is set as Default Mailbox", "success") return redirect(url_for("dashboard.mailbox_route")) elif request.form.get("form-name") == "create": - if not current_user.is_premium(): - flash("Only premium plan can add additional mailbox", "warning") + if not new_mailbox_form.validate(): + flash("Invalid request", "warning") + return redirect(request.url) + mailbox_email = new_mailbox_form.email.data.lower().strip().replace(" ", "") + try: + mailbox = mailbox_utils.create_mailbox(current_user, mailbox_email) + except mailbox_utils.MailboxError as e: + flash(e.msg, "warning") return redirect(url_for("dashboard.mailbox_route")) - if new_mailbox_form.validate(): - mailbox_email = ( - new_mailbox_form.email.data.lower().strip().replace(" ", "") + flash( + f"You are going to receive an email to confirm {mailbox.email}.", + "success", + ) + + return redirect( + url_for( + "dashboard.mailbox_detail_route", + mailbox_id=mailbox.id, ) - - if not is_valid_email(mailbox_email): - flash(f"{mailbox_email} invalid", "error") - elif mailbox_already_used(mailbox_email, current_user): - flash(f"{mailbox_email} already used", "error") - elif not email_can_be_used_as_mailbox(mailbox_email): - flash(f"You cannot use {mailbox_email}.", "error") - else: - new_mailbox = Mailbox.create( - email=mailbox_email, user_id=current_user.id - ) - Session.commit() - - send_verification_email(current_user, new_mailbox) - - flash( - f"You are going to receive an email to confirm {mailbox_email}.", - "success", - ) - - return redirect( - url_for( - "dashboard.mailbox_detail_route", - mailbox_id=new_mailbox.id, - ) - ) + ) return render_template( "dashboard/mailbox.html", @@ -182,34 +113,19 @@ def mailbox_route(): ) -def send_verification_email(user, mailbox): - s = TimestampSigner(MAILBOX_SECRET) - encoded_data = json.dumps([mailbox.id, mailbox.email]).encode("utf-8") - b64_data = base64.urlsafe_b64encode(encoded_data) - mailbox_id_signed = s.sign(b64_data).decode() - verification_url = ( - URL + "/dashboard/mailbox_verify" + f"?mailbox_id={mailbox_id_signed}" - ) - send_email( - mailbox.email, - f"Please confirm your mailbox {mailbox.email}", - render( - "transactional/verify-mailbox.txt.jinja2", - user=user, - link=verification_url, - mailbox_email=mailbox.email, - ), - render( - "transactional/verify-mailbox.html", - user=user, - link=verification_url, - mailbox_email=mailbox.email, - ), - ) - - @dashboard_bp.route("/mailbox_verify") def mailbox_verify(): + mailbox_id = request.args.get("mailbox_id") + code = request.args.get("code") + if not code: + # Old way + return verify_with_signed_secret(mailbox_id) + mailbox = mailbox_utils.verify_mailbox_code(mailbox_id, code) + LOG.d("Mailbox %s is verified", mailbox) + return render_template("dashboard/mailbox_validation.html", mailbox=mailbox) + + +def verify_with_signed_secret(request: str): s = TimestampSigner(MAILBOX_SECRET) mailbox_verify_request = request.args.get("mailbox_id") try: diff --git a/app/mailbox_utils.py b/app/mailbox_utils.py new file mode 100644 index 00000000..1d1e6bdc --- /dev/null +++ b/app/mailbox_utils.py @@ -0,0 +1,183 @@ +import secrets +import random +from typing import Optional +import arrow + +from app import config +from app.config import JOB_DELETE_MAILBOX +from app.db import Session +from app.email_utils import ( + mailbox_already_used, + email_can_be_used_as_mailbox, + send_email, + render, +) +from app.email_validation import is_valid_email +from app.log import LOG +from app.models import User, Mailbox, Job, MailboxActivation + + +class MailboxError(Exception): + def __init__(self, msg: str): + self.msg = msg + + +def create_mailbox( + user: User, + email: str, + use_digit_codes: bool = False, + send_verification_link: bool = True, +) -> Mailbox: + if not user.is_premium(): + raise MailboxError("Only premium plan can add additional mailbox") + if not is_valid_email(email): + raise MailboxError("Invalid email") + elif mailbox_already_used(email, user): + raise MailboxError("Email already used") + elif not email_can_be_used_as_mailbox(email): + raise MailboxError("Invalid email") + new_mailbox = Mailbox.create(email=email, user_id=user.id, commit=True) + + send_verification_email( + user, + new_mailbox, + use_digit_code=use_digit_codes, + send_link=send_verification_link, + ) + return new_mailbox + + +def delete_mailbox( + user: User, mailbox_id: int, transfer_mailbox_id: Optional[int] +) -> Mailbox: + mailbox = Mailbox.get(mailbox_id) + + if not mailbox or mailbox.user_id != user.id: + raise MailboxError("Invalid mailbox") + + if mailbox.id == user.default_mailbox_id: + raise MailboxError("Cannot delete your default mailbox") + + if transfer_mailbox_id and transfer_mailbox_id > 0: + transfer_mailbox = Mailbox.get(transfer_mailbox_id) + + if not transfer_mailbox or transfer_mailbox.user_id != user.id: + raise MailboxError("You must transfer the aliases to a mailbox you own") + + if transfer_mailbox.id == mailbox.id: + raise MailboxError( + "You can not transfer the aliases to the mailbox you want to delete" + ) + + if not transfer_mailbox.verified: + MailboxError("Your new mailbox is not verified") + + # Schedule delete account job + LOG.w( + f"schedule delete mailbox job for {mailbox.id} with transfer to mailbox {transfer_mailbox_id}" + ) + Job.create( + name=JOB_DELETE_MAILBOX, + payload={ + "mailbox_id": mailbox.id, + "transfer_mailbox_id": transfer_mailbox_id + if transfer_mailbox_id and transfer_mailbox_id > 0 + else None, + }, + run_at=arrow.now(), + commit=True, + ) + return mailbox + + +def set_default_mailbox(user: User, mailbox_id: int) -> Mailbox: + mailbox = Mailbox.get(mailbox_id) + + if not mailbox or mailbox.user_id != user.id: + raise MailboxError("Invalid mailbox") + + if not mailbox.verified: + raise MailboxError("This is mailbox is not verified") + + if mailbox.id == user.default_mailbox_id: + return mailbox + + user.default_mailbox_id = mailbox.id + Session.commit() + return mailbox + + +def clear_activation_codes_for_mailbox(mailbox: Mailbox): + Session.query(MailboxActivation).filter( + MailboxActivation.mailbox_id == mailbox.id + ).delete() + Session.commit() + + +def verify_mailbox_code(mailbox_id: int, code: str) -> Mailbox: + mailbox = Mailbox.get(mailbox_id) + if not mailbox: + raise MailboxError("Invalid mailbox") + if mailbox.verified: + clear_activation_codes_for_mailbox(mailbox) + return mailbox + activation = MailboxActivation.get_by(mailbox_id=mailbox_id).first() + if not activation: + raise MailboxError("Invalid code") + if activation.tries > 3: + clear_activation_codes_for_mailbox(mailbox) + raise MailboxError("Invalid activation code. Please request another code.") + if activation.created_at < arrow.now().shift(minutes=-15): + clear_activation_codes_for_mailbox(mailbox) + raise MailboxError("Invalid activation code. Please request another code.") + if code != activation.code: + activation.tries = activation.tries + 1 + Session.commit() + raise MailboxError("Invalid activation code") + mailbox.verified = True + clear_activation_codes_for_mailbox(mailbox) + return mailbox + + +def send_verification_email( + user: User, mailbox: Mailbox, use_digit_code: bool = False, send_link: bool = True +): + clear_activation_codes_for_mailbox(mailbox) + if use_digit_code: + code = "{:06d}".format(random.randint(1, 999999)) + else: + code = secrets.token_urlsafe(16) + activation = MailboxActivation.create( + mailbox_id=mailbox.id, + code=code, + tries=0, + ) + Session.commit() + + if send_link: + verification_url = ( + config.URL + + "/dashboard/mailbox_verify" + + f"?mailbox_id={mailbox.id}&code={code}" + ) + else: + verification_url = None + + send_email( + mailbox.email, + f"Please confirm your mailbox {mailbox.email}", + render( + "transactional/verify-mailbox.txt.jinja2", + user=user, + code=activation.code, + link=verification_url, + mailbox_email=mailbox.email, + ), + render( + "transactional/verify-mailbox.html", + user=user, + code=activation.code, + link=verification_url, + mailbox_email=mailbox.email, + ), + ) diff --git a/app/models.py b/app/models.py index e5297aa7..96794d8c 100644 --- a/app/models.py +++ b/app/models.py @@ -2804,6 +2804,16 @@ class Mailbox(Base, ModelMixin): return f"" +class MailboxActivation(Base, ModelMixin): + __tablename__ = "mailbox_activation" + + mailbox_id = sa.Column( + sa.ForeignKey(Mailbox.id, ondelete="cascade"), nullable=False, index=True + ) + code = sa.Column(sa.String(32), nullable=False, index=True) + tries = sa.Column(sa.Integer, default=0, nullable=False) + + class AccountActivation(Base, ModelMixin): """contains code to activate the user account when they sign up on mobile""" diff --git a/migrations/versions/2024_073011_1c14339aae90_.py b/migrations/versions/2024_073011_1c14339aae90_.py new file mode 100644 index 00000000..b8488423 --- /dev/null +++ b/migrations/versions/2024_073011_1c14339aae90_.py @@ -0,0 +1,42 @@ +"""empty message + +Revision ID: 1c14339aae90 +Revises: 56d08955fcab +Create Date: 2024-07-30 11:46:32.460221 + +""" +import sqlalchemy_utils +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '1c14339aae90' +down_revision = '56d08955fcab' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('mailbox_activation', + sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), + sa.Column('created_at', sqlalchemy_utils.types.arrow.ArrowType(), nullable=False), + sa.Column('updated_at', sqlalchemy_utils.types.arrow.ArrowType(), nullable=True), + sa.Column('mailbox_id', sa.Integer(), nullable=False), + sa.Column('code', sa.String(length=32), nullable=False), + sa.Column('tries', sa.Integer(), nullable=False), + sa.ForeignKeyConstraint(['mailbox_id'], ['mailbox.id'], ondelete='cascade'), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_mailbox_activation_code'), 'mailbox_activation', ['code'], unique=False) + op.create_index(op.f('ix_mailbox_activation_mailbox_id'), 'mailbox_activation', ['mailbox_id'], unique=False) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f('ix_mailbox_activation_mailbox_id'), table_name='mailbox_activation') + op.drop_index(op.f('ix_mailbox_activation_code'), table_name='mailbox_activation') + op.drop_table('mailbox_activation') + # ### end Alembic commands ### diff --git a/templates/emails/transactional/verify-mailbox.html b/templates/emails/transactional/verify-mailbox.html index b0dadd6a..1c36aad1 100644 --- a/templates/emails/transactional/verify-mailbox.html +++ b/templates/emails/transactional/verify-mailbox.html @@ -4,8 +4,13 @@ {{ render_text("Hi") }} {{ render_text("You have added "+ mailbox_email +" as an additional mailbox.") }} - {{ render_text("To confirm, please click on the button below.") }} - {{ render_button("Confirm mailbox", link) }} + {% if link %} + + {{ render_text("To confirm, please click on the button below.") }} + {{ render_button("Confirm mailbox", link) }} + {% else %} + {{ render_text("Please enter "+code+" as your verification code") }} + {% endif %} {{ render_text("This email will only be valid for the next 15 minutes.") }} {{ render_text('Thanks,
diff --git a/templates/emails/transactional/verify-mailbox.txt.jinja2 b/templates/emails/transactional/verify-mailbox.txt.jinja2 index 588edeee..049e30a4 100644 --- a/templates/emails/transactional/verify-mailbox.txt.jinja2 +++ b/templates/emails/transactional/verify-mailbox.txt.jinja2 @@ -5,9 +5,13 @@ Hi You have added {{mailbox_email}} as an additional mailbox. +{% if link %} To confirm, please click on this link: {{link}} +{% else %} +Please enter {{ code }} as your verification code for this mailbox +{% endif %} This link will only be valid during the next 15 minutes. {% endblock %} diff --git a/tests/api/test_mailbox.py b/tests/api/test_mailbox.py index 8a28d1ff..813eb3cd 100644 --- a/tests/api/test_mailbox.py +++ b/tests/api/test_mailbox.py @@ -28,7 +28,7 @@ def test_create_mailbox(flask_client): ) assert r.status_code == 400 - assert r.json == {"error": "gmail.com invalid"} + assert r.json == {"error": "Invalid email"} def test_create_mailbox_fail_for_free_user(flask_client): diff --git a/tests/test_mailbox_utils.py b/tests/test_mailbox_utils.py new file mode 100644 index 00000000..8b50ddb6 --- /dev/null +++ b/tests/test_mailbox_utils.py @@ -0,0 +1,213 @@ +from typing import Optional + +import pytest + +from app import mailbox_utils, config +from app.db import Session +from app.mail_sender import mail_sender +from app.models import Mailbox, MailboxActivation, User, Job +from tests.utils import create_new_user, random_email + + +user: Optional[User] = None + + +def setup_module(): + global user + config.SKIP_MX_LOOKUP_ON_CHECK = True + user = create_new_user() + user.trial_end = None + user.lifetime = True + Session.commit() + + +def teardown_module(): + config.SKIP_MX_LOOKUP_ON_CHECK = False + + +def test_free_user_cannot_add_mailbox(): + user.lifetime = False + email = random_email() + try: + with pytest.raises(mailbox_utils.MailboxError): + mailbox_utils.create_mailbox(user, email) + finally: + user.lifetime = True + + +def test_invalid_email(): + user.lifetime = True + with pytest.raises(mailbox_utils.MailboxError): + mailbox_utils.create_mailbox(user, "invalid") + + +def test_already_used(): + user.lifetime = True + with pytest.raises(mailbox_utils.MailboxError): + mailbox_utils.create_mailbox(user, user.email) + + +@mail_sender.store_emails_test_decorator +def test_create_mailbox(): + email = random_email() + mailbox_utils.create_mailbox(user, email) + mailbox = Mailbox.get_by(email=email) + assert mailbox is not None + assert not mailbox.verified + activation = MailboxActivation.get_by(mailbox_id=mailbox.id) + assert activation is not None + assert activation.tries == 0 + assert len(activation.code) > 6 + + assert 1 == len(mail_sender.get_stored_emails()) + mail_sent = mail_sender.get_stored_emails()[0] + mail_contents = str(mail_sent.msg) + assert mail_contents.find(config.URL) > 0 + assert mail_contents.find(activation.code) > 0 + assert mail_sent.envelope_to == email + + +@mail_sender.store_emails_test_decorator +def test_create_mailbox_with_digits(): + email = random_email() + mailbox_utils.create_mailbox( + user, email, use_digit_codes=True, send_verification_link=False + ) + mailbox = Mailbox.get_by(email=email) + assert mailbox is not None + assert not mailbox.verified + activation = MailboxActivation.get_by(mailbox_id=mailbox.id) + assert activation is not None + assert activation.tries == 0 + assert len(activation.code) == 6 + + assert 1 == len(mail_sender.get_stored_emails()) + mail_sent = mail_sender.get_stored_emails()[0] + mail_contents = str(mail_sent.msg) + assert mail_contents.find(activation.code) > 0 + assert mail_contents.find(config.URL) == -1 + assert mail_sent.envelope_to == email + + +@mail_sender.store_emails_test_decorator +def test_send_verification_email(): + email = random_email() + mailbox_utils.create_mailbox( + user, email, use_digit_codes=True, send_verification_link=False + ) + mailbox = Mailbox.get_by(email=email) + activation = MailboxActivation.get_by(mailbox_id=mailbox.id) + old_code = activation.code + mailbox_utils.send_verification_email(user, mailbox) + activation = MailboxActivation.get_by(mailbox_id=mailbox.id) + assert activation.code != old_code + + +def test_delete_other_user_mailbox(): + other = create_new_user() + mailbox = Mailbox.create(user_id=other.id, email=random_email(), commit=True) + with pytest.raises(mailbox_utils.MailboxError): + mailbox_utils.delete_mailbox(user, mailbox.id, transfer_mailbox_id=None) + + +def test_delete_default_mailbox(): + with pytest.raises(mailbox_utils.MailboxError): + mailbox_utils.delete_mailbox( + user, user.default_mailbox_id, transfer_mailbox_id=None + ) + + +def test_transfer_to_same_mailbox(): + email = random_email() + mailbox = mailbox_utils.create_mailbox( + user, email, use_digit_codes=True, send_verification_link=False + ) + with pytest.raises(mailbox_utils.MailboxError): + mailbox_utils.delete_mailbox(user, mailbox.id, transfer_mailbox_id=mailbox.id) + + +def test_transfer_to_other_users_mailbox(): + email = random_email() + mailbox = mailbox_utils.create_mailbox( + user, email, use_digit_codes=True, send_verification_link=False + ) + other = create_new_user() + other_mailbox = Mailbox.create(user_id=other.id, email=random_email(), commit=True) + with pytest.raises(mailbox_utils.MailboxError): + mailbox_utils.delete_mailbox( + user, mailbox.id, transfer_mailbox_id=other_mailbox.id + ) + + +def test_delete_with_no_transfer(): + email = random_email() + mailbox = mailbox_utils.create_mailbox( + user, email, use_digit_codes=True, send_verification_link=False + ) + mailbox_utils.delete_mailbox(user, mailbox.id, transfer_mailbox_id=None) + job = Session.query(Job).order_by(Job.id.desc()).first() + assert job is not None + assert job.name == config.JOB_DELETE_MAILBOX + assert job.payload["mailbox_id"] == mailbox.id + assert job.payload["transfer_mailbox_id"] is None + + +def test_delete_with_transfer(): + mailbox = mailbox_utils.create_mailbox( + user, random_email(), use_digit_codes=True, send_verification_link=False + ) + transfer_mailbox = mailbox_utils.create_mailbox( + user, random_email(), use_digit_codes=True, send_verification_link=False + ) + mailbox_utils.delete_mailbox( + user, mailbox.id, transfer_mailbox_id=transfer_mailbox.id + ) + job = Session.query(Job).order_by(Job.id.desc()).first() + assert job is not None + assert job.name == config.JOB_DELETE_MAILBOX + assert job.payload["mailbox_id"] == mailbox.id + assert job.payload["transfer_mailbox_id"] == transfer_mailbox.id + mailbox_utils.delete_mailbox(user, mailbox.id, transfer_mailbox_id=None) + job = Session.query(Job).order_by(Job.id.desc()).first() + assert job is not None + assert job.name == config.JOB_DELETE_MAILBOX + assert job.payload["mailbox_id"] == mailbox.id + assert job.payload["transfer_mailbox_id"] is None + + +def test_set_default_mailbox(): + other = create_new_user() + mailbox = mailbox_utils.create_mailbox( + other, + random_email(), + use_digit_codes=True, + send_verification_link=False, + ) + mailbox.verified = True + Session.commit() + mailbox_utils.set_default_mailbox(other, mailbox.id) + other = User.get(other.id) + assert other.default_mailbox_id == mailbox.id + + +def test_cannot_set_unverified(): + mailbox = mailbox_utils.create_mailbox( + user, + random_email(), + use_digit_codes=True, + send_verification_link=False, + ) + with pytest.raises(mailbox_utils.MailboxError): + mailbox_utils.set_default_mailbox(user, mailbox.id) + + +def test_cannot_default_other_user_mailbox(): + other = create_new_user() + mailbox = mailbox_utils.create_mailbox( + other, + random_email(), + use_digit_codes=True, + send_verification_link=False, + ) + with pytest.raises(mailbox_utils.MailboxError): + mailbox_utils.set_default_mailbox(user, mailbox.id)