Small round of MTA queueing improvements

This commit is contained in:
mdecimus 2025-07-12 19:42:27 +02:00
parent e28769c5ce
commit 036cb2ecd4
24 changed files with 278 additions and 266 deletions

View file

@ -42,7 +42,7 @@ pub const DEFAULT_QUEUE_NAME: QueueName = QueueName([b'd', b'e', b'f', b'a', b'u
#[derive(Clone)]
pub struct QueueConfig {
// Strategy resolver
pub gateway: IfBlock,
pub route: IfBlock,
pub queue: IfBlock,
pub connection: IfBlock,
pub tls: IfBlock,
@ -58,13 +58,13 @@ pub struct QueueConfig {
// Strategies
pub queue_strategy: AHashMap<String, QueueStrategy>,
pub connection_strategy: AHashMap<String, ConnectionStrategy>,
pub gateway_strategy: AHashMap<String, GatewayStrategy>,
pub routing_strategy: AHashMap<String, RoutingStrategy>,
pub tls_strategy: AHashMap<String, TlsStrategy>,
pub virtual_queues: AHashMap<QueueName, VirtualQueue>,
}
#[derive(Clone, Hash, PartialEq, Eq, Debug)]
pub enum GatewayStrategy {
pub enum RoutingStrategy {
Local,
Mx(MxConfig),
Relay(RelayConfig),
@ -109,8 +109,8 @@ pub struct QueueStrategy {
serde::Deserialize,
)]
pub enum QueueExpiry {
Duration(u64),
Count(u32),
Ttl(u64),
Attempts(u32),
}
#[derive(Clone, Debug)]
@ -188,17 +188,38 @@ pub enum RequireOptional {
impl Default for QueueConfig {
fn default() -> Self {
Self {
gateway: IfBlock::new::<()>(
"queue.strategy.gateway",
route: IfBlock::new::<()>(
"queue.strategy.route",
#[cfg(not(feature = "test_mode"))]
[("is_local_domain('*', rcpt_domain)", "'local'")],
#[cfg(feature = "test_mode")]
[],
"'mx'",
),
queue: IfBlock::new::<()>("queue.strategy.schedule", [], "'default'"),
queue: IfBlock::new::<()>(
"queue.strategy.schedule",
#[cfg(not(feature = "test_mode"))]
[
("is_local_domain('*', rcpt_domain)", "'local'"),
("source == 'dsn'", "'dsn'"),
("source == 'report'", "'report'"),
],
#[cfg(feature = "test_mode")]
[],
#[cfg(not(feature = "test_mode"))]
"'remote'",
#[cfg(feature = "test_mode")]
"'default'",
),
connection: IfBlock::new::<()>("queue.strategy.connection", [], "'default'"),
tls: IfBlock::new::<()>("queue.strategy.tls", [], "'default'"),
tls: IfBlock::new::<()>(
"queue.strategy.tls",
#[cfg(not(feature = "test_mode"))]
[("retry_num > 0 && last_error == 'tls'", "'invalid-tls'")],
#[cfg(feature = "test_mode")]
[],
"'default'",
),
dsn: Dsn {
name: IfBlock::new::<()>("report.dsn.from-name", [], "'Mail Delivery Subsystem'"),
address: IfBlock::new::<()>(
@ -218,7 +239,7 @@ impl Default for QueueConfig {
queue_strategy: Default::default(),
virtual_queues: Default::default(),
connection_strategy: Default::default(),
gateway_strategy: Default::default(),
routing_strategy: Default::default(),
tls_strategy: Default::default(),
}
}
@ -232,7 +253,7 @@ impl QueueConfig {
let host_vars = TokenMap::default().with_variables(SMTP_QUEUE_HOST_VARS);
for (value, key, token_map) in [
(&mut queue.gateway, "queue.strategy.gateway", &rcpt_vars),
(&mut queue.route, "queue.strategy.route", &rcpt_vars),
(&mut queue.queue, "queue.strategy.schedule", &rcpt_vars),
(
&mut queue.connection,
@ -257,7 +278,7 @@ impl QueueConfig {
queue.virtual_queues = parse_virtual_queues(config);
queue.queue_strategy = parse_queue_strategies(config, &queue.virtual_queues);
queue.connection_strategy = parse_connection_strategies(config);
queue.gateway_strategy = parse_gateway_strategies(config);
queue.routing_strategy = parse_routing_strategies(config);
queue.tls_strategy = parse_tls_strategies(config);
// Parse rate limiters
@ -333,8 +354,8 @@ fn parse_queue_strategy(
config.property::<Duration>(("queue.schedule", id, "expire")),
config.property::<u32>(("queue.schedule", id, "max-attempts")),
) {
(Some(duration), None) => QueueExpiry::Duration(duration.as_secs()),
(None, Some(count)) => QueueExpiry::Count(count),
(Some(duration), None) => QueueExpiry::Ttl(duration.as_secs()),
(None, Some(count)) => QueueExpiry::Attempts(count),
(Some(_), Some(_)) => {
config.new_parse_error(
("queue.schedule", id, "expire"),
@ -342,7 +363,7 @@ fn parse_queue_strategy(
);
return None;
}
(None, None) => QueueExpiry::Duration(60 * 60 * 24 * 3), // Default to 3 days
(None, None) => QueueExpiry::Ttl(60 * 60 * 24 * 3), // Default to 3 days
},
virtual_queue,
})
@ -373,59 +394,59 @@ fn parse_virtual_queue(config: &mut Config, id: &str) -> Option<VirtualQueue> {
})
}
fn parse_gateway_strategies(config: &mut Config) -> AHashMap<String, GatewayStrategy> {
fn parse_routing_strategies(config: &mut Config) -> AHashMap<String, RoutingStrategy> {
let mut entries = AHashMap::new();
for key in config.sub_keys("queue.gateway", ".type") {
if let Some(strategy) = parse_gateway(config, &key) {
for key in config.sub_keys("queue.route", ".type") {
if let Some(strategy) = parse_route(config, &key) {
entries.insert(key, strategy);
}
}
entries
}
fn parse_gateway(config: &mut Config, id: &str) -> Option<GatewayStrategy> {
match config.value_require_non_empty(("queue.gateway", id, "type"))? {
"relay" => GatewayStrategy::Relay(RelayConfig {
address: config.property_require(("queue.gateway", id, "address"))?,
fn parse_route(config: &mut Config, id: &str) -> Option<RoutingStrategy> {
match config.value_require_non_empty(("queue.route", id, "type"))? {
"relay" => RoutingStrategy::Relay(RelayConfig {
address: config.property_require(("queue.route", id, "address"))?,
port: config
.property_require(("queue.gateway", id, "port"))
.property_require(("queue.route", id, "port"))
.unwrap_or(25),
protocol: config
.property_require(("queue.gateway", id, "protocol"))
.property_require(("queue.route", id, "protocol"))
.unwrap_or(ServerProtocol::Smtp),
auth: if let (Some(username), Some(secret)) = (
config.value(("queue.gateway", id, "auth.username")),
config.value(("queue.gateway", id, "auth.secret")),
config.value(("queue.route", id, "auth.username")),
config.value(("queue.route", id, "auth.secret")),
) {
Credentials::new(username.to_string(), secret.to_string()).into()
} else {
None
},
tls_implicit: config
.property(("queue.gateway", id, "tls.implicit"))
.property(("queue.route", id, "tls.implicit"))
.unwrap_or(true),
tls_allow_invalid_certs: config
.property(("queue.gateway", id, "tls.allow-invalid-certs"))
.property(("queue.route", id, "tls.allow-invalid-certs"))
.unwrap_or(false),
})
.into(),
"local" => GatewayStrategy::Local.into(),
"mx" => GatewayStrategy::Mx(MxConfig {
"local" => RoutingStrategy::Local.into(),
"mx" => RoutingStrategy::Mx(MxConfig {
max_mx: config
.property_require(("queue.gateway", id, "limits.mx"))
.property_require(("queue.route", id, "limits.mx"))
.unwrap_or(5),
max_multi_homed: config
.property_require(("queue.gateway", id, "limits.multihomed"))
.property_require(("queue.route", id, "limits.multihomed"))
.unwrap_or(2),
ip_lookup_strategy: config
.property_require(("queue.gateway", id, "ip-lookup"))
.property_require(("queue.route", id, "ip-lookup"))
.unwrap_or(IpLookupStrategy::Ipv4thenIpv6),
})
.into(),
invalid => {
let details =
format!("Invalid gateway type: {invalid:?}. Expected 'relay', 'local', or 'mx'.");
config.new_parse_error(("queue.gateway", id, "type"), details);
format!("Invalid route type: {invalid:?}. Expected 'relay', 'local', or 'mx'.");
config.new_parse_error(("queue.route", id, "type"), details);
None
}
}
@ -499,30 +520,16 @@ fn parse_connection(config: &mut Config, id: &str) -> Option<ConnectionStrategy>
let mut source_ipv4 = Vec::new();
let mut source_ipv6 = Vec::new();
for ip_num in config.sub_keys(("queue.connection", id, "source-ip"), ".address") {
if let Some(ip) = config.property_require::<IpAddr>((
"queue.connection",
id,
"source-ip",
ip_num.as_str(),
"address",
)) {
let ip_and_host = IpAndHost {
ip,
host: config.property::<String>((
"queue.connection",
id,
"source-ip",
ip_num.as_str(),
"ehlo-hostname",
)),
};
for (_, ip) in config.properties::<IpAddr>(("queue.connection", id, "source-ips")) {
let ip_and_host = IpAndHost {
ip,
host: config.property::<String>(("queue.source-ip", ip.to_string(), "ehlo-hostname")),
};
if ip.is_ipv4() {
source_ipv4.push(ip_and_host);
} else {
source_ipv6.push(ip_and_host);
}
if ip.is_ipv4() {
source_ipv4.push(ip_and_host);
} else {
source_ipv6.push(ip_and_host);
}
}

View file

@ -10,8 +10,8 @@ use crate::{
config::smtp::{
auth::{ArcSealer, DkimSigner, LazySignature, ResolvedSignature, build_signature},
queue::{
ConnectionStrategy, DEFAULT_QUEUE_NAME, GatewayStrategy, MxConfig, QueueExpiry,
QueueName, QueueStrategy, RequireOptional, TlsStrategy, VirtualQueue,
ConnectionStrategy, DEFAULT_QUEUE_NAME, MxConfig, QueueExpiry, QueueName,
QueueStrategy, RequireOptional, RoutingStrategy, TlsStrategy, VirtualQueue,
},
},
ipc::{BroadcastEvent, StateEvent},
@ -190,9 +190,9 @@ impl Server {
})
}
pub fn get_gateway_or_default(&self, name: &str, session_id: u64) -> &GatewayStrategy {
static LOCAL_GATEWAY: GatewayStrategy = GatewayStrategy::Local;
static MX_GATEWAY: GatewayStrategy = GatewayStrategy::Mx(MxConfig {
pub fn get_route_or_default(&self, name: &str, session_id: u64) -> &RoutingStrategy {
static LOCAL_GATEWAY: RoutingStrategy = RoutingStrategy::Local;
static MX_GATEWAY: RoutingStrategy = RoutingStrategy::Mx(MxConfig {
max_mx: 5,
max_multi_homed: 2,
ip_lookup_strategy: IpLookupStrategy::Ipv4thenIpv6,
@ -200,7 +200,7 @@ impl Server {
self.core
.smtp
.queue
.gateway_strategy
.routing_strategy
.get(name)
.unwrap_or_else(|| match name {
"local" => &LOCAL_GATEWAY,
@ -252,7 +252,7 @@ impl Server {
86400, // 1 day
259200, // 3 days
],
expiry: QueueExpiry::Duration(432000), // 5 days
expiry: QueueExpiry::Ttl(432000), // 5 days
virtual_queue: QueueName::default(),
});
self.core

View file

@ -247,7 +247,7 @@ impl QueueManagement for Server {
.is_some_and(|expires| expires > time)
{
recipient.expires =
QueueExpiry::Count(recipient.retry.inner + 10);
QueueExpiry::Attempts(recipient.retry.inner + 10);
}
has_changes = true;
}
@ -302,7 +302,8 @@ impl QueueManagement for Server {
.expiration_time(message.message.created)
.is_some_and(|expires| expires > time)
{
recipient.expires = QueueExpiry::Count(recipient.retry.inner + 10);
recipient.expires =
QueueExpiry::Attempts(recipient.retry.inner + 10);
}
found = true;
}
@ -605,7 +606,7 @@ impl Message {
} else {
None
},
expires: if let ArchivedQueueExpiry::Duration(time) = &rcpt.expires {
expires: if let ArchivedQueueExpiry::Ttl(time) = &rcpt.expires {
DateTime::from_timestamp((u64::from(*time) + message.created) as i64).into()
} else {
None

View file

@ -133,7 +133,8 @@ pub(crate) async fn migrate_calendar_events(server: &Server) -> trc::Result<()>
num_migrated += 1;
}
Err(err) => {
if archive.unarchive_untrusted::<CalendarEvent>().is_err() {
if let Err(err_) = archive.unarchive_untrusted::<CalendarEvent>() {
trc::error!(err_.caused_by(trc::location!()));
return Err(err.caused_by(trc::location!()));
}
}

View file

@ -157,9 +157,13 @@ async fn migrate_v0_12(server: &Server, migrate_tasks: bool) -> trc::Result<()>
}
}
migrate_calendar_events(server)
.await
.caused_by(trc::location!())
if migrate_tasks {
migrate_calendar_events(server)
.await
.caused_by(trc::location!())
} else {
Ok(())
}
}
async fn migrate_v0_11(server: &Server) -> trc::Result<()> {

View file

@ -278,7 +278,7 @@ where
retry: domain.retry.clone(),
notify: domain.notify.clone(),
queue: QueueName::default(),
expires: QueueExpiry::Duration(domain.expires.saturating_sub(now())),
expires: QueueExpiry::Ttl(domain.expires.saturating_sub(now())),
}
})
.collect(),

View file

@ -745,7 +745,7 @@ impl<T: SessionStream> Session<T> {
orcpt: rcpt.dsn_info,
retry: Schedule::now(),
notify: Schedule::now(),
expires: QueueExpiry::Count(0),
expires: QueueExpiry::Attempts(0),
queue: QueueName::default(),
});
@ -779,18 +779,18 @@ impl<T: SessionStream> Session<T> {
(
queue::Schedule::later(future_release + next_notify),
match queue.expiry {
QueueExpiry::Duration(time) => QueueExpiry::Duration(future_release + time),
QueueExpiry::Count(count) => QueueExpiry::Count(count),
QueueExpiry::Ttl(time) => QueueExpiry::Ttl(future_release + time),
QueueExpiry::Attempts(count) => QueueExpiry::Attempts(count),
},
)
} else if (message.flags & MAIL_BY_RETURN) != 0 {
(
queue::Schedule::later(future_release + next_notify),
QueueExpiry::Duration(self.data.delivery_by as u64),
QueueExpiry::Ttl(self.data.delivery_by as u64),
)
} else {
let (notify, expires) = match queue.expiry {
QueueExpiry::Duration(expire_secs) => (
QueueExpiry::Ttl(expire_secs) => (
(if self.data.delivery_by.is_positive() {
let notify_at = self.data.delivery_by as u64;
if expire_secs > notify_at {
@ -806,11 +806,11 @@ impl<T: SessionStream> Session<T> {
next_notify
}
}),
QueueExpiry::Duration(expire_secs),
QueueExpiry::Ttl(expire_secs),
),
QueueExpiry::Count(_) => (
QueueExpiry::Attempts(_) => (
next_notify,
QueueExpiry::Duration(self.data.delivery_by.unsigned_abs()),
QueueExpiry::Ttl(self.data.delivery_by.unsigned_abs()),
),
};

View file

@ -25,7 +25,7 @@ use crate::reporting::SmtpReporting;
use crate::{queue::ErrorDetails, reporting::tls::TlsRptOptions};
use ahash::AHashMap;
use common::Server;
use common::config::smtp::queue::GatewayStrategy;
use common::config::smtp::queue::RoutingStrategy;
use common::config::{server::ServerProtocol, smtp::report::AggregateFrequency};
use common::ipc::{PolicyType, QueueEvent, QueueEventStatus, TlsEvent};
use compact_str::ToCompactString;
@ -215,10 +215,10 @@ impl QueuedMessage {
}
}
// Group recipients by gateway
// Group recipients by route
let queue_config = &server.core.smtp.queue;
let now_ = now();
let mut gateways: AHashMap<(&str, &GatewayStrategy), Vec<usize>> = AHashMap::new();
let mut routes: AHashMap<(&str, &RoutingStrategy), Vec<usize>> = AHashMap::new();
for (rcpt_idx, rcpt) in message.message.recipients.iter().enumerate() {
if matches!(
&rcpt.status,
@ -227,16 +227,16 @@ impl QueuedMessage {
&& rcpt.queue == message.queue_name
{
let envelope = QueueEnvelope::new(&message.message, rcpt);
let gateway = server.get_gateway_or_default(
let route = server.get_route_or_default(
&server
.eval_if::<String, _>(&queue_config.gateway, &envelope, message.span_id)
.eval_if::<String, _>(&queue_config.route, &envelope, message.span_id)
.await
.unwrap_or_else(|| "default".to_string()),
message.span_id,
);
gateways
.entry((rcpt.address_lcase.domain_part(), gateway))
routes
.entry((rcpt.address_lcase.domain_part(), route))
.or_default()
.push(rcpt_idx);
}
@ -244,7 +244,7 @@ impl QueuedMessage {
let no_ip = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
let mut delivery_results: Vec<DeliveryResult> = Vec::new();
'next_gateway: for ((domain, gateway), rcpt_idxs) in gateways {
'next_route: for ((domain, route), rcpt_idxs) in routes {
trc::event!(
Delivery(DeliveryEvent::DomainDeliveryStart),
SpanId = message.span_id,
@ -269,21 +269,21 @@ impl QueuedMessage {
);
delivery_results.push(DeliveryResult::rate_limited(rcpt_idxs, retry_at));
continue 'next_gateway;
continue 'next_route;
}
}
// Obtain next hop
let (mut remote_hosts, mx_config, is_smtp) = match gateway {
GatewayStrategy::Local => {
let (mut remote_hosts, mx_config, is_smtp) = match route {
RoutingStrategy::Local => {
// Deliver message locally
message
.deliver_local(&rcpt_idxs, &mut delivery_results, &server)
.await;
continue 'next_gateway;
continue 'next_route;
}
GatewayStrategy::Mx(mx_config) => (Vec::with_capacity(0), Some(mx_config), true),
GatewayStrategy::Relay(relay_config) => (
RoutingStrategy::Mx(mx_config) => (Vec::with_capacity(0), Some(mx_config), true),
RoutingStrategy::Relay(relay_config) => (
vec![NextHop::Relay(relay_config)],
None,
relay_config.protocol == ServerProtocol::Smtp,
@ -480,7 +480,7 @@ impl QueuedMessage {
Status::from_mta_sts_error(domain, err),
rcpt_idxs,
));
continue 'next_gateway;
continue 'next_route;
}
None
@ -528,7 +528,7 @@ impl QueuedMessage {
Status::from_mail_auth_error(domain, err),
rcpt_idxs,
));
continue 'next_gateway;
continue 'next_route;
}
};
@ -561,7 +561,7 @@ impl QueuedMessage {
}),
rcpt_idxs,
));
continue 'next_gateway;
continue 'next_route;
}
}
@ -850,7 +850,7 @@ impl QueuedMessage {
);
delivery_results
.push(DeliveryResult::rate_limited(rcpt_idxs, retry_at));
continue 'next_gateway;
continue 'next_route;
}
}
@ -1236,8 +1236,8 @@ impl QueuedMessage {
.await
}
// Continue with the next domain/gateway
continue 'next_gateway;
// Continue with the next domain/route
continue 'next_route;
}
}

View file

@ -24,10 +24,12 @@ use tokio::sync::mpsc;
pub struct Queue {
pub core: Arc<Inner>,
pub locked_messages: LockedMessages,
pub locked: AHashMap<(QueueId, QueueName), LockedMessage>,
pub locked_revision: u64,
pub stats: AHashMap<QueueName, QueueStats>,
pub next_wake_up: Instant,
pub next_refresh: Instant,
pub rx: mpsc::Receiver<QueueEvent>,
pub is_paused: bool,
}
#[derive(Debug)]
@ -37,12 +39,6 @@ pub struct QueueStats {
pub last_warning: Instant,
}
#[derive(Debug)]
pub struct LockedMessages {
pub locked: AHashMap<(QueueId, QueueName), LockedMessage>,
pub revision: u64,
}
#[derive(Debug)]
pub struct LockedMessage {
pub expires: u64,
@ -63,88 +59,43 @@ impl Queue {
pub fn new(core: Arc<Inner>, rx: mpsc::Receiver<QueueEvent>) -> Self {
Queue {
core,
locked_messages: LockedMessages::default(),
locked: AHashMap::with_capacity(128),
locked_revision: 0,
stats: AHashMap::new(),
next_wake_up: Instant::now(),
next_refresh: Instant::now(),
is_paused: false,
rx,
}
}
pub async fn start(&mut self) {
let mut is_paused = false;
loop {
let refresh_queue = match tokio::time::timeout(
self.next_wake_up.duration_since(Instant::now()),
let mut refresh_queue;
match tokio::time::timeout(
self.next_refresh.duration_since(Instant::now()),
self.rx.recv(),
)
.await
{
Ok(Some(QueueEvent::WorkerDone {
queue_id,
queue_name,
status,
})) => {
let queue_stats = self.stats.get_mut(&queue_name).unwrap();
queue_stats.in_flight -= 1;
Ok(Some(event)) => {
refresh_queue = self.handle_event(event).await;
match status {
QueueEventStatus::Completed => {
self.locked_messages.locked.remove(&(queue_id, queue_name));
!self.locked_messages.locked.is_empty() || !queue_stats.has_capacity()
}
QueueEventStatus::Locked => {
let expires = LOCK_EXPIRY + rand::rng().random_range(5..10);
let due_in = Instant::now() + Duration::from_secs(expires);
if due_in < self.next_wake_up {
self.next_wake_up = due_in;
}
self.locked_messages.locked.insert(
(queue_id, queue_name),
LockedMessage {
expires: now() + expires,
revision: self.locked_messages.revision,
},
);
self.locked_messages.locked.len() > 1 || !queue_stats.has_capacity()
}
QueueEventStatus::Deferred => {
self.locked_messages.locked.remove(&(queue_id, queue_name));
true
}
while let Ok(event) = self.rx.try_recv() {
refresh_queue = self.handle_event(event).await || refresh_queue;
}
}
Ok(Some(QueueEvent::Refresh)) => true,
Ok(Some(QueueEvent::Paused(paused))) => {
self.core
.data
.queue_status
.store(!paused, Ordering::Relaxed);
is_paused = paused;
false
Err(_) => {
refresh_queue = true;
}
Ok(Some(QueueEvent::ReloadSettings)) => {
let server = self.core.build_server();
for (name, settings) in &server.core.smtp.queue.virtual_queues {
if let Some(stats) = self.stats.get_mut(name) {
stats.max_in_flight = settings.threads;
} else {
self.stats.insert(*name, QueueStats::new(settings.threads));
}
}
false
}
Err(_) => true,
Ok(Some(QueueEvent::Stop)) | Ok(None) => {
Ok(None) => {
break;
}
};
if !is_paused {
if !self.is_paused {
// Deliver scheduled messages
if refresh_queue || self.next_wake_up <= Instant::now() {
if refresh_queue || self.next_refresh <= Instant::now() {
// Process queue events
let server = self.core.build_server();
let mut queue_events = server.next_event(self).await;
@ -183,24 +134,89 @@ impl Queue {
Limit = stats.max_in_flight,
);
}
self.locked_messages
.locked
self.locked
.remove(&(queue_event.queue_id, queue_event.queue_name));
}
}
// Remove expired locks
let now = now();
self.locked_messages.locked.retain(|_, locked| {
locked.expires > now && locked.revision == self.locked_messages.revision
self.locked.retain(|_, locked| {
locked.expires > now && locked.revision == self.locked_revision
});
self.next_wake_up = Instant::now()
self.next_refresh = Instant::now()
+ Duration::from_secs(queue_events.next_refresh.saturating_sub(now));
}
} else {
// Queue is paused
self.next_wake_up = Instant::now() + Duration::from_secs(86400);
self.next_refresh = Instant::now() + Duration::from_secs(86400);
}
}
}
async fn handle_event(&mut self, event: QueueEvent) -> bool {
match event {
QueueEvent::WorkerDone {
queue_id,
queue_name,
status,
} => {
let queue_stats = self.stats.get_mut(&queue_name).unwrap();
queue_stats.in_flight -= 1;
match status {
QueueEventStatus::Completed => {
self.locked.remove(&(queue_id, queue_name));
!self.locked.is_empty() || !queue_stats.has_capacity()
}
QueueEventStatus::Locked => {
let expires = LOCK_EXPIRY + rand::rng().random_range(5..10);
let due_in = Instant::now() + Duration::from_secs(expires);
if due_in < self.next_refresh {
self.next_refresh = due_in;
}
self.locked.insert(
(queue_id, queue_name),
LockedMessage {
expires: now() + expires,
revision: self.locked_revision,
},
);
self.locked.len() > 1 || !queue_stats.has_capacity()
}
QueueEventStatus::Deferred => {
self.locked.remove(&(queue_id, queue_name));
true
}
}
}
QueueEvent::Refresh => true,
QueueEvent::Paused(paused) => {
self.core
.data
.queue_status
.store(!paused, Ordering::Relaxed);
self.is_paused = paused;
false
}
QueueEvent::ReloadSettings => {
let server = self.core.build_server();
for (name, settings) in &server.core.smtp.queue.virtual_queues {
if let Some(stats) = self.stats.get_mut(name) {
stats.max_in_flight = settings.threads;
} else {
self.stats.insert(*name, QueueStats::new(settings.threads));
}
}
false
}
QueueEvent::Stop => {
self.rx.close();
self.is_paused = true;
false
}
}
}
@ -324,15 +340,15 @@ impl Message {
impl Recipient {
pub fn expiration_time(&self, created: u64) -> Option<u64> {
match self.expires {
QueueExpiry::Duration(time) => Some(created + time),
QueueExpiry::Count(_) => None,
QueueExpiry::Ttl(time) => Some(created + time),
QueueExpiry::Attempts(_) => None,
}
}
pub fn is_expired(&self, created: u64, now: u64) -> bool {
match self.expires {
QueueExpiry::Duration(time) => created + time <= now,
QueueExpiry::Count(count) => self.retry.inner >= count,
QueueExpiry::Ttl(time) => created + time <= now,
QueueExpiry::Attempts(count) => self.retry.inner >= count,
}
}
}
@ -355,12 +371,3 @@ impl QueueStats {
self.in_flight < self.max_in_flight
}
}
impl Default for LockedMessages {
fn default() -> Self {
LockedMessages {
locked: AHashMap::with_capacity(128),
revision: 0,
}
}
}

View file

@ -294,8 +294,8 @@ impl<'x> ResolveVariable for QueueEnvelope<'x> {
V_QUEUE_RETRY_NUM => self.rcpt.retry.inner.into(),
V_QUEUE_NOTIFY_NUM => self.rcpt.notify.inner.into(),
V_QUEUE_EXPIRES_IN => match &self.rcpt.expires {
QueueExpiry::Duration(time) => (*time + self.message.created).saturating_sub(now()),
QueueExpiry::Count(count) => (*count) as u64,
QueueExpiry::Ttl(time) => (*time + self.message.created).saturating_sub(now()),
QueueExpiry::Attempts(count) => (*count) as u64,
}
.into(),
V_QUEUE_LAST_STATUS => self.rcpt.status.to_compact_string().into(),

View file

@ -130,7 +130,7 @@ impl SmtpSpool for Server {
next_refresh: now + QUEUE_REFRESH,
};
queue.locked_messages.revision += 1;
queue.locked_revision += 1;
let result = self
.store()
.iterate(
@ -148,10 +148,10 @@ impl SmtpSpool for Server {
.stats
.get(&queue_name)
.is_none_or(|stats| stats.has_capacity())
&& match queue.locked_messages.locked.entry((queue_id, queue_name)) {
&& match queue.locked.entry((queue_id, queue_name)) {
Entry::Occupied(mut entry) => {
let locked = entry.get_mut();
locked.revision = queue.locked_messages.revision;
locked.revision = queue.locked_revision;
if locked.expires <= now {
locked.expires = now + INFINITE_LOCK;
@ -167,7 +167,7 @@ impl SmtpSpool for Server {
Entry::Vacant(entry) => {
entry.insert(LockedMessage {
expires: now + INFINITE_LOCK,
revision: queue.locked_messages.revision,
revision: queue.locked_revision,
});
true
}
@ -507,7 +507,7 @@ impl MessageWrapper {
orcpt: None,
retry: Schedule::now(),
notify: Schedule::now(),
expires: QueueExpiry::Count(0),
expires: QueueExpiry::Attempts(0),
queue: QueueName::default(),
});
let queue = server.get_queue_or_default(

View file

@ -4,30 +4,26 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use std::{future::Future, io, time::SystemTime};
use crate::{
core::Session,
inbound::DkimSign,
queue::{DomainPart, MessageSource, MessageWrapper, spool::SmtpSpool},
};
use common::{
Server, USER_AGENT,
config::smtp::report::{AddressMatch, AggregateFrequency},
expr::if_block::IfBlock,
ipc::ReportingEvent,
};
use mail_auth::{
common::headers::HeaderWriter,
report::{AuthFailureType, DeliveryResult, Feedback, FeedbackType},
};
use mail_parser::DateTime;
use std::{future::Future, io, time::SystemTime};
use store::write::{ReportEvent, key::KeySerializer};
use tokio::io::{AsyncRead, AsyncWrite};
use crate::{
core::Session,
inbound::DkimSign,
queue::{DomainPart, FROM_REPORT, MessageSource, MessageWrapper, spool::SmtpSpool},
};
pub mod analysis;
pub mod dkim;
pub mod dmarc;
@ -137,19 +133,21 @@ impl SmtpReporting for Server {
if !deliver_now {
#[cfg(not(feature = "test_mode"))]
{
use common::config::smtp::queue::QueueExpiry;
use rand::Rng;
let delivery_time = rand::rng().random_range(0u64..10800u64);
for domain in &mut message.domains {
domain.retry.due += delivery_time;
domain.expires += delivery_time;
domain.notify.due += delivery_time;
for rcpt in &mut message.message.recipients {
rcpt.retry.due += delivery_time;
rcpt.notify.due += delivery_time;
if let QueueExpiry::Ttl(expires) = &mut rcpt.expires {
*expires += delivery_time;
}
}
}
}
// Queue message
message.message.flags |= FROM_REPORT;
message
.queue(
signature.as_deref(),

View file

@ -255,7 +255,7 @@ impl RunScript for Server {
(alimit as u64).saturating_sub(message.message.created);
if expires > 0 {
for domain in &mut message.message.recipients {
domain.expires = QueueExpiry::Duration(expires);
domain.expires = QueueExpiry::Ttl(expires);
}
}
}

View file

@ -291,7 +291,7 @@ directory = "'{STORE}'"
type = "system"
[queue.strategy]
gateway = [ { if = "rcpt_domain == 'example.com'", then = "'local'" },
route = [ { if = "rcpt_domain == 'example.com'", then = "'local'" },
{ else = "'mx'" } ]
[store."foundationdb"]

View file

@ -679,17 +679,17 @@ hash = 64
type = "system"
[queue.strategy]
gateway = [ { if = "rcpt_domain == 'example.com'", then = "'local'" },
route = [ { if = "rcpt_domain == 'example.com'", then = "'local'" },
{ if = "contains(['remote.org', 'foobar.com', 'test.com', 'other_domain.com'], rcpt_domain)", then = "'mock-smtp'" },
{ else = "'mx'" } ]
[queue.gateway."mock-smtp"]
[queue.route."mock-smtp"]
type = "relay"
address = "localhost"
port = 9999
protocol = "smtp"
[queue.gateway."mock-smtp".tls]
[queue.route."mock-smtp".tls]
enable = false
allow-invalid-certs = true

View file

@ -802,17 +802,17 @@ hash = 64
type = "system"
[queue.strategy]
gateway = [ { if = "rcpt_domain == 'example.com'", then = "'local'" },
route = [ { if = "rcpt_domain == 'example.com'", then = "'local'" },
{ if = "contains(['remote.org', 'foobar.com', 'test.com', 'other_domain.com'], rcpt_domain)", then = "'mock-smtp'" },
{ else = "'mx'" } ]
[queue.gateway."mock-smtp"]
[queue.route."mock-smtp"]
type = "relay"
address = "localhost"
port = 9999
protocol = "smtp"
[queue.gateway."mock-smtp".tls]
[queue.route."mock-smtp".tls]
enable = false
allow-invalid-certs = true

View file

@ -450,7 +450,7 @@ pub async fn test(params: &mut JMAPTest) {
&mut smtp_rx,
MockMessage::new(
"<jdoe@example.com>",
["<sms_gateway@remote.org>"],
["<sms_route@remote.org>"],
"@It's TPS-o-clock",
),
)

View file

@ -36,40 +36,34 @@ const CONFIG: &str = r#"
[queue.connection.test.timeout]
connect = "10s"
[[queue.connection.test.source-ip]]
address = "10.0.0.1"
ehlo-hostname = "test1.example.com"
[[queue.connection.test.source-ip]]
address = "10.0.0.2"
ehlo-hostname = "test2.example.com"
[[queue.connection.test.source-ip]]
address = "10.0.0.3"
ehlo-hostname = "test3.example.com"
[[queue.connection.test.source-ip]]
address = "10.0.0.4"
ehlo-hostname = "test4.example.com"
[[queue.connection.test.source-ip]]
address = "a:b::1"
ehlo-hostname = "test5.example.com"
[[queue.connection.test.source-ip]]
address = "a:b::2"
ehlo-hostname = "test6.example.com"
[[queue.connection.test.source-ip]]
address = "a:b::3"
ehlo-hostname = "test7.example.com"
[[queue.connection.test.source-ip]]
address = "a:b::4"
ehlo-hostname = "test8.example.com"
[queue.connection.test]
ehlo-hostname = "test.example.com"
source-ips = ["10.0.0.1", "10.0.0.2", "10.0.0.3", "10.0.0.4",
"a:b::1", "a:b::2", "a:b::3", "a:b::4"]
[queue.source-ip."10.0.0.1"]
ehlo-hostname = "test1.example.com"
[queue.source-ip."10.0.0.2"]
ehlo-hostname = "test2.example.com"
[queue.source-ip."10.0.0.3"]
ehlo-hostname = "test3.example.com"
[queue.source-ip."10.0.0.4"]
ehlo-hostname = "test4.example.com"
[queue.source-ip."a:b::1"]
ehlo-hostname = "test5.example.com"
[queue.source-ip."a:b::2"]
ehlo-hostname = "test6.example.com"
[queue.source-ip."a:b::3"]
ehlo-hostname = "test7.example.com"
[queue.source-ip."a:b::4"]
ehlo-hostname = "test8.example.com"
[queue.test-v4.type]
type = "mx"
@ -160,7 +154,7 @@ async fn strategies() {
address_lcase: "recipient@foobar.com".to_string(),
retry: Schedule::now(),
notify: Schedule::now(),
expires: QueueExpiry::Duration(3600),
expires: QueueExpiry::Ttl(3600),
queue: QueueName::new("test").unwrap(),
status: Status::TemporaryFailure(ErrorDetails {
entity: "test.example.com".to_string(),

View file

@ -14,7 +14,7 @@ use crate::smtp::{DnsCache, TestSMTP, session::TestSession};
const LOCAL: &str = r#"
[queue.strategy]
gateway = [{if = "retry_num > 0", then = "'fallback'"},
route = [{if = "retry_num > 0", then = "'fallback'"},
{else = "'mx'"}]
[session.rcpt]
@ -24,14 +24,14 @@ max-recipients = 100
[session.extensions]
dsn = true
[queue.gateway.fallback]
[queue.route.fallback]
type = "relay"
address = fallback.foobar.org
port = 9925
protocol = 'smtp'
concurrency = 5
[queue.gateway.fallback.tls]
[queue.route.fallback.tls]
implicit = false
allow-invalid-certs = true

View file

@ -15,7 +15,7 @@ const LOCAL: &str = r#"
[session.rcpt]
relay = true
[queue.gateway.mx]
[queue.route.mx]
ip-lookup = "ipv6_then_ipv4"
"#;

View file

@ -32,7 +32,7 @@ dsn = true
const LOCAL: &str = r#"
[queue.strategy]
gateway = [{if = "rcpt_domain = 'foobar.org'", then = "'lmtp'"},
route = [{if = "rcpt_domain = 'foobar.org'", then = "'lmtp'"},
{else = "'mx'"}]
schedule = [{if = "rcpt_domain = 'foobar.org'", then = "'foobar'"},
{else = "'default'"}]
@ -60,14 +60,14 @@ queue-name = "default"
connect = "1s"
data = "50ms"
[queue.gateway.lmtp]
[queue.route.lmtp]
type = "relay"
address = lmtp.foobar.org
port = 9924
protocol = 'lmtp'
concurrency = 5
[queue.gateway.lmtp.tls]
[queue.route.lmtp.tls]
implicit = true
allow-invalid-certs = true
"#;

View file

@ -84,7 +84,7 @@ async fn generate_dsn() {
orcpt: None,
retry: Schedule::now(),
notify: Schedule::now(),
expires: QueueExpiry::Duration(10),
expires: QueueExpiry::Ttl(10),
queue: QueueName::default(),
}],
flags: 0,
@ -138,7 +138,7 @@ async fn generate_dsn() {
orcpt: None,
retry: Schedule::now(),
notify: Schedule::now(),
expires: QueueExpiry::Duration(10),
expires: QueueExpiry::Ttl(10),
queue: QueueName::default(),
});
core.send_dsn(&mut message).await;
@ -157,7 +157,7 @@ async fn generate_dsn() {
orcpt: Some("jdoe@example.org".into()),
retry: Schedule::now(),
notify: Schedule::now(),
expires: QueueExpiry::Duration(10),
expires: QueueExpiry::Ttl(10),
queue: QueueName::default(),
});
core.send_dsn(&mut message).await;

View file

@ -27,7 +27,7 @@ pub fn build_rcpt(address: &str, retry: u64, notify: u64, expires: u64) -> Recip
address_lcase: address.to_string(),
retry: Schedule::later(retry),
notify: Schedule::later(notify),
expires: QueueExpiry::Duration(expires),
expires: QueueExpiry::Ttl(expires),
status: Status::Scheduled,
flags: 0,
orcpt: None,

View file

@ -1069,7 +1069,7 @@ wait = "1ms"
type = "system"
[queue.strategy]
gateway = [ { if = "rcpt_domain == 'example.com'", then = "'local'" },
route = [ { if = "rcpt_domain == 'example.com'", then = "'local'" },
{ else = "'mx'" } ]
[session.data.add-headers]