mirror of
https://github.com/simple-login/app.git
synced 2025-02-23 23:34:05 +08:00
WIP: Import test
This commit is contained in:
parent
c6646d5971
commit
44ae20816a
3 changed files with 121 additions and 81 deletions
84
app/import_utils.py
Normal file
84
app/import_utils.py
Normal file
|
@ -0,0 +1,84 @@
|
|||
from .log import LOG
|
||||
|
||||
import requests
|
||||
|
||||
from app import s3
|
||||
from app.extensions import db
|
||||
from app.models import BatchImport
|
||||
|
||||
def handle_batch_import(batch_import: BatchImport):
|
||||
user = batch_import.user
|
||||
|
||||
batch_import.processed = True
|
||||
db.session.commit()
|
||||
|
||||
LOG.debug("Start batch import for %s %s", batch_import, user)
|
||||
file_url = s3.get_url(batch_import.file.path)
|
||||
|
||||
LOG.d("Download file %s from %s", batch_import.file, file_url)
|
||||
r = requests.get(file_url)
|
||||
lines = [line.decode() for line in r.iter_lines()]
|
||||
reader = csv.DictReader(lines)
|
||||
|
||||
for row in reader:
|
||||
try:
|
||||
full_alias = sanitize_email(row["alias"])
|
||||
note = row["note"]
|
||||
except KeyError:
|
||||
LOG.warning("Cannot parse row %s", row)
|
||||
continue
|
||||
|
||||
alias_domain = get_email_domain_part(full_alias)
|
||||
custom_domain = CustomDomain.get_by(domain=alias_domain)
|
||||
|
||||
if (
|
||||
not custom_domain
|
||||
or not custom_domain.verified
|
||||
or custom_domain.user_id != user.id
|
||||
):
|
||||
LOG.debug("domain %s can't be used %s", alias_domain, user)
|
||||
continue
|
||||
|
||||
if (
|
||||
Alias.get_by(email=full_alias)
|
||||
or DeletedAlias.get_by(email=full_alias)
|
||||
or DomainDeletedAlias.get_by(email=full_alias)
|
||||
):
|
||||
LOG.d("alias already used %s", full_alias)
|
||||
continue
|
||||
|
||||
mailboxes = []
|
||||
|
||||
if "mailboxes" in row:
|
||||
for mailbox_email in row["mailboxes"].split():
|
||||
mailbox_email = sanitize_email(mailbox_email)
|
||||
mailbox = Mailbox.get_by(email=mailbox_email)
|
||||
|
||||
if not mailbox or not mailbox.verified or mailbox.user_id != user.id:
|
||||
LOG.d("mailbox %s can't be used %s", mailbox, user)
|
||||
continue
|
||||
|
||||
mailboxes.append(mailbox.id)
|
||||
|
||||
if len(mailboxes) == 0:
|
||||
mailboxes = [user.default_mailbox_id]
|
||||
|
||||
alias = Alias.create(
|
||||
user_id=user.id,
|
||||
email=full_alias,
|
||||
note=note,
|
||||
mailbox_id=mailboxes[0],
|
||||
custom_domain_id=custom_domain.id,
|
||||
batch_import_id=batch_import.id,
|
||||
)
|
||||
db.session.commit()
|
||||
db.session.flush()
|
||||
LOG.d("Create %s", alias)
|
||||
|
||||
for i in range(1, len(mailboxes)):
|
||||
alias_mailbox = AliasMailbox.create(
|
||||
alias_id=alias.id,
|
||||
mailbox_id=mailboxes[i],
|
||||
)
|
||||
db.session.commit()
|
||||
LOG.d("Create %s", alias_mailbox)
|
|
@ -20,6 +20,7 @@ from app.email_utils import (
|
|||
render,
|
||||
get_email_domain_part,
|
||||
)
|
||||
from app.import_utils import handle_batch_import
|
||||
from app.utils import sanitize_email
|
||||
from app.extensions import db
|
||||
from app.log import LOG
|
||||
|
@ -113,84 +114,6 @@ def onboarding_mailbox(user):
|
|||
)
|
||||
|
||||
|
||||
def handle_batch_import(batch_import: BatchImport):
|
||||
user = batch_import.user
|
||||
|
||||
batch_import.processed = True
|
||||
db.session.commit()
|
||||
|
||||
LOG.debug("Start batch import for %s %s", batch_import, user)
|
||||
file_url = s3.get_url(batch_import.file.path)
|
||||
|
||||
LOG.d("Download file %s from %s", batch_import.file, file_url)
|
||||
r = requests.get(file_url)
|
||||
lines = [line.decode() for line in r.iter_lines()]
|
||||
reader = csv.DictReader(lines)
|
||||
|
||||
for row in reader:
|
||||
try:
|
||||
full_alias = sanitize_email(row["alias"])
|
||||
note = row["note"]
|
||||
except KeyError:
|
||||
LOG.warning("Cannot parse row %s", row)
|
||||
continue
|
||||
|
||||
alias_domain = get_email_domain_part(full_alias)
|
||||
custom_domain = CustomDomain.get_by(domain=alias_domain)
|
||||
|
||||
if (
|
||||
not custom_domain
|
||||
or not custom_domain.verified
|
||||
or custom_domain.user_id != user.id
|
||||
):
|
||||
LOG.debug("domain %s can't be used %s", alias_domain, user)
|
||||
continue
|
||||
|
||||
if (
|
||||
Alias.get_by(email=full_alias)
|
||||
or DeletedAlias.get_by(email=full_alias)
|
||||
or DomainDeletedAlias.get_by(email=full_alias)
|
||||
):
|
||||
LOG.d("alias already used %s", full_alias)
|
||||
continue
|
||||
|
||||
mailboxes = []
|
||||
|
||||
if "mailboxes" in row:
|
||||
for mailbox_email in row["mailboxes"].split():
|
||||
mailbox_email = sanitize_email(mailbox_email)
|
||||
mailbox = Mailbox.get_by(email=mailbox_email)
|
||||
|
||||
if not mailbox or not mailbox.verified or mailbox.user_id != user.id:
|
||||
LOG.d("mailbox %s can't be used %s", mailbox, user)
|
||||
continue
|
||||
|
||||
mailboxes.append(mailbox.id)
|
||||
|
||||
if len(mailboxes) == 0:
|
||||
mailboxes = [user.default_mailbox_id]
|
||||
|
||||
alias = Alias.create(
|
||||
user_id=user.id,
|
||||
email=full_alias,
|
||||
note=note,
|
||||
mailbox_id=mailboxes[0],
|
||||
custom_domain_id=custom_domain.id,
|
||||
batch_import_id=batch_import.id,
|
||||
)
|
||||
db.session.commit()
|
||||
db.session.flush()
|
||||
LOG.d("Create %s", alias)
|
||||
|
||||
for i in range(1, len(mailboxes)):
|
||||
alias_mailbox = AliasMailbox.create(
|
||||
alias_id=alias.id,
|
||||
mailbox_id=mailboxes[i],
|
||||
)
|
||||
db.session.commit()
|
||||
LOG.d("Create %s", alias_mailbox)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
while True:
|
||||
# run a job 1h earlier or later is not a big deal ...
|
||||
|
|
|
@ -1,8 +1,13 @@
|
|||
from io import BytesIO
|
||||
from os import path
|
||||
|
||||
from flask import url_for
|
||||
|
||||
from app import alias_utils
|
||||
from app import alias_utils, s3
|
||||
from app.extensions import db
|
||||
from app.models import User, CustomDomain, Mailbox, Alias, AliasMailbox, ApiKey
|
||||
from app.models import User, CustomDomain, Mailbox, Alias, AliasMailbox, ApiKey, File, BatchImport
|
||||
from app.import_utils import handle_batch_import
|
||||
from app.utils import random_string
|
||||
|
||||
def test_export(flask_client):
|
||||
# Create users
|
||||
|
@ -101,3 +106,31 @@ def test_export(flask_client):
|
|||
ebay@my-domain.com,Used on eBay,True,destination@my-destination-domain.com
|
||||
facebook@my-domain.com,"Used on Facebook, Instagram.",True,destination@my-destination-domain.com destination2@my-destination-domain.com
|
||||
""".replace("\n", "\r\n").encode()
|
||||
|
||||
def test_import_no_mailboxes(flask_client):
|
||||
# Create user
|
||||
user = User.create(
|
||||
email="a@b.c",
|
||||
password="password",
|
||||
name="Test User",
|
||||
activated=True
|
||||
)
|
||||
db.session.commit()
|
||||
|
||||
alias_file = BytesIO(b"""alias,note,enabled
|
||||
ebay@my-domain.com,Used on eBay,True
|
||||
facebook@my-domain.com,"Used on Facebook, Instagram.",True
|
||||
""")
|
||||
|
||||
file_path = random_string(20) + ".csv"
|
||||
file = File.create(user_id=user.id, path=file_path)
|
||||
s3.upload_from_bytesio(file_path, alias_file)
|
||||
db.session.flush()
|
||||
|
||||
batch_import = BatchImport.create(
|
||||
user_id=user.id,
|
||||
file_id=file.id
|
||||
)
|
||||
db.session.commit()
|
||||
|
||||
handle_batch_import(batch_import)
|
Loading…
Reference in a new issue