mirror of
https://github.com/stalwartlabs/mail-server.git
synced 2025-02-25 00:12:58 +08:00
Improved TLS/DMARC report API
This commit is contained in:
parent
d94a6a2ec6
commit
0bd4f8148e
6 changed files with 450 additions and 220 deletions
|
@ -27,7 +27,7 @@ use directory::{AuthResult, Type};
|
|||
use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full};
|
||||
use hyper::{
|
||||
body::{self, Bytes},
|
||||
header::{self, AUTHORIZATION},
|
||||
header::{self, HeaderValue, AUTHORIZATION},
|
||||
server::conn::http1,
|
||||
service::service_fn,
|
||||
Method, StatusCode, Uri,
|
||||
|
@ -36,6 +36,7 @@ use hyper_util::rt::TokioIo;
|
|||
use mail_parser::{decoders::base64::base64_decode, DateTime};
|
||||
use mail_send::Credentials;
|
||||
use serde::{Deserializer, Serializer};
|
||||
use serde_json::json;
|
||||
use store::{
|
||||
write::{key::DeserializeBigEndian, now, Bincode, QueueClass, ReportEvent, ValueClass},
|
||||
Deserialize, IterateParams, ValueKey,
|
||||
|
@ -43,7 +44,10 @@ use store::{
|
|||
|
||||
use utils::listener::{limiter::InFlight, SessionData, SessionManager, SessionStream};
|
||||
|
||||
use crate::queue::{self, HostResponse, QueueId, Status};
|
||||
use crate::{
|
||||
queue::{self, ErrorDetails, HostResponse, QueueId, Status},
|
||||
reporting::{dmarc::DmarcFormat, tls::TlsFormat},
|
||||
};
|
||||
|
||||
use super::{SmtpAdminSessionManager, SMTP};
|
||||
|
||||
|
@ -54,6 +58,7 @@ pub struct Response<T> {
|
|||
|
||||
#[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
|
||||
pub struct Message {
|
||||
pub id: QueueId,
|
||||
pub return_path: String,
|
||||
pub domains: Vec<Domain>,
|
||||
#[serde(deserialize_with = "deserialize_datetime")]
|
||||
|
@ -94,17 +99,30 @@ pub struct Recipient {
|
|||
}
|
||||
|
||||
#[derive(Debug, serde::Serialize, serde::Deserialize)]
|
||||
pub struct Report {
|
||||
pub domain: String,
|
||||
#[serde(rename = "type")]
|
||||
pub type_: String,
|
||||
#[serde(deserialize_with = "deserialize_datetime")]
|
||||
#[serde(serialize_with = "serialize_datetime")]
|
||||
pub range_from: DateTime,
|
||||
#[serde(deserialize_with = "deserialize_datetime")]
|
||||
#[serde(serialize_with = "serialize_datetime")]
|
||||
pub range_to: DateTime,
|
||||
pub size: usize,
|
||||
#[serde(tag = "type")]
|
||||
pub enum Report {
|
||||
Tls {
|
||||
id: String,
|
||||
domain: String,
|
||||
#[serde(deserialize_with = "deserialize_datetime")]
|
||||
#[serde(serialize_with = "serialize_datetime")]
|
||||
range_from: DateTime,
|
||||
#[serde(deserialize_with = "deserialize_datetime")]
|
||||
#[serde(serialize_with = "serialize_datetime")]
|
||||
range_to: DateTime,
|
||||
report: TlsFormat,
|
||||
},
|
||||
Dmarc {
|
||||
id: String,
|
||||
domain: String,
|
||||
#[serde(deserialize_with = "deserialize_datetime")]
|
||||
#[serde(serialize_with = "serialize_datetime")]
|
||||
range_from: DateTime,
|
||||
#[serde(deserialize_with = "deserialize_datetime")]
|
||||
#[serde(serialize_with = "serialize_datetime")]
|
||||
range_to: DateTime,
|
||||
report: DmarcFormat,
|
||||
},
|
||||
}
|
||||
|
||||
impl SessionManager for SmtpAdminSessionManager {
|
||||
|
@ -148,7 +166,28 @@ async fn handle_request(
|
|||
let core = core.clone();
|
||||
|
||||
async move {
|
||||
let response = core.parse_request(&req, remote_addr).await;
|
||||
let mut response = core.parse_request(&req, remote_addr).await;
|
||||
|
||||
// Add CORS headers
|
||||
if let Ok(response) = &mut response {
|
||||
let headers = response.headers_mut();
|
||||
headers.insert(
|
||||
header::ACCESS_CONTROL_ALLOW_ORIGIN,
|
||||
HeaderValue::from_static("*"),
|
||||
);
|
||||
headers.insert(
|
||||
header::ACCESS_CONTROL_ALLOW_METHODS,
|
||||
HeaderValue::from_static(
|
||||
"POST, GET, PATCH, PUT, DELETE, HEAD, OPTIONS",
|
||||
),
|
||||
);
|
||||
headers.insert(
|
||||
header::ACCESS_CONTROL_ALLOW_HEADERS,
|
||||
HeaderValue::from_static(
|
||||
"Authorization, Content-Type, Accept, X-Requested-With",
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
tracing::debug!(
|
||||
context = "management",
|
||||
|
@ -182,6 +221,17 @@ impl SMTP {
|
|||
req: &hyper::Request<hyper::body::Incoming>,
|
||||
remote_addr: IpAddr,
|
||||
) -> Result<hyper::Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
if req.method() == Method::OPTIONS {
|
||||
return Ok(hyper::Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.body(
|
||||
Empty::<Bytes>::new()
|
||||
.map_err(|never| match never {})
|
||||
.boxed(),
|
||||
)
|
||||
.unwrap());
|
||||
}
|
||||
|
||||
// Authenticate request
|
||||
let mut is_authenticated = false;
|
||||
if let Some((mechanism, payload)) = req
|
||||
|
@ -261,7 +311,7 @@ impl SMTP {
|
|||
|
||||
let mut path = req.uri().path().split('/');
|
||||
path.next();
|
||||
path.next(); // Skip the leading /admin
|
||||
path.next(); // Skip the leading /api
|
||||
Ok(self
|
||||
.handle_manage_request(
|
||||
req.uri(),
|
||||
|
@ -281,20 +331,33 @@ impl SMTP {
|
|||
) -> hyper::Response<BoxBody<Bytes, hyper::Error>> {
|
||||
let (status, response) = match (method, path_1, path_2) {
|
||||
(&Method::GET, "queue", "list") => {
|
||||
let mut text = None;
|
||||
let mut from = None;
|
||||
let mut to = None;
|
||||
let mut before = None;
|
||||
let mut after = None;
|
||||
let mut error = None;
|
||||
let mut page: usize = 0;
|
||||
let mut limit: usize = 0;
|
||||
let mut values = false;
|
||||
|
||||
if let Some(query) = uri.query() {
|
||||
for (key, value) in form_urlencoded::parse(query.as_bytes()) {
|
||||
match key.as_ref() {
|
||||
"text" => {
|
||||
if !value.is_empty() {
|
||||
text = value.into_owned().into();
|
||||
}
|
||||
}
|
||||
"from" => {
|
||||
from = value.into_owned().into();
|
||||
if !value.is_empty() {
|
||||
from = value.into_owned().into();
|
||||
}
|
||||
}
|
||||
"to" => {
|
||||
to = value.into_owned().into();
|
||||
if !value.is_empty() {
|
||||
to = value.into_owned().into();
|
||||
}
|
||||
}
|
||||
"after" => match value.parse_timestamp() {
|
||||
Ok(dt) => {
|
||||
|
@ -314,6 +377,15 @@ impl SMTP {
|
|||
break;
|
||||
}
|
||||
},
|
||||
"values" => {
|
||||
values = true;
|
||||
}
|
||||
"limit" => {
|
||||
limit = value.parse().unwrap_or_default();
|
||||
}
|
||||
"page" => {
|
||||
page = value.parse().unwrap_or_default();
|
||||
}
|
||||
_ => {
|
||||
error = format!("Invalid parameter {key:?}.").into();
|
||||
break;
|
||||
|
@ -324,39 +396,70 @@ impl SMTP {
|
|||
|
||||
match error {
|
||||
None => {
|
||||
let mut result = Vec::new();
|
||||
let mut result_ids = Vec::new();
|
||||
let mut result_values = Vec::new();
|
||||
let from_key = ValueKey::from(ValueClass::Queue(QueueClass::Message(0)));
|
||||
let to_key =
|
||||
ValueKey::from(ValueClass::Queue(QueueClass::Message(u64::MAX)));
|
||||
let has_filters =
|
||||
from.is_some() || to.is_some() || before.is_some() || after.is_some();
|
||||
let has_filters = text.is_some()
|
||||
|| from.is_some()
|
||||
|| to.is_some()
|
||||
|| before.is_some()
|
||||
|| after.is_some();
|
||||
let mut offset = page.saturating_sub(1) * limit;
|
||||
let mut total = 0;
|
||||
let mut total_returned = 0;
|
||||
let _ =
|
||||
self.shared
|
||||
.default_data_store
|
||||
.iterate(
|
||||
IterateParams::new(from_key, to_key).ascending(),
|
||||
|key, value| {
|
||||
if has_filters {
|
||||
let message =
|
||||
Bincode::<queue::Message>::deserialize(value)?
|
||||
.inner;
|
||||
if from.as_ref().map_or(true, |from| {
|
||||
message.return_path.contains(from)
|
||||
}) && to.as_ref().map_or(true, |to| {
|
||||
message
|
||||
.recipients
|
||||
.iter()
|
||||
.any(|r| r.address_lcase.contains(to))
|
||||
}) && before.as_ref().map_or(true, |before| {
|
||||
message.next_delivery_event() < *before
|
||||
}) && after.as_ref().map_or(true, |after| {
|
||||
message.next_delivery_event() > *after
|
||||
}) {
|
||||
result.push(key.deserialize_be_u64(1)?);
|
||||
let message =
|
||||
Bincode::<queue::Message>::deserialize(value)?.inner;
|
||||
let matches =
|
||||
!has_filters
|
||||
|| (text
|
||||
.as_ref()
|
||||
.map(|text| {
|
||||
message.return_path.contains(text)
|
||||
|| message.recipients.iter().any(|r| {
|
||||
r.address_lcase.contains(text)
|
||||
})
|
||||
})
|
||||
.unwrap_or_else(|| {
|
||||
from.as_ref().map_or(true, |from| {
|
||||
message.return_path.contains(from)
|
||||
}) && to.as_ref().map_or(true, |to| {
|
||||
message.recipients.iter().any(|r| {
|
||||
r.address_lcase.contains(to)
|
||||
})
|
||||
})
|
||||
})
|
||||
&& before.as_ref().map_or(true, |before| {
|
||||
message.next_delivery_event() < *before
|
||||
})
|
||||
&& after.as_ref().map_or(true, |after| {
|
||||
message.next_delivery_event() > *after
|
||||
}));
|
||||
|
||||
if matches {
|
||||
if offset == 0 {
|
||||
if limit == 0 || total_returned < limit {
|
||||
if values {
|
||||
result_values.push(Message::from(&message));
|
||||
} else {
|
||||
result_ids.push(key.deserialize_be_u64(1)?);
|
||||
}
|
||||
total_returned += 1;
|
||||
}
|
||||
} else {
|
||||
offset -= 1;
|
||||
}
|
||||
} else {
|
||||
result.push(key.deserialize_be_u64(1)?);
|
||||
|
||||
total += 1;
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
},
|
||||
)
|
||||
|
@ -364,7 +467,22 @@ impl SMTP {
|
|||
|
||||
(
|
||||
StatusCode::OK,
|
||||
serde_json::to_string(&Response { data: result }).unwrap_or_default(),
|
||||
if values {
|
||||
serde_json::to_string(&json!({
|
||||
"data": {
|
||||
"items": result_values,
|
||||
"total": total,
|
||||
},
|
||||
}))
|
||||
} else {
|
||||
serde_json::to_string(&json!({
|
||||
"data": {
|
||||
"items": result_ids,
|
||||
"total": total,
|
||||
},
|
||||
}))
|
||||
}
|
||||
.unwrap_or_default(),
|
||||
)
|
||||
}
|
||||
Some(error) => error.into_bad_request(),
|
||||
|
@ -441,7 +559,9 @@ impl SMTP {
|
|||
}
|
||||
},
|
||||
"filter" => {
|
||||
item = value.into_owned().into();
|
||||
if !value.is_empty() {
|
||||
item = value.into_owned().into();
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
error = format!("Invalid parameter {key:?}.").into();
|
||||
|
@ -518,7 +638,9 @@ impl SMTP {
|
|||
}
|
||||
},
|
||||
"filter" => {
|
||||
item = value.into_owned().into();
|
||||
if !value.is_empty() {
|
||||
item = value.into_owned().into();
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
error = format!("Invalid parameter {key:?}.").into();
|
||||
|
@ -542,8 +664,8 @@ impl SMTP {
|
|||
// Cancel delivery for all recipients that match
|
||||
for rcpt in &mut message.recipients {
|
||||
if rcpt.address_lcase.contains(item) {
|
||||
rcpt.status = Status::Completed(HostResponse {
|
||||
hostname: String::new(),
|
||||
rcpt.status = Status::PermanentFailure(HostResponse {
|
||||
hostname: ErrorDetails::default(),
|
||||
response: smtp_proto::Response {
|
||||
code: 0,
|
||||
esc: [0, 0, 0],
|
||||
|
@ -734,16 +856,55 @@ impl SMTP {
|
|||
|
||||
let mut result = Vec::with_capacity(report_ids.len());
|
||||
for report_id in report_ids {
|
||||
if let Ok(Some(_)) = self
|
||||
.shared
|
||||
.default_data_store
|
||||
.get_value::<()>(ValueKey::from(ValueClass::Queue(report_id.clone())))
|
||||
.await
|
||||
{
|
||||
result.push(Report::from(report_id).into());
|
||||
} else {
|
||||
result.push(None);
|
||||
match report_id {
|
||||
QueueClass::DmarcReportHeader(event) => {
|
||||
if let Ok(Some(report)) = self
|
||||
.shared
|
||||
.default_data_store
|
||||
.get_value::<Bincode<DmarcFormat>>(ValueKey::from(
|
||||
ValueClass::Queue(QueueClass::DmarcReportHeader(event.clone())),
|
||||
))
|
||||
.await
|
||||
{
|
||||
let mut report = report.inner;
|
||||
if let Ok(records) =
|
||||
self.fetch_dmarc_records(&event, &report, None).await
|
||||
{
|
||||
report.records = records;
|
||||
result.push(Report::dmarc(event, report).into());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
QueueClass::TlsReportHeader(event) => {
|
||||
if let Ok(Some(report)) = self
|
||||
.shared
|
||||
.default_data_store
|
||||
.get_value::<Bincode<TlsFormat>>(ValueKey::from(ValueClass::Queue(
|
||||
QueueClass::TlsReportHeader(event.clone()),
|
||||
)))
|
||||
.await
|
||||
{
|
||||
let mut report = report.inner;
|
||||
if let Ok(policy) =
|
||||
self.fetch_tls_policy(&event, report.policy, None).await
|
||||
{
|
||||
report.policy = policy.policy;
|
||||
for record in policy.failure_details {
|
||||
report.records.push(record.into());
|
||||
}
|
||||
for _ in 0..policy.summary.total_success {
|
||||
report.records.push(None);
|
||||
}
|
||||
result.push(Report::tls(event, report).into());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
|
||||
result.push(None);
|
||||
}
|
||||
|
||||
match error {
|
||||
|
@ -815,7 +976,7 @@ impl SMTP {
|
|||
|
||||
hyper::Response::builder()
|
||||
.status(status)
|
||||
.header(header::CONTENT_TYPE, "application/json; charset=utf-8")
|
||||
.header(header::CONTENT_TYPE, "application/json")
|
||||
.body(
|
||||
Full::new(Bytes::from(response))
|
||||
.map_err(|never| match never {})
|
||||
|
@ -830,6 +991,7 @@ impl From<&queue::Message> for Message {
|
|||
let now = now();
|
||||
|
||||
Message {
|
||||
id: message.id,
|
||||
return_path: message.return_path.clone(),
|
||||
created: DateTime::from_timestamp(message.created as i64),
|
||||
size: message.size,
|
||||
|
@ -852,11 +1014,7 @@ impl From<&queue::Message> for Message {
|
|||
}
|
||||
},
|
||||
retry_num: domain.retry.inner,
|
||||
next_retry: if domain.retry.due > now {
|
||||
DateTime::from_timestamp(domain.retry.due as i64).into()
|
||||
} else {
|
||||
None
|
||||
},
|
||||
next_retry: Some(DateTime::from_timestamp(domain.retry.due as i64)),
|
||||
next_notify: if domain.notify.due > now {
|
||||
DateTime::from_timestamp(domain.notify.due as i64).into()
|
||||
} else {
|
||||
|
@ -890,24 +1048,24 @@ impl From<&queue::Message> for Message {
|
|||
}
|
||||
}
|
||||
|
||||
impl From<QueueClass> for Report {
|
||||
fn from(value: QueueClass) -> Self {
|
||||
match value {
|
||||
QueueClass::DmarcReportHeader(event) => Report {
|
||||
domain: event.domain,
|
||||
type_: "dmarc".to_string(),
|
||||
range_from: DateTime::from_timestamp(event.seq_id as i64),
|
||||
range_to: DateTime::from_timestamp(event.due as i64),
|
||||
size: 0,
|
||||
},
|
||||
QueueClass::TlsReportHeader(event) => Report {
|
||||
domain: event.domain,
|
||||
type_: "tls".to_string(),
|
||||
range_from: DateTime::from_timestamp(event.seq_id as i64),
|
||||
range_to: DateTime::from_timestamp(event.due as i64),
|
||||
size: 0,
|
||||
},
|
||||
_ => unreachable!(),
|
||||
impl Report {
|
||||
fn dmarc(event: ReportEvent, report: DmarcFormat) -> Self {
|
||||
Self::Dmarc {
|
||||
domain: event.domain.clone(),
|
||||
range_from: DateTime::from_timestamp(event.seq_id as i64),
|
||||
range_to: DateTime::from_timestamp(event.due as i64),
|
||||
id: QueueClass::DmarcReportHeader(event).queue_id(),
|
||||
report,
|
||||
}
|
||||
}
|
||||
|
||||
fn tls(event: ReportEvent, report: TlsFormat) -> Self {
|
||||
Self::Tls {
|
||||
domain: event.domain.clone(),
|
||||
range_from: DateTime::from_timestamp(event.seq_id as i64),
|
||||
range_to: DateTime::from_timestamp(event.due as i64),
|
||||
id: QueueClass::TlsReportHeader(event).queue_id(),
|
||||
report,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -147,7 +147,7 @@ pub enum Error {
|
|||
Io(String),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
pub struct ErrorDetails {
|
||||
pub entity: String,
|
||||
pub details: String,
|
||||
|
|
|
@ -386,53 +386,22 @@ impl SMTP {
|
|||
let _ = serde::Serialize::serialize(&dmarc, &mut serialized_size);
|
||||
let config = &self.report.config.dmarc_aggregate;
|
||||
|
||||
// Group duplicates
|
||||
let from_key = ValueKey::from(ValueClass::Queue(QueueClass::DmarcReportEvent(
|
||||
ReportEvent {
|
||||
due: event.due,
|
||||
policy_hash: event.policy_hash,
|
||||
seq_id: 0,
|
||||
domain: event.domain.clone(),
|
||||
},
|
||||
)));
|
||||
let to_key = ValueKey::from(ValueClass::Queue(QueueClass::DmarcReportEvent(
|
||||
ReportEvent {
|
||||
due: event.due,
|
||||
policy_hash: event.policy_hash,
|
||||
seq_id: u64::MAX,
|
||||
domain: event.domain.clone(),
|
||||
},
|
||||
)));
|
||||
let mut record_map = AHashMap::with_capacity(dmarc.records.len());
|
||||
if let Err(err) = self
|
||||
.shared
|
||||
.default_data_store
|
||||
.iterate(
|
||||
IterateParams::new(from_key, to_key).ascending(),
|
||||
|_, v| match record_map.entry(Bincode::<Record>::deserialize(v)?.inner) {
|
||||
Entry::Occupied(mut e) => {
|
||||
*e.get_mut() += 1;
|
||||
Ok(true)
|
||||
}
|
||||
Entry::Vacant(e) => {
|
||||
if serde::Serialize::serialize(e.key(), &mut serialized_size).is_ok() {
|
||||
e.insert(1u32);
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
},
|
||||
)
|
||||
// Fetch records
|
||||
let records = match self
|
||||
.fetch_dmarc_records(&event, &dmarc, Some(&mut serialized_size))
|
||||
.await
|
||||
{
|
||||
tracing::warn!(
|
||||
parent: &span,
|
||||
event = "error",
|
||||
"Failed to read DMARC report: {}",
|
||||
err
|
||||
);
|
||||
}
|
||||
Ok(records) => records,
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
parent: &span,
|
||||
event = "error",
|
||||
"Failed to read DMARC records: {}",
|
||||
err
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Create report
|
||||
let mut report = Report::new()
|
||||
|
@ -466,8 +435,8 @@ impl SMTP {
|
|||
{
|
||||
report = report.with_extra_contact_info(contact_info);
|
||||
}
|
||||
for (record, count) in record_map {
|
||||
report.add_record(record.with_count(count));
|
||||
for record in records {
|
||||
report = report.with_record(record);
|
||||
}
|
||||
let from_addr = self
|
||||
.eval_if(
|
||||
|
@ -503,6 +472,64 @@ impl SMTP {
|
|||
self.delete_dmarc_report(event).await;
|
||||
}
|
||||
|
||||
pub(crate) async fn fetch_dmarc_records(
|
||||
&self,
|
||||
event: &ReportEvent,
|
||||
dmarc: &DmarcFormat,
|
||||
mut serialized_size: Option<&mut serde_json::Serializer<SerializedSize>>,
|
||||
) -> store::Result<Vec<Record>> {
|
||||
// Group duplicates
|
||||
let from_key = ValueKey::from(ValueClass::Queue(QueueClass::DmarcReportEvent(
|
||||
ReportEvent {
|
||||
due: event.due,
|
||||
policy_hash: event.policy_hash,
|
||||
seq_id: 0,
|
||||
domain: event.domain.clone(),
|
||||
},
|
||||
)));
|
||||
let to_key = ValueKey::from(ValueClass::Queue(QueueClass::DmarcReportEvent(
|
||||
ReportEvent {
|
||||
due: event.due,
|
||||
policy_hash: event.policy_hash,
|
||||
seq_id: u64::MAX,
|
||||
domain: event.domain.clone(),
|
||||
},
|
||||
)));
|
||||
let mut record_map = AHashMap::with_capacity(dmarc.records.len());
|
||||
self.shared
|
||||
.default_data_store
|
||||
.iterate(
|
||||
IterateParams::new(from_key, to_key).ascending(),
|
||||
|_, v| match record_map.entry(Bincode::<Record>::deserialize(v)?.inner) {
|
||||
Entry::Occupied(mut e) => {
|
||||
*e.get_mut() += 1;
|
||||
Ok(true)
|
||||
}
|
||||
Entry::Vacant(e) => {
|
||||
if serialized_size
|
||||
.as_deref_mut()
|
||||
.map_or(true, |serialized_size| {
|
||||
serde::Serialize::serialize(e.key(), serialized_size).is_ok()
|
||||
})
|
||||
{
|
||||
e.insert(1u32);
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut records = Vec::with_capacity(record_map.len());
|
||||
for (record, count) in record_map {
|
||||
records.push(record.with_count(count));
|
||||
}
|
||||
|
||||
Ok(records)
|
||||
}
|
||||
|
||||
pub async fn delete_dmarc_report(&self, event: ReportEvent) {
|
||||
let from_key = ReportEvent {
|
||||
due: event.due,
|
||||
|
|
|
@ -58,9 +58,9 @@ pub struct TlsRptOptions {
|
|||
|
||||
#[derive(Debug, serde::Serialize, serde::Deserialize)]
|
||||
pub struct TlsFormat {
|
||||
rua: Vec<ReportUri>,
|
||||
policy: PolicyDetails,
|
||||
records: Vec<Option<FailureDetails>>,
|
||||
pub rua: Vec<ReportUri>,
|
||||
pub policy: PolicyDetails,
|
||||
pub records: Vec<Option<FailureDetails>>,
|
||||
}
|
||||
|
||||
#[cfg(feature = "test_mode")]
|
||||
|
@ -146,81 +146,29 @@ impl SMTP {
|
|||
};
|
||||
let _ = serde::Serialize::serialize(&tls, &mut serialized_size);
|
||||
|
||||
// Group duplicates
|
||||
let mut total_success = 0;
|
||||
let mut total_failure = 0;
|
||||
|
||||
let from_key =
|
||||
ValueKey::from(ValueClass::Queue(QueueClass::TlsReportEvent(ReportEvent {
|
||||
due: event.due,
|
||||
policy_hash: event.policy_hash,
|
||||
seq_id: 0,
|
||||
domain: event.domain.clone(),
|
||||
})));
|
||||
let to_key =
|
||||
ValueKey::from(ValueClass::Queue(QueueClass::TlsReportEvent(ReportEvent {
|
||||
due: event.due,
|
||||
policy_hash: event.policy_hash,
|
||||
seq_id: u64::MAX,
|
||||
domain: event.domain.clone(),
|
||||
})));
|
||||
let mut record_map = AHashMap::with_capacity(tls.records.len());
|
||||
if let Err(err) = self
|
||||
.shared
|
||||
.default_data_store
|
||||
.iterate(IterateParams::new(from_key, to_key).ascending(), |_, v| {
|
||||
if let Some(failure_details) =
|
||||
Bincode::<Option<FailureDetails>>::deserialize(v)?.inner
|
||||
{
|
||||
match record_map.entry(failure_details) {
|
||||
Entry::Occupied(mut e) => {
|
||||
total_failure += 1;
|
||||
*e.get_mut() += 1;
|
||||
Ok(true)
|
||||
}
|
||||
Entry::Vacant(e) => {
|
||||
if serde::Serialize::serialize(e.key(), &mut serialized_size)
|
||||
.is_ok()
|
||||
{
|
||||
total_failure += 1;
|
||||
e.insert(1u32);
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
total_success += 1;
|
||||
Ok(true)
|
||||
}
|
||||
})
|
||||
// Fetch policy
|
||||
match self
|
||||
.fetch_tls_policy(event, tls.policy, Some(&mut serialized_size))
|
||||
.await
|
||||
{
|
||||
tracing::warn!(
|
||||
parent: &span,
|
||||
event = "error",
|
||||
"Failed to read TLS report: {}",
|
||||
err
|
||||
);
|
||||
Ok(policy) => {
|
||||
report.policies.push(policy);
|
||||
for entry in tls.rua {
|
||||
if !rua.contains(&entry) {
|
||||
rua.push(entry);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
parent: &span,
|
||||
event = "error",
|
||||
"Failed to read TLS report: {}",
|
||||
err
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
report.policies.push(Policy {
|
||||
policy: tls.policy,
|
||||
summary: Summary {
|
||||
total_success,
|
||||
total_failure,
|
||||
},
|
||||
failure_details: record_map
|
||||
.into_iter()
|
||||
.map(|(mut r, count)| {
|
||||
r.failed_session_count = count;
|
||||
r
|
||||
})
|
||||
.collect(),
|
||||
});
|
||||
|
||||
rua = tls.rua;
|
||||
}
|
||||
|
||||
if report.policies.is_empty() {
|
||||
|
@ -362,6 +310,79 @@ impl SMTP {
|
|||
self.delete_tls_report(events).await;
|
||||
}
|
||||
|
||||
pub(crate) async fn fetch_tls_policy(
|
||||
&self,
|
||||
event: &ReportEvent,
|
||||
policy_details: PolicyDetails,
|
||||
mut serialized_size: Option<&mut serde_json::Serializer<SerializedSize>>,
|
||||
) -> store::Result<Policy> {
|
||||
// Group duplicates
|
||||
let mut total_success = 0;
|
||||
let mut total_failure = 0;
|
||||
|
||||
let from_key = ValueKey::from(ValueClass::Queue(QueueClass::TlsReportEvent(ReportEvent {
|
||||
due: event.due,
|
||||
policy_hash: event.policy_hash,
|
||||
seq_id: 0,
|
||||
domain: event.domain.clone(),
|
||||
})));
|
||||
let to_key = ValueKey::from(ValueClass::Queue(QueueClass::TlsReportEvent(ReportEvent {
|
||||
due: event.due,
|
||||
policy_hash: event.policy_hash,
|
||||
seq_id: u64::MAX,
|
||||
domain: event.domain.clone(),
|
||||
})));
|
||||
let mut record_map = AHashMap::new();
|
||||
self.shared
|
||||
.default_data_store
|
||||
.iterate(IterateParams::new(from_key, to_key).ascending(), |_, v| {
|
||||
if let Some(failure_details) =
|
||||
Bincode::<Option<FailureDetails>>::deserialize(v)?.inner
|
||||
{
|
||||
match record_map.entry(failure_details) {
|
||||
Entry::Occupied(mut e) => {
|
||||
total_failure += 1;
|
||||
*e.get_mut() += 1;
|
||||
Ok(true)
|
||||
}
|
||||
Entry::Vacant(e) => {
|
||||
if serialized_size
|
||||
.as_deref_mut()
|
||||
.map_or(true, |serialized_size| {
|
||||
serde::Serialize::serialize(e.key(), serialized_size).is_ok()
|
||||
})
|
||||
{
|
||||
total_failure += 1;
|
||||
e.insert(1u32);
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
total_success += 1;
|
||||
Ok(true)
|
||||
}
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(Policy {
|
||||
policy: policy_details,
|
||||
summary: Summary {
|
||||
total_success,
|
||||
total_failure,
|
||||
},
|
||||
failure_details: record_map
|
||||
.into_iter()
|
||||
.map(|(mut r, count)| {
|
||||
r.failed_session_count = count;
|
||||
r
|
||||
})
|
||||
.collect(),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn schedule_tls(&self, event: Box<TlsEvent>) {
|
||||
let created = event.interval.to_timestamp();
|
||||
let deliver_at = created + event.interval.as_secs();
|
||||
|
|
|
@ -59,6 +59,13 @@ member-of = ["superusers"]
|
|||
|
||||
"#;
|
||||
|
||||
#[derive(serde::Deserialize)]
|
||||
#[allow(dead_code)]
|
||||
struct List<T> {
|
||||
items: Vec<T>,
|
||||
total: usize,
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial_test::serial]
|
||||
async fn manage_queue() {
|
||||
|
@ -185,10 +192,11 @@ async fn manage_queue() {
|
|||
);
|
||||
|
||||
// Fetch and validate messages
|
||||
let ids = send_manage_request::<Vec<QueueId>>("/admin/queue/list")
|
||||
let ids = send_manage_request::<List<QueueId>>("/api/queue/list")
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap_data();
|
||||
.unwrap_data()
|
||||
.items;
|
||||
assert_eq!(ids.len(), 6);
|
||||
let mut id_map = AHashMap::new();
|
||||
let mut id_map_rev = AHashMap::new();
|
||||
|
@ -269,31 +277,32 @@ async fn manage_queue() {
|
|||
// Test list search
|
||||
for (query, expected_ids) in [
|
||||
(
|
||||
"/admin/queue/list?from=bill1@foobar.net".to_string(),
|
||||
"/api/queue/list?from=bill1@foobar.net".to_string(),
|
||||
vec!["a"],
|
||||
),
|
||||
(
|
||||
"/admin/queue/list?to=foobar.org".to_string(),
|
||||
"/api/queue/list?to=foobar.org".to_string(),
|
||||
vec!["d", "e", "f"],
|
||||
),
|
||||
(
|
||||
"/admin/queue/list?from=bill3@foobar.net&to=rcpt5@example1.com".to_string(),
|
||||
"/api/queue/list?from=bill3@foobar.net&to=rcpt5@example1.com".to_string(),
|
||||
vec!["c"],
|
||||
),
|
||||
(
|
||||
format!("/admin/queue/list?before={test_search}"),
|
||||
format!("/api/queue/list?before={test_search}"),
|
||||
vec!["a", "b"],
|
||||
),
|
||||
(
|
||||
format!("/admin/queue/list?after={test_search}"),
|
||||
format!("/api/queue/list?after={test_search}"),
|
||||
vec!["d", "e", "f", "c"],
|
||||
),
|
||||
] {
|
||||
let expected_ids = HashSet::from_iter(expected_ids.into_iter().map(|s| s.to_string()));
|
||||
let ids = send_manage_request::<Vec<QueueId>>(&query)
|
||||
let ids = send_manage_request::<List<QueueId>>(&query)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap_data()
|
||||
.items
|
||||
.into_iter()
|
||||
.map(|id| id_map_rev.get(&id).unwrap().clone())
|
||||
.collect::<HashSet<_>>();
|
||||
|
@ -303,7 +312,7 @@ async fn manage_queue() {
|
|||
// Retry delivery
|
||||
assert_eq!(
|
||||
send_manage_request::<Vec<bool>>(&format!(
|
||||
"/admin/queue/retry?id={},{}",
|
||||
"/api/queue/retry?id={},{}",
|
||||
id_map.get("e").unwrap(),
|
||||
id_map.get("f").unwrap()
|
||||
))
|
||||
|
@ -314,7 +323,7 @@ async fn manage_queue() {
|
|||
);
|
||||
assert_eq!(
|
||||
send_manage_request::<Vec<bool>>(&format!(
|
||||
"/admin/queue/retry?id={}&filter=example1.org&at=2200-01-01T00:00:00Z",
|
||||
"/api/queue/retry?id={}&filter=example1.org&at=2200-01-01T00:00:00Z",
|
||||
id_map.get("a").unwrap(),
|
||||
))
|
||||
.await
|
||||
|
@ -377,7 +386,7 @@ async fn manage_queue() {
|
|||
] {
|
||||
assert_eq!(
|
||||
send_manage_request::<Vec<bool>>(&format!(
|
||||
"/admin/queue/cancel?id={}{}{}",
|
||||
"/api/queue/cancel?id={}{}{}",
|
||||
id_map.get(id).unwrap(),
|
||||
if !filter.is_empty() { "&filter=" } else { "" },
|
||||
filter
|
||||
|
@ -390,10 +399,11 @@ async fn manage_queue() {
|
|||
);
|
||||
}
|
||||
assert_eq!(
|
||||
send_manage_request::<Vec<QueueId>>("/admin/queue/list")
|
||||
send_manage_request::<List<QueueId>>("/api/queue/list")
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap_data()
|
||||
.items
|
||||
.len(),
|
||||
3
|
||||
);
|
||||
|
@ -418,7 +428,7 @@ async fn manage_queue() {
|
|||
if domain.name == "example2.org" {
|
||||
assert_eq!(&domain.status, &Status::Completed("".to_string()));
|
||||
for rcpt in &domain.recipients {
|
||||
assert!(matches!(&rcpt.status, Status::Completed(_)));
|
||||
assert!(matches!(&rcpt.status, Status::PermanentFailure(_)));
|
||||
}
|
||||
} else {
|
||||
assert_eq!(&domain.status, &Status::Scheduled);
|
||||
|
@ -432,7 +442,7 @@ async fn manage_queue() {
|
|||
if domain.name == "example2.com" {
|
||||
for rcpt in &domain.recipients {
|
||||
if rcpt.address == "rcpt6@example2.com" {
|
||||
assert!(matches!(&rcpt.status, Status::Completed(_)));
|
||||
assert!(matches!(&rcpt.status, Status::PermanentFailure(_)));
|
||||
} else {
|
||||
assert!(matches!(&rcpt.status, Status::Scheduled));
|
||||
}
|
||||
|
@ -476,7 +486,7 @@ fn assert_timestamp(timestamp: &DateTime, expected: i64, ctx: &str, message: &Me
|
|||
|
||||
async fn get_messages(ids: &[QueueId]) -> Vec<Option<Message>> {
|
||||
send_manage_request(&format!(
|
||||
"/admin/queue/status?id={}",
|
||||
"/api/queue/status?id={}",
|
||||
ids.iter()
|
||||
.map(|id| id.to_string())
|
||||
.collect::<Vec<_>>()
|
||||
|
|
|
@ -152,15 +152,29 @@ async fn manage_reports() {
|
|||
let mut parts = id.split('!');
|
||||
let report = report.unwrap();
|
||||
let mut id_num = if parts.next().unwrap() == "t" {
|
||||
assert_eq!(report.type_, "tls");
|
||||
assert!(matches!(report, Report::Tls { .. }));
|
||||
2
|
||||
} else {
|
||||
assert_eq!(report.type_, "dmarc");
|
||||
assert!(matches!(report, Report::Dmarc { .. }));
|
||||
0
|
||||
};
|
||||
assert_eq!(parts.next().unwrap(), report.domain);
|
||||
let diff = report.range_to.to_timestamp() - report.range_from.to_timestamp();
|
||||
if report.domain == "foobar.org" {
|
||||
let (domain, range_to, range_from) = match report {
|
||||
Report::Dmarc {
|
||||
domain,
|
||||
range_to,
|
||||
range_from,
|
||||
..
|
||||
} => (domain, range_to, range_from),
|
||||
Report::Tls {
|
||||
domain,
|
||||
range_to,
|
||||
range_from,
|
||||
..
|
||||
} => (domain, range_to, range_from),
|
||||
};
|
||||
assert_eq!(parts.next().unwrap(), domain);
|
||||
let diff = range_to.to_timestamp() - range_from.to_timestamp();
|
||||
if domain == "foobar.org" {
|
||||
assert_eq!(diff, 86400);
|
||||
} else {
|
||||
assert_eq!(diff, 7 * 86400);
|
||||
|
|
Loading…
Reference in a new issue