feat: alias restore check quota (#2418)

* feat: alias restore check quota

* Update app/dashboard/views/alias_trash.py

Co-authored-by: Adrià Casajús <acasajus@users.noreply.github.com>

* Update app/dashboard/views/alias_trash.py

Co-authored-by: Adrià Casajús <acasajus@users.noreply.github.com>

---------

Co-authored-by: Adrià Casajús <acasajus@users.noreply.github.com>
This commit is contained in:
Carlos Quintana 2025-03-18 12:05:09 +01:00 committed by GitHub
parent a70baad478
commit 84364a1684
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 127 additions and 12 deletions

View file

@ -5,6 +5,7 @@ from app import config, rate_limiter
from app.alias_audit_log_utils import emit_alias_audit_log, AliasAuditLogAction
from app.config import ALIAS_TRASH_DAYS
from app.db import Session
from app.errors import CannotCreateAliasQuotaExceeded
from app.events.event_dispatcher import EventDispatcher
from app.events.generated.event_pb2 import EventContent, AliasDeleted, AliasCreated
from app.log import LOG
@ -140,7 +141,11 @@ def __perform_alias_restore(user: User, alias: Alias) -> None:
),
)
Session.commit()
return
def check_user_can_restore_num_aliases(user: User, num_aliases_to_restore: int):
if not user.can_create_num_aliases(num_aliases_to_restore):
raise CannotCreateAliasQuotaExceeded()
def restore_alias(user: User, alias_id: int) -> None | Alias:
@ -152,6 +157,8 @@ def restore_alias(user: User, alias_id: int) -> None | Alias:
alias = Alias.get_by(id=alias_id, user_id=user.id)
if alias is None:
return None
check_user_can_restore_num_aliases(user, 1)
__perform_alias_restore(user, alias)
newrelic.agent.record_custom_event("RestoreAlias", {"mode": "single"})
newrelic.agent.record_custom_metric("AliasRestored", 1)
@ -164,12 +171,13 @@ def restore_all_alias(user: User) -> int:
for limit in limits:
key = f"alias_restore_all_{limit[1]}:{user.id}"
rate_limiter.check_bucket_limit(key, limit[0], limit[1])
query = (
Session.query(Alias)
.filter(Alias.user_id == user.id, Alias.delete_on != None) # noqa: E711
.enable_eagerloads(False)
.yield_per(50)
)
filters = [Alias.user_id == user.id, Alias.delete_on != None] # noqa: E711
trashed_aliases_count = Session.query(Alias).filter(*filters).count()
check_user_can_restore_num_aliases(user, trashed_aliases_count)
query = Session.query(Alias).filter(*filters).enable_eagerloads(False).yield_per(50)
count = 0
for alias in query.all():
__perform_alias_restore(user, alias)

View file

