Added retry_num, notify_num, last_error, last_status variables to queue expressions

This commit is contained in:
mdecimus 2024-05-06 12:55:31 +02:00
parent 54dbd9ec5e
commit 5b236e00ae
17 changed files with 427 additions and 183 deletions

View file

@ -86,7 +86,7 @@ pub(crate) const SMTP_RCPT_TO_VARS: &[u32; 15] = &[
V_PRIORITY,
V_HELO_DOMAIN,
];
pub(crate) const SMTP_QUEUE_HOST_VARS: &[u32; 9] = &[
pub(crate) const SMTP_QUEUE_HOST_VARS: &[u32; 14] = &[
V_SENDER,
V_SENDER_DOMAIN,
V_RECIPIENT_DOMAIN,
@ -96,22 +96,46 @@ pub(crate) const SMTP_QUEUE_HOST_VARS: &[u32; 9] = &[
V_PRIORITY,
V_REMOTE_IP,
V_LOCAL_IP,
V_QUEUE_RETRY_NUM,
V_QUEUE_NOTIFY_NUM,
V_QUEUE_EXPIRES_IN,
V_QUEUE_LAST_STATUS,
V_QUEUE_LAST_ERROR,
];
pub(crate) const SMTP_QUEUE_RCPT_VARS: &[u32; 5] = &[
pub(crate) const SMTP_QUEUE_RCPT_VARS: &[u32; 10] = &[
V_RECIPIENT_DOMAIN,
V_RECIPIENTS,
V_SENDER,
V_SENDER_DOMAIN,
V_PRIORITY,
V_QUEUE_RETRY_NUM,
V_QUEUE_NOTIFY_NUM,
V_QUEUE_EXPIRES_IN,
V_QUEUE_LAST_STATUS,
V_QUEUE_LAST_ERROR,
];
pub(crate) const SMTP_QUEUE_SENDER_VARS: &[u32; 3] = &[V_SENDER, V_SENDER_DOMAIN, V_PRIORITY];
pub(crate) const SMTP_QUEUE_MX_VARS: &[u32; 6] = &[
pub(crate) const SMTP_QUEUE_SENDER_VARS: &[u32; 8] = &[
V_SENDER,
V_SENDER_DOMAIN,
V_PRIORITY,
V_QUEUE_RETRY_NUM,
V_QUEUE_NOTIFY_NUM,
V_QUEUE_EXPIRES_IN,
V_QUEUE_LAST_STATUS,
V_QUEUE_LAST_ERROR,
];
pub(crate) const SMTP_QUEUE_MX_VARS: &[u32; 11] = &[
V_RECIPIENT_DOMAIN,
V_RECIPIENTS,
V_SENDER,
V_SENDER_DOMAIN,
V_PRIORITY,
V_MX,
V_QUEUE_RETRY_NUM,
V_QUEUE_NOTIFY_NUM,
V_QUEUE_EXPIRES_IN,
V_QUEUE_LAST_STATUS,
V_QUEUE_LAST_ERROR,
];
impl SmtpConfig {

View file

@ -43,6 +43,11 @@ pub const V_PRIORITY: u32 = 12;
pub const V_PROTOCOL: u32 = 13;
pub const V_TLS: u32 = 14;
pub const V_RECIPIENTS: u32 = 15;
pub const V_QUEUE_RETRY_NUM: u32 = 16;
pub const V_QUEUE_NOTIFY_NUM: u32 = 17;
pub const V_QUEUE_EXPIRES_IN: u32 = 18;
pub const V_QUEUE_LAST_STATUS: u32 = 19;
pub const V_QUEUE_LAST_ERROR: u32 = 20;
pub const VARIABLES_MAP: &[(&str, u32)] = &[
("rcpt", V_RECIPIENT),
@ -61,6 +66,11 @@ pub const VARIABLES_MAP: &[(&str, u32)] = &[
("protocol", V_PROTOCOL),
("is_tls", V_TLS),
("recipients", V_RECIPIENTS),
("retry_num", V_QUEUE_RETRY_NUM),
("notify_num", V_QUEUE_NOTIFY_NUM),
("expires_in", V_QUEUE_EXPIRES_IN),
("last_status", V_QUEUE_LAST_STATUS),
("last_error", V_QUEUE_LAST_ERROR),
];
use regex::Regex;
@ -215,12 +225,24 @@ impl From<i64> for Variable<'_> {
}
}
impl From<u64> for Variable<'_> {
fn from(value: u64) -> Self {
Variable::Integer(value as i64)
}
}
impl From<i32> for Variable<'_> {
fn from(value: i32) -> Self {
Variable::Integer(value as i64)
}
}
impl From<u32> for Variable<'_> {
fn from(value: u32) -> Self {
Variable::Integer(value as i64)
}
}
impl From<u16> for Variable<'_> {
fn from(value: u16) -> Self {
Variable::Integer(value as i64)

View file

@ -46,7 +46,7 @@ use utils::config::Rate;
use crate::{
core::{Session, SessionAddress, State},
queue::{self, Message, SimpleEnvelope},
queue::{self, Message, QueueEnvelope, Schedule},
scripts::ScriptResult,
};
@ -751,7 +751,16 @@ impl<T: SessionStream> Session<T> {
.last()
.map_or(true, |d| d.domain != rcpt.domain)
{
let envelope = SimpleEnvelope::new(&message, &rcpt.domain);
let rcpt_idx = message.domains.len();
message.domains.push(queue::Domain {
retry: Schedule::now(),
notify: Schedule::now(),
expires: 0,
status: queue::Status::Scheduled,
domain: rcpt.domain,
});
let envelope = QueueEnvelope::new(&message, rcpt_idx);
// Set next retry time
let retry = if self.data.future_release == 0 {
@ -816,14 +825,11 @@ impl<T: SessionStream> Session<T> {
(notify, now() + expire_secs)
};
message.domains.push(queue::Domain {
retry,
notify,
expires,
status: queue::Status::Scheduled,
domain: rcpt.domain,
disable_tls: false,
});
// Update domain
let domain = message.domains.last_mut().unwrap();
domain.retry = retry;
domain.notify = notify;
domain.expires = expires;
}
message.recipients.push(queue::Recipient {

View file

@ -162,11 +162,10 @@ impl DeliveryAttempt {
let queue_config = &core.core.smtp.queue;
let mut on_hold = Vec::new();
let no_ip = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
let mut domains = std::mem::take(&mut message.domains);
let mut recipients = std::mem::take(&mut message.recipients);
'next_domain: for (domain_idx, domain) in domains.iter_mut().enumerate() {
'next_domain: for domain_idx in 0..message.domains.len() {
// Only process domains due for delivery
let domain = &message.domains[domain_idx];
if !matches!(&domain.status, Status::Scheduled | Status::TemporaryFailure(_)
if domain.retry.due <= now())
{
@ -182,13 +181,7 @@ impl DeliveryAttempt {
);
// Build envelope
let mut envelope = QueueEnvelope {
message: &message,
domain: &domain.domain,
mx: "",
remote_ip: no_ip,
local_ip: no_ip,
};
let mut envelope = QueueEnvelope::new(&message, domain_idx);
// Throttle recipient domain
let mut in_flight = Vec::new();
@ -197,7 +190,7 @@ impl DeliveryAttempt {
.is_allowed(throttle, &envelope, &mut in_flight, &span)
.await
{
domain.set_throttle_error(err, &mut on_hold);
message.domains[domain_idx].set_throttle_error(err, &mut on_hold);
continue 'next_domain;
}
}
@ -221,14 +214,12 @@ impl DeliveryAttempt {
.await;
// Update status for the current domain and continue with the next one
domain.set_status(
delivery_result,
&core
.core
.eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope)
.await
.unwrap_or_else(|| vec![Duration::from_secs(60)]),
);
let schedule = core
.core
.eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope)
.await
.unwrap_or_else(|| vec![Duration::from_secs(60)]);
message.domains[domain_idx].set_status(delivery_result, &schedule);
continue 'next_domain;
}
Some(next_hop) => (
@ -239,7 +230,6 @@ impl DeliveryAttempt {
};
// Prepare TLS strategy
let mut disable_tls = false;
let mut tls_strategy = TlsStrategy {
mta_sts: core
.core
@ -271,7 +261,7 @@ impl DeliveryAttempt {
.smtp
.resolvers
.dns
.txt_lookup::<TlsRpt>(format!("_smtp._tls.{}.", envelope.domain))
.txt_lookup::<TlsRpt>(format!("_smtp._tls.{}.", domain.domain))
.await
{
Ok(record) => {
@ -300,7 +290,7 @@ impl DeliveryAttempt {
let mta_sts_policy = if tls_strategy.try_mta_sts() && is_smtp {
match core
.lookup_mta_sts_policy(
envelope.domain,
&domain.domain,
core.core
.eval_if(&queue_config.timeout.mta_sts, &envelope)
.await
@ -326,7 +316,7 @@ impl DeliveryAttempt {
if tls_strategy.is_mta_sts_required() {
core.schedule_report(TlsEvent {
policy: PolicyType::Sts(None),
domain: envelope.domain.to_string(),
domain: domain.domain.to_string(),
failure: FailureDetails::new(ResultType::Other)
.with_failure_reason_code("MTA-STS is required and no policy was found.")
.into(),
@ -340,7 +330,7 @@ impl DeliveryAttempt {
_ => {
core.schedule_report(TlsEvent {
policy: PolicyType::Sts(None),
domain: envelope.domain.to_string(),
domain: domain.domain.to_string(),
failure: FailureDetails::new(&err)
.with_failure_reason_code(err.to_string())
.into(),
@ -360,14 +350,12 @@ impl DeliveryAttempt {
"Failed to retrieve MTA-STS policy: {}",
err
);
domain.set_status(
err,
&core
.core
.eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope)
.await
.unwrap_or_else(|| vec![Duration::from_secs(60)]),
);
let schedule = core
.core
.eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope)
.await
.unwrap_or_else(|| vec![Duration::from_secs(60)]);
message.domains[domain_idx].set_status(err, &schedule);
continue 'next_domain;
} else {
tracing::debug!(
@ -399,14 +387,12 @@ impl DeliveryAttempt {
event = "mx-lookup-failed",
reason = %err,
);
domain.set_status(
err,
&core
.core
.eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope)
.await
.unwrap_or_else(|| vec![Duration::from_secs(60)]),
);
let schedule = core
.core
.eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope)
.await
.unwrap_or_else(|| vec![Duration::from_secs(60)]);
message.domains[domain_idx].set_status(err, &schedule);
continue 'next_domain;
}
};
@ -426,15 +412,16 @@ impl DeliveryAttempt {
event = "null-mx",
reason = "Domain does not accept messages (mull MX)",
);
domain.set_status(
let schedule = core
.core
.eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope)
.await
.unwrap_or_else(|| vec![Duration::from_secs(60)]);
message.domains[domain_idx].set_status(
Status::PermanentFailure(Error::DnsError(
"Domain does not accept messages (null MX)".to_string(),
)),
&core
.core
.eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope)
.await
.unwrap_or_else(|| vec![Duration::from_secs(60)]),
&schedule,
);
continue 'next_domain;
}
@ -456,7 +443,7 @@ impl DeliveryAttempt {
if let Some(tls_report) = &tls_report {
core.schedule_report(TlsEvent {
policy: mta_sts_policy.into(),
domain: envelope.domain.to_string(),
domain: domain.domain.to_string(),
failure: FailureDetails::new(ResultType::ValidationFailure)
.with_receiving_mx_hostname(envelope.mx)
.with_failure_reason_code("MX not authorized by policy.")
@ -543,7 +530,7 @@ impl DeliveryAttempt {
if let Some(tls_report) = &tls_report {
core.schedule_report(TlsEvent {
policy: tlsa.into(),
domain: envelope.domain.to_string(),
domain: domain.domain.to_string(),
failure: FailureDetails::new(ResultType::TlsaInvalid)
.with_receiving_mx_hostname(envelope.mx)
.with_failure_reason_code("Invalid TLSA record.")
@ -573,7 +560,7 @@ impl DeliveryAttempt {
if let Some(tls_report) = &tls_report {
core.schedule_report(TlsEvent {
policy: PolicyType::Tlsa(None),
domain: envelope.domain.to_string(),
domain: domain.domain.to_string(),
failure: FailureDetails::new(ResultType::DaneRequired)
.with_receiving_mx_hostname(envelope.mx)
.with_failure_reason_code(
@ -619,7 +606,7 @@ impl DeliveryAttempt {
if let Some(tls_report) = &tls_report {
core.schedule_report(TlsEvent {
policy: PolicyType::Tlsa(None),
domain: envelope.domain.to_string(),
domain: domain.domain.to_string(),
failure: FailureDetails::new(
ResultType::DaneRequired,
)
@ -670,7 +657,7 @@ impl DeliveryAttempt {
.is_allowed(throttle, &envelope, &mut in_flight_host, &span)
.await
{
domain.set_throttle_error(err, &mut on_hold);
message.domains[domain_idx].set_throttle_error(err, &mut on_hold);
continue 'next_domain;
}
}
@ -815,7 +802,7 @@ impl DeliveryAttempt {
};
// Try starting TLS
if tls_strategy.try_start_tls() && !domain.disable_tls {
if tls_strategy.try_start_tls() {
smtp_client.timeout = core
.core
.eval_if(&queue_config.timeout.tls, &envelope)
@ -850,7 +837,7 @@ impl DeliveryAttempt {
if let Some(tls_report) = &tls_report {
core.schedule_report(TlsEvent {
policy: dane_policy.into(),
domain: envelope.domain.to_string(),
domain: domain.domain.to_string(),
failure: FailureDetails::new(
ResultType::ValidationFailure,
)
@ -875,7 +862,7 @@ impl DeliveryAttempt {
if let Some(tls_report) = &tls_report {
core.schedule_report(TlsEvent {
policy: (&mta_sts_policy, &dane_policy).into(),
domain: envelope.domain.to_string(),
domain: domain.domain.to_string(),
failure: None,
tls_record: tls_report.record.clone(),
interval: tls_report.interval,
@ -917,7 +904,7 @@ impl DeliveryAttempt {
if let Some(tls_report) = &tls_report {
core.schedule_report(TlsEvent {
policy: (&mta_sts_policy, &dane_policy).into(),
domain: envelope.domain.to_string(),
domain: domain.domain.to_string(),
failure: FailureDetails::new(
ResultType::StartTlsNotSupported,
)
@ -963,7 +950,7 @@ impl DeliveryAttempt {
{
core.schedule_report(TlsEvent {
policy: (&mta_sts_policy, &dane_policy).into(),
domain: envelope.domain.to_string(),
domain: domain.domain.to_string(),
failure: FailureDetails::new(
ResultType::CertificateNotTrusted,
)
@ -980,7 +967,6 @@ impl DeliveryAttempt {
last_status = if is_strict_tls {
Status::from_tls_error(envelope.mx, error)
} else {
disable_tls = true;
Status::from_tls_error(envelope.mx, error)
.into_temporary()
};
@ -994,7 +980,7 @@ impl DeliveryAttempt {
context = "tls",
event = "disabled",
mx = envelope.mx,
reason = if domain.disable_tls {"TLS is disabled for this host"} else {"TLS is unavailable for this host, falling back to plain-text."},
reason = "TLS is disabled for this host.",
);
message
@ -1062,30 +1048,24 @@ impl DeliveryAttempt {
};
// Update status for the current domain and continue with the next one
domain.set_status(
delivery_result,
&core
.core
.eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope)
.await
.unwrap_or_else(|| vec![Duration::from_secs(60)]),
);
let schedule = core
.core
.eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope)
.await
.unwrap_or_else(|| vec![Duration::from_secs(60)]);
message.domains[domain_idx].set_status(delivery_result, &schedule);
continue 'next_domain;
}
}
// Update status
domain.disable_tls = disable_tls;
domain.set_status(
last_status,
&core
.core
.eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope)
.await
.unwrap_or_else(|| vec![Duration::from_secs(60)]),
);
let schedule = core
.core
.eval_if::<Vec<Duration>, _>(&queue_config.retry, &envelope)
.await
.unwrap_or_else(|| vec![Duration::from_secs(60)]);
message.domains[domain_idx].set_status(last_status, &schedule);
}
message.domains = domains;
message.recipients = recipients;
// Send Delivery Status Notifications

View file

@ -36,7 +36,7 @@ use store::write::now;
use crate::core::SMTP;
use super::{
Domain, Error, ErrorDetails, HostResponse, Message, Recipient, SimpleEnvelope, Status,
Domain, Error, ErrorDetails, HostResponse, Message, QueueEnvelope, Recipient, Status,
RCPT_DSN_SENT, RCPT_STATUS_CHANGED,
};
@ -222,14 +222,14 @@ impl Message {
// Update next delay notification time
if has_delay {
let mut domains = std::mem::take(&mut self.domains);
for domain in &mut domains {
let mut changes = Vec::new();
for (domain_idx, domain) in self.domains.iter().enumerate() {
if matches!(
&domain.status,
Status::TemporaryFailure(_) | Status::Scheduled
) && domain.notify.due <= now
{
let envelope = SimpleEnvelope::new(self, &domain.domain);
let envelope = QueueEnvelope::new(self, domain_idx);
if let Some(next_notify) = core
.core
@ -239,14 +239,18 @@ impl Message {
notify.into_iter().nth((domain.notify.inner + 1) as usize)
})
{
domain.notify.inner += 1;
domain.notify.due = now + next_notify.as_secs();
changes.push((domain_idx, 1, now + next_notify.as_secs()));
} else {
domain.notify.due = domain.expires + 10;
changes.push((domain_idx, 0, domain.expires + 10));
}
}
}
self.domains = domains;
for (domain_idx, inner, due) in changes {
let domain = &mut self.domains[domain_idx];
domain.notify.inner += inner;
domain.notify.due = due;
}
}
// Obtain hostname and sender addresses

View file

@ -23,7 +23,7 @@
use std::{
fmt::Display,
net::IpAddr,
net::{IpAddr, Ipv4Addr},
time::{Duration, Instant, SystemTime},
};
@ -99,7 +99,6 @@ pub struct Domain {
pub notify: Schedule<u32>,
pub expires: u64,
pub status: Status<(), Error>,
pub disable_tls: bool,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@ -193,49 +192,43 @@ impl<T: Default> Schedule<T> {
}
}
pub struct SimpleEnvelope<'x> {
pub message: &'x Message,
pub domain: &'x str,
pub recipient: &'x str,
}
impl<'x> SimpleEnvelope<'x> {
pub fn new(message: &'x Message, domain: &'x str) -> Self {
Self {
message,
domain,
recipient: "",
}
}
pub fn new_rcpt(message: &'x Message, domain: &'x str, recipient: &'x str) -> Self {
Self {
message,
domain,
recipient,
}
}
}
impl<'x> ResolveVariable for SimpleEnvelope<'x> {
fn resolve_variable(&self, variable: u32) -> expr::Variable<'x> {
match variable {
V_SENDER => self.message.return_path_lcase.as_str().into(),
V_SENDER_DOMAIN => self.message.return_path_domain.as_str().into(),
V_PRIORITY => self.message.priority.to_string().into(),
V_RECIPIENT => self.recipient.into(),
V_RECIPIENT_DOMAIN => self.domain.into(),
_ => "".into(),
}
}
}
pub struct QueueEnvelope<'x> {
pub message: &'x Message,
pub domain: &'x str,
pub mx: &'x str,
pub remote_ip: IpAddr,
pub local_ip: IpAddr,
pub current_domain: usize,
pub current_rcpt: usize,
}
impl<'x> QueueEnvelope<'x> {
pub fn new(message: &'x Message, current_domain: usize) -> Self {
Self {
message,
current_domain,
current_rcpt: 0,
mx: "",
remote_ip: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
local_ip: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
}
}
pub fn new_rcpt(message: &'x Message, current_domain: usize, current_rcpt: usize) -> Self {
Self {
message,
current_domain,
current_rcpt,
mx: "",
remote_ip: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
local_ip: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
}
}
}
impl<'x> QueueEnvelope<'x> {
fn current_domain(&self) -> Option<&'x Domain> {
self.message.domains.get(self.current_domain)
}
}
impl<'x> ResolveVariable for QueueEnvelope<'x> {
@ -243,7 +236,18 @@ impl<'x> ResolveVariable for QueueEnvelope<'x> {
match variable {
V_SENDER => self.message.return_path_lcase.as_str().into(),
V_SENDER_DOMAIN => self.message.return_path_domain.as_str().into(),
V_RECIPIENT_DOMAIN => self.domain.into(),
V_RECIPIENT_DOMAIN => self
.current_domain()
.map(|d| d.domain.as_str())
.unwrap_or_default()
.into(),
V_RECIPIENT => self
.message
.recipients
.get(self.current_rcpt)
.map(|r| r.address_lcase.as_str())
.unwrap_or_default()
.into(),
V_RECIPIENTS => self
.message
.recipients
@ -251,6 +255,44 @@ impl<'x> ResolveVariable for QueueEnvelope<'x> {
.map(|r| Variable::from(r.address_lcase.as_str()))
.collect::<Vec<_>>()
.into(),
V_QUEUE_RETRY_NUM => self
.current_domain()
.map(|d| d.retry.inner)
.unwrap_or_default()
.into(),
V_QUEUE_NOTIFY_NUM => self
.current_domain()
.map(|d| d.notify.inner)
.unwrap_or_default()
.into(),
V_QUEUE_EXPIRES_IN => self
.current_domain()
.map(|d| d.expires.saturating_sub(now()))
.unwrap_or_default()
.into(),
V_QUEUE_LAST_STATUS => self
.current_domain()
.map(|d| d.status.to_string())
.unwrap_or_default()
.into(),
V_QUEUE_LAST_ERROR => self
.current_domain()
.map(|d| match &d.status {
Status::Scheduled | Status::Completed(_) => "none",
Status::TemporaryFailure(err) | Status::PermanentFailure(err) => match err {
Error::DnsError(_) => "dns",
Error::UnexpectedResponse(_) => "unexpected-reply",
Error::ConnectionError(_) => "connection",
Error::TlsError(_) => "tls",
Error::DaneError(_) => "dane",
Error::MtaStsError(_) => "mta-sts",
Error::RateLimited => "rate",
Error::ConcurrencyLimited => "concurrency",
Error::Io(_) => "io",
},
})
.unwrap_or_default()
.into(),
V_MX => self.mx.into(),
V_PRIORITY => self.message.priority.into(),
V_REMOTE_IP => self.remote_ip.to_string().into(),

View file

@ -29,7 +29,7 @@ use store::{
use crate::core::{throttle::NewKey, SMTP};
use super::{Message, QuotaKey, SimpleEnvelope, Status};
use super::{Message, QueueEnvelope, QuotaKey, Status};
impl SMTP {
pub async fn has_quota(&self, message: &mut Message) -> bool {
@ -47,13 +47,13 @@ impl SMTP {
}
for quota in &self.core.smtp.queue.quota.rcpt_domain {
for (pos, domain) in message.domains.iter().enumerate() {
for domain_idx in 0..message.domains.len() {
if !self
.check_quota(
quota,
&SimpleEnvelope::new(message, &domain.domain),
&QueueEnvelope::new(message, domain_idx),
message.size,
((pos + 1) << 32) as u64,
((domain_idx + 1) << 32) as u64,
&mut quota_keys,
)
.await
@ -64,17 +64,13 @@ impl SMTP {
}
for quota in &self.core.smtp.queue.quota.rcpt {
for (pos, rcpt) in message.recipients.iter().enumerate() {
for (rcpt_idx, rcpt) in message.recipients.iter().enumerate() {
if !self
.check_quota(
quota,
&SimpleEnvelope::new_rcpt(
message,
&message.domains[rcpt.domain_idx].domain,
&rcpt.address_lcase,
),
&QueueEnvelope::new_rcpt(message, rcpt.domain_idx, rcpt_idx),
message.size,
(pos + 1) as u64,
(rcpt_idx + 1) as u64,
&mut quota_keys,
)
.await

View file

@ -32,7 +32,7 @@ use utils::BlobHash;
use crate::core::SMTP;
use super::{
Domain, Event, Message, QueueId, QuotaKey, Recipient, Schedule, SimpleEnvelope, Status,
Domain, Event, Message, QueueEnvelope, QueueId, QuotaKey, Recipient, Schedule, Status,
};
pub const LOCK_EXPIRY: u64 = 300;
@ -341,22 +341,26 @@ impl Message {
idx
} else {
let idx = self.domains.len();
let expires = core
.core
.eval_if(
&core.core.smtp.queue.expire,
&SimpleEnvelope::new(self, &rcpt_domain),
)
.await
.unwrap_or_else(|| Duration::from_secs(5 * 86400));
self.domains.push(Domain {
domain: rcpt_domain,
retry: Schedule::now(),
notify: Schedule::later(expires + Duration::from_secs(10)),
expires: now() + expires.as_secs(),
notify: Schedule::now(),
expires: 0,
status: Status::Scheduled,
disable_tls: false,
});
let expires = core
.core
.eval_if(&core.core.smtp.queue.expire, &QueueEnvelope::new(self, idx))
.await
.unwrap_or_else(|| Duration::from_secs(5 * 86400));
// Update expiration
let domain = self.domains.last_mut().unwrap();
domain.notify = Schedule::later(expires + Duration::from_secs(10));
domain.expires = now() + expires.as_secs();
idx
};
self.recipients.push(Recipient {

View file

@ -48,6 +48,7 @@ pub enum ConfigWarning {
Missing,
AppliedDefault { default: String },
Unread { value: String },
Build { error: String },
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
@ -218,6 +219,7 @@ impl Config {
ConfigWarning::Unread { value } => {
format!("WARNING: Unused setting {key:?} with value {value:?}")
}
ConfigWarning::Build { error } => format!("WARNING for {key:?}: {error}"),
};
if !use_stderr {
tracing::debug!("{}", message);

View file

@ -311,6 +311,15 @@ impl Config {
);
}
pub fn new_build_warning(&mut self, key: impl AsKey, details: impl Into<String>) {
self.warnings.insert(
key.as_key(),
ConfigWarning::Build {
error: details.into(),
},
);
}
pub fn new_missing_property(&mut self, key: impl AsKey) {
self.warnings.insert(key.as_key(), ConfigWarning::Missing);
}

View file

@ -133,7 +133,7 @@ impl PublicSuffix {
if r.status().is_success() {
r.bytes().await
} else {
config.new_build_error(
config.new_build_warning(
format!("{value}.{idx}"),
format!(
"Failed to fetch public suffixes from {value:?}: Status {status}",
@ -150,7 +150,7 @@ impl PublicSuffix {
match result {
Ok(bytes) => bytes.to_vec(),
Err(err) => {
config.new_build_error(
config.new_build_warning(
format!("{value}.{idx}"),
format!("Failed to fetch public suffixes from {value:?}: {err}",),
);
@ -161,7 +161,7 @@ impl PublicSuffix {
match std::fs::read(filename) {
Ok(bytes) => bytes,
Err(err) => {
config.new_build_error(
config.new_build_warning(
format!("{value}.{idx}"),
format!("Failed to read public suffixes from {value:?}: {err}",),
);
@ -179,7 +179,7 @@ impl PublicSuffix {
{
Ok(bytes) => bytes,
Err(err) => {
config.new_build_error(
config.new_build_warning(
format!("{value}.{idx}"),
format!(
"Failed to decompress public suffixes from {value:?}: {err}",
@ -199,7 +199,7 @@ impl PublicSuffix {
return PublicSuffix::from(list.as_str());
}
Err(err) => {
config.new_build_error(
config.new_build_warning(
format!("{value}.{idx}"),
format!(
"Failed to parse public suffixes from {value:?}: {err}",
@ -212,7 +212,7 @@ impl PublicSuffix {
}
#[cfg(not(feature = "test_mode"))]
config.new_build_error(key, "Failed to parse public suffixes from any source.");
config.new_build_warning(key, "Failed to parse public suffixes from any source.");
PublicSuffix::default()
}

View file

@ -0,0 +1,134 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of Stalwart Mail Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::time::{Duration, Instant};
use common::config::server::ServerProtocol;
use mail_auth::MX;
use store::write::now;
use crate::smtp::{outbound::TestServer, session::TestSession};
const LOCAL: &str = r#"
[queue.outbound]
next-hop = [{if = "retry_num > 0", then = "'fallback'"},
{else = false}]
[session.rcpt]
relay = true
max-recipients = 100
[session.extensions]
dsn = true
[remote.fallback]
address = fallback.foobar.org
port = 9925
protocol = 'smtp'
concurrency = 5
[remote.fallback.tls]
implicit = false
allow-invalid-certs = true
"#;
const REMOTE: &str = r#"
[session.rcpt]
relay = true
[session.ehlo]
reject-non-fqdn = false
[session.extensions]
dsn = true
chunking = false
"#;
#[tokio::test]
#[serial_test::serial]
async fn fallback_relay() {
/*let disable = 1;
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::TRACE)
.finish(),
)
.unwrap();*/
// Start test server
let mut remote = TestServer::new("smtp_fallback_remote", REMOTE, true).await;
let _rx = remote.start(&[ServerProtocol::Smtp]).await;
let mut local = TestServer::new("smtp_fallback_local", LOCAL, true).await;
// Add mock DNS entries
let core = local.build_smtp();
core.core.smtp.resolvers.dns.mx_add(
"foobar.org",
vec![MX {
exchanges: vec!["_dns_error.foobar.org".to_string()],
preference: 10,
}],
Instant::now() + Duration::from_secs(10),
);
/*core.core.smtp.resolvers.dns.ipv4_add(
"unreachable.foobar.org",
vec!["127.0.0.2".parse().unwrap()],
Instant::now() + Duration::from_secs(10),
);*/
core.core.smtp.resolvers.dns.ipv4_add(
"fallback.foobar.org",
vec!["127.0.0.1".parse().unwrap()],
Instant::now() + Duration::from_secs(10),
);
let mut session = local.new_session();
session.data.remote_ip_str = "10.0.0.1".to_string();
session.eval_session_params().await;
session.ehlo("mx.test.org").await;
session
.send_message("john@test.org", &["bill@foobar.org"], "test:no_dkim", "250")
.await;
local
.qr
.expect_message_then_deliver()
.await
.try_deliver(core.clone())
.await;
let mut retry = local.qr.expect_message().await;
let prev_due = retry.domains[0].retry.due;
let next_due = now();
let queue_id = retry.id;
retry.domains[0].retry.due = next_due;
retry
.save_changes(&core, prev_due.into(), next_due.into())
.await;
local
.qr
.delivery_attempt(queue_id)
.await
.try_deliver(core.clone())
.await;
tokio::time::sleep(Duration::from_millis(100)).await;
remote.qr.expect_message().await;
}

View file

@ -42,6 +42,7 @@ use super::{
pub mod dane;
pub mod extensions;
pub mod fallback_relay;
pub mod ip_lookup;
pub mod lmtp;
pub mod mta_sts;

View file

@ -33,7 +33,7 @@ use crate::smtp::{
inbound::TestQueueEvent, outbound::TestServer, queue::manager::new_message,
session::TestSession,
};
use smtp::queue::{Message, QueueEnvelope};
use smtp::queue::{Domain, Message, QueueEnvelope, Schedule, Status};
const CONFIG: &str = r#"
[session.rcpt]
@ -113,7 +113,7 @@ async fn throttle_outbound() {
for t in &throttle.sender {
core.is_allowed(
t,
&QueueEnvelope::test(&test_message, "", ""),
&QueueEnvelope::test(&test_message, 0, ""),
&mut in_flight,
&span,
)
@ -138,7 +138,7 @@ async fn throttle_outbound() {
for t in &throttle.sender {
core.is_allowed(
t,
&QueueEnvelope::test(&test_message, "", ""),
&QueueEnvelope::test(&test_message, 0, ""),
&mut in_flight,
&span,
)
@ -162,10 +162,17 @@ async fn throttle_outbound() {
// Expect concurrency throttle for recipient domain 'example.org'
test_message.return_path_domain = "test.net".to_string();
test_message.domains.push(Domain {
domain: "example.org".to_string(),
retry: Schedule::now(),
notify: Schedule::now(),
expires: 0,
status: Status::Scheduled,
});
for t in &throttle.rcpt {
core.is_allowed(
t,
&QueueEnvelope::test(&test_message, "example.org", ""),
&QueueEnvelope::test(&test_message, 0, ""),
&mut in_flight,
&span,
)
@ -191,11 +198,18 @@ async fn throttle_outbound() {
local.qr.read_event().await.unwrap_on_hold();
in_flight.clear();
// Expect rate limit throttle for recipient domain 'example.org'
// Expect rate limit throttle for recipient domain 'example.net'
test_message.domains.push(Domain {
domain: "example.net".to_string(),
retry: Schedule::now(),
notify: Schedule::now(),
expires: 0,
status: Status::Scheduled,
});
for t in &throttle.rcpt {
core.is_allowed(
t,
&QueueEnvelope::test(&test_message, "example.net", ""),
&QueueEnvelope::test(&test_message, 1, ""),
&mut in_flight,
&span,
)
@ -236,10 +250,17 @@ async fn throttle_outbound() {
vec!["127.0.0.1".parse().unwrap()],
Instant::now() + Duration::from_secs(10),
);
test_message.domains.push(Domain {
domain: "test.org".to_string(),
retry: Schedule::now(),
notify: Schedule::now(),
expires: 0,
status: Status::Scheduled,
});
for t in &throttle.host {
core.is_allowed(
t,
&QueueEnvelope::test(&test_message, "test.org", "mx.test.org"),
&QueueEnvelope::test(&test_message, 2, "mx.test.org"),
&mut in_flight,
&span,
)
@ -276,7 +297,7 @@ async fn throttle_outbound() {
for t in &throttle.host {
core.is_allowed(
t,
&QueueEnvelope::test(&test_message, "example.net", "mx.test.net"),
&QueueEnvelope::test(&test_message, 1, "mx.test.net"),
&mut in_flight,
&span,
)
@ -301,17 +322,18 @@ async fn throttle_outbound() {
}
pub trait TestQueueEnvelope<'x> {
fn test(message: &'x Message, domain: &'x str, mx: &'x str) -> Self;
fn test(message: &'x Message, current_domain: usize, mx: &'x str) -> Self;
}
impl<'x> TestQueueEnvelope<'x> for QueueEnvelope<'x> {
fn test(message: &'x Message, domain: &'x str, mx: &'x str) -> Self {
fn test(message: &'x Message, current_domain: usize, mx: &'x str) -> Self {
QueueEnvelope {
message,
domain,
mx,
remote_ip: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
local_ip: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
current_domain,
current_rcpt: 0,
}
}
}

View file

@ -41,7 +41,8 @@ relay = true
hostname = "'badtls.foobar.org'"
[queue.outbound.tls]
starttls = "optional"
starttls = [ { if = "retry_num > 0 && last_error == 'tls'", then = "disable"},
{ else = "optional" }]
"#;
const REMOTE: &str = r#"
@ -104,7 +105,6 @@ async fn starttls_optional() {
.try_deliver(core.clone())
.await;
let mut retry = local.qr.expect_message().await;
assert!(retry.domains[0].disable_tls);
let prev_due = retry.domains[0].retry.due;
let next_due = now();
let queue_id = retry.id;

View file

@ -96,7 +96,6 @@ async fn generate_dsn() {
entity: "mx.domain.org".to_string(),
details: "Connection timeout".to_string(),
})),
disable_tls: false,
}],
flags: 0,
env_id: None,

View file

@ -165,7 +165,6 @@ fn domain(domain: &str, retry: u64, notify: u64, expires: u64) -> Domain {
notify: Schedule::later(Duration::from_secs(notify)),
expires: now() + expires,
status: Status::Scheduled,
disable_tls: false,
}
}