Test fixes

This commit is contained in:
Mauro D 2024-02-10 15:39:05 +01:00
parent d16119f54b
commit 3d7b9b0334
33 changed files with 518 additions and 508 deletions

View file

@ -208,9 +208,6 @@ pub struct RelayHost {
}
pub struct QueueConfig {
pub path: PathBuf,
pub hash: IfBlock,
// Schedule
pub retry: IfBlock,
pub notify: IfBlock,
@ -240,8 +237,6 @@ pub struct QueueOutboundSourceIp {
}
pub struct ReportConfig {
pub path: PathBuf,
pub hash: IfBlock,
pub submitter: IfBlock,
pub analysis: ReportAnalysis,

View file

@ -74,13 +74,6 @@ impl ConfigQueue for Config {
let default_hostname = self.value_require("server.hostname")?;
let config = QueueConfig {
path: self.property_require("queue.path")?,
hash: self
.parse_if_block("queue.hash", |name| {
map_expr_token::<NoConstants>(name, sender_envelope_keys)
})?
.unwrap_or_else(|| IfBlock::new(32)),
retry: self
.parse_if_block("queue.schedule.retry", |name| {
map_expr_token::<Duration>(name, host_envelope_keys)

View file

@ -89,17 +89,11 @@ impl ConfigReport for Config {
sender_envelope_keys,
)?,
tls: self.parse_aggregate_report("tls", default_hostname, rcpt_envelope_keys)?,
path: self.property_require("report.path")?,
submitter: self
.parse_if_block("report.submitter", |name| {
map_expr_token::<NoConstants>(name, &[V_RECIPIENT_DOMAIN])
})?
.unwrap_or_else(|| IfBlock::new(default_hostname.to_string())),
hash: self
.parse_if_block("report.hash", |name| {
map_expr_token::<NoConstants>(name, sender_envelope_keys)
})?
.unwrap_or_else(|| IfBlock::new(32)),
analysis: ReportAnalysis {
addresses,
forward: self.property("report.analysis.forward")?.unwrap_or(false),

View file

@ -65,7 +65,7 @@ pub struct Schedule<T> {
pub inner: T,
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
#[derive(Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct Message {
pub id: QueueId,
pub created: u64,
@ -85,7 +85,7 @@ pub struct Message {
pub quota_keys: Vec<QuotaKey>,
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
#[derive(Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum QuotaKey {
Size { key: Vec<u8>, id: u64 },
Count { key: Vec<u8>, id: u64 },

View file

@ -36,7 +36,7 @@ use crate::{
use super::{pool::SqliteConnectionManager, SqliteStore};
impl SqliteStore {
pub async fn open(config: &Config, prefix: impl AsKey) -> crate::Result<Self> {
pub fn open(config: &Config, prefix: impl AsKey) -> crate::Result<Self> {
let prefix = prefix.as_key();
let db = Self {
conn_pool: Pool::builder()

View file

@ -137,7 +137,7 @@ impl ConfigStore for Config {
}
#[cfg(feature = "sqlite")]
"sqlite" => {
let db: Store = SqliteStore::open(self, prefix).await?.into();
let db: Store = SqliteStore::open(self, prefix)?.into();
config.stores.insert(store_id.clone(), db.clone());
config
.fts_stores

View file

@ -36,7 +36,7 @@ use rustls::ServerConfig;
use rustls_pemfile::{certs, pkcs8_private_keys};
use rustls_pki_types::PrivateKeyDer;
use std::{borrow::Cow, io::BufReader, path::PathBuf, sync::Arc};
use store::{config::ConfigStore, LookupKey, LookupStore, LookupValue, Store, Stores};
use store::{config::ConfigStore, LookupStore, Store, Stores};
use tokio_rustls::TlsAcceptor;
use utils::config::Servers;
@ -599,15 +599,13 @@ async fn lookup_local() {
("suffix", "coco", false),
] {
assert_eq!(
matches!(
lookups
.get(&format!("local/{lookup}"))
.unwrap()
.key_get::<String>(LookupKey::Key(item.as_bytes().to_vec()))
.await
.unwrap(),
LookupValue::Value { .. }
),
lookups
.get(&format!("local/{lookup}"))
.unwrap()
.key_get::<String>(item.as_bytes().to_vec())
.await
.unwrap()
.is_some(),
expect,
"failed for {lookup}, item {item}"
);

View file

@ -150,8 +150,9 @@ async fn data() {
session
.send_message("john@doe.org", &["bill@foobar.org"], "test:no_msgid", "250")
.await;
qr.read_event().await.assert_reload();
assert_eq!(
qr.read_event().await.unwrap_message().read_message(),
qr.last_queued_message().await.read_message(&core).await,
load_test_message("no_msgid", "messages")
);
@ -168,10 +169,11 @@ async fn data() {
session
.send_message("john@doe.org", &["mike@test.com"], "test:no_msgid", "250")
.await;
qr.read_event()
qr.read_event().await.assert_reload();
qr.last_queued_message()
.await
.read_lines(&core)
.await
.unwrap_message()
.read_lines()
.assert_contains("From: ")
.assert_contains("To: ")
.assert_contains("Subject: ")

View file

@ -180,13 +180,15 @@ async fn dmarc() {
session.mail_from("bill@example.com", "550 5.7.23").await;
// Expect SPF auth failure report
let message = qr.read_event().await.unwrap_message();
qr.read_event().await.assert_reload();
let message = qr.last_queued_message().await;
assert_eq!(
message.recipients.last().unwrap().address,
"spf-failures@example.com"
);
message
.read_lines()
.read_lines(&core)
.await
.assert_contains("DKIM-Signature: v=1; a=rsa-sha256; s=rsa; d=example.com;")
.assert_contains("To: spf-failures@example.com")
.assert_contains("Feedback-Type: auth-failure")
@ -194,7 +196,7 @@ async fn dmarc() {
// Second DKIM failure report should be rate limited
session.mail_from("bill@example.com", "550 5.7.23").await;
qr.assert_empty_queue();
qr.assert_no_events();
// Invalid DKIM signatures should be rejected
session.data.remote_ip_str = "10.0.0.1".to_string();
@ -210,13 +212,15 @@ async fn dmarc() {
.await;
// Expect DKIM auth failure report
let message = qr.read_event().await.unwrap_message();
qr.read_event().await.assert_reload();
let message = qr.last_queued_message().await;
assert_eq!(
message.recipients.last().unwrap().address,
"dkim-failures@example.com"
);
message
.read_lines()
.read_lines(&core)
.await
.assert_contains("DKIM-Signature: v=1; a=rsa-sha256; s=rsa; d=example.com;")
.assert_contains("To: dkim-failures@example.com")
.assert_contains("Feedback-Type: auth-failure")
@ -231,7 +235,7 @@ async fn dmarc() {
"550 5.7.20",
)
.await;
qr.assert_empty_queue();
qr.assert_no_events();
// Invalid ARC should be rejected
session
@ -242,7 +246,7 @@ async fn dmarc() {
"550 5.7.29",
)
.await;
qr.assert_empty_queue();
qr.assert_no_events();
// Unaligned DMARC should be rejected
core.resolvers.dns.txt_add(
@ -260,13 +264,15 @@ async fn dmarc() {
.await;
// Expect DMARC auth failure report
let message = qr.read_event().await.unwrap_message();
qr.read_event().await.assert_reload();
let message = qr.last_queued_message().await;
assert_eq!(
message.recipients.last().unwrap().address,
"dmarc-failures@example.com"
);
message
.read_lines()
.read_lines(&core)
.await
.assert_contains("DKIM-Signature: v=1; a=rsa-sha256; s=rsa; d=example.com;")
.assert_contains("To: dmarc-failures@example.com")
.assert_contains("Feedback-Type: auth-failure")
@ -289,7 +295,7 @@ async fn dmarc() {
"550 5.7.1",
)
.await;
qr.assert_empty_queue();
qr.assert_no_events();
// Messagess passing DMARC should be accepted
session
@ -300,10 +306,11 @@ async fn dmarc() {
"250",
)
.await;
qr.read_event()
qr.read_event().await.assert_reload();
qr.last_queued_message()
.await
.read_lines(&core)
.await
.unwrap_message()
.read_lines()
.assert_contains("dkim=pass")
.assert_contains("spf=pass")
.assert_contains("dmarc=pass")

View file

@ -97,7 +97,7 @@ async fn milter_session() {
"503 5.5.3",
)
.await;
qr.assert_empty_queue();
qr.assert_no_events();
// Test discard
session
@ -108,7 +108,7 @@ async fn milter_session() {
"250 2.0.0",
)
.await;
qr.assert_empty_queue();
qr.assert_no_events();
// Test temp fail
session
@ -119,7 +119,7 @@ async fn milter_session() {
"451 4.3.5",
)
.await;
qr.assert_empty_queue();
qr.assert_no_events();
// Test shutdown
session
@ -130,7 +130,7 @@ async fn milter_session() {
"421 4.3.0",
)
.await;
qr.assert_empty_queue();
qr.assert_no_events();
// Test reply code
session
@ -141,7 +141,7 @@ async fn milter_session() {
"321",
)
.await;
qr.assert_empty_queue();
qr.assert_no_events();
// Test accept with header addition
session
@ -152,10 +152,11 @@ async fn milter_session() {
"250 2.0.0",
)
.await;
qr.read_event()
qr.read_event().await.assert_reload();
qr.last_queued_message()
.await
.read_lines(&core)
.await
.unwrap_message()
.read_lines()
.assert_contains("X-Hello: World")
.assert_contains("Subject: Is dinner ready?")
.assert_contains("Are you hungry yet?");
@ -169,10 +170,11 @@ async fn milter_session() {
"250 2.0.0",
)
.await;
qr.read_event()
qr.read_event().await.assert_reload();
qr.last_queued_message()
.await
.read_lines(&core)
.await
.unwrap_message()
.read_lines()
.assert_contains("Subject: [SPAM] Saying Hello")
.assert_count("References: ", 1)
.assert_contains("Are you hungry yet?");
@ -186,10 +188,11 @@ async fn milter_session() {
"250 2.0.0",
)
.await;
qr.read_event()
qr.read_event().await.assert_reload();
qr.last_queued_message()
.await
.read_lines(&core)
.await
.unwrap_message()
.read_lines()
.assert_contains("X-Spam: Yes")
.assert_contains("123456");
}

View file

@ -23,10 +23,15 @@
use std::time::Duration;
use store::{
write::{key::DeserializeBigEndian, Bincode, QueueClass, QueueEvent, ValueClass},
Deserialize, IterateParams, ValueKey, U64_LEN,
};
use tokio::sync::mpsc::error::TryRecvError;
use smtp::{
queue::{self, Message, OnHold, Schedule, WorkerResult},
core::SMTP,
queue::{self, DeliveryAttempt, Message, OnHold, QueueId},
reporting::{self, DmarcEvent, TlsEvent},
};
@ -64,16 +69,106 @@ impl QueueReceiver {
Err(_) => None,
}
}
pub fn assert_empty_queue(&mut self) {
pub fn assert_no_events(&mut self) {
match self.queue_rx.try_recv() {
Err(TryRecvError::Empty) => (),
Ok(queue::Event::Queue(message)) => {
panic!("Unexpected message: {}", message.inner.read_message());
}
Ok(event) => panic!("Expected empty queue but got {event:?}"),
Err(err) => panic!("Queue error: {err:?}"),
}
}
pub async fn assert_queue_is_empty(&mut self) {
assert_eq!(self.read_queued_messages().await, vec![]);
assert_eq!(self.read_queued_events().await, vec![]);
}
pub async fn expect_message(&mut self) -> Message {
self.read_event().await.assert_reload();
self.last_queued_message().await
}
pub async fn expect_message_then_deliver(&mut self) -> DeliveryAttempt {
let message = self.expect_message().await;
let event = QueueEvent {
due: self.message_due(message.id).await,
queue_id: message.id,
};
DeliveryAttempt::new(message, event)
}
pub async fn read_queued_events(&self) -> Vec<QueueEvent> {
let mut events = Vec::new();
let from_key = ValueKey::from(ValueClass::Queue(QueueClass::MessageEvent(QueueEvent {
due: 0,
queue_id: 0,
})));
let to_key = ValueKey::from(ValueClass::Queue(QueueClass::MessageEvent(QueueEvent {
due: u64::MAX,
queue_id: u64::MAX,
})));
self.store
.iterate(
IterateParams::new(from_key, to_key).ascending().no_values(),
|key, _| {
events.push(QueueEvent {
due: key.deserialize_be_u64(1)?,
queue_id: key.deserialize_be_u64(U64_LEN + 1)?,
});
Ok(true)
},
)
.await
.unwrap();
events
}
pub async fn read_queued_messages(&self) -> Vec<Message> {
let from_key = ValueKey::from(ValueClass::Queue(QueueClass::Message(0)));
let to_key = ValueKey::from(ValueClass::Queue(QueueClass::Message(u64::MAX)));
let mut messages = Vec::new();
self.store
.iterate(
IterateParams::new(from_key, to_key).ascending(),
|key, value| {
let value = Bincode::<Message>::deserialize(value)?;
assert_eq!(key.deserialize_be_u64(1)?, value.inner.id);
messages.push(value.inner);
Ok(true)
},
)
.await
.unwrap();
messages
}
pub async fn last_queued_message(&self) -> Message {
self.read_queued_messages()
.await
.into_iter()
.next()
.expect("No messages found in queue")
}
pub async fn message_due(&self, queue_id: QueueId) -> u64 {
self.read_queued_events()
.await
.iter()
.find_map(|event| {
if event.queue_id == queue_id {
Some(event.due)
} else {
None
}
})
.expect("No event found in queue for message")
}
}
impl ReportReceiver {
@ -102,65 +197,21 @@ impl ReportReceiver {
}
pub trait TestQueueEvent {
fn unwrap_message(self) -> Box<Message>;
fn unwrap_schedule(self) -> Schedule<Box<Message>>;
fn unwrap_result(self) -> WorkerResult;
fn unwrap_done(self);
fn unwrap_on_hold(self) -> OnHold<Box<Message>>;
fn unwrap_retry(self) -> Schedule<Box<Message>>;
fn assert_reload(self);
fn unwrap_on_hold(self) -> OnHold<QueueEvent>;
}
impl TestQueueEvent for queue::Event {
fn unwrap_message(self) -> Box<Message> {
fn assert_reload(self) {
match self {
queue::Event::Queue(message) => message.inner,
queue::Event::Reload => (),
e => panic!("Unexpected event: {e:?}"),
}
}
fn unwrap_schedule(self) -> Schedule<Box<Message>> {
fn unwrap_on_hold(self) -> OnHold<QueueEvent> {
match self {
queue::Event::Queue(message) => message,
e => panic!("Unexpected event: {e:?}"),
}
}
fn unwrap_result(self) -> WorkerResult {
match self {
queue::Event::Done(result) => result,
queue::Event::Queue(message) => {
panic!("Unexpected message: {}", message.inner.read_message());
}
e => panic!("Unexpected event: {e:?}"),
}
}
fn unwrap_done(self) {
match self {
queue::Event::Done(WorkerResult::Done) => (),
queue::Event::Queue(message) => {
panic!("Unexpected message: {}", message.inner.read_message());
}
e => panic!("Unexpected event: {e:?}"),
}
}
fn unwrap_on_hold(self) -> OnHold<Box<Message>> {
match self {
queue::Event::Done(WorkerResult::OnHold(value)) => value,
queue::Event::Queue(message) => {
panic!("Unexpected message: {}", message.inner.read_message());
}
e => panic!("Unexpected event: {e:?}"),
}
}
fn unwrap_retry(self) -> Schedule<Box<Message>> {
match self {
queue::Event::Done(WorkerResult::Retry(value)) => value,
queue::Event::Queue(message) => {
panic!("Unexpected message: {}", message.inner.read_message());
}
queue::Event::OnHold(value) => value,
e => panic!("Unexpected event: {e:?}"),
}
}
@ -188,20 +239,26 @@ impl TestReportingEvent for reporting::Event {
}
pub trait TestMessage {
fn read_message(&self) -> String;
fn read_lines(&self) -> Vec<String>;
async fn read_message(&self, core: &SMTP) -> String;
async fn read_lines(&self, core: &SMTP) -> Vec<String>;
}
impl TestMessage for Message {
fn read_message(&self) -> String {
let mut buf = vec![0u8; self.size];
let mut file = std::fs::File::open(&self.path).unwrap();
std::io::Read::read_exact(&mut file, &mut buf).unwrap();
String::from_utf8(buf).unwrap()
async fn read_message(&self, core: &SMTP) -> String {
String::from_utf8(
core.shared
.default_blob_store
.get_blob(self.blob_hash.as_slice(), 0..u32::MAX)
.await
.unwrap()
.expect("Message blob not found"),
)
.unwrap()
}
fn read_lines(&self) -> Vec<String> {
self.read_message()
async fn read_lines(&self, core: &SMTP) -> Vec<String> {
self.read_message(core)
.await
.split('\n')
.map(|l| l.to_string())
.collect()

View file

@ -228,13 +228,14 @@ async fn sieve_scripts() {
// Expect a modified message
session.data("test:multipart", "250").await;
qr.read_event()
qr.read_event().await.assert_reload();
qr.last_queued_message()
.await
.read_lines(&core)
.await
.unwrap_message()
.read_lines()
.assert_contains("X-Part-Number: 5")
.assert_contains("THIS IS A PIECE OF HTML TEXT");
qr.assert_empty_queue();
qr.assert_no_events();
// Expect rejection for bill@foobar.net
session
@ -245,7 +246,7 @@ async fn sieve_scripts() {
"503 5.5.3 Bill cannot receive messages",
)
.await;
qr.assert_empty_queue();
qr.assert_no_events();
// Expect message delivery plus a notification
session
@ -256,7 +257,8 @@ async fn sieve_scripts() {
"250",
)
.await;
let notification = qr.read_event().await.unwrap_message();
qr.read_event().await.assert_reload();
let notification = qr.last_queued_message().await;
assert_eq!(notification.return_path, "");
assert_eq!(notification.recipients.len(), 2);
assert_eq!(
@ -268,22 +270,24 @@ async fn sieve_scripts() {
"jane@example.org"
);
notification
.read_lines()
.read_lines(&core)
.await
.assert_contains("DKIM-Signature: v=1; a=rsa-sha256; s=rsa; d=example.com;")
.assert_contains("From: \"Sieve Daemon\" <sieve@foobar.org>")
.assert_contains("To: <john@example.net>")
.assert_contains("Cc: <jane@example.org>")
.assert_contains("Subject: You have got mail")
.assert_contains("One Two Three Four");
qr.read_event()
qr.read_event().await.assert_reload();
qr.last_queued_message()
.await
.read_lines(&core)
.await
.unwrap_message()
.read_lines()
.assert_contains("One Two Three Four")
.assert_contains("multi-part message in MIME format")
.assert_not_contains("X-Part-Number: 5")
.assert_not_contains("THIS IS A PIECE OF HTML TEXT");
qr.assert_empty_queue();
qr.assert_no_events();
// Expect a modified message delivery plus a notification
session
@ -295,10 +299,11 @@ async fn sieve_scripts() {
)
.await;
qr.read_event()
qr.read_event().await.assert_reload();
qr.last_queued_message()
.await
.read_lines(&core)
.await
.unwrap_message()
.read_lines()
.assert_contains("DKIM-Signature: v=1; a=rsa-sha256; s=rsa; d=example.com;")
.assert_contains("From: \"Sieve Daemon\" <sieve@foobar.org>")
.assert_contains("To: <john@example.net>")
@ -306,10 +311,11 @@ async fn sieve_scripts() {
.assert_contains("Subject: You have got mail")
.assert_contains("One Two Three Four");
qr.read_event()
qr.read_event().await.assert_reload();
qr.last_queued_message()
.await
.read_lines(&core)
.await
.unwrap_message()
.read_lines()
.assert_contains("X-Part-Number: 5")
.assert_contains("THIS IS A PIECE OF HTML TEXT")
.assert_not_contains("X-My-Header: true");
@ -323,7 +329,8 @@ async fn sieve_scripts() {
"250",
)
.await;
let redirect = qr.read_event().await.unwrap_message();
qr.read_event().await.assert_reload();
let redirect = qr.last_queued_message().await;
assert_eq!(redirect.return_path, "");
assert_eq!(redirect.recipients.len(), 1);
assert_eq!(
@ -331,13 +338,14 @@ async fn sieve_scripts() {
"redirect@here.email"
);
redirect
.read_lines()
.read_lines(&core)
.await
.assert_contains("From: no-reply@my.domain")
.assert_contains("To: Suzie Q <suzie@shopping.example.net>")
.assert_contains("Subject: Is dinner ready?")
.assert_contains("Message-ID: <20030712040037.46341.5F8J@football.example.com>")
.assert_not_contains("From: Joe SixPack <joe@football.example.com>");
qr.assert_empty_queue();
qr.assert_no_events();
// Expect an intact redirected message
session
@ -348,7 +356,8 @@ async fn sieve_scripts() {
"250",
)
.await;
let redirect = qr.read_event().await.unwrap_message();
qr.read_event().await.assert_reload();
let redirect = qr.last_queued_message().await;
assert_eq!(redirect.return_path, "");
assert_eq!(redirect.recipients.len(), 1);
assert_eq!(
@ -356,13 +365,14 @@ async fn sieve_scripts() {
"redirect@somewhere.email"
);
redirect
.read_lines()
.read_lines(&core)
.await
.assert_not_contains("From: no-reply@my.domain")
.assert_contains("To: Suzie Q <suzie@shopping.example.net>")
.assert_contains("Subject: Is dinner ready?")
.assert_contains("Message-ID: <20030712040037.46341.5F8J@football.example.com>")
.assert_contains("From: Joe SixPack <joe@football.example.com>");
qr.assert_empty_queue();
qr.assert_no_events();
// Test pipes
session.data.remote_ip_str = "10.0.0.123".parse().unwrap();
@ -375,11 +385,12 @@ async fn sieve_scripts() {
"250",
)
.await;
qr.read_event()
qr.read_event().await.assert_reload();
qr.last_queued_message()
.await
.read_lines(&core)
.await
.unwrap_message()
.read_lines()
.assert_contains("X-My-Header: true")
.assert_contains("Authentication-Results");
qr.assert_empty_queue();
qr.assert_no_events();
}

View file

@ -192,10 +192,11 @@ async fn sign_and_seal() {
"250",
)
.await;
qr.read_event()
qr.read_event().await.assert_reload();
qr.last_queued_message()
.await
.read_lines(&core)
.await
.unwrap_message()
.read_lines()
.assert_contains(
"DKIM-Signature: v=1; a=rsa-sha256; s=rsa; d=example.com; c=simple/relaxed;",
);
@ -204,10 +205,11 @@ async fn sign_and_seal() {
session
.send_message("bill@foobar.org", &["jdoe@example.com"], "test:arc", "250")
.await;
qr.read_event()
qr.read_event().await.assert_reload();
qr.last_queued_message()
.await
.read_lines(&core)
.await
.unwrap_message()
.read_lines()
.assert_contains("ARC-Seal: i=3; a=ed25519-sha256; s=ed; d=example.com; cv=pass;")
.assert_contains(
"ARC-Message-Signature: i=3; a=ed25519-sha256; s=ed; d=example.com; c=relaxed/simple;",

View file

@ -40,10 +40,7 @@ use crate::smtp::{
};
use smtp::{
core::{management::Message, Session, SMTP},
queue::{
manager::{Queue, SpawnQueue},
QueueId, Status,
},
queue::{manager::SpawnQueue, QueueId, Status},
};
const DIRECTORY: &str = r#"
@ -108,7 +105,7 @@ async fn manage_queue() {
core.queue.config.expire = IfBlock::new(Duration::from_secs(3000));
let local_qr = core.init_test_queue("smtp_manage_queue_local");
let core = Arc::new(core);
local_qr.queue_rx.spawn(core.clone(), Queue::default());
local_qr.queue_rx.spawn(core.clone());
let _rx_manage = start_test_server(core.clone(), &[ServerProtocol::Http]);
// Send test messages
@ -172,11 +169,11 @@ async fn manage_queue() {
// Expect delivery to success@foobar.org
tokio::time::sleep(Duration::from_millis(100)).await;
remote_qr.read_event().await.assert_reload();
assert_eq!(
remote_qr
.read_event()
.last_queued_message()
.await
.unwrap_message()
.recipients
.into_iter()
.map(|r| r.address)
@ -323,11 +320,11 @@ async fn manage_queue() {
// Expect delivery to john@foobar.org
tokio::time::sleep(Duration::from_millis(100)).await;
remote_qr.read_event().await.assert_reload();
assert_eq!(
remote_qr
.read_event()
.last_queued_message()
.await
.unwrap_message()
.recipients
.into_iter()
.map(|r| r.address)

View file

@ -44,10 +44,7 @@ use crate::smtp::{
use smtp::{
config::AggregateFrequency,
core::{management::Report, SMTP},
reporting::{
scheduler::{Scheduler, SpawnReport},
DmarcEvent, TlsEvent,
},
reporting::{scheduler::SpawnReport, DmarcEvent, TlsEvent},
};
const DIRECTORY: &str = r#"
@ -77,8 +74,6 @@ async fn manage_reports() {
let mut core = SMTP::test();
let temp_dir = make_temp_dir("smtp_report_management_test", true);
let config = &mut core.report.config;
config.path = temp_dir.temp_dir.clone();
config.hash = IfBlock::new(16);
config.dmarc_aggregate.max_size = IfBlock::new(1024);
config.tls.max_size = IfBlock::new(1024);
let directory = Config::new(DIRECTORY)
@ -90,7 +85,7 @@ async fn manage_reports() {
let (report_tx, report_rx) = mpsc::channel(1024);
core.report.tx = report_tx;
let core = Arc::new(core);
report_rx.spawn(core.clone(), Scheduler::default());
report_rx.spawn(core.clone());
let _rx_manage = start_test_server(core.clone(), &[ServerProtocol::Http]);
// Send test reporting events

View file

@ -33,7 +33,7 @@ use mail_auth::{
use mail_send::smtp::tls::build_tls_connector;
use sieve::Runtime;
use smtp_proto::{AUTH_LOGIN, AUTH_PLAIN};
use store::{LookupStore, Store};
use store::{backend::sqlite::SqliteStore, LookupStore, Store};
use tokio::sync::mpsc;
use smtp::{
@ -55,7 +55,10 @@ use smtp::{
},
outbound::dane::DnssecResolver,
};
use utils::config::{if_block::IfBlock, utils::ConstantValue, Config};
use utils::{
config::{if_block::IfBlock, utils::ConstantValue, Config},
snowflake::SnowflakeIdGenerator,
};
pub mod config;
pub mod inbound;
@ -201,6 +204,7 @@ impl TestConfig for SMTP {
blocked_ips: Arc::new(Default::default()),
}),
default_lookup_store: LookupStore::Store(store.clone()),
default_blob_store: store::BlobStore::Store(store.clone()),
default_data_store: store,
},
}
@ -300,13 +304,8 @@ impl TestConfig for QueueCore {
ThrottleKeyHasherBuilder::default(),
16,
),
quota: DashMap::with_capacity_and_hasher_and_shard_amount(
10,
ThrottleKeyHasherBuilder::default(),
16,
),
tx: mpsc::channel(1024).0,
id_seq: 0.into(),
snowflake_id: SnowflakeIdGenerator::new(),
connectors: TlsConnectors {
pki_verify: build_tls_connector(false),
dummy_verify: build_tls_connector(true),
@ -318,8 +317,6 @@ impl TestConfig for QueueCore {
impl TestConfig for QueueConfig {
fn test() -> Self {
Self {
path: Default::default(),
hash: IfBlock::new(10),
retry: IfBlock::new(Duration::from_secs(10)),
notify: IfBlock::new(Duration::from_secs(20)),
expire: IfBlock::new(Duration::from_secs(10)),
@ -404,8 +401,6 @@ impl TestConfig for ReportCore {
impl TestConfig for ReportConfig {
fn test() -> Self {
Self {
path: Default::default(),
hash: IfBlock::new(10),
submitter: IfBlock::new("example.org".to_string()),
analysis: ReportAnalysis {
addresses: vec![],
@ -502,6 +497,7 @@ pub fn add_test_certs(config: &str) -> String {
pub struct QueueReceiver {
_temp_dir: TempDir,
store: Store,
pub queue_rx: mpsc::Receiver<smtp::queue::Event>,
}
@ -514,17 +510,27 @@ pub trait TestSMTP {
fn init_test_report(&mut self) -> ReportReceiver;
}
const QUEUE_STORE_CONFIG: &str = r#"[store."sqlite"]
type = "sqlite"
path = "{TMP}/queue.db"
"#;
impl TestSMTP for SMTP {
fn init_test_queue(&mut self, test_name: &str) -> QueueReceiver {
let _temp_dir = make_temp_dir(test_name, true);
self.queue.config.path = _temp_dir.temp_dir.clone();
let config =
Config::new(&QUEUE_STORE_CONFIG.replace("{TMP}", _temp_dir.temp_dir.to_str().unwrap()))
.unwrap();
let store = Store::SQLite(SqliteStore::open(&config, "store.sqlite").unwrap().into());
self.shared.default_data_store = store.clone();
self.shared.default_blob_store = store.clone().into();
let (queue_tx, queue_rx) = mpsc::channel(128);
self.queue.tx = queue_tx;
QueueReceiver {
_temp_dir,
store,
queue_rx,
_temp_dir,
}
}

View file

@ -114,17 +114,20 @@ async fn dane_verify() {
session
.send_message("john@test.org", &["bill@foobar.org"], "test:no_dkim", "250")
.await;
DeliveryAttempt::from(local_qr.read_event().await.unwrap_message())
.try_deliver(core.clone(), &mut queue)
local_qr
.expect_message_then_deliver()
.await
.try_deliver(core.clone())
.await;
local_qr
.read_event()
.last_queued_message()
.await
.read_lines(&core)
.await
.unwrap_message()
.read_lines()
.assert_contains("<bill@foobar.org> (DANE failed to authenticate")
.assert_contains("No TLSA records found");
local_qr.read_event().await.unwrap_done();
local_qr.read_event().await.assert_reload();
local_qr.assert_queue_is_empty().await;
// Expect TLS failure report
let report = rr.read_report().await.unwrap_tls();
@ -162,17 +165,20 @@ async fn dane_verify() {
session
.send_message("john@test.org", &["bill@foobar.org"], "test:no_dkim", "250")
.await;
DeliveryAttempt::from(local_qr.read_event().await.unwrap_message())
.try_deliver(core.clone(), &mut queue)
local_qr
.expect_message_then_deliver()
.await
.try_deliver(core.clone())
.await;
local_qr
.read_event()
.expect_message()
.await
.read_lines(&core)
.await
.unwrap_message()
.read_lines()
.assert_contains("<bill@foobar.org> (DANE failed to authenticate")
.assert_contains("No matching certificates found");
local_qr.read_event().await.unwrap_done();
local_qr.read_event().await.assert_reload();
local_qr.assert_queue_is_empty().await;
// Expect TLS failure report
let report = rr.read_report().await.unwrap_tls();
@ -181,7 +187,7 @@ async fn dane_verify() {
report.failure.as_ref().unwrap().result_type,
ResultType::ValidationFailure
);
remote_qr.assert_empty_queue();
remote_qr.assert_no_events();
// DANE successful delivery
let tlsa = Arc::new(Tlsa {
@ -205,15 +211,18 @@ async fn dane_verify() {
session
.send_message("john@test.org", &["bill@foobar.org"], "test:no_dkim", "250")
.await;
DeliveryAttempt::from(local_qr.read_event().await.unwrap_message())
.try_deliver(core.clone(), &mut queue)
.await;
local_qr.read_event().await.unwrap_done();
remote_qr
.read_event()
local_qr
.expect_message_then_deliver()
.await
.try_deliver(core.clone())
.await;
local_qr.read_event().await.assert_reload();
local_qr.assert_queue_is_empty().await;
remote_qr
.last_queued_message()
.await
.read_lines(&core)
.await
.unwrap_message()
.read_lines()
.assert_contains("using TLSv1.3 with cipher");
// Expect TLS success report

View file

@ -94,15 +94,15 @@ async fn extensions() {
"250",
)
.await;
DeliveryAttempt::from(local_qr.read_event().await.unwrap_message())
.try_deliver(core.clone(), &mut queue)
local_qr.expect_message_then_deliver().await
.try_deliver(core.clone())
.await;
local_qr
.read_event()
.await
.unwrap_message()
.read_lines()
.read_lines(&core).await
.assert_contains("<bill@foobar.org> (delivered to")
.assert_contains("Final-Recipient: rfc822;bill@foobar.org")
.assert_contains("Action: delivered");
@ -111,21 +111,21 @@ async fn extensions() {
.read_event()
.await
.unwrap_message()
.read_lines()
.read_lines(&core).await
.assert_contains("using TLSv1.3 with cipher");
// Test SIZE extension
session
.send_message("john@test.org", &["bill@foobar.org"], "test:arc", "250")
.await;
DeliveryAttempt::from(local_qr.read_event().await.unwrap_message())
.try_deliver(core.clone(), &mut queue)
local_qr.expect_message_then_deliver().await
.try_deliver(core.clone())
.await;
local_qr
.read_event()
.await
.unwrap_message()
.read_lines()
.read_lines(&core).await
.assert_contains("<bill@foobar.org> (host 'mx.foobar.org' rejected command 'MAIL FROM:")
.assert_contains("Action: failed")
.assert_contains("Diagnostic-Code: smtp;552")
@ -142,11 +142,11 @@ async fn extensions() {
"250",
)
.await;
DeliveryAttempt::from(local_qr.read_event().await.unwrap_message())
.try_deliver(core.clone(), &mut queue)
local_qr.expect_message_then_deliver().await
.try_deliver(core.clone())
.await;
local_qr.read_event().await.unwrap_done();
let message = remote_qr.read_event().await.unwrap_message();
let message = remote_qr.expect_message().await();
assert_eq!(message.env_id, Some("abc123".to_string()));
assert!((message.flags & MAIL_RET_HDRS) != 0);
assert!((message.flags & MAIL_REQUIRETLS) != 0);

View file

@ -93,12 +93,12 @@ async fn ip_lookup_strategy() {
session
.send_message("john@test.org", &["bill@foobar.org"], "test:no_dkim", "250")
.await;
DeliveryAttempt::from(local_qr.read_event().await.unwrap_message())
.try_deliver(core.clone(), &mut queue)
local_qr.expect_message_then_deliver().await
.try_deliver(core.clone())
.await;
if matches!(strategy, IpLookupStrategy::Ipv6thenIpv4) {
local_qr.read_event().await.unwrap_done();
remote_qr.read_event().await.unwrap_message();
remote_qr.expect_message().await();
} else {
let status = local_qr.read_event().await.unwrap_retry().inner.domains[0]
.status

View file

@ -35,7 +35,7 @@ use crate::smtp::{
use smtp::{
config::shared::ConfigShared,
core::{Session, SMTP},
queue::{manager::Queue, DeliveryAttempt, Event, WorkerResult},
queue::{manager::Queue, DeliveryAttempt, Event},
};
use utils::config::{if_block::IfBlock, Config, ServerProtocol};
@ -119,8 +119,10 @@ async fn lmtp_delivery() {
"250",
)
.await;
DeliveryAttempt::from(local_qr.read_event().await.unwrap_message())
.try_deliver(core.clone(), &mut queue)
local_qr
.expect_message_then_deliver()
.await
.try_deliver(core.clone())
.await;
let mut dsn = Vec::new();
loop {
@ -144,7 +146,7 @@ async fn lmtp_delivery() {
if !queue.scheduled.is_empty() {
tokio::time::sleep(queue.wake_up_time()).await;
DeliveryAttempt::from(queue.next_due().unwrap())
.try_deliver(core.clone(), &mut queue)
.try_deliver(core.clone())
.await;
}
}
@ -155,7 +157,8 @@ async fn lmtp_delivery() {
dsn.next()
.unwrap()
.read_lines()
.read_lines(&core)
.await
.assert_contains("<bill@foobar.org> (delivered to")
.assert_contains("<jane@foobar.org> (delivered to")
.assert_contains("<john@foobar.org> (delivered to")
@ -164,19 +167,22 @@ async fn lmtp_delivery() {
dsn.next()
.unwrap()
.read_lines()
.read_lines(&core)
.await
.assert_contains("<delay@foobar.org> (host 'lmtp.foobar.org' rejected")
.assert_contains("Action: delayed");
dsn.next()
.unwrap()
.read_lines()
.read_lines(&core)
.await
.assert_contains("<delay@foobar.org> (host 'lmtp.foobar.org' rejected")
.assert_contains("Action: delayed");
dsn.next()
.unwrap()
.read_lines()
.read_lines(&core)
.await
.assert_contains("<delay@foobar.org> (host 'lmtp.foobar.org' rejected")
.assert_contains("Action: failed");

View file

@ -101,14 +101,17 @@ async fn mta_sts_verify() {
session
.send_message("john@test.org", &["bill@foobar.org"], "test:no_dkim", "250")
.await;
DeliveryAttempt::from(local_qr.read_event().await.unwrap_message())
.try_deliver(core.clone(), &mut queue)
local_qr
.expect_message_then_deliver()
.await
.try_deliver(core.clone())
.await;
local_qr
.read_event()
.await
.unwrap_message()
.read_lines()
.read_lines(&core)
.await
.assert_contains("<bill@foobar.org> (MTA-STS failed to authenticate")
.assert_contains("Record not found");
local_qr.read_event().await.unwrap_done();
@ -135,14 +138,17 @@ async fn mta_sts_verify() {
session
.send_message("john@test.org", &["bill@foobar.org"], "test:no_dkim", "250")
.await;
DeliveryAttempt::from(local_qr.read_event().await.unwrap_message())
.try_deliver(core.clone(), &mut queue)
local_qr
.expect_message_then_deliver()
.await
.try_deliver(core.clone())
.await;
local_qr
.read_event()
.await
.unwrap_message()
.read_lines()
.read_lines(&core)
.await
.assert_contains("<bill@foobar.org> (MTA-STS failed to authenticate")
.assert_contains("No 'mx' entries found");
local_qr.read_event().await.unwrap_done();
@ -166,14 +172,17 @@ async fn mta_sts_verify() {
session
.send_message("john@test.org", &["bill@foobar.org"], "test:no_dkim", "250")
.await;
DeliveryAttempt::from(local_qr.read_event().await.unwrap_message())
.try_deliver(core.clone(), &mut queue)
local_qr
.expect_message_then_deliver()
.await
.try_deliver(core.clone())
.await;
local_qr
.read_event()
.await
.unwrap_message()
.read_lines()
.read_lines(&core)
.await
.assert_contains("<bill@foobar.org> (MTA-STS failed to authenticate")
.assert_contains("not authorized by policy");
local_qr.read_event().await.unwrap_done();
@ -213,15 +222,18 @@ async fn mta_sts_verify() {
session
.send_message("john@test.org", &["bill@foobar.org"], "test:no_dkim", "250")
.await;
DeliveryAttempt::from(local_qr.read_event().await.unwrap_message())
.try_deliver(core.clone(), &mut queue)
local_qr
.expect_message_then_deliver()
.await
.try_deliver(core.clone())
.await;
local_qr.read_event().await.unwrap_done();
remote_qr
.read_event()
.await
.unwrap_message()
.read_lines()
.read_lines(&core)
.await
.assert_contains("using TLSv1.3 with cipher");
// Expect TLS success report

View file

@ -37,7 +37,7 @@ use crate::smtp::{
};
use smtp::{
core::{Session, SMTP},
queue::{manager::Queue, DeliveryAttempt, Event, WorkerResult},
queue::{manager::Queue, DeliveryAttempt, Event},
};
const SMUGGLER: &str = r#"From: Joe SixPack <john@foobar.net>
@ -139,11 +139,11 @@ async fn smtp_delivery() {
"250",
)
.await;
let message = local_qr.read_event().await.unwrap_message();
let message = local_qr.expect_message().await();
let num_domains = message.domains.len();
assert_eq!(num_domains, 3);
DeliveryAttempt::from(message)
.try_deliver(core.clone(), &mut queue)
.try_deliver(core.clone())
.await;
let mut dsn = Vec::new();
let mut domain_retries = vec![0; num_domains];
@ -171,7 +171,7 @@ async fn smtp_delivery() {
if !queue.scheduled.is_empty() {
tokio::time::sleep(queue.wake_up_time()).await;
DeliveryAttempt::from(queue.next_due().unwrap())
.try_deliver(core.clone(), &mut queue)
.try_deliver(core.clone())
.await;
}
}
@ -190,7 +190,7 @@ async fn smtp_delivery() {
dsn.next()
.unwrap()
.read_lines()
.read_lines(&core).await
.assert_contains("<ok@foobar.net> (delivered to")
.assert_contains("<ok@foobar.org> (delivered to")
.assert_contains("<invalid@domain.org> (failed to lookup")
@ -199,25 +199,25 @@ async fn smtp_delivery() {
dsn.next()
.unwrap()
.read_lines()
.read_lines(&core).await
.assert_contains("<delay@foobar.net> (host ")
.assert_contains("<delay@foobar.org> (host ")
.assert_contains("Action: delayed");
dsn.next()
.unwrap()
.read_lines()
.read_lines(&core).await
.assert_contains("<delay@foobar.org> (host ")
.assert_contains("Action: delayed");
dsn.next()
.unwrap()
.read_lines()
.read_lines(&core).await
.assert_contains("<delay@foobar.org> (host ");
dsn.next()
.unwrap()
.read_lines()
.read_lines(&core).await
.assert_contains("<delay@foobar.net> (host ")
.assert_contains("Action: failed");
@ -261,8 +261,8 @@ async fn smtp_delivery() {
session
.send_message("john@doe.org", &["bill@foobar.com"], &message, "250")
.await;
DeliveryAttempt::from(local_qr.read_event().await.unwrap_message())
.try_deliver(core.clone(), &mut queue)
local_qr.expect_message_then_deliver().await
.try_deliver(core.clone())
.await;
let event = local_qr.read_event().await;
@ -272,7 +272,7 @@ async fn smtp_delivery() {
event
);
let message = remote_qr.read_event().await.unwrap_message().read_message();
let message = remote_qr.expect_message().await().read_message();
assert!(
message.contains("This is a smuggled message"),

View file

@ -118,8 +118,10 @@ async fn throttle_outbound() {
assert!(!in_flight.is_empty());
// Expect concurrency throttle for sender domain 'foobar.org'
DeliveryAttempt::from(local_qr.read_event().await.unwrap_message())
.try_deliver(core.clone(), &mut queue)
local_qr
.expect_message_then_deliver()
.await
.try_deliver(core.clone())
.await;
local_qr.assert_empty_queue();
in_flight.clear();
@ -142,8 +144,10 @@ async fn throttle_outbound() {
session
.send_message("john@foobar.net", &["bill@test.org"], "test:no_dkim", "250")
.await;
DeliveryAttempt::from(local_qr.read_event().await.unwrap_message())
.try_deliver(core.clone(), &mut queue)
local_qr
.expect_message_then_deliver()
.await
.try_deliver(core.clone())
.await;
local_qr.assert_empty_queue();
assert!([1799, 1800].contains(
@ -177,8 +181,10 @@ async fn throttle_outbound() {
"250",
)
.await;
DeliveryAttempt::from(local_qr.read_event().await.unwrap_message())
.try_deliver(core.clone(), &mut queue)
local_qr
.expect_message_then_deliver()
.await
.try_deliver(core.clone())
.await;
local_qr.read_event().await.unwrap_on_hold();
in_flight.clear();
@ -203,8 +209,10 @@ async fn throttle_outbound() {
"250",
)
.await;
DeliveryAttempt::from(local_qr.read_event().await.unwrap_message())
.try_deliver(core.clone(), &mut queue)
local_qr
.expect_message_then_deliver()
.await
.try_deliver(core.clone())
.await;
assert!([2399, 2400].contains(
&local_qr
@ -244,8 +252,10 @@ async fn throttle_outbound() {
session
.send_message("john@test.net", &["jane@test.org"], "test:no_dkim", "250")
.await;
DeliveryAttempt::from(local_qr.read_event().await.unwrap_message())
.try_deliver(core.clone(), &mut queue)
local_qr
.expect_message_then_deliver()
.await
.try_deliver(core.clone())
.await;
local_qr.read_event().await.unwrap_on_hold();
in_flight.clear();
@ -278,8 +288,10 @@ async fn throttle_outbound() {
session
.send_message("john@test.net", &["jane@test.net"], "test:no_dkim", "250")
.await;
DeliveryAttempt::from(local_qr.read_event().await.unwrap_message())
.try_deliver(core.clone(), &mut queue)
local_qr
.expect_message_then_deliver()
.await
.try_deliver(core.clone())
.await;
assert!([2999, 3000].contains(
&local_qr

View file

@ -88,20 +88,23 @@ async fn starttls_optional() {
session
.send_message("john@test.org", &["bill@foobar.org"], "test:no_dkim", "250")
.await;
DeliveryAttempt::from(local_qr.read_event().await.unwrap_message())
.try_deliver(core.clone(), &mut queue)
local_qr
.expect_message_then_deliver()
.await
.try_deliver(core.clone())
.await;
let mut retry = local_qr.read_event().await.unwrap_retry();
assert!(retry.inner.domains[0].disable_tls);
retry.inner.domains[0].retry.due = Instant::now();
DeliveryAttempt::from(retry.inner)
.try_deliver(core.clone(), &mut queue)
.try_deliver(core.clone())
.await;
local_qr.read_event().await.unwrap_done();
remote_qr
.read_event()
.await
.unwrap_message()
.read_lines()
.read_lines(&core)
.await
.assert_not_contains("using TLSv1.3 with cipher");
}

View file

@ -121,7 +121,7 @@ async fn generate_dsn() {
// Failure DSN
attempt.message.recipients[0].flags = flags;
core.send_dsn(&mut attempt).await;
compare_dsn(qr.read_event().await.unwrap_message(), "failure.eml").await;
compare_dsn(qr.expect_message().await(), "failure.eml").await;
// Success DSN
attempt.message.recipients.push(Recipient {
@ -140,7 +140,7 @@ async fn generate_dsn() {
orcpt: None,
});
core.send_dsn(&mut attempt).await;
compare_dsn(qr.read_event().await.unwrap_message(), "success.eml").await;
compare_dsn(qr.expect_message().await(), "success.eml").await;
// Delay DSN
attempt.message.recipients.push(Recipient {
@ -152,7 +152,7 @@ async fn generate_dsn() {
orcpt: "jdoe@example.org".to_string().into(),
});
core.send_dsn(&mut attempt).await;
compare_dsn(qr.read_event().await.unwrap_message(), "delay.eml").await;
compare_dsn(qr.expect_message().await(), "delay.eml").await;
// Mixed DSN
for rcpt in &mut attempt.message.recipients {
@ -160,7 +160,7 @@ async fn generate_dsn() {
}
attempt.message.domains[0].notify.due = Instant::now();
core.send_dsn(&mut attempt).await;
compare_dsn(qr.read_event().await.unwrap_message(), "mixed.eml").await;
compare_dsn(qr.expect_message().await(), "mixed.eml").await;
// Load queue
let queue = core.queue.read_queue().await;

View file

@ -33,7 +33,7 @@ use crate::smtp::{
};
use smtp::{
core::{Session, SMTP},
queue::{manager::Queue, DeliveryAttempt, Event, WorkerResult},
queue::{manager::Queue, DeliveryAttempt, Event},
};
use utils::config::if_block::IfBlock;
@ -75,17 +75,17 @@ async fn queue_retry() {
session
.send_message("john@test.org", &["bill@foobar.org"], "test:no_dkim", "250")
.await;
let attempt = DeliveryAttempt::from(qr.read_event().await.unwrap_message());
let attempt = DeliveryAttempt::from(qr.expect_message().await());
// Expect a failed DSN
let path = attempt.message.path.clone();
attempt.try_deliver(core.clone(), &mut queue).await;
let message = qr.read_event().await.unwrap_message();
attempt.try_deliver(core.clone()).await;
let message = qr.expect_message().await();
assert_eq!(message.return_path, "");
assert_eq!(message.domains.first().unwrap().domain, "test.org");
assert_eq!(message.recipients.first().unwrap().address, "john@test.org");
message
.read_lines()
.read_lines(&core).await
.assert_contains("Content-Type: multipart/report")
.assert_contains("Final-Recipient: rfc822;bill@foobar.org")
.assert_contains("Action: failed");
@ -102,11 +102,11 @@ async fn queue_retry() {
"250",
)
.await;
let attempt = DeliveryAttempt::from(qr.read_event().await.unwrap_message());
let attempt = DeliveryAttempt::from(qr.expect_message().await());
let path = attempt.message.path.clone();
let mut dsn = Vec::new();
let mut num_retries = 0;
attempt.try_deliver(core.clone(), &mut queue).await;
attempt.try_deliver(core.clone()).await;
loop {
match qr.try_read_event().await {
Some(Event::Queue(message)) => {
@ -127,7 +127,7 @@ async fn queue_retry() {
if !queue.scheduled.is_empty() {
tokio::time::sleep(queue.wake_up_time()).await;
DeliveryAttempt::from(queue.next_due().unwrap())
.try_deliver(core.clone(), &mut queue)
.try_deliver(core.clone())
.await;
}
}
@ -139,28 +139,28 @@ async fn queue_retry() {
dsn.next()
.unwrap()
.read_lines()
.read_lines(&core).await
.assert_contains("<bill@foobar.org> (failed to lookup 'foobar.org'")
.assert_contains("Final-Recipient: rfc822;bill@foobar.org")
.assert_contains("Action: failed");
dsn.next()
.unwrap()
.read_lines()
.read_lines(&core).await
.assert_contains("<jane@_dns_error.org> (failed to lookup '_dns_error.org'")
.assert_contains("Final-Recipient: rfc822;jane@_dns_error.org")
.assert_contains("Action: delayed");
dsn.next()
.unwrap()
.read_lines()
.read_lines(&core).await
.assert_contains("<jane@_dns_error.org> (failed to lookup '_dns_error.org'")
.assert_contains("Final-Recipient: rfc822;jane@_dns_error.org")
.assert_contains("Action: delayed");
dsn.next()
.unwrap()
.read_lines()
.read_lines(&core).await
.assert_contains("<jane@_dns_error.org> (failed to lookup '_dns_error.org'")
.assert_contains("Final-Recipient: rfc822;jane@_dns_error.org")
.assert_contains("Action: failed");

View file

@ -110,7 +110,7 @@ async fn queue_serialize() {
)
.await
);
let mut message = qr.read_event().await.unwrap_message();
let mut message = qr.expect_message().await();
// Deserialize
assert_msg_eq(

View file

@ -96,5 +96,6 @@ async fn report_analyze() {
session
.send_message("john@test.org", &["bill@foobar.org"], "test:no_dkim", "250")
.await;
qr.read_event().await.unwrap_message();
qr.read_event().await.assert_reload();
qr.last_queued_message().await;
}

View file

@ -43,11 +43,7 @@ use crate::smtp::{
use smtp::{
config::{AggregateFrequency, ConfigContext},
core::SMTP,
reporting::{
dmarc::GenerateDmarcReport,
scheduler::{ReportType, Scheduler},
DmarcEvent,
},
reporting::DmarcEvent,
};
#[tokio::test]
@ -64,15 +60,12 @@ async fn report_dmarc() {
core.shared.signers = ConfigContext::new(&[]).parse_signatures().signers;
let temp_dir = make_temp_dir("smtp_report_dmarc_test", true);
let config = &mut core.report.config;
config.path = temp_dir.temp_dir.clone();
config.hash = IfBlock::new(16);
config.dmarc_aggregate.sign = "\"['rsa']\"".parse_if();
config.dmarc_aggregate.max_size = IfBlock::new(4096);
config.submitter = IfBlock::new("mx.example.org".to_string());
config.dmarc_aggregate.address = IfBlock::new("reports@example.org".to_string());
config.dmarc_aggregate.org_name = IfBlock::new("Foobar, Inc.".to_string());
config.dmarc_aggregate.contact_info = IfBlock::new("https://foobar.org/contact".to_string());
let mut scheduler = Scheduler::default();
// Authorize external report for foobar.org
core.resolvers.dns.txt_add(
@ -94,40 +87,32 @@ async fn report_dmarc() {
);
assert_eq!(dmarc_record.rua().len(), 2);
for _ in 0..2 {
scheduler
.schedule_dmarc(
Box::new(DmarcEvent {
domain: "foobar.org".to_string(),
report_record: Record::new()
.with_source_ip("192.168.1.2".parse().unwrap())
.with_action_disposition(ActionDisposition::Pass)
.with_dmarc_dkim_result(DmarcResult::Pass)
.with_dmarc_spf_result(DmarcResult::Fail)
.with_envelope_from("hello@example.org")
.with_envelope_to("other@example.org")
.with_header_from("bye@example.org"),
dmarc_record: dmarc_record.clone(),
interval: AggregateFrequency::Weekly,
}),
&core,
)
.await;
}
scheduler
.schedule_dmarc(
Box::new(DmarcEvent {
domain: "foobar.org".to_string(),
report_record: Record::new()
.with_source_ip("a:b:c::e:f".parse().unwrap())
.with_action_disposition(ActionDisposition::Reject)
.with_dmarc_dkim_result(DmarcResult::Fail)
.with_dmarc_spf_result(DmarcResult::Pass),
dmarc_record: dmarc_record.clone(),
interval: AggregateFrequency::Weekly,
}),
&core,
)
core.schedule_dmarc(Box::new(DmarcEvent {
domain: "foobar.org".to_string(),
report_record: Record::new()
.with_source_ip("192.168.1.2".parse().unwrap())
.with_action_disposition(ActionDisposition::Pass)
.with_dmarc_dkim_result(DmarcResult::Pass)
.with_dmarc_spf_result(DmarcResult::Fail)
.with_envelope_from("hello@example.org")
.with_envelope_to("other@example.org")
.with_header_from("bye@example.org"),
dmarc_record: dmarc_record.clone(),
interval: AggregateFrequency::Weekly,
}))
.await;
}
core.schedule_dmarc(Box::new(DmarcEvent {
domain: "foobar.org".to_string(),
report_record: Record::new()
.with_source_ip("a:b:c::e:f".parse().unwrap())
.with_action_disposition(ActionDisposition::Reject)
.with_dmarc_dkim_result(DmarcResult::Fail)
.with_dmarc_spf_result(DmarcResult::Pass),
dmarc_record: dmarc_record.clone(),
interval: AggregateFrequency::Weekly,
}))
.await;
assert_eq!(scheduler.reports.len(), 1);
tokio::time::sleep(Duration::from_millis(200)).await;
let report_path;
@ -140,7 +125,7 @@ async fn report_dmarc() {
}
// Expect report
let message = qr.read_event().await.unwrap_message();
let message = qr.expect_message().await();
qr.assert_empty_queue();
assert_eq!(message.recipients.len(), 1);
assert_eq!(
@ -149,7 +134,7 @@ async fn report_dmarc() {
);
assert_eq!(message.return_path, "reports@example.org");
message
.read_lines()
.read_lines(&core).await
.assert_contains("DKIM-Signature: v=1; a=rsa-sha256; s=rsa; d=example.com;")
.assert_contains("To: <reports@foobar.net>")
.assert_contains("Report Domain: foobar.org")

View file

@ -36,11 +36,7 @@ use crate::smtp::{make_temp_dir, TestConfig};
use smtp::{
config::AggregateFrequency,
core::SMTP,
reporting::{
dmarc::DmarcFormat,
scheduler::{ReportType, Scheduler},
DmarcEvent, PolicyType, TlsEvent,
},
reporting::{dmarc::DmarcFormat, DmarcEvent, PolicyType, TlsEvent},
};
#[tokio::test]
@ -56,123 +52,92 @@ async fn report_scheduler() {
let mut core = SMTP::test();
let temp_dir = make_temp_dir("smtp_report_scheduler_test", true);
let config = &mut core.report.config;
config.path = temp_dir.temp_dir.clone();
config.hash = IfBlock::new(16);
config.dmarc_aggregate.max_size = IfBlock::new(500);
config.tls.max_size = IfBlock::new(550);
let mut scheduler = Scheduler::default();
// Schedule two events with a same policy and another one with a different policy
let dmarc_record =
Arc::new(Dmarc::parse(b"v=DMARC1; p=quarantine; rua=mailto:dmarc@foobar.org").unwrap());
scheduler
.schedule_dmarc(
Box::new(DmarcEvent {
domain: "foobar.org".to_string(),
report_record: Record::new()
.with_source_ip("192.168.1.2".parse().unwrap())
.with_action_disposition(ActionDisposition::Pass)
.with_dmarc_dkim_result(DmarcResult::Pass)
.with_dmarc_spf_result(DmarcResult::Fail)
.with_envelope_from("hello@example.org")
.with_envelope_to("other@example.org")
.with_header_from("bye@example.org"),
dmarc_record: dmarc_record.clone(),
interval: AggregateFrequency::Weekly,
}),
&core,
)
.await;
core.schedule_dmarc(Box::new(DmarcEvent {
domain: "foobar.org".to_string(),
report_record: Record::new()
.with_source_ip("192.168.1.2".parse().unwrap())
.with_action_disposition(ActionDisposition::Pass)
.with_dmarc_dkim_result(DmarcResult::Pass)
.with_dmarc_spf_result(DmarcResult::Fail)
.with_envelope_from("hello@example.org")
.with_envelope_to("other@example.org")
.with_header_from("bye@example.org"),
dmarc_record: dmarc_record.clone(),
interval: AggregateFrequency::Weekly,
}))
.await;
// No records should be added once the 550 bytes max size is reached
for _ in 0..10 {
scheduler
.schedule_dmarc(
Box::new(DmarcEvent {
domain: "foobar.org".to_string(),
report_record: Record::new()
.with_source_ip("192.168.1.2".parse().unwrap())
.with_action_disposition(ActionDisposition::Pass)
.with_dmarc_dkim_result(DmarcResult::Pass)
.with_dmarc_spf_result(DmarcResult::Fail)
.with_envelope_from("hello@example.org")
.with_envelope_to("other@example.org")
.with_header_from("bye@example.org"),
dmarc_record: dmarc_record.clone(),
interval: AggregateFrequency::Weekly,
}),
&core,
)
.await;
core.schedule_dmarc(Box::new(DmarcEvent {
domain: "foobar.org".to_string(),
report_record: Record::new()
.with_source_ip("192.168.1.2".parse().unwrap())
.with_action_disposition(ActionDisposition::Pass)
.with_dmarc_dkim_result(DmarcResult::Pass)
.with_dmarc_spf_result(DmarcResult::Fail)
.with_envelope_from("hello@example.org")
.with_envelope_to("other@example.org")
.with_header_from("bye@example.org"),
dmarc_record: dmarc_record.clone(),
interval: AggregateFrequency::Weekly,
}))
.await;
}
let dmarc_record =
Arc::new(Dmarc::parse(b"v=DMARC1; p=reject; rua=mailto:dmarc@foobar.org").unwrap());
scheduler
.schedule_dmarc(
Box::new(DmarcEvent {
domain: "foobar.org".to_string(),
report_record: Record::new()
.with_source_ip("a:b:c::e:f".parse().unwrap())
.with_action_disposition(ActionDisposition::Reject)
.with_dmarc_dkim_result(DmarcResult::Fail)
.with_dmarc_spf_result(DmarcResult::Pass),
dmarc_record: dmarc_record.clone(),
interval: AggregateFrequency::Weekly,
}),
&core,
)
.await;
core.schedule_dmarc(Box::new(DmarcEvent {
domain: "foobar.org".to_string(),
report_record: Record::new()
.with_source_ip("a:b:c::e:f".parse().unwrap())
.with_action_disposition(ActionDisposition::Reject)
.with_dmarc_dkim_result(DmarcResult::Fail)
.with_dmarc_spf_result(DmarcResult::Pass),
dmarc_record: dmarc_record.clone(),
interval: AggregateFrequency::Weekly,
}))
.await;
// Schedule TLS event
let tls_record = Arc::new(TlsRpt::parse(b"v=TLSRPTv1;rua=mailto:reports@foobar.org").unwrap());
scheduler
.schedule_tls(
Box::new(TlsEvent {
domain: "foobar.org".to_string(),
policy: PolicyType::Tlsa(None),
failure: None,
tls_record: tls_record.clone(),
interval: AggregateFrequency::Daily,
}),
&core,
)
.await;
scheduler
.schedule_tls(
Box::new(TlsEvent {
domain: "foobar.org".to_string(),
policy: PolicyType::Tlsa(None),
failure: None,
tls_record: tls_record.clone(),
interval: AggregateFrequency::Daily,
}),
&core,
)
.await;
scheduler
.schedule_tls(
Box::new(TlsEvent {
domain: "foobar.org".to_string(),
policy: PolicyType::Sts(None),
failure: None,
tls_record: tls_record.clone(),
interval: AggregateFrequency::Daily,
}),
&core,
)
.await;
scheduler
.schedule_tls(
Box::new(TlsEvent {
domain: "foobar.org".to_string(),
policy: PolicyType::None,
failure: None,
tls_record: tls_record.clone(),
interval: AggregateFrequency::Daily,
}),
&core,
)
.await;
core.schedule_tls(Box::new(TlsEvent {
domain: "foobar.org".to_string(),
policy: PolicyType::Tlsa(None),
failure: None,
tls_record: tls_record.clone(),
interval: AggregateFrequency::Daily,
}))
.await;
core.schedule_tls(Box::new(TlsEvent {
domain: "foobar.org".to_string(),
policy: PolicyType::Tlsa(None),
failure: None,
tls_record: tls_record.clone(),
interval: AggregateFrequency::Daily,
}))
.await;
core.schedule_tls(Box::new(TlsEvent {
domain: "foobar.org".to_string(),
policy: PolicyType::Sts(None),
failure: None,
tls_record: tls_record.clone(),
interval: AggregateFrequency::Daily,
}))
.await;
core.schedule_tls(Box::new(TlsEvent {
domain: "foobar.org".to_string(),
policy: PolicyType::None,
failure: None,
tls_record: tls_record.clone(),
interval: AggregateFrequency::Daily,
}))
.await;
// Verify sizes and counts
let mut total_tls = 0;

View file

@ -40,11 +40,7 @@ use crate::smtp::{
use smtp::{
config::{AggregateFrequency, ConfigContext},
core::SMTP,
reporting::{
scheduler::{ReportType, Scheduler},
tls::{GenerateTlsReport, TLS_HTTP_REPORT},
TlsEvent,
},
reporting::{tls::TLS_HTTP_REPORT, TlsEvent},
};
#[tokio::test]
@ -142,14 +138,14 @@ async fn report_tls() {
}
// Expect report
let message = qr.read_event().await.unwrap_message();
let message = qr.expect_message().await();
assert_eq!(
message.recipients.last().unwrap().address,
"reports@foobar.org"
);
assert_eq!(message.return_path, "reports@example.org");
message
.read_lines()
.read_lines(&core).await
.assert_contains("DKIM-Signature: v=1; a=rsa-sha256; s=rsa; d=example.com;")
.assert_contains("To: <reports@foobar.org>")
.assert_contains("Report Domain: foobar.org")

View file

@ -25,9 +25,9 @@ use ahash::AHashMap;
use store::{
config::ConfigStore,
write::{blob::BlobQuota, now, BatchBuilder, BlobOp},
BlobClass, BlobHash, BlobStore, Serialize,
BlobClass, BlobStore, Serialize,
};
use utils::config::Config;
use utils::{config::Config, BlobHash};
use crate::store::{TempDir, CONFIG};

View file

@ -21,7 +21,7 @@
* for more details.
*/
use store::{config::ConfigStore, LookupKey, LookupStore, LookupValue};
use store::{config::ConfigStore, LookupStore};
use utils::config::Config;
use crate::store::{TempDir, CONFIG};
@ -33,6 +33,9 @@ pub async fn lookup_tests() {
Config::new(&CONFIG.replace("{TMP}", temp_dir.path.as_path().to_str().unwrap())).unwrap();
let stores = config.parse_stores().await.unwrap();
let todo = "test expiry counter + ratelimit";
let todo = "use lookup ratelimit everywhere";
for (store_id, store) in stores.lookup_stores {
println!("Testing lookup store {}...", store_id);
if let LookupStore::Store(store) = &store {
@ -40,13 +43,7 @@ pub async fn lookup_tests() {
} else {
// Reset redis counter
store
.key_set(
"abc".as_bytes().to_vec(),
LookupValue::Value {
value: "0".as_bytes().to_vec(),
expires: 0,
},
)
.key_set("abc".as_bytes().to_vec(), "0".as_bytes().to_vec(), None)
.await
.unwrap();
}
@ -54,44 +51,26 @@ pub async fn lookup_tests() {
// Test key
let key = "xyz".as_bytes().to_vec();
store
.key_set(
key.clone(),
LookupValue::Value {
value: "world".to_string().into_bytes(),
expires: 0,
},
)
.key_set(key.clone(), "world".to_string().into_bytes(), None)
.await
.unwrap();
store.purge_expired().await.unwrap();
assert!(matches!(store
.key_get::<String>(LookupKey::Key(key.clone()))
.await
.unwrap(), LookupValue::Value { value,.. } if value == "world"));
assert_eq!(
store.key_get::<String>(key.clone()).await.unwrap(),
Some("world".to_string())
);
// Test value expiry
store
.key_set(
key.clone(),
LookupValue::Value {
value: "hello".to_string().into_bytes(),
expires: 1,
},
)
.key_set(key.clone(), "hello".to_string().into_bytes(), 1.into())
.await
.unwrap();
assert!(matches!(store
.key_get::<String>(LookupKey::Key(key.clone()))
.await
.unwrap(), LookupValue::Value { value,.. } if value == "hello"));
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
assert_eq!(
LookupValue::None,
store
.key_get::<String>(LookupKey::Key(key.clone()))
.await
.unwrap()
store.key_get::<String>(key.clone()).await.unwrap(),
Some("hello".to_string())
);
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
assert_eq!(None, store.key_get::<String>(key.clone()).await.unwrap());
store.purge_expired().await.unwrap();
if let LookupStore::Store(store) = &store {
@ -100,27 +79,9 @@ pub async fn lookup_tests() {
// Test counter
let key = "abc".as_bytes().to_vec();
store
.key_set(key.clone(), LookupValue::Counter { num: 1 })
.await
.unwrap();
assert_eq!(
LookupValue::Counter { num: 1 },
store
.key_get::<String>(LookupKey::Counter(key.clone()))
.await
.unwrap()
);
store
.key_set(key.clone(), LookupValue::Counter { num: 2 })
.await
.unwrap();
assert_eq!(
LookupValue::Counter { num: 3 },
store
.key_get::<String>(LookupKey::Counter(key.clone()))
.await
.unwrap()
);
store.counter_incr(key.clone(), 1, None).await.unwrap();
assert_eq!(1, store.counter_get(key.clone()).await.unwrap());
store.counter_incr(key.clone(), 2, None).await.unwrap();
assert_eq!(3, store.counter_get(key.clone()).await.unwrap());
}
}