From 5b236e00aeffbf85425b5fd0ceb75b2e2901310c Mon Sep 17 00:00:00 2001 From: mdecimus Date: Mon, 6 May 2024 12:55:31 +0200 Subject: [PATCH] Added retry_num, notify_num, last_error, last_status variables to queue expressions --- crates/common/src/config/smtp/mod.rs | 32 +++++- crates/common/src/expr/mod.rs | 22 ++++ crates/smtp/src/inbound/data.rs | 26 +++-- crates/smtp/src/outbound/delivery.rs | 132 +++++++++------------ crates/smtp/src/queue/dsn.rs | 20 ++-- crates/smtp/src/queue/mod.rs | 124 +++++++++++++------- crates/smtp/src/queue/quota.rs | 18 ++- crates/smtp/src/queue/spool.rs | 28 +++-- crates/utils/src/config/mod.rs | 2 + crates/utils/src/config/utils.rs | 9 ++ crates/utils/src/suffixlist.rs | 12 +- tests/src/smtp/outbound/fallback_relay.rs | 134 ++++++++++++++++++++++ tests/src/smtp/outbound/mod.rs | 1 + tests/src/smtp/outbound/throttle.rs | 44 +++++-- tests/src/smtp/outbound/tls.rs | 4 +- tests/src/smtp/queue/dsn.rs | 1 - tests/src/smtp/queue/manager.rs | 1 - 17 files changed, 427 insertions(+), 183 deletions(-) create mode 100644 tests/src/smtp/outbound/fallback_relay.rs diff --git a/crates/common/src/config/smtp/mod.rs b/crates/common/src/config/smtp/mod.rs index 5aca0155..f5eba933 100644 --- a/crates/common/src/config/smtp/mod.rs +++ b/crates/common/src/config/smtp/mod.rs @@ -86,7 +86,7 @@ pub(crate) const SMTP_RCPT_TO_VARS: &[u32; 15] = &[ V_PRIORITY, V_HELO_DOMAIN, ]; -pub(crate) const SMTP_QUEUE_HOST_VARS: &[u32; 9] = &[ +pub(crate) const SMTP_QUEUE_HOST_VARS: &[u32; 14] = &[ V_SENDER, V_SENDER_DOMAIN, V_RECIPIENT_DOMAIN, @@ -96,22 +96,46 @@ pub(crate) const SMTP_QUEUE_HOST_VARS: &[u32; 9] = &[ V_PRIORITY, V_REMOTE_IP, V_LOCAL_IP, + V_QUEUE_RETRY_NUM, + V_QUEUE_NOTIFY_NUM, + V_QUEUE_EXPIRES_IN, + V_QUEUE_LAST_STATUS, + V_QUEUE_LAST_ERROR, ]; -pub(crate) const SMTP_QUEUE_RCPT_VARS: &[u32; 5] = &[ +pub(crate) const SMTP_QUEUE_RCPT_VARS: &[u32; 10] = &[ V_RECIPIENT_DOMAIN, V_RECIPIENTS, V_SENDER, V_SENDER_DOMAIN, V_PRIORITY, + V_QUEUE_RETRY_NUM, + V_QUEUE_NOTIFY_NUM, + V_QUEUE_EXPIRES_IN, + V_QUEUE_LAST_STATUS, + V_QUEUE_LAST_ERROR, ]; -pub(crate) const SMTP_QUEUE_SENDER_VARS: &[u32; 3] = &[V_SENDER, V_SENDER_DOMAIN, V_PRIORITY]; -pub(crate) const SMTP_QUEUE_MX_VARS: &[u32; 6] = &[ +pub(crate) const SMTP_QUEUE_SENDER_VARS: &[u32; 8] = &[ + V_SENDER, + V_SENDER_DOMAIN, + V_PRIORITY, + V_QUEUE_RETRY_NUM, + V_QUEUE_NOTIFY_NUM, + V_QUEUE_EXPIRES_IN, + V_QUEUE_LAST_STATUS, + V_QUEUE_LAST_ERROR, +]; +pub(crate) const SMTP_QUEUE_MX_VARS: &[u32; 11] = &[ V_RECIPIENT_DOMAIN, V_RECIPIENTS, V_SENDER, V_SENDER_DOMAIN, V_PRIORITY, V_MX, + V_QUEUE_RETRY_NUM, + V_QUEUE_NOTIFY_NUM, + V_QUEUE_EXPIRES_IN, + V_QUEUE_LAST_STATUS, + V_QUEUE_LAST_ERROR, ]; impl SmtpConfig { diff --git a/crates/common/src/expr/mod.rs b/crates/common/src/expr/mod.rs index 4b8ea2fb..bad63d02 100644 --- a/crates/common/src/expr/mod.rs +++ b/crates/common/src/expr/mod.rs @@ -43,6 +43,11 @@ pub const V_PRIORITY: u32 = 12; pub const V_PROTOCOL: u32 = 13; pub const V_TLS: u32 = 14; pub const V_RECIPIENTS: u32 = 15; +pub const V_QUEUE_RETRY_NUM: u32 = 16; +pub const V_QUEUE_NOTIFY_NUM: u32 = 17; +pub const V_QUEUE_EXPIRES_IN: u32 = 18; +pub const V_QUEUE_LAST_STATUS: u32 = 19; +pub const V_QUEUE_LAST_ERROR: u32 = 20; pub const VARIABLES_MAP: &[(&str, u32)] = &[ ("rcpt", V_RECIPIENT), @@ -61,6 +66,11 @@ pub const VARIABLES_MAP: &[(&str, u32)] = &[ ("protocol", V_PROTOCOL), ("is_tls", V_TLS), ("recipients", V_RECIPIENTS), + ("retry_num", V_QUEUE_RETRY_NUM), + ("notify_num", V_QUEUE_NOTIFY_NUM), + ("expires_in", V_QUEUE_EXPIRES_IN), + ("last_status", V_QUEUE_LAST_STATUS), + ("last_error", V_QUEUE_LAST_ERROR), ]; use regex::Regex; @@ -215,12 +225,24 @@ impl From for Variable<'_> { } } +impl From for Variable<'_> { + fn from(value: u64) -> Self { + Variable::Integer(value as i64) + } +} + impl From for Variable<'_> { fn from(value: i32) -> Self { Variable::Integer(value as i64) } } +impl From for Variable<'_> { + fn from(value: u32) -> Self { + Variable::Integer(value as i64) + } +} + impl From for Variable<'_> { fn from(value: u16) -> Self { Variable::Integer(value as i64) diff --git a/crates/smtp/src/inbound/data.rs b/crates/smtp/src/inbound/data.rs index 8c2ef8f6..13bb1bc5 100644 --- a/crates/smtp/src/inbound/data.rs +++ b/crates/smtp/src/inbound/data.rs @@ -46,7 +46,7 @@ use utils::config::Rate; use crate::{ core::{Session, SessionAddress, State}, - queue::{self, Message, SimpleEnvelope}, + queue::{self, Message, QueueEnvelope, Schedule}, scripts::ScriptResult, }; @@ -751,7 +751,16 @@ impl Session { .last() .map_or(true, |d| d.domain != rcpt.domain) { - let envelope = SimpleEnvelope::new(&message, &rcpt.domain); + let rcpt_idx = message.domains.len(); + message.domains.push(queue::Domain { + retry: Schedule::now(), + notify: Schedule::now(), + expires: 0, + status: queue::Status::Scheduled, + domain: rcpt.domain, + }); + + let envelope = QueueEnvelope::new(&message, rcpt_idx); // Set next retry time let retry = if self.data.future_release == 0 { @@ -816,14 +825,11 @@ impl Session { (notify, now() + expire_secs) }; - message.domains.push(queue::Domain { - retry, - notify, - expires, - status: queue::Status::Scheduled, - domain: rcpt.domain, - disable_tls: false, - }); + // Update domain + let domain = message.domains.last_mut().unwrap(); + domain.retry = retry; + domain.notify = notify; + domain.expires = expires; } message.recipients.push(queue::Recipient { diff --git a/crates/smtp/src/outbound/delivery.rs b/crates/smtp/src/outbound/delivery.rs index f046b37f..2cad4304 100644 --- a/crates/smtp/src/outbound/delivery.rs +++ b/crates/smtp/src/outbound/delivery.rs @@ -162,11 +162,10 @@ impl DeliveryAttempt { let queue_config = &core.core.smtp.queue; let mut on_hold = Vec::new(); let no_ip = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)); - - let mut domains = std::mem::take(&mut message.domains); let mut recipients = std::mem::take(&mut message.recipients); - 'next_domain: for (domain_idx, domain) in domains.iter_mut().enumerate() { + 'next_domain: for domain_idx in 0..message.domains.len() { // Only process domains due for delivery + let domain = &message.domains[domain_idx]; if !matches!(&domain.status, Status::Scheduled | Status::TemporaryFailure(_) if domain.retry.due <= now()) { @@ -182,13 +181,7 @@ impl DeliveryAttempt { ); // Build envelope - let mut envelope = QueueEnvelope { - message: &message, - domain: &domain.domain, - mx: "", - remote_ip: no_ip, - local_ip: no_ip, - }; + let mut envelope = QueueEnvelope::new(&message, domain_idx); // Throttle recipient domain let mut in_flight = Vec::new(); @@ -197,7 +190,7 @@ impl DeliveryAttempt { .is_allowed(throttle, &envelope, &mut in_flight, &span) .await { - domain.set_throttle_error(err, &mut on_hold); + message.domains[domain_idx].set_throttle_error(err, &mut on_hold); continue 'next_domain; } } @@ -221,14 +214,12 @@ impl DeliveryAttempt { .await; // Update status for the current domain and continue with the next one - domain.set_status( - delivery_result, - &core - .core - .eval_if::, _>(&queue_config.retry, &envelope) - .await - .unwrap_or_else(|| vec![Duration::from_secs(60)]), - ); + let schedule = core + .core + .eval_if::, _>(&queue_config.retry, &envelope) + .await + .unwrap_or_else(|| vec![Duration::from_secs(60)]); + message.domains[domain_idx].set_status(delivery_result, &schedule); continue 'next_domain; } Some(next_hop) => ( @@ -239,7 +230,6 @@ impl DeliveryAttempt { }; // Prepare TLS strategy - let mut disable_tls = false; let mut tls_strategy = TlsStrategy { mta_sts: core .core @@ -271,7 +261,7 @@ impl DeliveryAttempt { .smtp .resolvers .dns - .txt_lookup::(format!("_smtp._tls.{}.", envelope.domain)) + .txt_lookup::(format!("_smtp._tls.{}.", domain.domain)) .await { Ok(record) => { @@ -300,7 +290,7 @@ impl DeliveryAttempt { let mta_sts_policy = if tls_strategy.try_mta_sts() && is_smtp { match core .lookup_mta_sts_policy( - envelope.domain, + &domain.domain, core.core .eval_if(&queue_config.timeout.mta_sts, &envelope) .await @@ -326,7 +316,7 @@ impl DeliveryAttempt { if tls_strategy.is_mta_sts_required() { core.schedule_report(TlsEvent { policy: PolicyType::Sts(None), - domain: envelope.domain.to_string(), + domain: domain.domain.to_string(), failure: FailureDetails::new(ResultType::Other) .with_failure_reason_code("MTA-STS is required and no policy was found.") .into(), @@ -340,7 +330,7 @@ impl DeliveryAttempt { _ => { core.schedule_report(TlsEvent { policy: PolicyType::Sts(None), - domain: envelope.domain.to_string(), + domain: domain.domain.to_string(), failure: FailureDetails::new(&err) .with_failure_reason_code(err.to_string()) .into(), @@ -360,14 +350,12 @@ impl DeliveryAttempt { "Failed to retrieve MTA-STS policy: {}", err ); - domain.set_status( - err, - &core - .core - .eval_if::, _>(&queue_config.retry, &envelope) - .await - .unwrap_or_else(|| vec![Duration::from_secs(60)]), - ); + let schedule = core + .core + .eval_if::, _>(&queue_config.retry, &envelope) + .await + .unwrap_or_else(|| vec![Duration::from_secs(60)]); + message.domains[domain_idx].set_status(err, &schedule); continue 'next_domain; } else { tracing::debug!( @@ -399,14 +387,12 @@ impl DeliveryAttempt { event = "mx-lookup-failed", reason = %err, ); - domain.set_status( - err, - &core - .core - .eval_if::, _>(&queue_config.retry, &envelope) - .await - .unwrap_or_else(|| vec![Duration::from_secs(60)]), - ); + let schedule = core + .core + .eval_if::, _>(&queue_config.retry, &envelope) + .await + .unwrap_or_else(|| vec![Duration::from_secs(60)]); + message.domains[domain_idx].set_status(err, &schedule); continue 'next_domain; } }; @@ -426,15 +412,16 @@ impl DeliveryAttempt { event = "null-mx", reason = "Domain does not accept messages (mull MX)", ); - domain.set_status( + let schedule = core + .core + .eval_if::, _>(&queue_config.retry, &envelope) + .await + .unwrap_or_else(|| vec![Duration::from_secs(60)]); + message.domains[domain_idx].set_status( Status::PermanentFailure(Error::DnsError( "Domain does not accept messages (null MX)".to_string(), )), - &core - .core - .eval_if::, _>(&queue_config.retry, &envelope) - .await - .unwrap_or_else(|| vec![Duration::from_secs(60)]), + &schedule, ); continue 'next_domain; } @@ -456,7 +443,7 @@ impl DeliveryAttempt { if let Some(tls_report) = &tls_report { core.schedule_report(TlsEvent { policy: mta_sts_policy.into(), - domain: envelope.domain.to_string(), + domain: domain.domain.to_string(), failure: FailureDetails::new(ResultType::ValidationFailure) .with_receiving_mx_hostname(envelope.mx) .with_failure_reason_code("MX not authorized by policy.") @@ -543,7 +530,7 @@ impl DeliveryAttempt { if let Some(tls_report) = &tls_report { core.schedule_report(TlsEvent { policy: tlsa.into(), - domain: envelope.domain.to_string(), + domain: domain.domain.to_string(), failure: FailureDetails::new(ResultType::TlsaInvalid) .with_receiving_mx_hostname(envelope.mx) .with_failure_reason_code("Invalid TLSA record.") @@ -573,7 +560,7 @@ impl DeliveryAttempt { if let Some(tls_report) = &tls_report { core.schedule_report(TlsEvent { policy: PolicyType::Tlsa(None), - domain: envelope.domain.to_string(), + domain: domain.domain.to_string(), failure: FailureDetails::new(ResultType::DaneRequired) .with_receiving_mx_hostname(envelope.mx) .with_failure_reason_code( @@ -619,7 +606,7 @@ impl DeliveryAttempt { if let Some(tls_report) = &tls_report { core.schedule_report(TlsEvent { policy: PolicyType::Tlsa(None), - domain: envelope.domain.to_string(), + domain: domain.domain.to_string(), failure: FailureDetails::new( ResultType::DaneRequired, ) @@ -670,7 +657,7 @@ impl DeliveryAttempt { .is_allowed(throttle, &envelope, &mut in_flight_host, &span) .await { - domain.set_throttle_error(err, &mut on_hold); + message.domains[domain_idx].set_throttle_error(err, &mut on_hold); continue 'next_domain; } } @@ -815,7 +802,7 @@ impl DeliveryAttempt { }; // Try starting TLS - if tls_strategy.try_start_tls() && !domain.disable_tls { + if tls_strategy.try_start_tls() { smtp_client.timeout = core .core .eval_if(&queue_config.timeout.tls, &envelope) @@ -850,7 +837,7 @@ impl DeliveryAttempt { if let Some(tls_report) = &tls_report { core.schedule_report(TlsEvent { policy: dane_policy.into(), - domain: envelope.domain.to_string(), + domain: domain.domain.to_string(), failure: FailureDetails::new( ResultType::ValidationFailure, ) @@ -875,7 +862,7 @@ impl DeliveryAttempt { if let Some(tls_report) = &tls_report { core.schedule_report(TlsEvent { policy: (&mta_sts_policy, &dane_policy).into(), - domain: envelope.domain.to_string(), + domain: domain.domain.to_string(), failure: None, tls_record: tls_report.record.clone(), interval: tls_report.interval, @@ -917,7 +904,7 @@ impl DeliveryAttempt { if let Some(tls_report) = &tls_report { core.schedule_report(TlsEvent { policy: (&mta_sts_policy, &dane_policy).into(), - domain: envelope.domain.to_string(), + domain: domain.domain.to_string(), failure: FailureDetails::new( ResultType::StartTlsNotSupported, ) @@ -963,7 +950,7 @@ impl DeliveryAttempt { { core.schedule_report(TlsEvent { policy: (&mta_sts_policy, &dane_policy).into(), - domain: envelope.domain.to_string(), + domain: domain.domain.to_string(), failure: FailureDetails::new( ResultType::CertificateNotTrusted, ) @@ -980,7 +967,6 @@ impl DeliveryAttempt { last_status = if is_strict_tls { Status::from_tls_error(envelope.mx, error) } else { - disable_tls = true; Status::from_tls_error(envelope.mx, error) .into_temporary() }; @@ -994,7 +980,7 @@ impl DeliveryAttempt { context = "tls", event = "disabled", mx = envelope.mx, - reason = if domain.disable_tls {"TLS is disabled for this host"} else {"TLS is unavailable for this host, falling back to plain-text."}, + reason = "TLS is disabled for this host.", ); message @@ -1062,30 +1048,24 @@ impl DeliveryAttempt { }; // Update status for the current domain and continue with the next one - domain.set_status( - delivery_result, - &core - .core - .eval_if::, _>(&queue_config.retry, &envelope) - .await - .unwrap_or_else(|| vec![Duration::from_secs(60)]), - ); + let schedule = core + .core + .eval_if::, _>(&queue_config.retry, &envelope) + .await + .unwrap_or_else(|| vec![Duration::from_secs(60)]); + message.domains[domain_idx].set_status(delivery_result, &schedule); continue 'next_domain; } } // Update status - domain.disable_tls = disable_tls; - domain.set_status( - last_status, - &core - .core - .eval_if::, _>(&queue_config.retry, &envelope) - .await - .unwrap_or_else(|| vec![Duration::from_secs(60)]), - ); + let schedule = core + .core + .eval_if::, _>(&queue_config.retry, &envelope) + .await + .unwrap_or_else(|| vec![Duration::from_secs(60)]); + message.domains[domain_idx].set_status(last_status, &schedule); } - message.domains = domains; message.recipients = recipients; // Send Delivery Status Notifications diff --git a/crates/smtp/src/queue/dsn.rs b/crates/smtp/src/queue/dsn.rs index 1f40f5d0..c65fbd17 100644 --- a/crates/smtp/src/queue/dsn.rs +++ b/crates/smtp/src/queue/dsn.rs @@ -36,7 +36,7 @@ use store::write::now; use crate::core::SMTP; use super::{ - Domain, Error, ErrorDetails, HostResponse, Message, Recipient, SimpleEnvelope, Status, + Domain, Error, ErrorDetails, HostResponse, Message, QueueEnvelope, Recipient, Status, RCPT_DSN_SENT, RCPT_STATUS_CHANGED, }; @@ -222,14 +222,14 @@ impl Message { // Update next delay notification time if has_delay { - let mut domains = std::mem::take(&mut self.domains); - for domain in &mut domains { + let mut changes = Vec::new(); + for (domain_idx, domain) in self.domains.iter().enumerate() { if matches!( &domain.status, Status::TemporaryFailure(_) | Status::Scheduled ) && domain.notify.due <= now { - let envelope = SimpleEnvelope::new(self, &domain.domain); + let envelope = QueueEnvelope::new(self, domain_idx); if let Some(next_notify) = core .core @@ -239,14 +239,18 @@ impl Message { notify.into_iter().nth((domain.notify.inner + 1) as usize) }) { - domain.notify.inner += 1; - domain.notify.due = now + next_notify.as_secs(); + changes.push((domain_idx, 1, now + next_notify.as_secs())); } else { - domain.notify.due = domain.expires + 10; + changes.push((domain_idx, 0, domain.expires + 10)); } } } - self.domains = domains; + + for (domain_idx, inner, due) in changes { + let domain = &mut self.domains[domain_idx]; + domain.notify.inner += inner; + domain.notify.due = due; + } } // Obtain hostname and sender addresses diff --git a/crates/smtp/src/queue/mod.rs b/crates/smtp/src/queue/mod.rs index b6bcc2ea..b7d92562 100644 --- a/crates/smtp/src/queue/mod.rs +++ b/crates/smtp/src/queue/mod.rs @@ -23,7 +23,7 @@ use std::{ fmt::Display, - net::IpAddr, + net::{IpAddr, Ipv4Addr}, time::{Duration, Instant, SystemTime}, }; @@ -99,7 +99,6 @@ pub struct Domain { pub notify: Schedule, pub expires: u64, pub status: Status<(), Error>, - pub disable_tls: bool, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -193,49 +192,43 @@ impl Schedule { } } -pub struct SimpleEnvelope<'x> { - pub message: &'x Message, - pub domain: &'x str, - pub recipient: &'x str, -} - -impl<'x> SimpleEnvelope<'x> { - pub fn new(message: &'x Message, domain: &'x str) -> Self { - Self { - message, - domain, - recipient: "", - } - } - - pub fn new_rcpt(message: &'x Message, domain: &'x str, recipient: &'x str) -> Self { - Self { - message, - domain, - recipient, - } - } -} - -impl<'x> ResolveVariable for SimpleEnvelope<'x> { - fn resolve_variable(&self, variable: u32) -> expr::Variable<'x> { - match variable { - V_SENDER => self.message.return_path_lcase.as_str().into(), - V_SENDER_DOMAIN => self.message.return_path_domain.as_str().into(), - V_PRIORITY => self.message.priority.to_string().into(), - V_RECIPIENT => self.recipient.into(), - V_RECIPIENT_DOMAIN => self.domain.into(), - _ => "".into(), - } - } -} - pub struct QueueEnvelope<'x> { pub message: &'x Message, - pub domain: &'x str, pub mx: &'x str, pub remote_ip: IpAddr, pub local_ip: IpAddr, + pub current_domain: usize, + pub current_rcpt: usize, +} + +impl<'x> QueueEnvelope<'x> { + pub fn new(message: &'x Message, current_domain: usize) -> Self { + Self { + message, + current_domain, + current_rcpt: 0, + mx: "", + remote_ip: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + local_ip: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + } + } + + pub fn new_rcpt(message: &'x Message, current_domain: usize, current_rcpt: usize) -> Self { + Self { + message, + current_domain, + current_rcpt, + mx: "", + remote_ip: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + local_ip: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + } + } +} + +impl<'x> QueueEnvelope<'x> { + fn current_domain(&self) -> Option<&'x Domain> { + self.message.domains.get(self.current_domain) + } } impl<'x> ResolveVariable for QueueEnvelope<'x> { @@ -243,7 +236,18 @@ impl<'x> ResolveVariable for QueueEnvelope<'x> { match variable { V_SENDER => self.message.return_path_lcase.as_str().into(), V_SENDER_DOMAIN => self.message.return_path_domain.as_str().into(), - V_RECIPIENT_DOMAIN => self.domain.into(), + V_RECIPIENT_DOMAIN => self + .current_domain() + .map(|d| d.domain.as_str()) + .unwrap_or_default() + .into(), + V_RECIPIENT => self + .message + .recipients + .get(self.current_rcpt) + .map(|r| r.address_lcase.as_str()) + .unwrap_or_default() + .into(), V_RECIPIENTS => self .message .recipients @@ -251,6 +255,44 @@ impl<'x> ResolveVariable for QueueEnvelope<'x> { .map(|r| Variable::from(r.address_lcase.as_str())) .collect::>() .into(), + V_QUEUE_RETRY_NUM => self + .current_domain() + .map(|d| d.retry.inner) + .unwrap_or_default() + .into(), + V_QUEUE_NOTIFY_NUM => self + .current_domain() + .map(|d| d.notify.inner) + .unwrap_or_default() + .into(), + V_QUEUE_EXPIRES_IN => self + .current_domain() + .map(|d| d.expires.saturating_sub(now())) + .unwrap_or_default() + .into(), + V_QUEUE_LAST_STATUS => self + .current_domain() + .map(|d| d.status.to_string()) + .unwrap_or_default() + .into(), + V_QUEUE_LAST_ERROR => self + .current_domain() + .map(|d| match &d.status { + Status::Scheduled | Status::Completed(_) => "none", + Status::TemporaryFailure(err) | Status::PermanentFailure(err) => match err { + Error::DnsError(_) => "dns", + Error::UnexpectedResponse(_) => "unexpected-reply", + Error::ConnectionError(_) => "connection", + Error::TlsError(_) => "tls", + Error::DaneError(_) => "dane", + Error::MtaStsError(_) => "mta-sts", + Error::RateLimited => "rate", + Error::ConcurrencyLimited => "concurrency", + Error::Io(_) => "io", + }, + }) + .unwrap_or_default() + .into(), V_MX => self.mx.into(), V_PRIORITY => self.message.priority.into(), V_REMOTE_IP => self.remote_ip.to_string().into(), diff --git a/crates/smtp/src/queue/quota.rs b/crates/smtp/src/queue/quota.rs index 0a5b58fc..3492d68a 100644 --- a/crates/smtp/src/queue/quota.rs +++ b/crates/smtp/src/queue/quota.rs @@ -29,7 +29,7 @@ use store::{ use crate::core::{throttle::NewKey, SMTP}; -use super::{Message, QuotaKey, SimpleEnvelope, Status}; +use super::{Message, QueueEnvelope, QuotaKey, Status}; impl SMTP { pub async fn has_quota(&self, message: &mut Message) -> bool { @@ -47,13 +47,13 @@ impl SMTP { } for quota in &self.core.smtp.queue.quota.rcpt_domain { - for (pos, domain) in message.domains.iter().enumerate() { + for domain_idx in 0..message.domains.len() { if !self .check_quota( quota, - &SimpleEnvelope::new(message, &domain.domain), + &QueueEnvelope::new(message, domain_idx), message.size, - ((pos + 1) << 32) as u64, + ((domain_idx + 1) << 32) as u64, &mut quota_keys, ) .await @@ -64,17 +64,13 @@ impl SMTP { } for quota in &self.core.smtp.queue.quota.rcpt { - for (pos, rcpt) in message.recipients.iter().enumerate() { + for (rcpt_idx, rcpt) in message.recipients.iter().enumerate() { if !self .check_quota( quota, - &SimpleEnvelope::new_rcpt( - message, - &message.domains[rcpt.domain_idx].domain, - &rcpt.address_lcase, - ), + &QueueEnvelope::new_rcpt(message, rcpt.domain_idx, rcpt_idx), message.size, - (pos + 1) as u64, + (rcpt_idx + 1) as u64, &mut quota_keys, ) .await diff --git a/crates/smtp/src/queue/spool.rs b/crates/smtp/src/queue/spool.rs index 70efbf2b..39ce8121 100644 --- a/crates/smtp/src/queue/spool.rs +++ b/crates/smtp/src/queue/spool.rs @@ -32,7 +32,7 @@ use utils::BlobHash; use crate::core::SMTP; use super::{ - Domain, Event, Message, QueueId, QuotaKey, Recipient, Schedule, SimpleEnvelope, Status, + Domain, Event, Message, QueueEnvelope, QueueId, QuotaKey, Recipient, Schedule, Status, }; pub const LOCK_EXPIRY: u64 = 300; @@ -341,22 +341,26 @@ impl Message { idx } else { let idx = self.domains.len(); - let expires = core - .core - .eval_if( - &core.core.smtp.queue.expire, - &SimpleEnvelope::new(self, &rcpt_domain), - ) - .await - .unwrap_or_else(|| Duration::from_secs(5 * 86400)); + self.domains.push(Domain { domain: rcpt_domain, retry: Schedule::now(), - notify: Schedule::later(expires + Duration::from_secs(10)), - expires: now() + expires.as_secs(), + notify: Schedule::now(), + expires: 0, status: Status::Scheduled, - disable_tls: false, }); + + let expires = core + .core + .eval_if(&core.core.smtp.queue.expire, &QueueEnvelope::new(self, idx)) + .await + .unwrap_or_else(|| Duration::from_secs(5 * 86400)); + + // Update expiration + let domain = self.domains.last_mut().unwrap(); + domain.notify = Schedule::later(expires + Duration::from_secs(10)); + domain.expires = now() + expires.as_secs(); + idx }; self.recipients.push(Recipient { diff --git a/crates/utils/src/config/mod.rs b/crates/utils/src/config/mod.rs index b8b2e55c..26b01f79 100644 --- a/crates/utils/src/config/mod.rs +++ b/crates/utils/src/config/mod.rs @@ -48,6 +48,7 @@ pub enum ConfigWarning { Missing, AppliedDefault { default: String }, Unread { value: String }, + Build { error: String }, } #[derive(Debug, Clone, PartialEq, Eq, Serialize)] @@ -218,6 +219,7 @@ impl Config { ConfigWarning::Unread { value } => { format!("WARNING: Unused setting {key:?} with value {value:?}") } + ConfigWarning::Build { error } => format!("WARNING for {key:?}: {error}"), }; if !use_stderr { tracing::debug!("{}", message); diff --git a/crates/utils/src/config/utils.rs b/crates/utils/src/config/utils.rs index 12cbce80..d3608591 100644 --- a/crates/utils/src/config/utils.rs +++ b/crates/utils/src/config/utils.rs @@ -311,6 +311,15 @@ impl Config { ); } + pub fn new_build_warning(&mut self, key: impl AsKey, details: impl Into) { + self.warnings.insert( + key.as_key(), + ConfigWarning::Build { + error: details.into(), + }, + ); + } + pub fn new_missing_property(&mut self, key: impl AsKey) { self.warnings.insert(key.as_key(), ConfigWarning::Missing); } diff --git a/crates/utils/src/suffixlist.rs b/crates/utils/src/suffixlist.rs index 8613febc..5a687035 100644 --- a/crates/utils/src/suffixlist.rs +++ b/crates/utils/src/suffixlist.rs @@ -133,7 +133,7 @@ impl PublicSuffix { if r.status().is_success() { r.bytes().await } else { - config.new_build_error( + config.new_build_warning( format!("{value}.{idx}"), format!( "Failed to fetch public suffixes from {value:?}: Status {status}", @@ -150,7 +150,7 @@ impl PublicSuffix { match result { Ok(bytes) => bytes.to_vec(), Err(err) => { - config.new_build_error( + config.new_build_warning( format!("{value}.{idx}"), format!("Failed to fetch public suffixes from {value:?}: {err}",), ); @@ -161,7 +161,7 @@ impl PublicSuffix { match std::fs::read(filename) { Ok(bytes) => bytes, Err(err) => { - config.new_build_error( + config.new_build_warning( format!("{value}.{idx}"), format!("Failed to read public suffixes from {value:?}: {err}",), ); @@ -179,7 +179,7 @@ impl PublicSuffix { { Ok(bytes) => bytes, Err(err) => { - config.new_build_error( + config.new_build_warning( format!("{value}.{idx}"), format!( "Failed to decompress public suffixes from {value:?}: {err}", @@ -199,7 +199,7 @@ impl PublicSuffix { return PublicSuffix::from(list.as_str()); } Err(err) => { - config.new_build_error( + config.new_build_warning( format!("{value}.{idx}"), format!( "Failed to parse public suffixes from {value:?}: {err}", @@ -212,7 +212,7 @@ impl PublicSuffix { } #[cfg(not(feature = "test_mode"))] - config.new_build_error(key, "Failed to parse public suffixes from any source."); + config.new_build_warning(key, "Failed to parse public suffixes from any source."); PublicSuffix::default() } diff --git a/tests/src/smtp/outbound/fallback_relay.rs b/tests/src/smtp/outbound/fallback_relay.rs new file mode 100644 index 00000000..92d5315e --- /dev/null +++ b/tests/src/smtp/outbound/fallback_relay.rs @@ -0,0 +1,134 @@ +/* + * Copyright (c) 2023 Stalwart Labs Ltd. + * + * This file is part of Stalwart Mail Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use std::time::{Duration, Instant}; + +use common::config::server::ServerProtocol; +use mail_auth::MX; +use store::write::now; + +use crate::smtp::{outbound::TestServer, session::TestSession}; + +const LOCAL: &str = r#" +[queue.outbound] +next-hop = [{if = "retry_num > 0", then = "'fallback'"}, + {else = false}] + +[session.rcpt] +relay = true +max-recipients = 100 + +[session.extensions] +dsn = true + +[remote.fallback] +address = fallback.foobar.org +port = 9925 +protocol = 'smtp' +concurrency = 5 + +[remote.fallback.tls] +implicit = false +allow-invalid-certs = true + +"#; + +const REMOTE: &str = r#" +[session.rcpt] +relay = true + +[session.ehlo] +reject-non-fqdn = false + +[session.extensions] +dsn = true +chunking = false +"#; + +#[tokio::test] +#[serial_test::serial] +async fn fallback_relay() { + /*let disable = 1; + tracing::subscriber::set_global_default( + tracing_subscriber::FmtSubscriber::builder() + .with_max_level(tracing::Level::TRACE) + .finish(), + ) + .unwrap();*/ + + // Start test server + let mut remote = TestServer::new("smtp_fallback_remote", REMOTE, true).await; + let _rx = remote.start(&[ServerProtocol::Smtp]).await; + let mut local = TestServer::new("smtp_fallback_local", LOCAL, true).await; + + // Add mock DNS entries + let core = local.build_smtp(); + core.core.smtp.resolvers.dns.mx_add( + "foobar.org", + vec![MX { + exchanges: vec!["_dns_error.foobar.org".to_string()], + preference: 10, + }], + Instant::now() + Duration::from_secs(10), + ); + /*core.core.smtp.resolvers.dns.ipv4_add( + "unreachable.foobar.org", + vec!["127.0.0.2".parse().unwrap()], + Instant::now() + Duration::from_secs(10), + );*/ + core.core.smtp.resolvers.dns.ipv4_add( + "fallback.foobar.org", + vec!["127.0.0.1".parse().unwrap()], + Instant::now() + Duration::from_secs(10), + ); + + let mut session = local.new_session(); + session.data.remote_ip_str = "10.0.0.1".to_string(); + session.eval_session_params().await; + session.ehlo("mx.test.org").await; + session + .send_message("john@test.org", &["bill@foobar.org"], "test:no_dkim", "250") + .await; + local + .qr + .expect_message_then_deliver() + .await + .try_deliver(core.clone()) + .await; + let mut retry = local.qr.expect_message().await; + let prev_due = retry.domains[0].retry.due; + let next_due = now(); + let queue_id = retry.id; + retry.domains[0].retry.due = next_due; + retry + .save_changes(&core, prev_due.into(), next_due.into()) + .await; + local + .qr + .delivery_attempt(queue_id) + .await + .try_deliver(core.clone()) + .await; + tokio::time::sleep(Duration::from_millis(100)).await; + remote.qr.expect_message().await; +} diff --git a/tests/src/smtp/outbound/mod.rs b/tests/src/smtp/outbound/mod.rs index 8867beef..c4a66a06 100644 --- a/tests/src/smtp/outbound/mod.rs +++ b/tests/src/smtp/outbound/mod.rs @@ -42,6 +42,7 @@ use super::{ pub mod dane; pub mod extensions; +pub mod fallback_relay; pub mod ip_lookup; pub mod lmtp; pub mod mta_sts; diff --git a/tests/src/smtp/outbound/throttle.rs b/tests/src/smtp/outbound/throttle.rs index 317e0770..b0030199 100644 --- a/tests/src/smtp/outbound/throttle.rs +++ b/tests/src/smtp/outbound/throttle.rs @@ -33,7 +33,7 @@ use crate::smtp::{ inbound::TestQueueEvent, outbound::TestServer, queue::manager::new_message, session::TestSession, }; -use smtp::queue::{Message, QueueEnvelope}; +use smtp::queue::{Domain, Message, QueueEnvelope, Schedule, Status}; const CONFIG: &str = r#" [session.rcpt] @@ -113,7 +113,7 @@ async fn throttle_outbound() { for t in &throttle.sender { core.is_allowed( t, - &QueueEnvelope::test(&test_message, "", ""), + &QueueEnvelope::test(&test_message, 0, ""), &mut in_flight, &span, ) @@ -138,7 +138,7 @@ async fn throttle_outbound() { for t in &throttle.sender { core.is_allowed( t, - &QueueEnvelope::test(&test_message, "", ""), + &QueueEnvelope::test(&test_message, 0, ""), &mut in_flight, &span, ) @@ -162,10 +162,17 @@ async fn throttle_outbound() { // Expect concurrency throttle for recipient domain 'example.org' test_message.return_path_domain = "test.net".to_string(); + test_message.domains.push(Domain { + domain: "example.org".to_string(), + retry: Schedule::now(), + notify: Schedule::now(), + expires: 0, + status: Status::Scheduled, + }); for t in &throttle.rcpt { core.is_allowed( t, - &QueueEnvelope::test(&test_message, "example.org", ""), + &QueueEnvelope::test(&test_message, 0, ""), &mut in_flight, &span, ) @@ -191,11 +198,18 @@ async fn throttle_outbound() { local.qr.read_event().await.unwrap_on_hold(); in_flight.clear(); - // Expect rate limit throttle for recipient domain 'example.org' + // Expect rate limit throttle for recipient domain 'example.net' + test_message.domains.push(Domain { + domain: "example.net".to_string(), + retry: Schedule::now(), + notify: Schedule::now(), + expires: 0, + status: Status::Scheduled, + }); for t in &throttle.rcpt { core.is_allowed( t, - &QueueEnvelope::test(&test_message, "example.net", ""), + &QueueEnvelope::test(&test_message, 1, ""), &mut in_flight, &span, ) @@ -236,10 +250,17 @@ async fn throttle_outbound() { vec!["127.0.0.1".parse().unwrap()], Instant::now() + Duration::from_secs(10), ); + test_message.domains.push(Domain { + domain: "test.org".to_string(), + retry: Schedule::now(), + notify: Schedule::now(), + expires: 0, + status: Status::Scheduled, + }); for t in &throttle.host { core.is_allowed( t, - &QueueEnvelope::test(&test_message, "test.org", "mx.test.org"), + &QueueEnvelope::test(&test_message, 2, "mx.test.org"), &mut in_flight, &span, ) @@ -276,7 +297,7 @@ async fn throttle_outbound() { for t in &throttle.host { core.is_allowed( t, - &QueueEnvelope::test(&test_message, "example.net", "mx.test.net"), + &QueueEnvelope::test(&test_message, 1, "mx.test.net"), &mut in_flight, &span, ) @@ -301,17 +322,18 @@ async fn throttle_outbound() { } pub trait TestQueueEnvelope<'x> { - fn test(message: &'x Message, domain: &'x str, mx: &'x str) -> Self; + fn test(message: &'x Message, current_domain: usize, mx: &'x str) -> Self; } impl<'x> TestQueueEnvelope<'x> for QueueEnvelope<'x> { - fn test(message: &'x Message, domain: &'x str, mx: &'x str) -> Self { + fn test(message: &'x Message, current_domain: usize, mx: &'x str) -> Self { QueueEnvelope { message, - domain, mx, remote_ip: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), local_ip: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + current_domain, + current_rcpt: 0, } } } diff --git a/tests/src/smtp/outbound/tls.rs b/tests/src/smtp/outbound/tls.rs index 0ac13609..b583125a 100644 --- a/tests/src/smtp/outbound/tls.rs +++ b/tests/src/smtp/outbound/tls.rs @@ -41,7 +41,8 @@ relay = true hostname = "'badtls.foobar.org'" [queue.outbound.tls] -starttls = "optional" +starttls = [ { if = "retry_num > 0 && last_error == 'tls'", then = "disable"}, + { else = "optional" }] "#; const REMOTE: &str = r#" @@ -104,7 +105,6 @@ async fn starttls_optional() { .try_deliver(core.clone()) .await; let mut retry = local.qr.expect_message().await; - assert!(retry.domains[0].disable_tls); let prev_due = retry.domains[0].retry.due; let next_due = now(); let queue_id = retry.id; diff --git a/tests/src/smtp/queue/dsn.rs b/tests/src/smtp/queue/dsn.rs index be65dc2f..d27c7b39 100644 --- a/tests/src/smtp/queue/dsn.rs +++ b/tests/src/smtp/queue/dsn.rs @@ -96,7 +96,6 @@ async fn generate_dsn() { entity: "mx.domain.org".to_string(), details: "Connection timeout".to_string(), })), - disable_tls: false, }], flags: 0, env_id: None, diff --git a/tests/src/smtp/queue/manager.rs b/tests/src/smtp/queue/manager.rs index 48ced7ee..c2ce6df3 100644 --- a/tests/src/smtp/queue/manager.rs +++ b/tests/src/smtp/queue/manager.rs @@ -165,7 +165,6 @@ fn domain(domain: &str, retry: u64, notify: u64, expires: u64) -> Domain { notify: Schedule::later(Duration::from_secs(notify)), expires: now() + expires, status: Status::Scheduled, - disable_tls: false, } }