mail-server/crates/smtp/src/queue/mod.rs
2025-05-16 16:20:05 +02:00

584 lines
15 KiB
Rust

/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use std::{
fmt::Display,
net::{IpAddr, Ipv4Addr},
time::{Duration, Instant, SystemTime},
};
use common::expr::{self, functions::ResolveVariable, *};
use compact_str::ToCompactString;
use smtp_proto::{ArchivedResponse, Response};
use store::{SERIALIZE_QUEUE_MSG_V1, SerializedVersion, write::now};
use utils::BlobHash;
pub mod dsn;
pub mod manager;
pub mod quota;
pub mod spool;
pub mod throttle;
pub type QueueId = u64;
#[derive(Debug, Clone, rkyv::Serialize, rkyv::Deserialize, rkyv::Archive, serde::Deserialize)]
pub struct Schedule<T> {
pub due: u64,
pub inner: T,
}
#[derive(Debug, Clone, Copy)]
pub struct QueuedMessage {
pub due: u64,
pub queue_id: u64,
}
#[derive(Debug, Clone, Copy)]
pub enum MessageSource {
Authenticated,
Unauthenticated,
Dsn,
Report,
Autogenerated,
}
#[derive(
rkyv::Serialize,
rkyv::Deserialize,
rkyv::Archive,
Debug,
Clone,
PartialEq,
Eq,
serde::Deserialize,
)]
pub struct Message {
pub queue_id: QueueId,
pub created: u64,
pub blob_hash: BlobHash,
pub return_path: String,
pub return_path_lcase: String,
pub return_path_domain: String,
pub recipients: Vec<Recipient>,
pub domains: Vec<Domain>,
pub flags: u64,
pub env_id: Option<String>,
pub priority: i16,
pub size: u64,
pub quota_keys: Vec<QuotaKey>,
#[rkyv(with = rkyv::with::Skip)]
#[serde(skip)]
pub span_id: u64,
}
impl SerializedVersion for Message {
fn serialize_version() -> u8 {
SERIALIZE_QUEUE_MSG_V1
}
}
#[derive(
rkyv::Serialize,
rkyv::Deserialize,
rkyv::Archive,
Debug,
Clone,
PartialEq,
Eq,
serde::Deserialize,
)]
pub enum QuotaKey {
Size { key: Vec<u8>, id: u64 },
Count { key: Vec<u8>, id: u64 },
}
#[derive(
rkyv::Serialize,
rkyv::Deserialize,
rkyv::Archive,
Debug,
Clone,
PartialEq,
Eq,
serde::Deserialize,
)]
pub struct Domain {
pub domain: String,
pub retry: Schedule<u32>,
pub notify: Schedule<u32>,
pub expires: u64,
pub status: Status<(), Error>,
}
#[derive(
rkyv::Serialize,
rkyv::Deserialize,
rkyv::Archive,
Debug,
Clone,
PartialEq,
Eq,
serde::Deserialize,
)]
pub struct Recipient {
pub domain_idx: u32,
pub address: String,
pub address_lcase: String,
pub status: Status<HostResponse<String>, HostResponse<ErrorDetails>>,
pub flags: u64,
pub orcpt: Option<String>,
}
pub const RCPT_DSN_SENT: u64 = 1 << 32;
pub const RCPT_STATUS_CHANGED: u64 = 2 << 32;
#[derive(
Debug,
Clone,
PartialEq,
Eq,
rkyv::Serialize,
rkyv::Deserialize,
rkyv::Archive,
serde::Serialize,
serde::Deserialize,
)]
pub enum Status<T, E> {
#[serde(rename = "scheduled")]
Scheduled,
#[serde(rename = "completed")]
Completed(T),
#[serde(rename = "temp_fail")]
TemporaryFailure(E),
#[serde(rename = "perm_fail")]
PermanentFailure(E),
}
#[derive(
Debug,
Clone,
PartialEq,
Eq,
rkyv::Serialize,
rkyv::Deserialize,
rkyv::Archive,
serde::Deserialize,
)]
pub struct HostResponse<T> {
pub hostname: T,
pub response: Response<String>,
}
#[derive(
Debug,
Clone,
PartialEq,
Eq,
rkyv::Serialize,
rkyv::Deserialize,
rkyv::Archive,
serde::Deserialize,
)]
pub enum Error {
DnsError(String),
UnexpectedResponse(HostResponse<ErrorDetails>),
ConnectionError(ErrorDetails),
TlsError(ErrorDetails),
DaneError(ErrorDetails),
MtaStsError(String),
RateLimited,
ConcurrencyLimited,
Io(String),
}
#[derive(
Debug,
Clone,
PartialEq,
Eq,
rkyv::Serialize,
rkyv::Deserialize,
rkyv::Archive,
Default,
serde::Deserialize,
)]
pub struct ErrorDetails {
pub entity: String,
pub details: String,
}
impl<T> Ord for Schedule<T> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other.due.cmp(&self.due)
}
}
impl<T> PartialOrd for Schedule<T> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl<T> PartialEq for Schedule<T> {
fn eq(&self, other: &Self) -> bool {
self.due == other.due
}
}
impl<T> Eq for Schedule<T> {}
impl<T: Default> Schedule<T> {
pub fn now() -> Self {
Schedule {
due: now(),
inner: T::default(),
}
}
pub fn later(duration: Duration) -> Self {
Schedule {
due: now() + duration.as_secs(),
inner: T::default(),
}
}
}
pub struct QueueEnvelope<'x> {
pub message: &'x Message,
pub mx: &'x str,
pub remote_ip: IpAddr,
pub local_ip: IpAddr,
pub current_domain: usize,
pub current_rcpt: usize,
}
impl<'x> QueueEnvelope<'x> {
pub fn new(message: &'x Message, current_domain: usize) -> Self {
Self {
message,
current_domain,
current_rcpt: 0,
mx: "",
remote_ip: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
local_ip: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
}
}
pub fn new_rcpt(message: &'x Message, current_domain: usize, current_rcpt: usize) -> Self {
Self {
message,
current_domain,
current_rcpt,
mx: "",
remote_ip: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
local_ip: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
}
}
}
impl<'x> QueueEnvelope<'x> {
fn current_domain(&self) -> Option<&'x Domain> {
self.message.domains.get(self.current_domain)
}
}
impl<'x> ResolveVariable for QueueEnvelope<'x> {
fn resolve_variable(&self, variable: u32) -> expr::Variable<'x> {
match variable {
V_SENDER => self.message.return_path_lcase.as_str().into(),
V_SENDER_DOMAIN => self.message.return_path_domain.as_str().into(),
V_RECIPIENT_DOMAIN => self
.current_domain()
.map(|d| d.domain.as_str())
.unwrap_or_default()
.into(),
V_RECIPIENT => self
.message
.recipients
.get(self.current_rcpt)
.map(|r| r.address_lcase.as_str())
.unwrap_or_default()
.into(),
V_RECIPIENTS => self
.message
.recipients
.iter()
.map(|r| Variable::from(r.address_lcase.as_str()))
.collect::<Vec<_>>()
.into(),
V_QUEUE_RETRY_NUM => self
.current_domain()
.map(|d| d.retry.inner)
.unwrap_or_default()
.into(),
V_QUEUE_NOTIFY_NUM => self
.current_domain()
.map(|d| d.notify.inner)
.unwrap_or_default()
.into(),
V_QUEUE_EXPIRES_IN => self
.current_domain()
.map(|d| d.expires.saturating_sub(now()))
.unwrap_or_default()
.into(),
V_QUEUE_LAST_STATUS => self
.current_domain()
.map(|d| d.status.to_compact_string())
.unwrap_or_default()
.into(),
V_QUEUE_LAST_ERROR => self
.current_domain()
.map(|d| match &d.status {
Status::Scheduled | Status::Completed(_) => "none",
Status::TemporaryFailure(err) | Status::PermanentFailure(err) => match err {
Error::DnsError(_) => "dns",
Error::UnexpectedResponse(_) => "unexpected-reply",
Error::ConnectionError(_) => "connection",
Error::TlsError(_) => "tls",
Error::DaneError(_) => "dane",
Error::MtaStsError(_) => "mta-sts",
Error::RateLimited => "rate",
Error::ConcurrencyLimited => "concurrency",
Error::Io(_) => "io",
},
})
.unwrap_or_default()
.into(),
V_MX => self.mx.into(),
V_PRIORITY => self.message.priority.into(),
V_REMOTE_IP => self.remote_ip.to_compact_string().into(),
V_LOCAL_IP => self.local_ip.to_compact_string().into(),
_ => "".into(),
}
}
fn resolve_global(&self, _: &str) -> Variable<'_> {
Variable::Integer(0)
}
}
impl ResolveVariable for Message {
fn resolve_variable(&self, variable: u32) -> expr::Variable<'_> {
match variable {
V_SENDER => self.return_path_lcase.as_str().into(),
V_SENDER_DOMAIN => self.return_path_domain.as_str().into(),
V_RECIPIENTS => self
.recipients
.iter()
.map(|r| Variable::from(r.address_lcase.as_str()))
.collect::<Vec<_>>()
.into(),
V_PRIORITY => self.priority.into(),
_ => "".into(),
}
}
fn resolve_global(&self, _: &str) -> Variable<'_> {
Variable::Integer(0)
}
}
pub struct RecipientDomain<'x>(&'x str);
impl<'x> RecipientDomain<'x> {
pub fn new(domain: &'x str) -> Self {
Self(domain)
}
}
impl<'x> ResolveVariable for RecipientDomain<'x> {
fn resolve_variable(&self, variable: u32) -> expr::Variable<'x> {
match variable {
V_RECIPIENT_DOMAIN => self.0.into(),
_ => "".into(),
}
}
fn resolve_global(&self, _: &str) -> Variable<'_> {
Variable::Integer(0)
}
}
#[inline(always)]
pub fn instant_to_timestamp(now: Instant, time: Instant) -> u64 {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map_or(0, |d| d.as_secs())
+ time.checked_duration_since(now).map_or(0, |d| d.as_secs())
}
pub trait InstantFromTimestamp {
fn to_instant(&self) -> Instant;
}
impl InstantFromTimestamp for u64 {
fn to_instant(&self) -> Instant {
let timestamp = *self;
let current_timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map_or(0, |d| d.as_secs());
if timestamp > current_timestamp {
Instant::now() + Duration::from_secs(timestamp - current_timestamp)
} else {
Instant::now()
}
}
}
pub trait DomainPart {
fn domain_part(&self) -> &str;
}
impl<T: AsRef<str>> DomainPart for T {
#[inline(always)]
fn domain_part(&self) -> &str {
self.as_ref()
.rsplit_once('@')
.map(|(_, d)| d)
.unwrap_or_default()
}
}
impl Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::UnexpectedResponse(response) => {
write!(
f,
"Unexpected response from '{}': {}",
response.hostname.entity, response.response
)
}
Error::DnsError(err) => {
write!(f, "DNS lookup failed: {err}")
}
Error::ConnectionError(details) => {
write!(
f,
"Connection to '{}' failed: {}",
details.entity, details.details
)
}
Error::TlsError(details) => {
write!(
f,
"TLS error from '{}': {}",
details.entity, details.details
)
}
Error::DaneError(details) => {
write!(
f,
"DANE failed to authenticate '{}': {}",
details.entity, details.details
)
}
Error::MtaStsError(details) => {
write!(f, "MTA-STS auth failed: {details}")
}
Error::RateLimited => {
write!(f, "Rate limited")
}
Error::ConcurrencyLimited => {
write!(f, "Too many concurrent connections to remote server")
}
Error::Io(err) => {
write!(f, "Queue error: {err}")
}
}
}
}
impl Display for ArchivedError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ArchivedError::UnexpectedResponse(response) => {
write!(
f,
"Unexpected response from '{}': {}",
response.hostname.entity,
response.response.to_string()
)
}
ArchivedError::DnsError(err) => {
write!(f, "DNS lookup failed: {err}")
}
ArchivedError::ConnectionError(details) => {
write!(
f,
"Connection to '{}' failed: {}",
details.entity, details.details
)
}
ArchivedError::TlsError(details) => {
write!(
f,
"TLS error from '{}': {}",
details.entity, details.details
)
}
ArchivedError::DaneError(details) => {
write!(
f,
"DANE failed to authenticate '{}': {}",
details.entity, details.details
)
}
ArchivedError::MtaStsError(details) => {
write!(f, "MTA-STS auth failed: {details}")
}
ArchivedError::RateLimited => {
write!(f, "Rate limited")
}
ArchivedError::ConcurrencyLimited => {
write!(f, "Too many concurrent connections to remote server")
}
ArchivedError::Io(err) => {
write!(f, "Queue error: {err}")
}
}
}
}
impl Display for Status<(), Error> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Status::Scheduled => write!(f, "Scheduled"),
Status::Completed(_) => write!(f, "Completed"),
Status::TemporaryFailure(err) => write!(f, "Temporary Failure: {err}"),
Status::PermanentFailure(err) => write!(f, "Permanent Failure: {err}"),
}
}
}
impl Display for Status<HostResponse<String>, HostResponse<ErrorDetails>> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Status::Scheduled => write!(f, "Scheduled"),
Status::Completed(response) => write!(f, "Delivered: {}", response.response),
Status::TemporaryFailure(err) => write!(f, "Temporary Failure: {}", err.response),
Status::PermanentFailure(err) => write!(f, "Permanent Failure: {}", err.response),
}
}
}
pub trait DisplayArchivedResponse {
fn to_string(&self) -> String;
}
impl DisplayArchivedResponse for ArchivedResponse<String> {
fn to_string(&self) -> String {
format!(
"Code: {}, Enhanced code: {}.{}.{}, Message: {}",
self.code, self.esc[0], self.esc[1], self.esc[2], self.message,
)
}
}