/* * SPDX-FileCopyrightText: 2020 Stalwart Labs LLC * * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ use super::FutureTimestamp; use base64::{Engine, engine::general_purpose::URL_SAFE_NO_PAD}; use common::{ Server, auth::AccessToken, config::smtp::queue::{ArchivedQueueExpiry, QueueExpiry, QueueName}, ipc::QueueEvent, }; use directory::{Permission, Type, backend::internal::manage::ManageDirectory}; use http_proto::{request::decode_path_element, *}; use hyper::Method; use mail_auth::{ dmarc::URI, mta_sts::ReportUri, report::{self, tlsrpt::TlsReport}, }; use mail_parser::DateTime; use serde::{Deserializer, Serializer}; use serde_json::json; use smtp::{ queue::{ self, ArchivedMessage, ArchivedStatus, DisplayArchivedResponse, ErrorDetails, QueueId, Status, spool::SmtpSpool, }, reporting::{dmarc::DmarcReporting, tls::TlsReporting}, }; use std::{future::Future, sync::atomic::Ordering}; use store::{ Deserialize, IterateParams, ValueKey, write::{ AlignedBytes, Archive, QueueClass, ReportEvent, ValueClass, key::DeserializeBigEndian, now, }, }; use trc::AddContext; use utils::url_params::UrlParams; #[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)] pub struct Message { pub id: QueueId, pub return_path: String, pub recipients: Vec, #[serde(deserialize_with = "deserialize_datetime")] #[serde(serialize_with = "serialize_datetime")] pub created: DateTime, pub size: u64, #[serde(skip_serializing_if = "is_zero")] #[serde(default)] pub priority: i16, #[serde(skip_serializing_if = "Option::is_none")] pub env_id: Option, pub blob_hash: String, } #[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)] pub struct Recipient { pub address: String, pub queue: String, pub status: Status, pub retry_num: u32, #[serde(skip_serializing_if = "Option::is_none")] #[serde(deserialize_with = "deserialize_maybe_datetime")] #[serde(serialize_with = "serialize_maybe_datetime")] pub next_retry: Option, #[serde(skip_serializing_if = "Option::is_none")] #[serde(deserialize_with = "deserialize_maybe_datetime")] #[serde(serialize_with = "serialize_maybe_datetime")] pub next_notify: Option, #[serde(skip_serializing_if = "Option::is_none")] #[serde(deserialize_with = "deserialize_maybe_datetime")] #[serde(serialize_with = "serialize_maybe_datetime")] pub expires: Option, #[serde(skip_serializing_if = "Option::is_none")] pub orcpt: Option, } #[derive(Debug, serde::Serialize, serde::Deserialize)] #[serde(tag = "type")] #[serde(rename_all = "camelCase")] pub enum Report { Tls { id: String, domain: String, #[serde(deserialize_with = "deserialize_datetime")] #[serde(serialize_with = "serialize_datetime")] range_from: DateTime, #[serde(deserialize_with = "deserialize_datetime")] #[serde(serialize_with = "serialize_datetime")] range_to: DateTime, report: TlsReport, rua: Vec, }, Dmarc { id: String, domain: String, #[serde(deserialize_with = "deserialize_datetime")] #[serde(serialize_with = "serialize_datetime")] range_from: DateTime, #[serde(deserialize_with = "deserialize_datetime")] #[serde(serialize_with = "serialize_datetime")] range_to: DateTime, report: report::Report, rua: Vec, }, } pub trait QueueManagement: Sync + Send { fn handle_manage_queue( &self, req: &HttpRequest, path: Vec<&str>, access_token: &AccessToken, ) -> impl Future> + Send; } impl QueueManagement for Server { async fn handle_manage_queue( &self, req: &HttpRequest, path: Vec<&str>, access_token: &AccessToken, ) -> trc::Result { let params = UrlParams::new(req.uri().query()); let mut tenant_domains: Option> = None; // SPDX-SnippetBegin // SPDX-FileCopyrightText: 2020 Stalwart Labs LLC // SPDX-License-Identifier: LicenseRef-SEL // Limit to tenant domains #[cfg(feature = "enterprise")] if self.core.is_enterprise_edition() { if let Some(tenant) = access_token.tenant { tenant_domains = self .core .storage .data .list_principals(None, tenant.id.into(), &[Type::Domain], false, 0, 0) .await .map(|principals| { principals .items .into_iter() .map(|p| p.name) .collect::>() }) .caused_by(trc::location!())? .into(); } } // SPDX-SnippetEnd match ( path.get(1).copied().unwrap_or_default(), path.get(2).copied().map(decode_path_element), req.method(), ) { ("messages", None, &Method::GET) => { // Validate the access token access_token.assert_has_permission(Permission::MessageQueueList)?; let result = fetch_queued_messages(self, ¶ms, &tenant_domains).await?; let queue_status = self.inner.data.queue_status.load(Ordering::Relaxed); Ok(if !result.values.is_empty() { JsonResponse::new(json!({ "data":{ "items": result.values, "total": result.total, "status": queue_status, }, })) } else { JsonResponse::new(json!({ "data": { "items": result.ids, "total": result.total, "status": queue_status, }, })) } .into_http_response()) } ("messages", Some(queue_id), &Method::GET) => { // Validate the access token access_token.assert_has_permission(Permission::MessageQueueGet)?; let queue_id = queue_id.parse().unwrap_or_default(); if let Some(message_) = self.read_message_archive(queue_id).await? { let message = message_.unarchive::()?; if message.is_tenant_domain(&tenant_domains) { return Ok(JsonResponse::new(json!({ "data": Message::from_archive(queue_id, message), })) .into_http_response()); } } Err(trc::ResourceEvent::NotFound.into_err()) } ("messages", None, &Method::PATCH) => { // Validate the access token access_token.assert_has_permission(Permission::MessageQueueUpdate)?; let time = params .parse::("at") .map(|t| t.into_inner()) .unwrap_or_else(now); let result = fetch_queued_messages(self, ¶ms, &tenant_domains).await?; let found = !result.ids.is_empty(); if found { let server = self.clone(); tokio::spawn(async move { for id in result.ids { if let Some(mut message) = server.read_message(id, QueueName::default()).await { let mut has_changes = false; for recipient in &mut message.message.recipients { if matches!( recipient.status, Status::Scheduled | Status::TemporaryFailure(_) ) { recipient.retry.due = time; if recipient .expiration_time(message.message.created) .is_some_and(|expires| expires > time) { recipient.expires = QueueExpiry::Attempts(recipient.retry.inner + 10); } has_changes = true; } } if has_changes { message.save_changes(&server, None).await; } } } let _ = server.inner.ipc.queue_tx.send(QueueEvent::Refresh).await; }); } Ok(JsonResponse::new(json!({ "data": found, })) .into_http_response()) } ("messages", Some(queue_id), &Method::PATCH) => { // Validate the access token access_token.assert_has_permission(Permission::MessageQueueUpdate)?; let time = params .parse::("at") .map(|t| t.into_inner()) .unwrap_or_else(now); let item = params.get("filter"); if let Some(mut message) = self .read_message(queue_id.parse().unwrap_or_default(), QueueName::default()) .await .filter(|message| { tenant_domains .as_ref() .is_none_or(|domains| message.has_domain(domains)) }) { let mut found = false; for recipient in &mut message.message.recipients { if matches!( recipient.status, Status::Scheduled | Status::TemporaryFailure(_) ) && item .as_ref() .is_none_or(|item| recipient.address.contains(item)) { recipient.retry.due = time; if recipient .expiration_time(message.message.created) .is_some_and(|expires| expires > time) { recipient.expires = QueueExpiry::Attempts(recipient.retry.inner + 10); } found = true; } } if found { message.save_changes(self, None).await; let _ = self.inner.ipc.queue_tx.send(QueueEvent::Refresh).await; } Ok(JsonResponse::new(json!({ "data": found, })) .into_http_response()) } else { Err(trc::ResourceEvent::NotFound.into_err()) } } ("messages", None, &Method::DELETE) => { // Validate the access token access_token.assert_has_permission(Permission::MessageQueueDelete)?; let result = fetch_queued_messages(self, ¶ms, &tenant_domains).await?; let found = !result.ids.is_empty(); if found { let server = self.clone(); tokio::spawn(async move { let is_active = server.inner.data.queue_status.load(Ordering::Relaxed); if is_active { let _ = server .inner .ipc .queue_tx .send(QueueEvent::Paused(true)) .await; } for id in result.ids { if let Some(message) = server.read_message(id, QueueName::default()).await { message.remove(&server, None).await; } } if is_active { let _ = server .inner .ipc .queue_tx .send(QueueEvent::Paused(false)) .await; } }); } Ok(JsonResponse::new(json!({ "data": found, })) .into_http_response()) } ("messages", Some(queue_id), &Method::DELETE) => { // Validate the access token access_token.assert_has_permission(Permission::MessageQueueDelete)?; if let Some(mut message) = self .read_message(queue_id.parse().unwrap_or_default(), QueueName::default()) .await .filter(|message| { tenant_domains .as_ref() .is_none_or(|domains| message.has_domain(domains)) }) { let mut found = false; if let Some(item) = params.get("filter") { // Cancel delivery for all recipients that match for rcpt in &mut message.message.recipients { if rcpt.address.contains(item) { rcpt.status = Status::PermanentFailure(ErrorDetails { entity: "localhost".to_string(), details: queue::Error::Io("Delivery canceled.".to_string()), }); found = true; } } if found { // Delete message if there are no pending deliveries if message.message.recipients.iter().any(|recipient| { matches!( recipient.status, Status::TemporaryFailure(_) | Status::Scheduled ) }) { message.save_changes(self, None).await; } else { message.remove(self, None).await; } } } else { message.remove(self, None).await; found = true; } Ok(JsonResponse::new(json!({ "data": found, })) .into_http_response()) } else { Err(trc::ResourceEvent::NotFound.into_err()) } } ("reports", None, &Method::GET) => { // Validate the access token access_token.assert_has_permission(Permission::OutgoingReportList)?; let result = fetch_queued_reports(self, ¶ms, &tenant_domains).await?; Ok(JsonResponse::new(json!({ "data": { "items": result.ids.into_iter().map(|id| id.queue_id()).collect::>(), "total": result.total, }, })) .into_http_response()) } ("reports", Some(report_id), &Method::GET) => { // Validate the access token access_token.assert_has_permission(Permission::OutgoingReportGet)?; let mut result = None; if let Some(report_id) = parse_queued_report_id(report_id.as_ref()) { match report_id { QueueClass::DmarcReportHeader(event) if tenant_domains.as_ref().is_none_or(|domains| { domains.iter().any(|dd| dd == &event.domain) }) => { let mut rua = Vec::new(); if let Some(report) = self .generate_dmarc_aggregate_report(&event, &mut rua, None, 0) .await? { result = Report::dmarc(event, report, rua).into(); } } QueueClass::TlsReportHeader(event) if tenant_domains.as_ref().is_none_or(|domains| { domains.iter().any(|dd| dd == &event.domain) }) => { let mut rua = Vec::new(); if let Some(report) = self .generate_tls_aggregate_report(&[event.clone()], &mut rua, None, 0) .await? { result = Report::tls(event, report, rua).into(); } } _ => (), } } if let Some(result) = result { Ok(JsonResponse::new(json!({ "data": result, })) .into_http_response()) } else { Err(trc::ResourceEvent::NotFound.into_err()) } } ("reports", None, &Method::DELETE) => { // Validate the access token access_token.assert_has_permission(Permission::OutgoingReportDelete)?; let result = fetch_queued_reports(self, ¶ms, &tenant_domains).await?; let found = !result.ids.is_empty(); if found { let server = self.clone(); tokio::spawn(async move { for id in result.ids { match id { QueueClass::DmarcReportHeader(event) => { server.delete_dmarc_report(event).await; } QueueClass::TlsReportHeader(event) => { server.delete_tls_report(vec![event]).await; } _ => (), } } }); } Ok(JsonResponse::new(json!({ "data": found, })) .into_http_response()) } ("reports", Some(report_id), &Method::DELETE) => { // Validate the access token access_token.assert_has_permission(Permission::OutgoingReportDelete)?; if let Some(report_id) = parse_queued_report_id(report_id.as_ref()) { let result = match report_id { QueueClass::DmarcReportHeader(event) if tenant_domains.as_ref().is_none_or(|domains| { domains.iter().any(|dd| dd == &event.domain) }) => { self.delete_dmarc_report(event).await; true } QueueClass::TlsReportHeader(event) if tenant_domains.as_ref().is_none_or(|domains| { domains.iter().any(|dd| dd == &event.domain) }) => { self.delete_tls_report(vec![event]).await; true } _ => false, }; Ok(JsonResponse::new(json!({ "data": result, })) .into_http_response()) } else { Err(trc::ResourceEvent::NotFound.into_err()) } } ("status", None, &Method::GET) => { // Validate the access token access_token.assert_has_permission(Permission::MessageQueueGet)?; Ok(JsonResponse::new(json!({ "data": self.inner.data.queue_status.load(Ordering::Relaxed), })) .into_http_response()) } ("status", Some(action), &Method::PATCH) => { // Validate the access token access_token.assert_has_permission(Permission::MessageQueueUpdate)?; let prev_status = self.inner.data.queue_status.load(Ordering::Relaxed); let _ = self .inner .ipc .queue_tx .send(QueueEvent::Paused(action == "stop")) .await; Ok(JsonResponse::new(json!({ "data": prev_status, })) .into_http_response()) } _ => Err(trc::ResourceEvent::NotFound.into_err()), } } } impl Message { fn from_archive(id: u64, message: &ArchivedMessage) -> Self { let now = now(); Message { id, return_path: message.return_path.to_string(), created: DateTime::from_timestamp(u64::from(message.created) as i64), size: message.size.into(), priority: message.priority.into(), env_id: message.env_id.as_ref().map(|id| id.to_string()), recipients: message .recipients .iter() .map(|rcpt| Recipient { address: rcpt.address.to_string(), queue: rcpt.queue.to_string(), status: match &rcpt.status { ArchivedStatus::Scheduled => Status::Scheduled, ArchivedStatus::Completed(status) => { Status::Completed(status.response.to_string()) } ArchivedStatus::TemporaryFailure(status) => { Status::TemporaryFailure(status.to_string()) } ArchivedStatus::PermanentFailure(status) => { Status::PermanentFailure(status.to_string()) } }, retry_num: rcpt.retry.inner.into(), next_retry: Some(DateTime::from_timestamp(u64::from(rcpt.retry.due) as i64)), next_notify: if rcpt.notify.due > now { DateTime::from_timestamp(u64::from(rcpt.notify.due) as i64).into() } else { None }, expires: if let ArchivedQueueExpiry::Ttl(time) = &rcpt.expires { DateTime::from_timestamp((u64::from(*time) + message.created) as i64).into() } else { None }, orcpt: rcpt.orcpt.as_ref().map(|orcpt| orcpt.to_string()), }) .collect(), blob_hash: URL_SAFE_NO_PAD.encode::<&[u8]>(message.blob_hash.0.as_slice()), } } } struct QueuedMessages { ids: Vec, values: Vec, total: usize, } async fn fetch_queued_messages( server: &Server, params: &UrlParams<'_>, tenant_domains: &Option>, ) -> trc::Result { let queue = params.get("queue").and_then(QueueName::new); let text = params.get("text"); let from = params.get("from"); let to = params.get("to"); let before = params .parse::("before") .map(|t| t.into_inner()); let after = params .parse::("after") .map(|t| t.into_inner()); let page = params.parse::("page").unwrap_or_default(); let limit = params.parse::("limit").unwrap_or_default(); let values = params.has_key("values"); let range_start = params.parse::("range-start").unwrap_or_default(); let range_end = params.parse::("range-end").unwrap_or(u64::MAX); let max_total = params.parse::("max-total").unwrap_or_default(); let mut result = QueuedMessages { ids: Vec::new(), values: Vec::new(), total: 0, }; let from_key = ValueKey::from(ValueClass::Queue(QueueClass::Message(range_start))); let to_key = ValueKey::from(ValueClass::Queue(QueueClass::Message(range_end))); let has_filters = text.is_some() || from.is_some() || to.is_some() || before.is_some() || after.is_some() || queue.is_some(); let mut offset = page.saturating_sub(1) * limit; let mut total_returned = 0; server .core .storage .data .iterate( IterateParams::new(from_key, to_key).ascending(), |key, value| { let message_ = as Deserialize>::deserialize(value) .add_context(|ctx| ctx.ctx(trc::Key::Key, key))?; let message = message_ .unarchive::() .add_context(|ctx| ctx.ctx(trc::Key::Key, key))?; let matches = tenant_domains .as_ref() .is_none_or(|domains| message.has_domain(domains)) && (!has_filters || (text .as_ref() .map(|text| { message.return_path.contains(text) || message.recipients.iter().any(|r| r.address.contains(text)) }) .unwrap_or_else(|| { from.as_ref() .is_none_or(|from| message.return_path.contains(from)) && to.as_ref().is_none_or(|to| { message.recipients.iter().any(|r| r.address.contains(to)) }) }) && before.as_ref().is_none_or(|before| { message .next_delivery_event(queue) .is_some_and(|next| next < *before) }) && after.as_ref().is_none_or(|after| { message .next_delivery_event(queue) .is_some_and(|next| next > *after) }) && queue .as_ref() .is_none_or(|q| message.recipients.iter().any(|r| &r.queue == q)))); if matches { if offset == 0 { if limit == 0 || total_returned < limit { let queue_id = key.deserialize_be_u64(0)?; if values { result.values.push(Message::from_archive(queue_id, message)); } else { result.ids.push(queue_id); } total_returned += 1; } } else { offset -= 1; } result.total += 1; } Ok(max_total == 0 || result.total < max_total) }, ) .await .caused_by(trc::location!()) .map(|_| result) } struct QueuedReports { ids: Vec, total: usize, } async fn fetch_queued_reports( server: &Server, params: &UrlParams<'_>, tenant_domains: &Option>, ) -> trc::Result { let domain = params.get("domain").map(|d| d.to_lowercase()); let type_ = params.get("type").and_then(|t| match t { "dmarc" => 0u8.into(), "tls" => 1u8.into(), _ => None, }); let page: usize = params.parse("page").unwrap_or_default(); let limit: usize = params.parse("limit").unwrap_or_default(); let range_start = params.parse::("range-start").unwrap_or_default(); let range_end = params.parse::("range-end").unwrap_or(u64::MAX); let max_total = params.parse::("max-total").unwrap_or_default(); let mut result = QueuedReports { ids: Vec::new(), total: 0, }; let from_key = ValueKey::from(ValueClass::Queue(QueueClass::DmarcReportHeader( ReportEvent { due: range_start, policy_hash: 0, seq_id: 0, domain: String::new(), }, ))); let to_key = ValueKey::from(ValueClass::Queue(QueueClass::TlsReportHeader( ReportEvent { due: range_end, policy_hash: 0, seq_id: 0, domain: String::new(), }, ))); let mut offset = page.saturating_sub(1) * limit; let mut total_returned = 0; server .core .storage .data .iterate( IterateParams::new(from_key, to_key).ascending().no_values(), |key, _| { if type_.is_none_or(|t| t == *key.last().unwrap()) { let event = ReportEvent::deserialize(key)?; if tenant_domains .as_ref() .is_none_or(|domains| domains.iter().any(|dd| dd == &event.domain)) && event.seq_id != 0 && domain.as_ref().is_none_or(|d| event.domain.contains(d)) { if offset == 0 { if limit == 0 || total_returned < limit { result.ids.push(if *key.last().unwrap() == 0 { QueueClass::DmarcReportHeader(event) } else { QueueClass::TlsReportHeader(event) }); total_returned += 1; } } else { offset -= 1; } result.total += 1; } } Ok(max_total == 0 || result.total < max_total) }, ) .await .caused_by(trc::location!()) .map(|_| result) } impl Report { fn dmarc(event: ReportEvent, report: report::Report, rua: Vec) -> Self { Self::Dmarc { domain: event.domain.clone(), range_from: DateTime::from_timestamp(event.seq_id as i64), range_to: DateTime::from_timestamp(event.due as i64), id: QueueClass::DmarcReportHeader(event).queue_id(), report, rua, } } fn tls(event: ReportEvent, report: TlsReport, rua: Vec) -> Self { Self::Tls { domain: event.domain.clone(), range_from: DateTime::from_timestamp(event.seq_id as i64), range_to: DateTime::from_timestamp(event.due as i64), id: QueueClass::TlsReportHeader(event).queue_id(), report, rua, } } } trait GenerateQueueId { fn queue_id(&self) -> String; } impl GenerateQueueId for QueueClass { fn queue_id(&self) -> String { match self { QueueClass::DmarcReportHeader(h) => { format!("d!{}!{}!{}!{}", h.domain, h.policy_hash, h.seq_id, h.due) } QueueClass::TlsReportHeader(h) => { format!("t!{}!{}!{}!{}", h.domain, h.policy_hash, h.seq_id, h.due) } _ => unreachable!(), } } } fn parse_queued_report_id(id: &str) -> Option { let mut parts = id.split('!'); let type_ = parts.next()?; let event = ReportEvent { domain: parts.next()?.to_string(), policy_hash: parts.next().and_then(|p| p.parse::().ok())?, seq_id: parts.next().and_then(|p| p.parse::().ok())?, due: parts.next().and_then(|p| p.parse::().ok())?, }; match type_ { "d" => Some(QueueClass::DmarcReportHeader(event)), "t" => Some(QueueClass::TlsReportHeader(event)), _ => None, } } fn serialize_maybe_datetime(value: &Option, serializer: S) -> Result where S: Serializer, { match value { Some(value) => serializer.serialize_some(&value.to_rfc3339()), None => serializer.serialize_none(), } } fn deserialize_maybe_datetime<'de, D>(deserializer: D) -> Result, D::Error> where D: Deserializer<'de>, { if let Some(value) = as serde::Deserialize>::deserialize(deserializer)? { if let Some(value) = DateTime::parse_rfc3339(value) { Ok(Some(value)) } else { Err(serde::de::Error::custom( "Failed to parse RFC3339 timestamp", )) } } else { Ok(None) } } fn serialize_datetime(value: &DateTime, serializer: S) -> Result where S: Serializer, { serializer.serialize_str(&value.to_rfc3339()) } fn deserialize_datetime<'de, D>(deserializer: D) -> Result where D: Deserializer<'de>, { use serde::Deserialize; if let Some(value) = DateTime::parse_rfc3339(<&str>::deserialize(deserializer)?) { Ok(value) } else { Err(serde::de::Error::custom( "Failed to parse RFC3339 timestamp", )) } } fn is_zero(num: &i16) -> bool { *num == 0 } trait IsTenantDomain { fn is_tenant_domain(&self, tenant_domains: &Option>) -> bool; } impl IsTenantDomain for ArchivedMessage { fn is_tenant_domain(&self, tenant_domains: &Option>) -> bool { tenant_domains .as_ref() .is_none_or(|domains| self.has_domain(domains)) } }