Move mailbox management to a module (#2164)

This commit is contained in:
Adrià Casajús 2024-07-30 13:36:48 +02:00 committed by GitHub
parent 10cfc21fe9
commit d11c2686b9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 523 additions and 200 deletions

View file

@ -1,22 +1,18 @@
from smtplib import SMTPRecipientsRefused
import arrow
from flask import g
from flask import jsonify
from flask import request
from app import mailbox_utils
from app.api.base import api_bp, require_api_auth
from app.config import JOB_DELETE_MAILBOX
from app.dashboard.views.mailbox import send_verification_email
from app.dashboard.views.mailbox_detail import verify_mailbox_change
from app.db import Session
from app.email_utils import (
mailbox_already_used,
email_can_be_used_as_mailbox,
)
from app.email_validation import is_valid_email
from app.log import LOG
from app.models import Mailbox, Job
from app.models import Mailbox
from app.utils import sanitize_email
@ -44,31 +40,15 @@ def create_mailbox():
user = g.user
mailbox_email = sanitize_email(request.get_json().get("email"))
if not user.is_premium():
return jsonify(error="Only premium plan can add additional mailbox"), 400
try:
new_mailbox = mailbox_utils.create_mailbox(user, mailbox_email)
except mailbox_utils.MailboxError as e:
return jsonify(error=e.msg), 400
if not is_valid_email(mailbox_email):
return jsonify(error=f"{mailbox_email} invalid"), 400
elif mailbox_already_used(mailbox_email, user):
return jsonify(error=f"{mailbox_email} already used"), 400
elif not email_can_be_used_as_mailbox(mailbox_email):
return (
jsonify(
error=f"{mailbox_email} cannot be used. Please note a mailbox cannot "
f"be a disposable email address"
),
400,
)
else:
new_mailbox = Mailbox.create(email=mailbox_email, user_id=user.id)
Session.commit()
send_verification_email(user, new_mailbox)
return (
jsonify(mailbox_to_dict(new_mailbox)),
201,
)
return (
jsonify(mailbox_to_dict(new_mailbox)),
201,
)
@api_bp.route("/mailboxes/<int:mailbox_id>", methods=["DELETE"])
@ -86,47 +66,17 @@ def delete_mailbox(mailbox_id):
"""
user = g.user
mailbox = Mailbox.get(mailbox_id)
if not mailbox or mailbox.user_id != user.id:
return jsonify(error="Forbidden"), 403
if mailbox.id == user.default_mailbox_id:
return jsonify(error="You cannot delete the default mailbox"), 400
data = request.get_json() or {}
transfer_mailbox_id = data.get("transfer_aliases_to")
if transfer_mailbox_id and int(transfer_mailbox_id) >= 0:
transfer_mailbox = Mailbox.get(transfer_mailbox_id)
transfer_mailbox_id = int(transfer_mailbox_id)
else:
transfer_mailbox_id = None
if not transfer_mailbox or transfer_mailbox.user_id != user.id:
return (
jsonify(error="You must transfer the aliases to a mailbox you own."),
403,
)
if transfer_mailbox_id == mailbox_id:
return (
jsonify(
error="You can not transfer the aliases to the mailbox you want to delete."
),
400,
)
if not transfer_mailbox.verified:
return jsonify(error="Your new mailbox is not verified"), 400
# Schedule delete account job
LOG.w("schedule delete mailbox job for %s", mailbox)
Job.create(
name=JOB_DELETE_MAILBOX,
payload={
"mailbox_id": mailbox.id,
"transfer_mailbox_id": transfer_mailbox_id,
},
run_at=arrow.now(),
commit=True,
)
try:
mailbox_utils.delete_mailbox(user, mailbox_id, transfer_mailbox_id)
except mailbox_utils.MailboxError as e:
return jsonify(error=e.msg), 400
return jsonify(deleted=True), 200

View file

@ -2,7 +2,6 @@ import base64
import binascii
import json
import arrow
from flask import render_template, request, redirect, url_for, flash
from flask_login import login_required, current_user
from flask_wtf import FlaskForm
@ -10,19 +9,12 @@ from itsdangerous import TimestampSigner
from wtforms import validators, IntegerField
from wtforms.fields.html5 import EmailField
from app import parallel_limiter
from app.config import MAILBOX_SECRET, URL, JOB_DELETE_MAILBOX
from app import parallel_limiter, mailbox_utils
from app.config import MAILBOX_SECRET
from app.dashboard.base import dashboard_bp
from app.db import Session
from app.email_utils import (
email_can_be_used_as_mailbox,
mailbox_already_used,
render,
send_email,
)
from app.email_validation import is_valid_email
from app.log import LOG
from app.models import Mailbox, Job
from app.models import Mailbox
from app.utils import CSRFValidationForm
@ -58,120 +50,59 @@ def mailbox_route():
if not delete_mailbox_form.validate():
flash("Invalid request", "warning")
return redirect(request.url)
mailbox = Mailbox.get(delete_mailbox_form.mailbox_id.data)
if not mailbox or mailbox.user_id != current_user.id:
flash("Invalid mailbox. Refresh the page", "warning")
try:
mailbox = mailbox_utils.delete_mailbox(
current_user,
delete_mailbox_form.mailbox_id.data,
delete_mailbox_form.transfer_mailbox_id.data,
)
except mailbox_utils.MailboxError as e:
flash(e.msg, "warning")
return redirect(url_for("dashboard.mailbox_route"))
if mailbox.id == current_user.default_mailbox_id:
flash("You cannot delete default mailbox", "error")
return redirect(url_for("dashboard.mailbox_route"))
transfer_mailbox_id = delete_mailbox_form.transfer_mailbox_id.data
if transfer_mailbox_id and transfer_mailbox_id > 0:
transfer_mailbox = Mailbox.get(transfer_mailbox_id)
if not transfer_mailbox or transfer_mailbox.user_id != current_user.id:
flash(
"You must transfer the aliases to a mailbox you own.", "error"
)
return redirect(url_for("dashboard.mailbox_route"))
if transfer_mailbox.id == mailbox.id:
flash(
"You can not transfer the aliases to the mailbox you want to delete.",
"error",
)
return redirect(url_for("dashboard.mailbox_route"))
if not transfer_mailbox.verified:
flash("Your new mailbox is not verified", "error")
return redirect(url_for("dashboard.mailbox_route"))
# Schedule delete account job
LOG.w(
f"schedule delete mailbox job for {mailbox.id} with transfer to mailbox {transfer_mailbox_id}"
)
Job.create(
name=JOB_DELETE_MAILBOX,
payload={
"mailbox_id": mailbox.id,
"transfer_mailbox_id": transfer_mailbox_id
if transfer_mailbox_id > 0
else None,
},
run_at=arrow.now(),
commit=True,
)
flash(
f"Mailbox {mailbox.email} scheduled for deletion."
f"You will receive a confirmation email when the deletion is finished",
"success",
)
return redirect(url_for("dashboard.mailbox_route"))
if request.form.get("form-name") == "set-default":
if not csrf_form.validate():
flash("Invalid request", "warning")
return redirect(request.url)
mailbox_id = request.form.get("mailbox_id")
mailbox = Mailbox.get(mailbox_id)
if not mailbox or mailbox.user_id != current_user.id:
flash("Unknown error. Refresh the page", "warning")
try:
mailbox_id = request.form.get("mailbox_id")
mailbox = mailbox_utils.set_default_mailbox(current_user, mailbox_id)
except mailbox_utils.MailboxError as e:
flash(e.msg, "warning")
return redirect(url_for("dashboard.mailbox_route"))
if mailbox.id == current_user.default_mailbox_id:
flash("This mailbox is already default one", "error")
return redirect(url_for("dashboard.mailbox_route"))
if not mailbox.verified:
flash("Cannot set unverified mailbox as default", "error")
return redirect(url_for("dashboard.mailbox_route"))
current_user.default_mailbox_id = mailbox.id
Session.commit()
flash(f"Mailbox {mailbox.email} is set as Default Mailbox", "success")
return redirect(url_for("dashboard.mailbox_route"))
elif request.form.get("form-name") == "create":
if not current_user.is_premium():
flash("Only premium plan can add additional mailbox", "warning")
if not new_mailbox_form.validate():
flash("Invalid request", "warning")
return redirect(request.url)
mailbox_email = new_mailbox_form.email.data.lower().strip().replace(" ", "")
try:
mailbox = mailbox_utils.create_mailbox(current_user, mailbox_email)
except mailbox_utils.MailboxError as e:
flash(e.msg, "warning")
return redirect(url_for("dashboard.mailbox_route"))
if new_mailbox_form.validate():
mailbox_email = (
new_mailbox_form.email.data.lower().strip().replace(" ", "")
flash(
f"You are going to receive an email to confirm {mailbox.email}.",
"success",
)
return redirect(
url_for(
"dashboard.mailbox_detail_route",
mailbox_id=mailbox.id,
)
if not is_valid_email(mailbox_email):
flash(f"{mailbox_email} invalid", "error")
elif mailbox_already_used(mailbox_email, current_user):
flash(f"{mailbox_email} already used", "error")
elif not email_can_be_used_as_mailbox(mailbox_email):
flash(f"You cannot use {mailbox_email}.", "error")
else:
new_mailbox = Mailbox.create(
email=mailbox_email, user_id=current_user.id
)
Session.commit()
send_verification_email(current_user, new_mailbox)
flash(
f"You are going to receive an email to confirm {mailbox_email}.",
"success",
)
return redirect(
url_for(
"dashboard.mailbox_detail_route",
mailbox_id=new_mailbox.id,
)
)
)
return render_template(
"dashboard/mailbox.html",
@ -182,34 +113,19 @@ def mailbox_route():
)
def send_verification_email(user, mailbox):
s = TimestampSigner(MAILBOX_SECRET)
encoded_data = json.dumps([mailbox.id, mailbox.email]).encode("utf-8")
b64_data = base64.urlsafe_b64encode(encoded_data)
mailbox_id_signed = s.sign(b64_data).decode()
verification_url = (
URL + "/dashboard/mailbox_verify" + f"?mailbox_id={mailbox_id_signed}"
)
send_email(
mailbox.email,
f"Please confirm your mailbox {mailbox.email}",
render(
"transactional/verify-mailbox.txt.jinja2",
user=user,
link=verification_url,
mailbox_email=mailbox.email,
),
render(
"transactional/verify-mailbox.html",
user=user,
link=verification_url,
mailbox_email=mailbox.email,
),
)
@dashboard_bp.route("/mailbox_verify")
def mailbox_verify():
mailbox_id = request.args.get("mailbox_id")
code = request.args.get("code")
if not code:
# Old way
return verify_with_signed_secret(mailbox_id)
mailbox = mailbox_utils.verify_mailbox_code(mailbox_id, code)
LOG.d("Mailbox %s is verified", mailbox)
return render_template("dashboard/mailbox_validation.html", mailbox=mailbox)
def verify_with_signed_secret(request: str):
s = TimestampSigner(MAILBOX_SECRET)
mailbox_verify_request = request.args.get("mailbox_id")
try:

183
app/mailbox_utils.py Normal file
View file

@ -0,0 +1,183 @@
import secrets
import random
from typing import Optional
import arrow
from app import config
from app.config import JOB_DELETE_MAILBOX
from app.db import Session
from app.email_utils import (
mailbox_already_used,
email_can_be_used_as_mailbox,
send_email,
render,
)
from app.email_validation import is_valid_email
from app.log import LOG
from app.models import User, Mailbox, Job, MailboxActivation
class MailboxError(Exception):
def __init__(self, msg: str):
self.msg = msg
def create_mailbox(
user: User,
email: str,
use_digit_codes: bool = False,
send_verification_link: bool = True,
) -> Mailbox:
if not user.is_premium():
raise MailboxError("Only premium plan can add additional mailbox")
if not is_valid_email(email):
raise MailboxError("Invalid email")
elif mailbox_already_used(email, user):
raise MailboxError("Email already used")
elif not email_can_be_used_as_mailbox(email):
raise MailboxError("Invalid email")
new_mailbox = Mailbox.create(email=email, user_id=user.id, commit=True)
send_verification_email(
user,
new_mailbox,
use_digit_code=use_digit_codes,
send_link=send_verification_link,
)
return new_mailbox
def delete_mailbox(
user: User, mailbox_id: int, transfer_mailbox_id: Optional[int]
) -> Mailbox:
mailbox = Mailbox.get(mailbox_id)
if not mailbox or mailbox.user_id != user.id:
raise MailboxError("Invalid mailbox")
if mailbox.id == user.default_mailbox_id:
raise MailboxError("Cannot delete your default mailbox")
if transfer_mailbox_id and transfer_mailbox_id > 0:
transfer_mailbox = Mailbox.get(transfer_mailbox_id)
if not transfer_mailbox or transfer_mailbox.user_id != user.id:
raise MailboxError("You must transfer the aliases to a mailbox you own")
if transfer_mailbox.id == mailbox.id:
raise MailboxError(
"You can not transfer the aliases to the mailbox you want to delete"
)
if not transfer_mailbox.verified:
MailboxError("Your new mailbox is not verified")
# Schedule delete account job
LOG.w(
f"schedule delete mailbox job for {mailbox.id} with transfer to mailbox {transfer_mailbox_id}"
)
Job.create(
name=JOB_DELETE_MAILBOX,
payload={
"mailbox_id": mailbox.id,
"transfer_mailbox_id": transfer_mailbox_id
if transfer_mailbox_id and transfer_mailbox_id > 0
else None,
},
run_at=arrow.now(),
commit=True,
)
return mailbox
def set_default_mailbox(user: User, mailbox_id: int) -> Mailbox:
mailbox = Mailbox.get(mailbox_id)
if not mailbox or mailbox.user_id != user.id:
raise MailboxError("Invalid mailbox")
if not mailbox.verified:
raise MailboxError("This is mailbox is not verified")
if mailbox.id == user.default_mailbox_id:
return mailbox
user.default_mailbox_id = mailbox.id
Session.commit()
return mailbox
def clear_activation_codes_for_mailbox(mailbox: Mailbox):
Session.query(MailboxActivation).filter(
MailboxActivation.mailbox_id == mailbox.id
).delete()
Session.commit()
def verify_mailbox_code(mailbox_id: int, code: str) -> Mailbox:
mailbox = Mailbox.get(mailbox_id)
if not mailbox:
raise MailboxError("Invalid mailbox")
if mailbox.verified:
clear_activation_codes_for_mailbox(mailbox)
return mailbox
activation = MailboxActivation.get_by(mailbox_id=mailbox_id).first()
if not activation:
raise MailboxError("Invalid code")
if activation.tries > 3:
clear_activation_codes_for_mailbox(mailbox)
raise MailboxError("Invalid activation code. Please request another code.")
if activation.created_at < arrow.now().shift(minutes=-15):
clear_activation_codes_for_mailbox(mailbox)
raise MailboxError("Invalid activation code. Please request another code.")
if code != activation.code:
activation.tries = activation.tries + 1
Session.commit()
raise MailboxError("Invalid activation code")
mailbox.verified = True
clear_activation_codes_for_mailbox(mailbox)
return mailbox
def send_verification_email(
user: User, mailbox: Mailbox, use_digit_code: bool = False, send_link: bool = True
):
clear_activation_codes_for_mailbox(mailbox)
if use_digit_code:
code = "{:06d}".format(random.randint(1, 999999))
else:
code = secrets.token_urlsafe(16)
activation = MailboxActivation.create(
mailbox_id=mailbox.id,
code=code,
tries=0,
)
Session.commit()
if send_link:
verification_url = (
config.URL
+ "/dashboard/mailbox_verify"
+ f"?mailbox_id={mailbox.id}&code={code}"
)
else:
verification_url = None
send_email(
mailbox.email,
f"Please confirm your mailbox {mailbox.email}",
render(
"transactional/verify-mailbox.txt.jinja2",
user=user,
code=activation.code,
link=verification_url,
mailbox_email=mailbox.email,
),
render(
"transactional/verify-mailbox.html",
user=user,
code=activation.code,
link=verification_url,
mailbox_email=mailbox.email,
),
)

View file

@ -2804,6 +2804,16 @@ class Mailbox(Base, ModelMixin):
return f"<Mailbox {self.id} {self.email}>"
class MailboxActivation(Base, ModelMixin):
__tablename__ = "mailbox_activation"
mailbox_id = sa.Column(
sa.ForeignKey(Mailbox.id, ondelete="cascade"), nullable=False, index=True
)
code = sa.Column(sa.String(32), nullable=False, index=True)
tries = sa.Column(sa.Integer, default=0, nullable=False)
class AccountActivation(Base, ModelMixin):
"""contains code to activate the user account when they sign up on mobile"""

View file

@ -0,0 +1,42 @@
"""empty message
Revision ID: 1c14339aae90
Revises: 56d08955fcab
Create Date: 2024-07-30 11:46:32.460221
"""
import sqlalchemy_utils
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '1c14339aae90'
down_revision = '56d08955fcab'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('mailbox_activation',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('created_at', sqlalchemy_utils.types.arrow.ArrowType(), nullable=False),
sa.Column('updated_at', sqlalchemy_utils.types.arrow.ArrowType(), nullable=True),
sa.Column('mailbox_id', sa.Integer(), nullable=False),
sa.Column('code', sa.String(length=32), nullable=False),
sa.Column('tries', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['mailbox_id'], ['mailbox.id'], ondelete='cascade'),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_mailbox_activation_code'), 'mailbox_activation', ['code'], unique=False)
op.create_index(op.f('ix_mailbox_activation_mailbox_id'), 'mailbox_activation', ['mailbox_id'], unique=False)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_mailbox_activation_mailbox_id'), table_name='mailbox_activation')
op.drop_index(op.f('ix_mailbox_activation_code'), table_name='mailbox_activation')
op.drop_table('mailbox_activation')
# ### end Alembic commands ###

View file

@ -4,8 +4,13 @@
{{ render_text("Hi") }}
{{ render_text("You have added <b>"+ mailbox_email +"</b> as an additional mailbox.") }}
{{ render_text("To confirm, please click on the button below.") }}
{{ render_button("Confirm mailbox", link) }}
{% if link %}
{{ render_text("To confirm, please click on the button below.") }}
{{ render_button("Confirm mailbox", link) }}
{% else %}
{{ render_text("Please enter <b>"+code+"</b> as your verification code") }}
{% endif %}
{{ render_text("This email will only be valid for the next 15 minutes.") }}
{{ render_text('Thanks,
<br />

View file

@ -5,9 +5,13 @@ Hi
You have added {{mailbox_email}} as an additional mailbox.
{% if link %}
To confirm, please click on this link:
{{link}}
{% else %}
Please enter {{ code }} as your verification code for this mailbox
{% endif %}
This link will only be valid during the next 15 minutes.
{% endblock %}

View file

@ -28,7 +28,7 @@ def test_create_mailbox(flask_client):
)
assert r.status_code == 400
assert r.json == {"error": "gmail.com invalid"}
assert r.json == {"error": "Invalid email"}
def test_create_mailbox_fail_for_free_user(flask_client):

213
tests/test_mailbox_utils.py Normal file
View file

@ -0,0 +1,213 @@
from typing import Optional
import pytest
from app import mailbox_utils, config
from app.db import Session
from app.mail_sender import mail_sender
from app.models import Mailbox, MailboxActivation, User, Job
from tests.utils import create_new_user, random_email
user: Optional[User] = None
def setup_module():
global user
config.SKIP_MX_LOOKUP_ON_CHECK = True
user = create_new_user()
user.trial_end = None
user.lifetime = True
Session.commit()
def teardown_module():
config.SKIP_MX_LOOKUP_ON_CHECK = False
def test_free_user_cannot_add_mailbox():
user.lifetime = False
email = random_email()
try:
with pytest.raises(mailbox_utils.MailboxError):
mailbox_utils.create_mailbox(user, email)
finally:
user.lifetime = True
def test_invalid_email():
user.lifetime = True
with pytest.raises(mailbox_utils.MailboxError):
mailbox_utils.create_mailbox(user, "invalid")
def test_already_used():
user.lifetime = True
with pytest.raises(mailbox_utils.MailboxError):
mailbox_utils.create_mailbox(user, user.email)
@mail_sender.store_emails_test_decorator
def test_create_mailbox():
email = random_email()
mailbox_utils.create_mailbox(user, email)
mailbox = Mailbox.get_by(email=email)
assert mailbox is not None
assert not mailbox.verified
activation = MailboxActivation.get_by(mailbox_id=mailbox.id)
assert activation is not None
assert activation.tries == 0
assert len(activation.code) > 6
assert 1 == len(mail_sender.get_stored_emails())
mail_sent = mail_sender.get_stored_emails()[0]
mail_contents = str(mail_sent.msg)
assert mail_contents.find(config.URL) > 0
assert mail_contents.find(activation.code) > 0
assert mail_sent.envelope_to == email
@mail_sender.store_emails_test_decorator
def test_create_mailbox_with_digits():
email = random_email()
mailbox_utils.create_mailbox(
user, email, use_digit_codes=True, send_verification_link=False
)
mailbox = Mailbox.get_by(email=email)
assert mailbox is not None
assert not mailbox.verified
activation = MailboxActivation.get_by(mailbox_id=mailbox.id)
assert activation is not None
assert activation.tries == 0
assert len(activation.code) == 6
assert 1 == len(mail_sender.get_stored_emails())
mail_sent = mail_sender.get_stored_emails()[0]
mail_contents = str(mail_sent.msg)
assert mail_contents.find(activation.code) > 0
assert mail_contents.find(config.URL) == -1
assert mail_sent.envelope_to == email
@mail_sender.store_emails_test_decorator
def test_send_verification_email():
email = random_email()
mailbox_utils.create_mailbox(
user, email, use_digit_codes=True, send_verification_link=False
)
mailbox = Mailbox.get_by(email=email)
activation = MailboxActivation.get_by(mailbox_id=mailbox.id)
old_code = activation.code
mailbox_utils.send_verification_email(user, mailbox)
activation = MailboxActivation.get_by(mailbox_id=mailbox.id)
assert activation.code != old_code
def test_delete_other_user_mailbox():
other = create_new_user()
mailbox = Mailbox.create(user_id=other.id, email=random_email(), commit=True)
with pytest.raises(mailbox_utils.MailboxError):
mailbox_utils.delete_mailbox(user, mailbox.id, transfer_mailbox_id=None)
def test_delete_default_mailbox():
with pytest.raises(mailbox_utils.MailboxError):
mailbox_utils.delete_mailbox(
user, user.default_mailbox_id, transfer_mailbox_id=None
)
def test_transfer_to_same_mailbox():
email = random_email()
mailbox = mailbox_utils.create_mailbox(
user, email, use_digit_codes=True, send_verification_link=False
)
with pytest.raises(mailbox_utils.MailboxError):
mailbox_utils.delete_mailbox(user, mailbox.id, transfer_mailbox_id=mailbox.id)
def test_transfer_to_other_users_mailbox():
email = random_email()
mailbox = mailbox_utils.create_mailbox(
user, email, use_digit_codes=True, send_verification_link=False
)
other = create_new_user()
other_mailbox = Mailbox.create(user_id=other.id, email=random_email(), commit=True)
with pytest.raises(mailbox_utils.MailboxError):
mailbox_utils.delete_mailbox(
user, mailbox.id, transfer_mailbox_id=other_mailbox.id
)
def test_delete_with_no_transfer():
email = random_email()
mailbox = mailbox_utils.create_mailbox(
user, email, use_digit_codes=True, send_verification_link=False
)
mailbox_utils.delete_mailbox(user, mailbox.id, transfer_mailbox_id=None)
job = Session.query(Job).order_by(Job.id.desc()).first()
assert job is not None
assert job.name == config.JOB_DELETE_MAILBOX
assert job.payload["mailbox_id"] == mailbox.id
assert job.payload["transfer_mailbox_id"] is None
def test_delete_with_transfer():
mailbox = mailbox_utils.create_mailbox(
user, random_email(), use_digit_codes=True, send_verification_link=False
)
transfer_mailbox = mailbox_utils.create_mailbox(
user, random_email(), use_digit_codes=True, send_verification_link=False
)
mailbox_utils.delete_mailbox(
user, mailbox.id, transfer_mailbox_id=transfer_mailbox.id
)
job = Session.query(Job).order_by(Job.id.desc()).first()
assert job is not None
assert job.name == config.JOB_DELETE_MAILBOX
assert job.payload["mailbox_id"] == mailbox.id
assert job.payload["transfer_mailbox_id"] == transfer_mailbox.id
mailbox_utils.delete_mailbox(user, mailbox.id, transfer_mailbox_id=None)
job = Session.query(Job).order_by(Job.id.desc()).first()
assert job is not None
assert job.name == config.JOB_DELETE_MAILBOX
assert job.payload["mailbox_id"] == mailbox.id
assert job.payload["transfer_mailbox_id"] is None
def test_set_default_mailbox():
other = create_new_user()
mailbox = mailbox_utils.create_mailbox(
other,
random_email(),
use_digit_codes=True,
send_verification_link=False,
)
mailbox.verified = True
Session.commit()
mailbox_utils.set_default_mailbox(other, mailbox.id)
other = User.get(other.id)
assert other.default_mailbox_id == mailbox.id
def test_cannot_set_unverified():
mailbox = mailbox_utils.create_mailbox(
user,
random_email(),
use_digit_codes=True,
send_verification_link=False,
)
with pytest.raises(mailbox_utils.MailboxError):
mailbox_utils.set_default_mailbox(user, mailbox.id)
def test_cannot_default_other_user_mailbox():
other = create_new_user()
mailbox = mailbox_utils.create_mailbox(
other,
random_email(),
use_digit_codes=True,
send_verification_link=False,
)
with pytest.raises(mailbox_utils.MailboxError):
mailbox_utils.set_default_mailbox(user, mailbox.id)