mirror of
https://github.com/simple-login/app.git
synced 2024-09-20 06:55:59 +08:00
Reformat base64 encoded messages to shorter lines (#1575)
* Reformat base64 encoded messages to shorter lines * Remove storing debug versions * Add example test email * Update linelength to 76 * Revert changes in pre-commit --------- Co-authored-by: Adrià Casajús <adria.casajus@proton.ch>
This commit is contained in:
parent
62ba2844f3
commit
3fcb37f246
|
@ -21,3 +21,4 @@ repos:
|
|||
- id: djlint-jinja
|
||||
files: '.*\.html'
|
||||
entry: djlint --reformat
|
||||
|
||||
|
|
|
@ -17,7 +17,7 @@ from attr import dataclass
|
|||
from app import config
|
||||
from app.email import headers
|
||||
from app.log import LOG
|
||||
from app.message_utils import message_to_bytes
|
||||
from app.message_utils import message_to_bytes, message_format_base64_parts
|
||||
|
||||
|
||||
@dataclass
|
||||
|
@ -173,8 +173,12 @@ class MailSender:
|
|||
self._save_request_to_unsent_dir(send_request)
|
||||
return False
|
||||
|
||||
def _save_request_to_unsent_dir(self, send_request: SendRequest):
|
||||
file_name = f"DeliveryFail-{int(time.time())}-{uuid.uuid4()}.{SendRequest.SAVE_EXTENSION}"
|
||||
def _save_request_to_unsent_dir(
|
||||
self, send_request: SendRequest, prefix: str = "DeliveryFail"
|
||||
):
|
||||
file_name = (
|
||||
f"{prefix}-{int(time.time())}-{uuid.uuid4()}.{SendRequest.SAVE_EXTENSION}"
|
||||
)
|
||||
file_path = os.path.join(config.SAVE_UNSENT_DIR, file_name)
|
||||
file_contents = send_request.to_bytes()
|
||||
with open(file_path, "wb") as fd:
|
||||
|
@ -256,7 +260,7 @@ def sl_sendmail(
|
|||
send_request = SendRequest(
|
||||
envelope_from,
|
||||
envelope_to,
|
||||
msg,
|
||||
message_format_base64_parts(msg),
|
||||
mail_options,
|
||||
rcpt_options,
|
||||
is_forward,
|
||||
|
|
|
@ -1,21 +1,42 @@
|
|||
import re
|
||||
from email import policy
|
||||
from email.message import Message
|
||||
|
||||
from app.email import headers
|
||||
from app.log import LOG
|
||||
|
||||
# Spam assassin might flag as spam with a different line length
|
||||
BASE64_LINELENGTH = 76
|
||||
|
||||
|
||||
def message_to_bytes(msg: Message) -> bytes:
|
||||
"""replace Message.as_bytes() method by trying different policies"""
|
||||
for generator_policy in [None, policy.SMTP, policy.SMTPUTF8]:
|
||||
try:
|
||||
return msg.as_bytes(policy=generator_policy)
|
||||
except:
|
||||
except Exception:
|
||||
LOG.w("as_bytes() fails with %s policy", policy, exc_info=True)
|
||||
|
||||
msg_string = msg.as_string()
|
||||
try:
|
||||
return msg_string.encode()
|
||||
except:
|
||||
except Exception:
|
||||
LOG.w("as_string().encode() fails", exc_info=True)
|
||||
|
||||
return msg_string.encode(errors="replace")
|
||||
|
||||
|
||||
def message_format_base64_parts(msg: Message) -> Message:
|
||||
for part in msg.walk():
|
||||
if part.get(
|
||||
headers.CONTENT_TRANSFER_ENCODING
|
||||
) == "base64" and part.get_content_type() in ("text/plain", "text/html"):
|
||||
# Remove line breaks
|
||||
body = re.sub("[\r\n]", "", part.get_payload())
|
||||
# Split in 80 column lines
|
||||
chunks = [
|
||||
body[i : i + BASE64_LINELENGTH]
|
||||
for i in range(0, len(body), BASE64_LINELENGTH)
|
||||
]
|
||||
part.set_payload("\r\n".join(chunks))
|
||||
return msg
|
||||
|
|
39
tests/example_emls/bad_base64format.eml
Normal file
39
tests/example_emls/bad_base64format.eml
Normal file
File diff suppressed because one or more lines are too long
|
@ -2,7 +2,8 @@ import email
|
|||
from app.email_utils import (
|
||||
copy,
|
||||
)
|
||||
from app.message_utils import message_to_bytes
|
||||
from app.message_utils import message_to_bytes, message_format_base64_parts
|
||||
from tests.utils import load_eml_file
|
||||
|
||||
|
||||
def test_copy():
|
||||
|
@ -33,3 +34,13 @@ def test_to_bytes():
|
|||
|
||||
msg = email.message_from_string("éèà€")
|
||||
assert message_to_bytes(msg).decode() == "\néèà€"
|
||||
|
||||
|
||||
def test_base64_line_breaks():
|
||||
msg = load_eml_file("bad_base64format.eml")
|
||||
msg = message_format_base64_parts(msg)
|
||||
for part in msg.walk():
|
||||
if part.get("content-transfer-encoding") == "base64":
|
||||
body = part.get_payload()
|
||||
for line in body.splitlines():
|
||||
assert len(line) <= 76
|
||||
|
|
Loading…
Reference in a new issue