@ -7,6 +7,7 @@ from app import alias_delete
from app.config import PAGE_LIMIT
from app.dashboard.base import dashboard_bp
from app.db import Session
from app.errors import CannotCreateAliasQuotaExceeded
from app.models import Alias
@ -35,13 +36,19 @@ def alias_trash():
elif action == "restore-one":
try:
alias_id = int(form.alias_id.data)
alias_delete.restore_alias(current_user, alias_id)
flash("Restored alias", "success")
try:
alias_delete.restore_alias(current_user, alias_id)
flash("Restored alias", "success")
except CannotCreateAliasQuotaExceeded:
flash("You do not have enough quota to restore this alias", "error")
except ValueError:
flash("Invalid alias", "warning")
elif action == "restore-all":
count = alias_delete.restore_all_alias(current_user)
flash(f"Restored {count} aliases", "success")
try:
count = alias_delete.restore_all_alias(current_user)
flash(f"Restored {count} aliases", "success")
except CannotCreateAliasQuotaExceeded:
flash("You do not have enough quota to restore all aliases", "error")
alias_in_trash = (
Session.query(Alias)

View file

@ -128,3 +128,10 @@ class ProtonAccountNotVerified(LinkException):
super().__init__(
"The Proton account you are trying to use has not been verified"
)
class CannotCreateAliasQuotaExceeded(SLException):
"""raised when an alias cannot be created because there is no quota left"""
def __init__(self):
super().__init__("You cannot create more aliases")

View file

@ -920,6 +920,9 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
Whether user can create a new alias. User can't create a new alias if
- has more than user.max_alias_for_free_account() aliases in the free plan, *even in the free trial*
"""
return self.can_create_num_aliases(1)
def can_create_num_aliases(self, num_aliases: int) -> bool:
if not self.is_active():
return False
@ -932,7 +935,9 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
active_alias_count = Alias.filter_by(
user_id=self.id, delete_on=None
).count()
return active_alias_count < self.max_alias_for_free_account()
return (
active_alias_count + num_aliases
) <= self.max_alias_for_free_account()
def can_send_or_receive(self) -> bool:
if self.disabled:

View file

@ -8,6 +8,7 @@ from app.alias_audit_log_utils import AliasAuditLogAction
from app.alias_delete import delete_alias, restore_all_alias, clear_trash
from app.alias_delete import perform_alias_deletion, move_alias_to_trash, restore_alias
from app.db import Session
from app.errors import CannotCreateAliasQuotaExceeded
from app.events.event_dispatcher import GlobalDispatcher
from app.models import (
UserAliasDeleteAction,
@ -253,3 +254,90 @@ def test_clear_trash():
deleted_alias = DeletedAlias.get_by(email=alias2.email)
assert deleted_alias is not None
assert deleted_alias.reason == AliasDeleteReason.MailboxDeleted
def test_cannot_restore_single_alias_if_over_quota():
user = create_new_user()
# Max out aliases
aliases = []
while user.can_create_new_alias():
aliases.append(Alias.create_new_random(user))
# Trash one alias
alias_to_trash = aliases[0]
move_alias_to_trash(alias_to_trash, user)
# Create new alias
Alias.create_new_random(user)
# Try to restore trashed alias
with pytest.raises(CannotCreateAliasQuotaExceeded):
restore_alias(user, alias_to_trash.id)
def test_can_restore_single_alias_just_to_quota():
user = create_new_user()
# Max out aliases
aliases = []
while user.can_create_new_alias():
aliases.append(Alias.create_new_random(user))
# Trash one alias
alias_to_trash = aliases[0]
move_alias_to_trash(alias_to_trash, user)
# Create new alias
new_alias = Alias.create_new_random(user)
# Trash that alias too
move_alias_to_trash(new_alias, user)
# Restore first alias
restored_alias = restore_alias(user, alias_to_trash.id)
assert restored_alias is not None
assert restored_alias.id == alias_to_trash.id
assert restored_alias.delete_on is None
assert restored_alias.delete_reason is None
def test_cannot_restore_many_aliases_over_quota():
user = create_new_user()
# Max out aliases
aliases = []
while user.can_create_new_alias():
aliases.append(Alias.create_new_random(user))
# Trash two aliases
alias_to_trash1 = aliases[0]
move_alias_to_trash(alias_to_trash1, user)
alias_to_trash2 = aliases[1]
move_alias_to_trash(alias_to_trash2, user)
# Create new alias
Alias.create_new_random(user)
# Try to restore trashed aliases
with pytest.raises(CannotCreateAliasQuotaExceeded):
restore_all_alias(user)
def test_can_restore_many_aliases_just_to_quota():
user = create_new_user()
# Max out aliases
aliases = []
while user.can_create_new_alias():
aliases.append(Alias.create_new_random(user))
# Trash two aliases
alias_to_trash1 = aliases[0]
move_alias_to_trash(alias_to_trash1, user)
alias_to_trash2 = aliases[1]
move_alias_to_trash(alias_to_trash2, user)
count = restore_all_alias(user)
assert count == 2