Spawn a delivery thread for EmailSubmission/set requests (closes #1540)

This commit is contained in:
mdecimus 2025-05-23 13:03:11 +02:00
parent e2ce576596
commit a138fe33d6
7 changed files with 100 additions and 83 deletions

View file

@ -34,6 +34,7 @@ This version includes **breaking changes** to the database layout and requires a
- Hide the current server version (#1435).
- Use the newest `X-Spam-Status` Header (#1308).
- MySQL Driver error: Transactions couldn't be nested (#1271).
- Spawn a delivery thread for `EmailSubmission/set` requests (#1540).
## [0.11.8] - 2025-04-30

View file

@ -582,7 +582,7 @@ impl EmailSubmissionSet for Server {
self.clone(),
instance.clone(),
SessionData::local(
Box::pin(self.get_access_token(account_id))
self.get_access_token(account_id)
.await
.caused_by(trc::location!())?,
None,
@ -592,71 +592,88 @@ impl EmailSubmissionSet for Server {
),
);
// MAIL FROM
let _ = Box::pin(session.handle_mail_from(mail_from)).await;
if let Some(error) = session.has_failed() {
return Ok(Err(SetError::new(SetErrorType::ForbiddenMailFrom)
.with_description(format!(
"Server rejected MAIL-FROM: {}",
error.trim()
))));
}
// RCPT TO
let mut responses = Vec::new();
let mut has_success = false;
for rcpt in rcpt_to {
let addr = rcpt.address.clone();
let _ = Box::pin(session.handle_rcpt_to(rcpt)).await;
let response = session.has_failed();
if response.is_none() {
has_success = true;
// Spawn SMTP session to avoid overflowing the stack
let handle = tokio::spawn(async move {
// MAIL FROM
let _ = session.handle_mail_from(mail_from).await;
if let Some(error) = session.has_failed() {
return Err(SetError::new(SetErrorType::ForbiddenMailFrom)
.with_description(format!("Server rejected MAIL-FROM: {}", error.trim())));
}
responses.push((addr, response));
}
// DATA
if has_success {
session.data.message = message;
let response = Box::pin(session.queue_message()).await;
if let smtp::core::State::Accepted(queue_id) = session.state {
submission.queue_id = Some(queue_id);
// RCPT TO
let mut responses = Vec::new();
let mut has_success = false;
for rcpt in rcpt_to {
let addr = rcpt.address.clone();
let _ = session.handle_rcpt_to(rcpt).await;
let response = session.has_failed();
if response.is_none() {
has_success = true;
}
responses.push((addr, response));
}
// DATA
if has_success {
session.data.message = message;
let response = session.queue_message().await;
if let smtp::core::State::Accepted(queue_id) = session.state {
Ok((true, responses, Some(queue_id)))
} else {
Err(
SetError::new(SetErrorType::ForbiddenToSend).with_description(format!(
"Server rejected DATA: {}",
std::str::from_utf8(&response).unwrap().trim()
)),
)
}
} else {
return Ok(Err(SetError::new(SetErrorType::ForbiddenToSend)
.with_description(format!(
"Server rejected DATA: {}",
std::str::from_utf8(&response).unwrap().trim()
))));
Ok((false, responses, None))
}
});
match handle.await {
Ok(Ok((has_success, responses, queue_id))) => {
// Set queue ID
if let Some(queue_id) = queue_id {
submission.queue_id = Some(queue_id);
}
// Set responses
submission.undo_status = if has_success {
UndoStatus::Final
} else {
UndoStatus::Pending
};
submission.delivery_status = responses
.into_iter()
.map(|(addr, response)| {
(
addr.to_string(),
DeliveryStatus {
delivered: if response.is_none() {
Delivered::Unknown
} else {
Delivered::No
},
smtp_reply: response
.map(|s| s.to_string())
.unwrap_or_else(|| "250 2.1.5 Queued".to_string()),
displayed: false,
},
)
})
.collect();
Ok(Ok(submission))
}
Ok(Err(err)) => Ok(Err(err)),
Err(err) => Err(trc::EventType::Server(trc::ServerEvent::ThreadError)
.reason(err)
.caused_by(trc::location!())
.details("Join Error")),
}
// Set responses
submission.undo_status = if has_success {
UndoStatus::Final
} else {
UndoStatus::Pending
};
submission.delivery_status = responses
.into_iter()
.map(|(addr, response)| {
(
addr.to_string(),
DeliveryStatus {
delivered: if response.is_none() {
Delivered::Unknown
} else {
Delivered::No
},
smtp_reply: response
.map(|s| s.to_string())
.unwrap_or_else(|| "250 2.1.5 Queued".to_string()),
displayed: false,
},
)
})
.collect();
Ok(Ok(submission))
}
}

View file

@ -240,7 +240,9 @@ async fn build_server(mut config: Config, stores: Stores) -> (Server, watch::Sen
const SERVER: &str = r#"
[server]
hostname = "'server{NODE_ID}.example.org'"
http.url = "'https://127.0.0.1:800{NODE_ID}'"
[http]
url = "'https://127.0.0.1:800{NODE_ID}'"
[cluster]
node-id = {NODE_ID}

View file

@ -19,7 +19,9 @@ use crate::{AssertConfig, add_test_certs};
const MOCK_HTTP_SERVER: &str = r#"
[server]
hostname = "'oidc.example.org'"
http.url = "'https://127.0.0.1:9090'"
[http]
url = "'https://127.0.0.1:9090'"
[server.listener.jmap]
bind = ['127.0.0.1:9090']

View file

@ -79,19 +79,8 @@ pub mod vacation_response;
pub mod webhooks;
pub mod websocket;
#[test]
fn jmap_tests() {
tokio::runtime::Builder::new_multi_thread()
.thread_stack_size(3 * 1024 * 1024)
.enable_all()
.build()
.unwrap()
.block_on(async {
jmap_tests_().await;
})
}
async fn jmap_tests_() {
#[tokio::test(flavor = "multi_thread")]
async fn jmap_tests() {
let delete = true;
let mut params = init_jmap_tests(
&std::env::var("STORE")
@ -100,8 +89,8 @@ async fn jmap_tests_() {
)
.await;
/*webhooks::test(&mut params).await;
email_query::test(&mut params, delete).await;
webhooks::test(&mut params).await;
/*email_query::test(&mut params, delete).await;
email_get::test(&mut params).await;
email_set::test(&mut params).await;
email_parse::test(&mut params).await;
@ -113,11 +102,11 @@ async fn jmap_tests_() {
thread_merge::test(&mut params).await;
mailbox::test(&mut params).await;
delivery::test(&mut params).await;
auth_acl::test(&mut params).await;
auth_acl::test(&mut params).await;*/
auth_limits::test(&mut params).await;
auth_oauth::test(&mut params).await;
event_source::test(&mut params).await;
push_subscription::test(&mut params).await;*/
push_subscription::test(&mut params).await;
sieve_script::test(&mut params).await;
vacation_response::test(&mut params).await;
email_submission::test(&mut params).await;
@ -736,7 +725,9 @@ impl<T: Debug> Response<T> {
const SERVER: &str = r#"
[server]
hostname = "'jmap.example.org'"
http.url = "'https://127.0.0.1:8899'"
[http]
url = "'https://127.0.0.1:8899'"
[server.listener.jmap]
bind = ["127.0.0.1:8899"]

View file

@ -40,7 +40,9 @@ use super::JMAPTest;
const SERVER: &str = r#"
[server]
hostname = "'jmap-push.example.org'"
http.url = "'https://127.0.0.1:9000'"
[http]
url = "'https://127.0.0.1:9000'"
[server.listener.jmap]
bind = ['127.0.0.1:9000']

View file

@ -966,7 +966,9 @@ fn generate_random_name(length: usize) -> String {
const SERVER: &str = r#"
[server]
hostname = "webdav.example.org"
http.url = "'https://127.0.0.1:8899'"
[http]
url = "'https://127.0.0.1:8899'"
[server.listener.webdav]
bind = ["127.0.0.1:8899"]