SMTP server passing tests.

This commit is contained in:
Mauro D 2023-05-17 10:45:43 +00:00
parent 77ced9e7fd
commit e0e8347de1
77 changed files with 5583 additions and 718 deletions

2
.gitignore vendored
View file

@ -1,6 +1,6 @@
/target
/Cargo.lock
.vscode
*.failed
*_failed
stalwart.toml
run.sh

4870
Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

View file

@ -21,7 +21,8 @@ jmap = { path = "crates/jmap" }
jmap_proto = { path = "crates/jmap-proto" }
smtp = { path = "crates/smtp" }
utils = { path = "crates/utils" }
tests = { path = "tests" }
tokio = { version = "1.23", features = ["full"] }
tracing = "0.1"
[workspace]
members = [

View file

@ -22,10 +22,11 @@ use utils::listener::{ServerInstance, SessionData, SessionManager};
use crate::{
auth::oauth::OAuthMetadata,
blob::{DownloadResponse, UploadResponse},
services::state,
JMAP,
};
use super::{session::Session, HtmlResponse, HttpResponse, JsonResponse};
use super::{session::Session, HtmlResponse, HttpResponse, JmapSessionManager, JsonResponse};
impl JMAP {
pub async fn parse_request(
@ -205,7 +206,7 @@ impl JMAP {
}
}
impl SessionManager for super::SessionManager {
impl SessionManager for JmapSessionManager {
fn spawn(&self, session: SessionData<TcpStream>) {
let jmap = self.inner.clone();
@ -242,6 +243,13 @@ impl SessionManager for super::SessionManager {
}
});
}
fn shutdown(&self) {
let jmap = self.inner.clone();
tokio::spawn(async move {
let _ = jmap.state_tx.send(state::Event::Stop).await;
});
}
}
async fn handle_request<T: AsyncRead + AsyncWrite + Unpin + 'static>(

View file

@ -14,13 +14,13 @@ pub mod request;
pub mod session;
#[derive(Clone)]
pub struct SessionManager {
pub struct JmapSessionManager {
pub inner: Arc<JMAP>,
}
impl From<Arc<JMAP>> for SessionManager {
fn from(inner: Arc<JMAP>) -> Self {
SessionManager { inner }
impl JmapSessionManager {
pub fn new(inner: Arc<JMAP>) -> Self {
Self { inner }
}
}

View file

@ -1 +1,59 @@
fn main() {}
use std::time::Duration;
use jmap::{api::JmapSessionManager, JMAP};
use smtp::core::{SmtpAdminSessionManager, SmtpSessionManager, SMTP};
use utils::{
config::{Config, ServerProtocol},
enable_tracing, wait_for_shutdown, UnwrapFailure,
};
#[tokio::main]
async fn main() -> std::io::Result<()> {
let config = Config::init();
let servers = config.parse_servers().failed("Invalid configuration");
// Bind ports and drop privileges
servers.bind(&config);
// Enable tracing
let _tracer = enable_tracing(&config).failed("Failed to enable tracing");
tracing::info!(
"Starting Stalwart mail server v{}...",
env!("CARGO_PKG_VERSION")
);
// Init servers
let smtp = SMTP::init(&config, &servers).await;
let jmap = JMAP::init(&config).await;
// Spawn servers
let shutdown_tx = servers.spawn(|server, shutdown_rx| {
match &server.protocol {
ServerProtocol::Smtp | ServerProtocol::Lmtp => {
server.spawn(SmtpSessionManager::new(smtp.clone()), shutdown_rx)
}
ServerProtocol::Http => {
server.spawn(SmtpAdminSessionManager::new(smtp.clone()), shutdown_rx)
}
ServerProtocol::Jmap => {
server.spawn(JmapSessionManager::new(jmap.clone()), shutdown_rx)
}
ServerProtocol::Imap => unimplemented!("IMAP is not implemented yet"),
};
});
// Wait for shutdown signal
wait_for_shutdown().await;
tracing::info!(
"Shutting down Stalwart mail server v{}...",
env!("CARGO_PKG_VERSION")
);
// Stop services
let _ = shutdown_tx.send(true);
// Wait for services to finish
tokio::time::sleep(Duration::from_secs(1)).await;
Ok(())
}

View file

@ -32,12 +32,6 @@ sha1 = "0.10"
sha2 = "0.10.6"
rayon = "1.5"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-appender = "0.2"
tracing-opentelemetry = "0.18.0"
opentelemetry = { version = "0.18.0", features = ["rt-tokio"] }
opentelemetry-otlp = { version = "0.11.0", features = ["http-proto", "reqwest-client", "reqwest-rustls"] }
opentelemetry-semantic-conventions = { version = "0.10.0" }
parking_lot = "0.12"
regex = "1.7.0"
dashmap = "5.4"

View file

@ -40,6 +40,7 @@ pub trait ConfigCondition {
ctx: &ConfigContext,
available_keys: &[EnvelopeKey],
) -> super::Result<Conditions>;
#[cfg(feature = "test_mode")]
fn parse_conditions(
&self,
ctx: &ConfigContext,
@ -381,14 +382,15 @@ mod tests {
file.push("rules.toml");
let config = Config::parse(&fs::read_to_string(file).unwrap()).unwrap();
let mut context = ConfigContext::default();
let list = Arc::new(Lookup::default());
context.lookup.insert("test-list".to_string(), list.clone());
context.servers.push(Server {
let servers = vec![Server {
id: "smtp".to_string(),
internal_id: 123,
..Default::default()
});
}];
let mut context = ConfigContext::new(&servers);
let list = Arc::new(Lookup::default());
context.lookup.insert("test-list".to_string(), list.clone());
let mut conditions = config.parse_conditions(&context).unwrap();
let expected_rules = AHashMap::from_iter([
(

View file

@ -298,7 +298,7 @@ mod tests {
let config = Config::parse(&fs::read_to_string(file).unwrap()).unwrap();
// Create context and add some conditions
let context = ConfigContext::default();
let context = ConfigContext::new(&[]);
let available_keys = vec![
EnvelopeKey::Recipient,
EnvelopeKey::RecipientDomain,

View file

@ -111,7 +111,7 @@ mod tests {
.replace("{LIST2}", list2.as_path().to_str().unwrap());
let config = Config::parse(&toml).unwrap();
let mut context = ConfigContext::default();
let mut context = ConfigContext::new(&[]);
config.parse_remote_hosts(&mut context).unwrap();
config.parse_lists(&mut context).unwrap();

View file

@ -52,7 +52,7 @@ use regex::Regex;
use sieve::Sieve;
use smtp_proto::MtPriority;
use tokio::sync::mpsc;
use utils::config::{Server, ServerProtocol};
use utils::config::{Rate, Server, ServerProtocol};
use crate::lookup::{self, Lookup, SqlDatabase};
@ -200,12 +200,6 @@ pub const THROTTLE_REMOTE_IP: u16 = 1 << 7;
pub const THROTTLE_LOCAL_IP: u16 = 1 << 8;
pub const THROTTLE_HELO_DOMAIN: u16 = 1 << 9;
#[derive(Debug, Default, PartialEq, Eq, Clone)]
pub struct Rate {
pub requests: u64,
pub period: Duration,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum IpAddrMask {
V4 { addr: Ipv4Addr, mask: u32 },
@ -527,8 +521,8 @@ pub enum VerifyStrategy {
}
#[derive(Default)]
pub struct ConfigContext {
pub servers: Vec<Server>,
pub struct ConfigContext<'x> {
pub servers: &'x [Server],
pub hosts: AHashMap<String, Host>,
pub scripts: AHashMap<String, Arc<Sieve>>,
pub lookup: AHashMap<String, Arc<Lookup>>,
@ -537,4 +531,13 @@ pub struct ConfigContext {
pub sealers: AHashMap<String, Arc<ArcSealer>>,
}
impl<'x> ConfigContext<'x> {
pub fn new(servers: &'x [Server]) -> Self {
Self {
servers,
..Default::default()
}
}
}
pub type Result<T> = std::result::Result<T, String>;

View file

@ -23,7 +23,7 @@
use super::{condition::ConfigCondition, *};
use utils::config::{
utils::{AsKey, ParseKey, ParseValue},
utils::{AsKey, ParseValue},
Config,
};
@ -119,36 +119,6 @@ impl ConfigThrottle for Config {
}
}
impl ParseValue for Rate {
fn parse_value(key: impl AsKey, value: &str) -> super::Result<Self> {
if let Some((requests, period)) = value.split_once('/') {
Ok(Rate {
requests: requests
.trim()
.parse::<u64>()
.ok()
.and_then(|r| if r > 0 { Some(r) } else { None })
.ok_or_else(|| {
format!(
"Invalid rate value {:?} for property {:?}.",
value,
key.as_key()
)
})?,
period: period.parse_key(key)?,
})
} else if ["false", "none", "unlimited"].contains(&value) {
Ok(Rate::default())
} else {
Err(format!(
"Invalid rate value {:?} for property {:?}.",
value,
key.as_key()
))
}
}
}
impl ParseValue for EnvelopeKey {
fn parse_value(key: impl AsKey, value: &str) -> super::Result<Self> {
Ok(match value {
@ -228,7 +198,7 @@ mod tests {
];
let config = Config::parse(&fs::read_to_string(file).unwrap()).unwrap();
let context = ConfigContext::default();
let context = ConfigContext::new(&[]);
let throttle = config
.parse_throttle("throttle", &context, &available_keys, u16::MAX)
.unwrap();

View file

@ -245,17 +245,19 @@ mod tests {
file.push("rules-eval.toml");
let config = Config::parse(&fs::read_to_string(file).unwrap()).unwrap();
let mut context = ConfigContext::default();
context.servers.push(Server {
id: "smtp".to_string(),
internal_id: 123,
..Default::default()
});
context.servers.push(Server {
id: "smtps".to_string(),
internal_id: 456,
..Default::default()
});
let servers = vec![
Server {
id: "smtp".to_string(),
internal_id: 123,
..Default::default()
},
Server {
id: "smtps".to_string(),
internal_id: 456,
..Default::default()
},
];
let mut context = ConfigContext::new(&servers);
config.parse_lists(&mut context).unwrap();
let conditions = config.parse_conditions(&context).unwrap();

View file

@ -50,7 +50,7 @@ use crate::{
},
};
use super::{Core, HttpAdminSessionManager};
use super::{SmtpAdminSessionManager, SMTP};
#[derive(Debug)]
pub enum QueueRequest {
@ -155,7 +155,7 @@ pub struct Report {
pub size: usize,
}
impl SessionManager for HttpAdminSessionManager {
impl SessionManager for SmtpAdminSessionManager {
fn spawn(&self, session: utils::listener::SessionData<tokio::net::TcpStream>) {
let core = self.inner.clone();
tokio::spawn(async move {
@ -179,11 +179,15 @@ impl SessionManager for HttpAdminSessionManager {
}
});
}
fn shutdown(&self) {
// No-op
}
}
async fn handle_request(
stream: impl AsyncRead + AsyncWrite + Unpin + 'static,
core: Arc<Core>,
core: Arc<SMTP>,
remote_addr: IpAddr,
_in_flight: InFlight,
) {
@ -223,7 +227,7 @@ async fn handle_request(
}
}
impl Core {
impl SMTP {
async fn parse_request(
&self,
req: &hyper::Request<hyper::body::Incoming>,

View file

@ -70,27 +70,27 @@ pub mod worker;
#[derive(Clone)]
pub struct SmtpSessionManager {
pub inner: Arc<Core>,
pub inner: Arc<SMTP>,
}
#[derive(Clone)]
pub struct HttpAdminSessionManager {
pub inner: Arc<Core>,
pub struct SmtpAdminSessionManager {
pub inner: Arc<SMTP>,
}
impl SmtpSessionManager {
pub fn new(inner: Arc<Core>) -> Self {
pub fn new(inner: Arc<SMTP>) -> Self {
Self { inner }
}
}
impl HttpAdminSessionManager {
pub fn new(inner: Arc<Core>) -> Self {
impl SmtpAdminSessionManager {
pub fn new(inner: Arc<SMTP>) -> Self {
Self { inner }
}
}
pub struct Core {
pub struct SMTP {
pub worker_pool: rayon::ThreadPool,
pub session: SessionCore,
pub queue: QueueCore,
@ -163,7 +163,7 @@ pub enum State {
pub struct Session<T: AsyncWrite + AsyncRead> {
pub state: State,
pub instance: Arc<ServerInstance>,
pub core: Arc<Core>,
pub core: Arc<SMTP>,
pub span: Span,
pub stream: T,
pub data: SessionData,

View file

@ -43,7 +43,7 @@ use crate::{
queue::{DomainPart, InstantFromTimestamp, Message},
};
use super::{Core, Session};
use super::{Session, SMTP};
pub enum ScriptResult {
Accept,
@ -113,7 +113,7 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
}
}
impl Core {
impl SMTP {
fn run_script_blocking(
&self,
script: Arc<Sieve>,

View file

@ -24,6 +24,7 @@
use ::utils::listener::limiter::{ConcurrencyLimiter, RateLimiter};
use dashmap::mapref::entry::Entry;
use tokio::io::{AsyncRead, AsyncWrite};
use utils::config::Rate;
use std::{
hash::{BuildHasher, Hash, Hasher},

View file

@ -25,9 +25,9 @@ use std::sync::{atomic::Ordering, Arc};
use tokio::sync::oneshot;
use super::Core;
use super::SMTP;
impl Core {
impl SMTP {
pub async fn spawn_worker<U, V>(&self, f: U) -> Option<V>
where
U: FnOnce() -> V + Send + 'static,
@ -73,7 +73,7 @@ pub trait SpawnCleanup {
fn spawn_cleanup(&self);
}
impl SpawnCleanup for Arc<Core> {
impl SpawnCleanup for Arc<SMTP> {
fn spawn_cleanup(&self) {
let core = self.clone();
self.worker_pool.spawn(move || {

View file

@ -30,8 +30,11 @@ use tokio::{
use tokio_rustls::server::TlsStream;
use utils::listener::SessionManager;
use crate::core::{
scripts::ScriptResult, Session, SessionData, SessionParameters, SmtpSessionManager, State,
use crate::{
core::{
scripts::ScriptResult, Session, SessionData, SessionParameters, SmtpSessionManager, State,
},
queue, reporting,
};
use super::IsTls;
@ -65,6 +68,15 @@ impl SessionManager for SmtpSessionManager {
}
});
}
fn shutdown(&self) {
// We spawn to avoid using async_trait
let core = self.inner.clone();
tokio::spawn(async move {
let _ = core.queue.tx.send(queue::Event::Stop).await;
let _ = core.report.tx.send(reporting::Event::Stop).await;
});
}
}
impl Session<TcpStream> {

View file

@ -21,6 +21,26 @@
* for more details.
*/
use crate::core::{
throttle::ThrottleKeyHasherBuilder, QueueCore, ReportCore, SessionCore, TlsConnectors, SMTP,
};
use std::sync::Arc;
use config::{
auth::ConfigAuth, database::ConfigDatabase, list::ConfigList, queue::ConfigQueue,
remote::ConfigHost, report::ConfigReport, resolver::ConfigResolver, scripts::ConfigSieve,
session::ConfigSession, ConfigContext,
};
use dashmap::DashMap;
use mail_send::smtp::tls::build_tls_connector;
use queue::manager::SpawnQueue;
use reporting::scheduler::SpawnReport;
use tokio::sync::mpsc;
use utils::{
config::{Config, Servers},
UnwrapFailure,
};
pub mod config;
pub mod core;
pub mod inbound;
@ -31,35 +51,122 @@ pub mod reporting;
pub static USER_AGENT: &str = concat!("StalwartSMTP/", env!("CARGO_PKG_VERSION"),);
pub trait UnwrapFailure<T> {
fn failed(self, action: &str) -> T;
}
impl SMTP {
pub async fn init(config: &Config, servers: &Servers) -> Arc<Self> {
// Read configuration parameters
let mut config_ctx = ConfigContext::new(&servers.inner);
config
.parse_remote_hosts(&mut config_ctx)
.failed("Configuration error");
config
.parse_databases(&mut config_ctx)
.failed("Configuration error");
config
.parse_lists(&mut config_ctx)
.failed("Configuration error");
config
.parse_signatures(&mut config_ctx)
.failed("Configuration error");
let sieve_config = config
.parse_sieve(&mut config_ctx)
.failed("Configuration error");
let session_config = config
.parse_session_config(&config_ctx)
.failed("Configuration error");
let queue_config = config
.parse_queue(&config_ctx)
.failed("Configuration error");
let mail_auth_config = config
.parse_mail_auth(&config_ctx)
.failed("Configuration error");
let report_config = config
.parse_reports(&config_ctx)
.failed("Configuration error");
impl<T> UnwrapFailure<T> for Option<T> {
fn failed(self, message: &str) -> T {
match self {
Some(result) => result,
None => {
eprintln!("{message}");
std::process::exit(1);
// Build core
let (queue_tx, queue_rx) = mpsc::channel(1024);
let (report_tx, report_rx) = mpsc::channel(1024);
let core = Arc::new(SMTP {
worker_pool: rayon::ThreadPoolBuilder::new()
.num_threads(
config
.property::<usize>("global.thread-pool")
.failed("Failed to parse thread pool size")
.filter(|v| *v > 0)
.unwrap_or_else(num_cpus::get),
)
.build()
.unwrap(),
resolvers: config.build_resolvers().failed("Failed to build resolvers"),
session: SessionCore {
config: session_config,
throttle: DashMap::with_capacity_and_hasher_and_shard_amount(
config
.property("global.shared-map.capacity")
.failed("Failed to parse shared map capacity")
.unwrap_or(2),
ThrottleKeyHasherBuilder::default(),
config
.property::<u64>("global.shared-map.shard")
.failed("Failed to parse shared map shard amount")
.unwrap_or(32)
.next_power_of_two() as usize,
),
},
queue: QueueCore {
config: queue_config,
throttle: DashMap::with_capacity_and_hasher_and_shard_amount(
config
.property("global.shared-map.capacity")
.failed("Failed to parse shared map capacity")
.unwrap_or(2),
ThrottleKeyHasherBuilder::default(),
config
.property::<u64>("global.shared-map.shard")
.failed("Failed to parse shared map shard amount")
.unwrap_or(32)
.next_power_of_two() as usize,
),
id_seq: 0.into(),
quota: DashMap::with_capacity_and_hasher_and_shard_amount(
config
.property("global.shared-map.capacity")
.failed("Failed to parse shared map capacity")
.unwrap_or(2),
ThrottleKeyHasherBuilder::default(),
config
.property::<u64>("global.shared-map.shard")
.failed("Failed to parse shared map shard amount")
.unwrap_or(32)
.next_power_of_two() as usize,
),
tx: queue_tx,
connectors: TlsConnectors {
pki_verify: build_tls_connector(false),
dummy_verify: build_tls_connector(true),
},
},
report: ReportCore {
tx: report_tx,
config: report_config,
},
mail_auth: mail_auth_config,
sieve: sieve_config,
});
// Spawn queue manager
queue_rx.spawn(core.clone(), core.queue.read_queue().await);
// Spawn report manager
report_rx.spawn(core.clone(), core.report.read_reports().await);
// Spawn remote hosts
for host in config_ctx.hosts.into_values() {
if host.lookup {
host.spawn(config);
}
}
core
}
}
impl<T, E: std::fmt::Display> UnwrapFailure<T> for Result<T, E> {
fn failed(self, message: &str) -> T {
match self {
Ok(result) => result,
Err(err) => {
eprintln!("{message}: {err}");
std::process::exit(1);
}
}
}
}
pub fn failed(message: &str) -> ! {
eprintln!("{message}");
std::process::exit(1);
}

View file

@ -1,385 +0,0 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of the Stalwart SMTP Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::{collections::HashMap, fs, sync::Arc, time::Duration};
use dashmap::DashMap;
use mail_send::smtp::tls::build_tls_connector;
use opentelemetry::{
sdk::{
trace::{self, Sampler},
Resource,
},
KeyValue,
};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION};
use stalwart_smtp::{
config::{Config, ConfigContext, ServerProtocol},
core::{
throttle::{ConcurrencyLimiter, ThrottleKeyHasherBuilder},
Core, QueueCore, ReportCore, SessionCore, TlsConnectors,
},
failed,
queue::{self, manager::SpawnQueue},
reporting::{self, scheduler::SpawnReport},
UnwrapFailure,
};
use tokio::sync::{mpsc, watch};
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{prelude::__tracing_subscriber_SubscriberExt, EnvFilter};
#[tokio::main]
async fn main() -> std::io::Result<()> {
// Read configuration parameters
let config = parse_config();
let mut config_context = ConfigContext::default();
config
.parse_servers(&mut config_context)
.failed("Configuration error");
config
.parse_remote_hosts(&mut config_context)
.failed("Configuration error");
config
.parse_databases(&mut config_context)
.failed("Configuration error");
config
.parse_lists(&mut config_context)
.failed("Configuration error");
config
.parse_signatures(&mut config_context)
.failed("Configuration error");
let sieve_config = config
.parse_sieve(&mut config_context)
.failed("Configuration error");
let session_config = config
.parse_session_config(&config_context)
.failed("Configuration error");
let queue_config = config
.parse_queue(&config_context)
.failed("Configuration error");
let mail_auth_config = config
.parse_mail_auth(&config_context)
.failed("Configuration error");
let report_config = config
.parse_reports(&config_context)
.failed("Configuration error");
// Build core
let (queue_tx, queue_rx) = mpsc::channel(1024);
let (report_tx, report_rx) = mpsc::channel(1024);
let core = Arc::new(Core {
worker_pool: rayon::ThreadPoolBuilder::new()
.num_threads(
config
.property::<usize>("global.thread-pool")
.failed("Failed to parse thread pool size")
.filter(|v| *v > 0)
.unwrap_or_else(num_cpus::get),
)
.build()
.unwrap(),
resolvers: config.build_resolvers().failed("Failed to build resolvers"),
session: SessionCore {
config: session_config,
throttle: DashMap::with_capacity_and_hasher_and_shard_amount(
config
.property("global.shared-map.capacity")
.failed("Failed to parse shared map capacity")
.unwrap_or(2),
ThrottleKeyHasherBuilder::default(),
config
.property::<u64>("global.shared-map.shard")
.failed("Failed to parse shared map shard amount")
.unwrap_or(32)
.next_power_of_two() as usize,
),
},
queue: QueueCore {
config: queue_config,
throttle: DashMap::with_capacity_and_hasher_and_shard_amount(
config
.property("global.shared-map.capacity")
.failed("Failed to parse shared map capacity")
.unwrap_or(2),
ThrottleKeyHasherBuilder::default(),
config
.property::<u64>("global.shared-map.shard")
.failed("Failed to parse shared map shard amount")
.unwrap_or(32)
.next_power_of_two() as usize,
),
id_seq: 0.into(),
quota: DashMap::with_capacity_and_hasher_and_shard_amount(
config
.property("global.shared-map.capacity")
.failed("Failed to parse shared map capacity")
.unwrap_or(2),
ThrottleKeyHasherBuilder::default(),
config
.property::<u64>("global.shared-map.shard")
.failed("Failed to parse shared map shard amount")
.unwrap_or(32)
.next_power_of_two() as usize,
),
tx: queue_tx,
connectors: TlsConnectors {
pki_verify: build_tls_connector(false),
dummy_verify: build_tls_connector(true),
},
},
report: ReportCore {
tx: report_tx,
config: report_config,
},
mail_auth: mail_auth_config,
sieve: sieve_config,
});
// Bind ports before dropping privileges
for server in &config_context.servers {
for listener in &server.listeners {
listener
.socket
.bind(listener.addr)
.failed(&format!("Failed to bind to {}", listener.addr));
}
}
// Drop privileges
#[cfg(not(target_env = "msvc"))]
{
if let Some(run_as_user) = config.value("server.run-as.user") {
let mut pd = privdrop::PrivDrop::default().user(run_as_user);
if let Some(run_as_group) = config.value("server.run-as.group") {
pd = pd.group(run_as_group);
}
pd.apply().failed("Failed to drop privileges");
}
}
// Enable tracing
let _tracer = enable_tracing(&config).failed("Failed to enable tracing");
tracing::info!(
"Starting Stalwart SMTP server v{}...",
env!("CARGO_PKG_VERSION")
);
// Spawn queue manager
queue_rx.spawn(core.clone(), core.queue.read_queue().await);
// Spawn report manager
report_rx.spawn(core.clone(), core.report.read_reports().await);
// Spawn remote hosts
for host in config_context.hosts.into_values() {
if host.lookup {
host.spawn(&config);
}
}
// Spawn listeners
let (shutdown_tx, shutdown_rx) = watch::channel(false);
for server in config_context.servers {
match server.protocol {
ServerProtocol::Smtp | ServerProtocol::Lmtp => server
.spawn(core.clone(), shutdown_rx.clone())
.failed("Failed to start listener"),
ServerProtocol::Http => server
.spawn_management(core.clone(), shutdown_rx.clone())
.failed("Failed to start management interface"),
ServerProtocol::Imap => {
eprintln!("Invalid protocol 'imap' for listener '{}'.", server.id);
std::process::exit(0);
}
}
}
// Wait for shutdown signal
#[cfg(not(target_env = "msvc"))]
{
use tokio::signal::unix::{signal, SignalKind};
let mut h_term = signal(SignalKind::terminate()).failed("start signal handler");
let mut h_int = signal(SignalKind::interrupt()).failed("start signal handler");
tokio::select! {
_ = h_term.recv() => tracing::debug!("Received SIGTERM."),
_ = h_int.recv() => tracing::debug!("Received SIGINT."),
};
}
#[cfg(target_env = "msvc")]
{
match tokio::signal::ctrl_c().await {
Ok(()) => {}
Err(err) => {
eprintln!("Unable to listen for shutdown signal: {}", err);
}
}
}
// Shutdown the system
tracing::info!(
"Shutting down Stalwart SMTP server v{}...",
env!("CARGO_PKG_VERSION")
);
// Stop services
shutdown_tx.send(true).ok();
core.queue.tx.send(queue::Event::Stop).await.ok();
core.report.tx.send(reporting::Event::Stop).await.ok();
// Wait for services to finish
tokio::time::sleep(Duration::from_secs(1)).await;
Ok(())
}
fn enable_tracing(config: &Config) -> stalwart_smtp::config::Result<Option<WorkerGuard>> {
let level = config.value("global.tracing.level").unwrap_or("info");
let env_filter = EnvFilter::builder()
.parse(format!("stalwart_smtp={}", level))
.failed("Failed to log level");
match config.value("global.tracing.method").unwrap_or_default() {
"log" => {
let path = config.value_require("global.tracing.path")?;
let prefix = config.value_require("global.tracing.prefix")?;
let file_appender = match config.value("global.tracing.rotate").unwrap_or("daily") {
"daily" => tracing_appender::rolling::daily(path, prefix),
"hourly" => tracing_appender::rolling::hourly(path, prefix),
"minutely" => tracing_appender::rolling::minutely(path, prefix),
"never" => tracing_appender::rolling::never(path, prefix),
rotate => {
return Err(format!("Unsupported log rotation strategy {rotate:?}"));
}
};
let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(env_filter)
.with_writer(non_blocking)
.finish(),
)
.failed("Failed to set subscriber");
Ok(guard.into())
}
"stdout" => {
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(env_filter)
.finish(),
)
.failed("Failed to set subscriber");
Ok(None)
}
"otel" | "open-telemetry" => {
let tracer = match config.value_require("global.tracing.transport")? {
"grpc" => {
let mut exporter = opentelemetry_otlp::new_exporter().tonic();
if let Some(endpoint) = config.value("global.tracing.endpoint") {
exporter = exporter.with_endpoint(endpoint);
}
opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(exporter)
}
"http" => {
let mut headers = HashMap::new();
for (_, value) in config.values("global.tracing.headers") {
if let Some((key, value)) = value.split_once(':') {
headers.insert(key.trim().to_string(), value.trim().to_string());
} else {
return Err(format!("Invalid open-telemetry header {value:?}"));
}
}
let mut exporter = opentelemetry_otlp::new_exporter()
.http()
.with_endpoint(config.value_require("global.tracing.endpoint")?);
if !headers.is_empty() {
exporter = exporter.with_headers(headers);
}
opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(exporter)
}
transport => {
return Err(format!(
"Unsupported open-telemetry transport {transport:?}"
));
}
}
.with_trace_config(
trace::config()
.with_resource(Resource::new(vec![
KeyValue::new(SERVICE_NAME, "stalwart-smtp".to_string()),
KeyValue::new(SERVICE_VERSION, env!("CARGO_PKG_VERSION").to_string()),
]))
.with_sampler(Sampler::AlwaysOn),
)
.install_batch(opentelemetry::runtime::Tokio)
.failed("Failed to create tracer");
tracing::subscriber::set_global_default(
tracing_subscriber::Registry::default()
.with(tracing_opentelemetry::layer().with_tracer(tracer))
.with(env_filter),
)
.failed("Failed to set subscriber");
Ok(None)
}
_ => Ok(None),
}
}
fn parse_config() -> Config {
let mut config_path = None;
let mut found_param = false;
for arg in std::env::args().skip(1) {
if let Some((key, value)) = arg.split_once('=') {
if key.starts_with("--config") {
config_path = value.trim().to_string().into();
break;
} else {
failed(&format!("Invalid command line argument: {key}"));
}
} else if found_param {
config_path = arg.into();
break;
} else if arg.starts_with("--config") {
found_param = true;
} else {
failed(&format!("Invalid command line argument: {arg}"));
}
}
Config::parse(
&fs::read_to_string(config_path.failed("Missing parameter --config=<path-to-config>."))
.failed("Could not read configuration file"),
)
.failed("Invalid configuration file")
}

View file

@ -37,7 +37,7 @@ use utils::config::ServerProtocol;
use crate::{
config::{AggregateFrequency, TlsStrategy},
core::Core,
core::SMTP,
queue::ErrorDetails,
reporting::{tls::TlsRptOptions, PolicyType, TlsEvent},
};
@ -54,7 +54,7 @@ use crate::queue::{
};
impl DeliveryAttempt {
pub async fn try_deliver(mut self, core: Arc<Core>, queue: &mut Queue) {
pub async fn try_deliver(mut self, core: Arc<SMTP>, queue: &mut Queue) {
// Check that the message still has recipients to be delivered
let has_pending_delivery = self.has_pending_delivery();

View file

@ -27,13 +27,13 @@ use mail_auth::MX;
use rand::{seq::SliceRandom, Rng};
use crate::{
core::{Core, Envelope},
core::{Envelope, SMTP},
queue::{Error, ErrorDetails, Status},
};
use super::RemoteHost;
impl Core {
impl SMTP {
pub(super) async fn resolve_host(
&self,
remote_host: &RemoteHost<'_>,

View file

@ -32,12 +32,12 @@ pub static STS_TEST_POLICY: parking_lot::Mutex<Vec<u8>> = parking_lot::Mutex::ne
use mail_auth::{common::lru::DnsCache, mta_sts::MtaSts, report::tlsrpt::ResultType};
use crate::core::Core;
use crate::core::SMTP;
use super::{Error, Policy};
#[allow(unused_variables)]
impl Core {
impl SMTP {
pub async fn lookup_mta_sts_policy<'x>(
&self,
domain: &str,

View file

@ -33,7 +33,7 @@ use tokio::sync::mpsc;
use crate::core::{
management::{self},
Core, QueueCore,
QueueCore, SMTP,
};
use super::{
@ -51,7 +51,7 @@ pub struct Queue {
}
impl SpawnQueue for mpsc::Receiver<Event> {
fn spawn(mut self, core: Arc<Core>, mut queue: Queue) {
fn spawn(mut self, core: Arc<SMTP>, mut queue: Queue) {
tokio::spawn(async move {
loop {
let result = tokio::time::timeout(queue.wake_up_time(), self.recv()).await;
@ -557,5 +557,5 @@ impl Default for Queue {
}
pub trait SpawnQueue {
fn spawn(self, core: Arc<Core>, queue: Queue);
fn spawn(self, core: Arc<SMTP>, queue: Queue);
}

View file

@ -37,7 +37,7 @@ use mail_auth::{
};
use mail_parser::{DateTime, HeaderValue, Message, MimeHeaders, PartType};
use crate::core::Core;
use crate::core::SMTP;
enum Compression {
None,
@ -61,7 +61,7 @@ pub trait AnalyzeReport {
fn analyze_report(&self, message: Arc<Vec<u8>>);
}
impl AnalyzeReport for Arc<Core> {
impl AnalyzeReport for Arc<SMTP> {
fn analyze_report(&self, message: Arc<Vec<u8>>) {
let core = self.clone();
self.worker_pool.spawn(move || {

View file

@ -25,8 +25,9 @@ use mail_auth::{
common::verify::VerifySignature, AuthenticatedMessage, AuthenticationResults, DkimOutput,
};
use tokio::io::{AsyncRead, AsyncWrite};
use utils::config::Rate;
use crate::{config::Rate, core::Session};
use crate::core::Session;
impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
pub async fn send_dkim_report(

View file

@ -39,7 +39,7 @@ use tokio::{
use crate::{
config::AggregateFrequency,
core::{Core, Session},
core::{Session, SMTP},
queue::{DomainPart, InstantFromTimestamp, Schedule},
};
@ -296,7 +296,7 @@ pub trait GenerateDmarcReport {
fn generate_dmarc_report(&self, domain: ReportPolicy<String>, path: ReportPath<PathBuf>);
}
impl GenerateDmarcReport for Arc<Core> {
impl GenerateDmarcReport for Arc<SMTP> {
fn generate_dmarc_report(&self, domain: ReportPolicy<String>, path: ReportPath<PathBuf>) {
let core = self.clone();
let handle = Handle::current();
@ -426,7 +426,7 @@ impl GenerateDmarcReport for Arc<Core> {
}
impl Scheduler {
pub async fn schedule_dmarc(&mut self, event: Box<DmarcEvent>, core: &Core) {
pub async fn schedule_dmarc(&mut self, event: Box<DmarcEvent>, core: &SMTP) {
let max_size = core
.report
.config

View file

@ -37,7 +37,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
use crate::{
config::{AddressMatch, AggregateFrequency, DkimSigner, IfBlock},
core::{management, Core, Session},
core::{management, Session, SMTP},
outbound::{dane::Tlsa, mta_sts::Policy},
queue::{DomainPart, Message},
USER_AGENT,
@ -123,7 +123,7 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
}
}
impl Core {
impl SMTP {
pub async fn send_report(
&self,
from_addr: &str,

View file

@ -46,7 +46,7 @@ use tokio::{
use crate::{
config::AggregateFrequency,
core::{management::ReportRequest, worker::SpawnCleanup, Core, ReportCore},
core::{management::ReportRequest, worker::SpawnCleanup, ReportCore, SMTP},
queue::{InstantFromTimestamp, Schedule},
};
@ -83,7 +83,7 @@ pub struct ReportPolicy<T> {
}
impl SpawnReport for mpsc::Receiver<Event> {
fn spawn(mut self, core: Arc<Core>, mut scheduler: Scheduler) {
fn spawn(mut self, core: Arc<SMTP>, mut scheduler: Scheduler) {
tokio::spawn(async move {
let mut last_cleanup = Instant::now();
@ -184,7 +184,7 @@ impl SpawnReport for mpsc::Receiver<Event> {
}
}
impl Core {
impl SMTP {
pub async fn build_report_path(
&self,
domain: ReportType<&str, &str>,
@ -645,5 +645,5 @@ impl ToTimestamp for Duration {
}
pub trait SpawnReport {
fn spawn(self, core: Arc<Core>, scheduler: Scheduler);
fn spawn(self, core: Arc<SMTP>, scheduler: Scheduler);
}

View file

@ -23,8 +23,9 @@
use mail_auth::{report::AuthFailureType, AuthenticationResults, SpfOutput};
use tokio::io::{AsyncRead, AsyncWrite};
use utils::config::Rate;
use crate::{config::Rate, core::Session};
use crate::core::Session;
impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
pub async fn send_spf_report(

View file

@ -40,7 +40,7 @@ use tokio::runtime::Handle;
use crate::{
config::AggregateFrequency,
core::Core,
core::SMTP,
outbound::mta_sts::{Mode, MxPattern},
queue::{InstantFromTimestamp, Schedule},
USER_AGENT,
@ -74,7 +74,7 @@ pub trait GenerateTlsReport {
#[cfg(feature = "test_mode")]
pub static TLS_HTTP_REPORT: parking_lot::Mutex<Vec<u8>> = parking_lot::Mutex::new(Vec::new());
impl GenerateTlsReport for Arc<Core> {
impl GenerateTlsReport for Arc<SMTP> {
fn generate_tls_report(&self, domain: String, path: ReportPath<Vec<ReportPolicy<PathBuf>>>) {
let core = self.clone();
let handle = Handle::current();
@ -277,7 +277,7 @@ impl GenerateTlsReport for Arc<Core> {
}
impl Scheduler {
pub async fn schedule_tls(&mut self, event: Box<TlsEvent>, core: &Core) {
pub async fn schedule_tls(&mut self, event: Box<TlsEvent>, core: &SMTP) {
let max_size = core
.report
.config

View file

@ -13,6 +13,15 @@ serde = { version = "1.0", features = ["derive"]}
tracing = "0.1"
mail-auth = { git = "https://github.com/stalwartlabs/mail-auth" }
smtp-proto = { git = "https://github.com/stalwartlabs/smtp-proto" }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-appender = "0.2"
tracing-opentelemetry = "0.18.0"
opentelemetry = { version = "0.18.0", features = ["rt-tokio"] }
opentelemetry-otlp = { version = "0.11.0", features = ["http-proto", "reqwest-client", "reqwest-rustls"] }
opentelemetry-semantic-conventions = { version = "0.10.0" }
[target.'cfg(unix)'.dependencies]
privdrop = "0.5.3"
[features]
test_mode = []

View file

@ -277,14 +277,20 @@ impl Config {
.value_or_default(("server.listener", id, "hostname"), "server.hostname")
.ok_or("Hostname directive not found.")?
.to_string(),
data: if matches!(protocol, ServerProtocol::Smtp | ServerProtocol::Lmtp) {
self.value_or_default(("server.listener", id, "data"), "server.data")
data: match protocol {
ServerProtocol::Smtp | ServerProtocol::Lmtp => self
.value_or_default(("server.listener", id, "greeting"), "server.greeting")
.unwrap_or("Stalwart SMTP at your service")
.to_string()
} else {
self.value_or_default(("server.listener", id, "url"), "server.url")
.to_string(),
ServerProtocol::Jmap => self
.value_or_default(("server.listener", id, "url"), "server.url")
.failed(&format!("No 'url' directive found for listener {id:?}"))
.to_string()
.to_string(),
ServerProtocol::Imap | ServerProtocol::Http => self
.value_or_default(("server.listener", id, "url"), "server.url")
.unwrap_or_default()
.to_string(),
},
max_connections: self
.property_or_default(

View file

@ -31,6 +31,8 @@ use std::{collections::BTreeMap, fmt::Display, net::SocketAddr, time::Duration};
use rustls::ServerConfig;
use tokio::net::TcpSocket;
use crate::{failed, UnwrapFailure};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Config {
pub keys: BTreeMap<String, String>,
@ -90,3 +92,36 @@ impl Display for ServerProtocol {
}
pub type Result<T> = std::result::Result<T, String>;
impl Config {
pub fn init() -> Self {
let mut config_path = None;
let mut found_param = false;
for arg in std::env::args().skip(1) {
if let Some((key, value)) = arg.split_once('=') {
if key.starts_with("--config") {
config_path = value.trim().to_string().into();
break;
} else {
failed(&format!("Invalid command line argument: {key}"));
}
} else if found_param {
config_path = arg.into();
break;
} else if arg.starts_with("--config") {
found_param = true;
} else {
failed(&format!("Invalid command line argument: {arg}"));
}
}
Config::parse(
&std::fs::read_to_string(
config_path.failed("Missing parameter --config=<path-to-config>."),
)
.failed("Could not read configuration file"),
)
.failed("Invalid configuration file")
}
}

View file

@ -21,11 +21,27 @@
* for more details.
*/
use std::collections::HashMap;
use config::Config;
pub mod codec;
pub mod config;
pub mod listener;
pub mod map;
use opentelemetry::{
sdk::{
trace::{self, Sampler},
Resource,
},
KeyValue,
};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION};
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{prelude::__tracing_subscriber_SubscriberExt, EnvFilter};
pub trait UnwrapFailure<T> {
fn failed(self, action: &str) -> T;
}
@ -47,8 +63,14 @@ impl<T, E: std::fmt::Display> UnwrapFailure<T> for Result<T, E> {
match self {
Ok(result) => result,
Err(err) => {
eprintln!("{message}: {err}");
std::process::exit(1);
#[cfg(feature = "test_mode")]
panic!("{message}: {err}");
#[cfg(not(feature = "test_mode"))]
{
eprintln!("{message}: {err}");
std::process::exit(1);
}
}
}
}
@ -58,3 +80,127 @@ pub fn failed(message: &str) -> ! {
eprintln!("{message}");
std::process::exit(1);
}
pub fn enable_tracing(config: &Config) -> config::Result<Option<WorkerGuard>> {
let level = config.value("global.tracing.level").unwrap_or("info");
let env_filter = EnvFilter::builder()
.parse(format!("stalwart_smtp={}", level))
.failed("Failed to log level");
match config.value("global.tracing.method").unwrap_or_default() {
"log" => {
let path = config.value_require("global.tracing.path")?;
let prefix = config.value_require("global.tracing.prefix")?;
let file_appender = match config.value("global.tracing.rotate").unwrap_or("daily") {
"daily" => tracing_appender::rolling::daily(path, prefix),
"hourly" => tracing_appender::rolling::hourly(path, prefix),
"minutely" => tracing_appender::rolling::minutely(path, prefix),
"never" => tracing_appender::rolling::never(path, prefix),
rotate => {
return Err(format!("Unsupported log rotation strategy {rotate:?}"));
}
};
let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(env_filter)
.with_writer(non_blocking)
.finish(),
)
.failed("Failed to set subscriber");
Ok(guard.into())
}
"stdout" => {
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(env_filter)
.finish(),
)
.failed("Failed to set subscriber");
Ok(None)
}
"otel" | "open-telemetry" => {
let tracer = match config.value_require("global.tracing.transport")? {
"grpc" => {
let mut exporter = opentelemetry_otlp::new_exporter().tonic();
if let Some(endpoint) = config.value("global.tracing.endpoint") {
exporter = exporter.with_endpoint(endpoint);
}
opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(exporter)
}
"http" => {
let mut headers = HashMap::new();
for (_, value) in config.values("global.tracing.headers") {
if let Some((key, value)) = value.split_once(':') {
headers.insert(key.trim().to_string(), value.trim().to_string());
} else {
return Err(format!("Invalid open-telemetry header {value:?}"));
}
}
let mut exporter = opentelemetry_otlp::new_exporter()
.http()
.with_endpoint(config.value_require("global.tracing.endpoint")?);
if !headers.is_empty() {
exporter = exporter.with_headers(headers);
}
opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(exporter)
}
transport => {
return Err(format!(
"Unsupported open-telemetry transport {transport:?}"
));
}
}
.with_trace_config(
trace::config()
.with_resource(Resource::new(vec![
KeyValue::new(SERVICE_NAME, "stalwart-smtp".to_string()),
KeyValue::new(SERVICE_VERSION, env!("CARGO_PKG_VERSION").to_string()),
]))
.with_sampler(Sampler::AlwaysOn),
)
.install_batch(opentelemetry::runtime::Tokio)
.failed("Failed to create tracer");
tracing::subscriber::set_global_default(
tracing_subscriber::Registry::default()
.with(tracing_opentelemetry::layer().with_tracer(tracer))
.with(env_filter),
)
.failed("Failed to set subscriber");
Ok(None)
}
_ => Ok(None),
}
}
pub async fn wait_for_shutdown() {
#[cfg(not(target_env = "msvc"))]
{
use tokio::signal::unix::{signal, SignalKind};
let mut h_term = signal(SignalKind::terminate()).failed("start signal handler");
let mut h_int = signal(SignalKind::interrupt()).failed("start signal handler");
tokio::select! {
_ = h_term.recv() => tracing::debug!("Received SIGTERM."),
_ = h_int.recv() => tracing::debug!("Received SIGINT."),
};
}
#[cfg(target_env = "msvc")]
{
match tokio::signal::ctrl_c().await {
Ok(()) => {}
Err(err) => {
eprintln!("Unable to listen for shutdown signal: {}", err);
}
}
}
}

View file

@ -103,6 +103,7 @@ impl Server {
instance = instance.id,
protocol = ?instance.protocol,
"Listener shutting down.");
manager.shutdown();
break;
}
};
@ -113,11 +114,7 @@ impl Server {
}
impl Servers {
pub fn spawn(
self,
config: &Config,
spawn: impl Fn(Server, watch::Receiver<bool>),
) -> watch::Sender<bool> {
pub fn bind(&self, config: &Config) {
// Bind as root
for server in &self.inner {
for listener in &server.listeners {
@ -139,7 +136,9 @@ impl Servers {
pd.apply().failed("Failed to drop privileges");
}
}
}
pub fn spawn(self, spawn: impl Fn(Server, watch::Receiver<bool>)) -> watch::Sender<bool> {
// Spawn listeners
let (shutdown_tx, shutdown_rx) = watch::channel(false);
for server in self.inner {

View file

@ -37,4 +37,5 @@ pub struct SessionData<T: AsyncRead + AsyncWrite + Unpin + 'static> {
pub trait SessionManager: Sync + Send + 'static + Clone {
fn spawn(&self, session: SessionData<TcpStream>);
fn shutdown(&self);
}

View file

@ -4,7 +4,7 @@ version = "0.1.0"
edition = "2021"
resolver = "2"
[dependencies]
[dev-dependencies]
store = { path = "../crates/store", features = ["test_mode"] }
jmap = { path = "../crates/jmap", features = ["test_mode"] }
jmap_proto = { path = "../crates/jmap-proto" }
@ -13,7 +13,7 @@ smtp-proto = { git = "https://github.com/stalwartlabs/smtp-proto" }
mail-send = { git = "https://github.com/stalwartlabs/mail-send" }
mail-auth = { git = "https://github.com/stalwartlabs/mail-auth", features = ["test"] }
sieve-rs = { git = "https://github.com/stalwartlabs/sieve" }
utils = { path = "../crates/utils" }
utils = { path = "../crates/utils", features = ["test_mode"] }
#jmap-client = { git = "https://github.com/stalwartlabs/jmap-client", features = ["websockets", "debug", "async"] }
jmap-client = { path = "/home/vagrant/code/jmap-client", features = ["websockets", "debug", "async"] }
mail-parser = { git = "https://github.com/stalwartlabs/mail-parser", features = ["full_encoding", "serde_support", "ludicrous_mode"] }

View file

@ -1,6 +1,6 @@
use std::{sync::Arc, time::Duration};
use jmap::{api::SessionManager, JMAP};
use jmap::{api::JmapSessionManager, JMAP};
use jmap_client::client::{Client, Credentials};
use jmap_proto::types::id::Id;
use tokio::sync::watch;
@ -147,8 +147,9 @@ async fn init_jmap_tests(delete_if_exists: bool) -> JMAPTest {
let servers = settings.parse_servers().unwrap();
// Start JMAP server
let manager = SessionManager::from(JMAP::init(&settings).await);
let shutdown_tx = servers.spawn(&settings, |server, shutdown_rx| {
servers.bind(&settings);
let manager = JmapSessionManager::new(JMAP::init(&settings).await);
let shutdown_tx = servers.spawn(|server, shutdown_rx| {
server.spawn(manager.clone(), shutdown_rx);
});

View file

@ -80,7 +80,8 @@ pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) {
// Start JMAP server
let manager = SessionManager::from(push_server.clone());
let _shutdown_tx = servers.spawn(&settings, |server, shutdown_rx| {
servers.bind(&settings);
let _shutdown_tx = servers.spawn(|server, shutdown_rx| {
server.spawn(manager.clone(), shutdown_rx);
});
@ -308,9 +309,7 @@ impl utils::listener::SessionManager for SessionManager {
});
}
fn max_concurrent(&self) -> u64 {
100
}
fn shutdown(&self) {}
}
async fn expect_push(event_rx: &mut mpsc::Receiver<PushMessage>) -> PushMessage {

View file

@ -1,11 +1,11 @@
use std::path::PathBuf;
#[cfg(test)]
//pub mod jmap;
#[cfg(test)]
//pub mod store;
pub mod jmap;
#[cfg(test)]
pub mod smtp;
#[cfg(test)]
pub mod store;
pub fn add_test_certs(config: &str) -> String {
let mut cert_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));

View file

@ -32,14 +32,14 @@ use crate::smtp::{
};
use smtp::{
config::ConfigContext,
core::{Core, Session, State},
core::{Session, State, SMTP},
lookup::Lookup,
};
#[tokio::test]
async fn auth() {
let mut core = Core::test();
let mut ctx = ConfigContext::default();
let mut core = SMTP::test();
let mut ctx = ConfigContext::new(&[]);
ctx.lookup.insert(
"plain".to_string(),
Arc::new(Lookup::Local(AHashSet::from_iter([
@ -72,7 +72,7 @@ async fn auth() {
core.session.config.extensions.future_release =
r"[{if = 'authenticated-as', ne = '', then = '1d'},
{else = false}]"
.parse_if(&ConfigContext::default());
.parse_if(&ConfigContext::new(&[]));
// EHLO should not avertise plain text auth without TLS
let mut session = Session::test(core);

View file

@ -25,11 +25,11 @@ use crate::smtp::{
session::{TestSession, VerifyResponse},
TestConfig,
};
use smtp::core::{Core, Session};
use smtp::core::{Session, SMTP};
#[tokio::test]
async fn basic_commands() {
let mut session = Session::test(Core::test());
let mut session = Session::test(SMTP::test());
// STARTTLS should be available on clear text connections
session.stream.tls = false;

View file

@ -28,17 +28,17 @@ use ahash::AHashSet;
use crate::smtp::{
inbound::{TestMessage, TestQueueEvent},
session::{load_test_message, TestSession, VerifyResponse},
ParseTestConfig, TestConfig, TestCore,
ParseTestConfig, TestConfig, TestSMTP,
};
use smtp::{
config::{ConfigContext, IfBlock},
core::{Core, Session},
core::{Session, SMTP},
lookup::Lookup,
};
#[tokio::test]
async fn data() {
let mut core = Core::test();
let mut core = SMTP::test();
// Create temp dir for queue
let mut qr = core.init_test_queue("smtp_data_test");
@ -59,7 +59,7 @@ async fn data() {
let mut config = &mut core.session.config;
config.data.add_auth_results = "[{if = 'remote-ip', eq = '10.0.0.3', then = true},
{else = false}]"
.parse_if(&ConfigContext::default());
.parse_if(&ConfigContext::new(&[]));
config.data.add_date = config.data.add_auth_results.clone();
config.data.add_message_id = config.data.add_auth_results.clone();
config.data.add_received = config.data.add_auth_results.clone();
@ -68,7 +68,7 @@ async fn data() {
config.data.max_received_headers = IfBlock::new(3);
config.data.max_messages = r"[{if = 'remote-ip', eq = '10.0.0.1', then = 1},
{else = 100}]"
.parse_if(&ConfigContext::default());
.parse_if(&ConfigContext::new(&[]));
core.queue.config.quota = r"[[queue.quota]]
match = {if = 'sender', eq = 'john@doe.org'}
@ -85,7 +85,7 @@ async fn data() {
key = ['rcpt']
size = 450
"
.parse_quota(&ConfigContext::default());
.parse_quota(&ConfigContext::new(&[]));
// Test queue message builder
let mut session = Session::test(core);

View file

@ -34,22 +34,23 @@ use mail_auth::{
report::DmarcResult,
spf::Spf,
};
use utils::config::Rate;
use crate::smtp::{
inbound::{sign::TextConfigContext, TestMessage, TestQueueEvent, TestReportingEvent},
session::{TestSession, VerifyResponse},
ParseTestConfig, TestConfig, TestCore,
ParseTestConfig, TestConfig, TestSMTP,
};
use smtp::{
config::{AggregateFrequency, ConfigContext, IfBlock, Rate, VerifyStrategy},
core::{Core, Session},
config::{AggregateFrequency, ConfigContext, IfBlock, VerifyStrategy},
core::{Session, SMTP},
lookup::Lookup,
};
#[tokio::test]
async fn dmarc() {
let mut core = Core::test();
let ctx = ConfigContext::default().parse_signatures();
let mut core = SMTP::test();
let ctx = ConfigContext::new(&[]).parse_signatures();
// Create temp dir for queue
let mut qr = core.init_test_queue("smtp_dmarc_test");
@ -147,13 +148,13 @@ async fn dmarc() {
let mut config = &mut core.mail_auth;
config.spf.verify_ehlo = "[{if = 'remote-ip', eq = '10.0.0.2', then = 'strict'},
{ else = 'relaxed' }]"
.parse_if(&ConfigContext::default());
.parse_if(&ConfigContext::new(&[]));
config.spf.verify_mail_from = config.spf.verify_ehlo.clone();
config.dmarc.verify = IfBlock::new(VerifyStrategy::Strict);
config.arc.verify = config.dmarc.verify.clone();
config.dkim.verify = "[{if = 'sender-domain', eq = 'test.net', then = 'relaxed'},
{ else = 'strict' }]"
.parse_if(&ConfigContext::default());
.parse_if(&ConfigContext::new(&[]));
let mut config = &mut core.report.config;
config.spf.sign = "['rsa']"

View file

@ -25,10 +25,10 @@ use std::time::{Duration, Instant};
use smtp::{
config::IfBlock,
core::{Core, Session},
core::{Session, SMTP},
};
use crate::smtp::{inbound::TestQueueEvent, session::TestSession, TestConfig, TestCore};
use crate::smtp::{inbound::TestQueueEvent, session::TestSession, TestConfig, TestSMTP};
#[tokio::test]
async fn dnsrbl() {
@ -39,7 +39,7 @@ async fn dnsrbl() {
)
.unwrap();*/
let mut core = Core::test();
let mut core = SMTP::test();
for entry in [
"1.0.0.10.zen.spamhaus.org",
"2.0.0.10.b.barracudacentral.org",

View file

@ -31,12 +31,12 @@ use crate::smtp::{
};
use smtp::{
config::{ConfigContext, IfBlock},
core::{Core, Session},
core::{Session, SMTP},
};
#[tokio::test]
async fn ehlo() {
let mut core = Core::test();
let mut core = SMTP::test();
core.resolvers.dns.txt_add(
"mx1.foobar.org",
Spf::parse(b"v=spf1 ip4:10.0.0.1 -all").unwrap(),
@ -51,16 +51,16 @@ async fn ehlo() {
let mut config = &mut core.session.config;
config.data.max_message_size = r"[{if = 'remote-ip', eq = '10.0.0.1', then = 1024},
{else = 2048}]"
.parse_if(&ConfigContext::default());
.parse_if(&ConfigContext::new(&[]));
config.extensions.future_release = r"[{if = 'remote-ip', eq = '10.0.0.1', then = '1h'},
{else = false}]"
.parse_if(&ConfigContext::default());
.parse_if(&ConfigContext::new(&[]));
config.extensions.mt_priority = r"[{if = 'remote-ip', eq = '10.0.0.1', then = 'nsep'},
{else = false}]"
.parse_if(&ConfigContext::default());
.parse_if(&ConfigContext::new(&[]));
core.mail_auth.spf.verify_ehlo = r"[{if = 'remote-ip', eq = '10.0.0.2', then = 'strict'},
{else = 'relaxed'}]"
.parse_if(&ConfigContext::default());
.parse_if(&ConfigContext::new(&[]));
config.ehlo.reject_non_fqdn = IfBlock::new(true);
// Reject non-FQDN domains

View file

@ -31,26 +31,26 @@ use crate::smtp::{
};
use smtp::{
config::ConfigContext,
core::{Core, Session},
core::{Session, SMTP},
};
#[tokio::test]
async fn limits() {
let mut core = Core::test();
let mut core = SMTP::test();
let mut config = &mut core.session.config;
config.transfer_limit = r"[{if = 'remote-ip', eq = '10.0.0.1', then = 10},
{else = 1024}]"
.parse_if(&ConfigContext::default());
.parse_if(&ConfigContext::new(&[]));
config.timeout = r"[{if = 'remote-ip', eq = '10.0.0.2', then = '500ms'},
{else = '30m'}]"
.parse_if(&ConfigContext::default());
.parse_if(&ConfigContext::new(&[]));
config.duration = r"[{if = 'remote-ip', eq = '10.0.0.3', then = '500ms'},
{else = '60m'}]"
.parse_if(&ConfigContext::default());
.parse_if(&ConfigContext::new(&[]));
let (_tx, rx) = watch::channel(true);
// Exceed max line length
let mut session = Session::test(core);
let mut session = Session::test_with_shutdown(core, rx);
session.data.remote_ip = "10.0.0.1".parse().unwrap();
let mut buf = vec![b'A'; 2049];
session.ingest(&buf).await.unwrap();

View file

@ -35,12 +35,12 @@ use crate::smtp::{
};
use smtp::{
config::{ConfigContext, IfBlock, VerifyStrategy},
core::{Core, Session},
core::{Session, SMTP},
};
#[tokio::test]
async fn mail() {
let mut core = Core::test();
let mut core = SMTP::test();
core.resolvers.dns.txt_add(
"foobar.org",
Spf::parse(b"v=spf1 ip4:10.0.0.1 -all").unwrap(),
@ -72,32 +72,32 @@ async fn mail() {
core.mail_auth.spf.verify_ehlo = IfBlock::new(VerifyStrategy::Relaxed);
core.mail_auth.spf.verify_mail_from = r"[{if = 'remote-ip', eq = '10.0.0.2', then = 'strict'},
{else = 'relaxed'}]"
.parse_if(&ConfigContext::default());
.parse_if(&ConfigContext::new(&[]));
core.mail_auth.iprev.verify = r"[{if = 'remote-ip', eq = '10.0.0.2', then = 'strict'},
{else = 'relaxed'}]"
.parse_if(&ConfigContext::default());
.parse_if(&ConfigContext::new(&[]));
config.extensions.future_release = r"[{if = 'remote-ip', eq = '10.0.0.2', then = '1d'},
{else = false}]"
.parse_if(&ConfigContext::default());
.parse_if(&ConfigContext::new(&[]));
config.extensions.deliver_by = r"[{if = 'remote-ip', eq = '10.0.0.2', then = '1d'},
{else = false}]"
.parse_if(&ConfigContext::default());
.parse_if(&ConfigContext::new(&[]));
config.extensions.requiretls = r"[{if = 'remote-ip', eq = '10.0.0.2', then = true},
{else = false}]"
.parse_if(&ConfigContext::default());
.parse_if(&ConfigContext::new(&[]));
config.extensions.mt_priority = r"[{if = 'remote-ip', eq = '10.0.0.2', then = 'nsep'},
{else = false}]"
.parse_if(&ConfigContext::default());
.parse_if(&ConfigContext::new(&[]));
config.data.max_message_size = r"[{if = 'remote-ip', eq = '10.0.0.2', then = 2048},
{else = 1024}]"
.parse_if(&ConfigContext::default());
.parse_if(&ConfigContext::new(&[]));
config.throttle.mail_from = r"[[throttle]]
match = {if = 'remote-ip', eq = '10.0.0.1'}
key = 'sender'
rate = '2/1s'
"
.parse_throttle(&ConfigContext::default());
.parse_throttle(&ConfigContext::new(&[]));
// Be rude and do not say EHLO
let core = Arc::new(core);

View file

@ -32,13 +32,13 @@ use crate::smtp::{
};
use smtp::{
config::{ConfigContext, IfBlock},
core::{Core, Session, State},
core::{Session, State, SMTP},
lookup::Lookup,
};
#[tokio::test]
async fn rcpt() {
let mut core = Core::test();
let mut core = SMTP::test();
let list_addresses = Lookup::Local(AHashSet::from_iter([
"jane@foobar.org".to_string(),
@ -54,25 +54,25 @@ async fn rcpt() {
config.lookup_addresses = IfBlock::new(Some(Arc::new(list_addresses)));
config.max_recipients = r"[{if = 'remote-ip', eq = '10.0.0.1', then = 3},
{else = 5}]"
.parse_if(&ConfigContext::default());
.parse_if(&ConfigContext::new(&[]));
config.relay = r"[{if = 'remote-ip', eq = '10.0.0.1', then = false},
{else = true}]"
.parse_if(&ConfigContext::default());
.parse_if(&ConfigContext::new(&[]));
config_ext.dsn = r"[{if = 'remote-ip', eq = '10.0.0.1', then = false},
{else = true}]"
.parse_if(&ConfigContext::default());
.parse_if(&ConfigContext::new(&[]));
config.errors_max = r"[{if = 'remote-ip', eq = '10.0.0.1', then = 3},
{else = 100}]"
.parse_if(&ConfigContext::default());
.parse_if(&ConfigContext::new(&[]));
config.errors_wait = r"[{if = 'remote-ip', eq = '10.0.0.1', then = '5ms'},
{else = '1s'}]"
.parse_if(&ConfigContext::default());
.parse_if(&ConfigContext::new(&[]));
core.session.config.throttle.rcpt_to = r"[[throttle]]
match = {if = 'remote-ip', eq = '10.0.0.1'}
key = 'sender'
rate = '2/1s'
"
.parse_throttle(&ConfigContext::default());
.parse_throttle(&ConfigContext::new(&[]));
// RCPT without MAIL FROM
let mut session = Session::test(core);

View file

@ -26,14 +26,14 @@ use std::path::PathBuf;
use crate::smtp::{
inbound::{sign::TextConfigContext, TestMessage, TestQueueEvent},
session::{TestSession, VerifyResponse},
TestConfig, TestCore,
TestConfig, TestSMTP,
};
use smtp::{
config::{
database::ConfigDatabase, list::ConfigList, scripts::ConfigSieve, session::ConfigSession,
ConfigContext, EnvelopeKey, IfBlock,
},
core::{Core, Session},
core::{Session, SMTP},
};
use utils::config::Config;
@ -160,9 +160,9 @@ async fn sieve_scripts() {
pipe_path.push("pipe");
// Prepare config
let mut core = Core::test();
let mut core = SMTP::test();
let mut qr = core.init_test_queue("smtp_sieve_test");
let mut ctx = ConfigContext::default().parse_signatures();
let mut ctx = ConfigContext::new(&[]).parse_signatures();
let config = Config::parse(
&CONFIG
.replace("%PATH%", qr._temp_dir.temp_dir.as_path().to_str().unwrap())

View file

@ -36,11 +36,11 @@ use utils::config::Config;
use crate::smtp::{
inbound::{TestMessage, TestQueueEvent},
session::{TestSession, VerifyResponse},
ParseTestConfig, TestConfig, TestCore,
ParseTestConfig, TestConfig, TestSMTP,
};
use smtp::{
config::{auth::ConfigAuth, ConfigContext, IfBlock, VerifyStrategy},
core::{Core, Session},
core::{Session, SMTP},
lookup::Lookup,
};
@ -96,7 +96,7 @@ set-body-length = false
#[tokio::test]
async fn sign_and_seal() {
let mut core = Core::test();
let mut core = SMTP::test();
// Create temp dir for queue
let mut qr = core.init_test_queue("smtp_sign_test");
@ -157,7 +157,7 @@ async fn sign_and_seal() {
config.data.add_received_spf = IfBlock::new(true);
let mut config = &mut core.mail_auth;
let ctx = ConfigContext::default().parse_signatures();
let ctx = ConfigContext::new(&[]).parse_signatures();
config.spf.verify_ehlo = IfBlock::new(VerifyStrategy::Relaxed);
config.spf.verify_mail_from = config.spf.verify_ehlo.clone();
config.dkim.verify = config.spf.verify_ehlo.clone();
@ -207,11 +207,11 @@ async fn sign_and_seal() {
);
}
pub trait TextConfigContext {
fn parse_signatures(self) -> ConfigContext;
pub trait TextConfigContext<'x> {
fn parse_signatures(self) -> ConfigContext<'x>;
}
impl TextConfigContext for ConfigContext {
impl<'x> TextConfigContext<'x> for ConfigContext<'x> {
fn parse_signatures(mut self) -> Self {
Config::parse(SIGNATURES)
.unwrap()

View file

@ -26,12 +26,12 @@ use std::time::Duration;
use crate::smtp::{session::TestSession, ParseTestConfig, TestConfig};
use smtp::{
config::ConfigContext,
core::{Core, Session, SessionAddress},
core::{Session, SessionAddress, SMTP},
};
#[tokio::test]
async fn throttle_inbound() {
let mut core = Core::test();
let mut core = SMTP::test();
let mut config = &mut core.session.config;
config.throttle.connect = r"[[throttle]]
match = {if = 'remote-ip', eq = '10.0.0.1'}
@ -39,17 +39,17 @@ async fn throttle_inbound() {
concurrency = 2
rate = '3/1s'
"
.parse_throttle(&ConfigContext::default());
.parse_throttle(&ConfigContext::new(&[]));
config.throttle.mail_from = r"[[throttle]]
key = 'sender'
rate = '2/1s'
"
.parse_throttle(&ConfigContext::default());
.parse_throttle(&ConfigContext::new(&[]));
config.throttle.rcpt_to = r"[[throttle]]
key = ['remote-ip', 'rcpt']
rate = '2/1s'
"
.parse_throttle(&ConfigContext::default());
.parse_throttle(&ConfigContext::new(&[]));
// Test connection concurrency limit
let mut session = Session::test(core);

View file

@ -31,14 +31,14 @@ use crate::smtp::{
};
use smtp::{
config::ConfigContext,
core::{Core, Session},
core::{Session, SMTP},
lookup::Lookup,
};
#[tokio::test]
async fn vrfy_expn() {
let mut core = Core::test();
let mut ctx = ConfigContext::default();
let mut core = SMTP::test();
let mut ctx = ConfigContext::new(&[]);
ctx.lookup.insert(
"vrfy".to_string(),
Arc::new(Lookup::Local(AHashSet::from_iter([

View file

@ -79,7 +79,7 @@ async fn lookup_imap() {
let shutdown = spawn_mock_imap_server(5);
// Spawn lookup client
let mut ctx = ConfigContext::default();
let mut ctx = ConfigContext::new(&[]);
let config = Config::parse(REMOTE).unwrap();
config.parse_remote_hosts(&mut ctx).unwrap();
let lookup = ctx.hosts.remove("imap").unwrap().spawn(&config);

View file

@ -71,7 +71,7 @@ async fn lookup_smtp() {
let shutdown = spawn_mock_lmtp_server(5);
// Spawn lookup client
let mut ctx = ConfigContext::default();
let mut ctx = ConfigContext::new(&[]);
let config = Config::parse(REMOTE).unwrap();
config.parse_remote_hosts(&mut ctx).unwrap();
let lookup = ctx.hosts.remove("lmtp").unwrap().spawn(&config);

View file

@ -33,7 +33,7 @@ use crate::smtp::{
};
use smtp::{
config::{database::ConfigDatabase, ConfigContext, IfBlock},
core::{Core, Session},
core::{Session, SMTP},
lookup::SqlDatabase,
};
@ -69,9 +69,9 @@ async fn lookup_sql() {
.unwrap();*/
// Parse settings
let mut core = Core::test();
let mut core = SMTP::test();
let _temp_dir = make_temp_dir("sql_lookup_test", true);
let mut ctx = ConfigContext::default();
let mut ctx = ConfigContext::new(&[]);
let config =
Config::parse(&CONFIG.replace("%PATH%", _temp_dir.temp_dir.as_path().to_str().unwrap()))
.unwrap();

View file

@ -2,7 +2,7 @@ use std::time::{Duration, Instant};
use mail_auth::{IpLookupStrategy, MX};
use smtp::{config::IfBlock, core::Core, outbound::RemoteHost};
use smtp::{config::IfBlock, core::SMTP, outbound::RemoteHost};
use super::ToRemoteHost;
@ -20,7 +20,7 @@ async fn lookup_ip() {
"10.0.0.3".parse().unwrap(),
"10.0.0.4".parse().unwrap(),
];
let mut core = Core::test();
let mut core = SMTP::test();
core.queue.config.source_ip.ipv4 = IfBlock::new(ipv4.clone());
core.queue.config.source_ip.ipv6 = IfBlock::new(ipv6.clone());
core.resolvers.dns.ipv4_add(

View file

@ -34,11 +34,11 @@ use utils::config::ServerProtocol;
use crate::smtp::{
inbound::TestQueueEvent, management::send_manage_request, outbound::start_test_server,
session::TestSession, TestConfig, TestCore,
session::TestSession, TestConfig, TestSMTP,
};
use smtp::{
config::IfBlock,
core::{management::Message, Core, Session},
core::{management::Message, Session, SMTP},
lookup::Lookup,
queue::{
manager::{Queue, SpawnQueue},
@ -57,13 +57,13 @@ async fn manage_queue() {
.unwrap();*/
// Start remote test server
let mut core = Core::test();
let mut core = SMTP::test();
core.session.config.rcpt.relay = IfBlock::new(true);
let mut remote_qr = core.init_test_queue("smtp_manage_queue_remote");
let _rx_remote = start_test_server(core.into(), &[ServerProtocol::Smtp]);
// Add mock DNS entries
let mut core = Core::test();
let mut core = SMTP::test();
core.resolvers.dns.mx_add(
"foobar.org",
vec![MX {

View file

@ -41,7 +41,7 @@ use crate::smtp::{
};
use smtp::{
config::{AggregateFrequency, IfBlock},
core::{management::Report, Core},
core::{management::Report, SMTP},
lookup::Lookup,
reporting::{
scheduler::{Scheduler, SpawnReport},
@ -60,7 +60,7 @@ async fn manage_reports() {
.unwrap();*/
// Start reporting service
let mut core = Core::test();
let mut core = SMTP::test();
let temp_dir = make_temp_dir("smtp_report_management_test", true);
let config = &mut core.report.config;
config.path = IfBlock::new(temp_dir.temp_dir.clone());

View file

@ -45,8 +45,8 @@ use smtp::{
SessionThrottle, SpfAuthConfig, Throttle, VerifyStrategy,
},
core::{
throttle::ThrottleKeyHasherBuilder, Core, QueueCore, ReportCore, Resolvers, SessionCore,
SieveConfig, SieveCore, TlsConnectors,
throttle::ThrottleKeyHasherBuilder, QueueCore, ReportCore, Resolvers, SessionCore,
SieveConfig, SieveCore, TlsConnectors, SMTP,
},
lookup::Lookup,
outbound::dane::DnssecResolver,
@ -134,9 +134,9 @@ pub trait TestConfig {
fn test() -> Self;
}
impl TestConfig for Core {
impl TestConfig for SMTP {
fn test() -> Self {
Core {
SMTP {
worker_pool: rayon::ThreadPoolBuilder::new()
.num_threads(num_cpus::get())
.build()
@ -470,12 +470,12 @@ pub struct ReportReceiver {
pub report_rx: mpsc::Receiver<smtp::reporting::Event>,
}
pub trait TestCore {
pub trait TestSMTP {
fn init_test_queue(&mut self, test_name: &str) -> QueueReceiver;
fn init_test_report(&mut self) -> ReportReceiver;
}
impl TestCore for Core {
impl TestSMTP for SMTP {
fn init_test_queue(&mut self, test_name: &str) -> QueueReceiver {
let _temp_dir = make_temp_dir(test_name, true);
self.queue.config.path = IfBlock::new(_temp_dir.temp_dir.clone());

View file

@ -38,11 +38,11 @@ use crate::smtp::{
inbound::{TestMessage, TestQueueEvent, TestReportingEvent},
outbound::start_test_server,
session::{TestSession, VerifyResponse},
TestConfig, TestCore,
TestConfig, TestSMTP,
};
use smtp::{
config::{AggregateFrequency, IfBlock, RequireOptional},
core::{Core, Session},
core::{Session, SMTP},
outbound::dane::{Tlsa, TlsaEntry},
queue::{manager::Queue, DeliveryAttempt},
reporting::PolicyType,
@ -59,13 +59,13 @@ async fn dane_verify() {
.unwrap();*/
// Start test server
let mut core = Core::test();
let mut core = SMTP::test();
core.session.config.rcpt.relay = IfBlock::new(true);
let mut remote_qr = core.init_test_queue("smtp_dane_remote");
let _rx = start_test_server(core.into(), &[ServerProtocol::Smtp]);
// Add mock DNS entries
let mut core = Core::test();
let mut core = SMTP::test();
core.resolvers.dns.mx_add(
"foobar.org",
vec![MX {

View file

@ -34,11 +34,11 @@ use crate::smtp::{
inbound::{TestMessage, TestQueueEvent},
outbound::start_test_server,
session::{TestSession, VerifyResponse},
TestConfig, TestCore,
TestConfig, TestSMTP,
};
use smtp::{
config::IfBlock,
core::{Core, Session},
core::{Session, SMTP},
queue::{manager::Queue, DeliveryAttempt},
};
@ -53,7 +53,7 @@ async fn extensions() {
.unwrap();*/
// Start test server
let mut core = Core::test();
let mut core = SMTP::test();
core.session.config.rcpt.relay = IfBlock::new(true);
core.session.config.data.max_message_size = IfBlock::new(1500);
core.session.config.extensions.dsn = IfBlock::new(true);
@ -62,7 +62,7 @@ async fn extensions() {
let _rx = start_test_server(core.into(), &[ServerProtocol::Smtp]);
// Add mock DNS entries
let mut core = Core::test();
let mut core = SMTP::test();
core.resolvers.dns.mx_add(
"foobar.org",
vec![MX {

View file

@ -30,11 +30,11 @@ use crate::smtp::{
inbound::{TestMessage, TestQueueEvent},
outbound::start_test_server,
session::{TestSession, VerifyResponse},
ParseTestConfig, TestConfig, TestCore,
ParseTestConfig, TestConfig, TestSMTP,
};
use smtp::{
config::{remote::ConfigHost, ConfigContext, IfBlock},
core::{Core, Session},
core::{Session, SMTP},
queue::{manager::Queue, DeliveryAttempt, Event, WorkerResult},
};
use utils::config::{Config, ServerProtocol};
@ -62,14 +62,14 @@ async fn lmtp_delivery() {
.unwrap();*/
// Start test server
let mut core = Core::test();
let mut core = SMTP::test();
core.session.config.rcpt.relay = IfBlock::new(true);
core.session.config.extensions.dsn = IfBlock::new(true);
let mut remote_qr = core.init_test_queue("lmtp_delivery_remote");
let _rx = start_test_server(core.into(), &[ServerProtocol::Lmtp]);
// Add mock DNS entries
let mut core = Core::test();
let mut core = SMTP::test();
core.resolvers.dns.ipv4_add(
"lmtp.foobar.org",
vec!["127.0.0.1".parse().unwrap()],
@ -79,7 +79,7 @@ async fn lmtp_delivery() {
// Multiple delivery attempts
let mut local_qr = core.init_test_queue("lmtp_delivery_local");
let mut ctx = ConfigContext::default();
let mut ctx = ConfigContext::new(&[]);
let config = Config::parse(REMOTE).unwrap();
config.parse_remote_hosts(&mut ctx).unwrap();
core.queue.config.next_hop = "[{if = 'rcpt-domain', eq = 'foobar.org', then = 'lmtp'},

View file

@ -25,7 +25,7 @@ use std::sync::Arc;
use tokio::sync::watch;
use ::smtp::core::{Core, HttpAdminSessionManager, SmtpSessionManager};
use ::smtp::core::{SmtpAdminSessionManager, SmtpSessionManager, SMTP};
use utils::config::{Config, ServerProtocol};
use super::add_test_certs;
@ -68,23 +68,27 @@ cert = 'file://{CERT}'
private-key = 'file://{PK}'
";
pub fn start_test_server(core: Arc<Core>, protocols: &[ServerProtocol]) -> watch::Sender<bool> {
pub fn start_test_server(core: Arc<SMTP>, protocols: &[ServerProtocol]) -> watch::Sender<bool> {
// Spawn listeners
let config = Config::parse(&add_test_certs(SERVER)).unwrap();
let servers = config.parse_servers().unwrap();
let mut servers = config.parse_servers().unwrap();
// Filter out protocols
servers
.inner
.retain(|server| protocols.contains(&server.protocol));
// Start servers
servers.bind(&config);
let smtp_manager = SmtpSessionManager::new(core.clone());
let smtp_admin_manager = HttpAdminSessionManager::new(core);
servers.spawn(&config, |server, shutdown_rx| {
if protocols.contains(&server.protocol) {
match &server.protocol {
ServerProtocol::Smtp | ServerProtocol::Lmtp => {
server.spawn(smtp_manager.clone(), shutdown_rx)
}
ServerProtocol::Http => server.spawn(smtp_admin_manager.clone(), shutdown_rx),
ServerProtocol::Imap | ServerProtocol::Jmap => unreachable!(),
};
}
let smtp_admin_manager = SmtpAdminSessionManager::new(core);
servers.spawn(|server, shutdown_rx| {
match &server.protocol {
ServerProtocol::Smtp | ServerProtocol::Lmtp => {
server.spawn(smtp_manager.clone(), shutdown_rx)
}
ServerProtocol::Http => server.spawn(smtp_admin_manager.clone(), shutdown_rx),
ServerProtocol::Imap | ServerProtocol::Jmap => unreachable!(),
};
})
}

View file

@ -38,11 +38,11 @@ use crate::smtp::{
inbound::{TestMessage, TestQueueEvent, TestReportingEvent},
outbound::start_test_server,
session::{TestSession, VerifyResponse},
TestConfig, TestCore,
TestConfig, TestSMTP,
};
use smtp::{
config::{AggregateFrequency, IfBlock, RequireOptional},
core::{Core, Session},
core::{Session, SMTP},
outbound::mta_sts::{lookup::STS_TEST_POLICY, Policy},
queue::{manager::Queue, DeliveryAttempt},
reporting::PolicyType,
@ -59,13 +59,13 @@ async fn mta_sts_verify() {
.unwrap();*/
// Start test server
let mut core = Core::test();
let mut core = SMTP::test();
core.session.config.rcpt.relay = IfBlock::new(true);
let mut remote_qr = core.init_test_queue("smtp_mta_sts_remote");
let _rx = start_test_server(core.into(), &[ServerProtocol::Smtp]);
// Add mock DNS entries
let mut core = Core::test();
let mut core = SMTP::test();
core.resolvers.dns.mx_add(
"foobar.org",
vec![MX {

View file

@ -33,11 +33,11 @@ use crate::smtp::{
inbound::{TestMessage, TestQueueEvent},
outbound::start_test_server,
session::{TestSession, VerifyResponse},
ParseTestConfig, TestConfig, TestCore,
ParseTestConfig, TestConfig, TestSMTP,
};
use smtp::{
config::{ConfigContext, IfBlock},
core::{Core, Session},
core::{Session, SMTP},
queue::{manager::Queue, DeliveryAttempt, Event, WorkerResult},
};
@ -52,14 +52,14 @@ async fn smtp_delivery() {
.unwrap();*/
// Start test server
let mut core = Core::test();
let mut core = SMTP::test();
core.session.config.rcpt.relay = IfBlock::new(true);
core.session.config.extensions.dsn = IfBlock::new(true);
let mut remote_qr = core.init_test_queue("smtp_delivery_remote");
let _rx = start_test_server(core.into(), &[ServerProtocol::Smtp]);
// Add mock DNS entries
let mut core = Core::test();
let mut core = SMTP::test();
core.resolvers.dns.mx_add(
"foobar.org",
vec![
@ -112,10 +112,10 @@ async fn smtp_delivery() {
config.retry = IfBlock::new(vec![Duration::from_millis(100)]);
config.notify = "[{if = 'rcpt-domain', eq = 'foobar.org', then = ['100ms', '200ms']},
{else = ['100ms']}]"
.parse_if(&ConfigContext::default());
.parse_if(&ConfigContext::new(&[]));
config.expire = "[{if = 'rcpt-domain', eq = 'foobar.org', then = '650ms'},
{else = '750ms'}]"
.parse_if(&ConfigContext::default());
.parse_if(&ConfigContext::new(&[]));
let core = Arc::new(core);
let mut queue = Queue::default();

View file

@ -31,11 +31,11 @@ use mail_auth::MX;
use crate::smtp::{
inbound::TestQueueEvent, queue::manager::new_message, session::TestSession, ParseTestConfig,
TestConfig, TestCore,
TestConfig, TestSMTP,
};
use smtp::{
config::{ConfigContext, IfBlock},
core::{Core, Session},
core::{Session, SMTP},
queue::{manager::Queue, DeliveryAttempt, Message, QueueEnvelope},
};
@ -83,10 +83,10 @@ async fn throttle_outbound() {
// Build test message
let mut test_message = new_message(0);
test_message.return_path_domain = "foobar.org".to_string();
let mut core = Core::test();
let mut core = SMTP::test();
let mut local_qr = core.init_test_queue("smtp_throttle_outbound");
core.session.config.rcpt.relay = IfBlock::new(true);
core.queue.config.throttle = THROTTLE.parse_queue_throttle(&ConfigContext::default());
core.queue.config.throttle = THROTTLE.parse_queue_throttle(&ConfigContext::new(&[]));
core.queue.config.retry = IfBlock::new(vec![Duration::from_secs(86400)]);
core.queue.config.notify = IfBlock::new(vec![Duration::from_secs(86400)]);
core.queue.config.expire = IfBlock::new(Duration::from_secs(86400));

View file

@ -32,11 +32,11 @@ use tokio::{fs::File, io::AsyncReadExt};
use crate::smtp::{
inbound::{sign::TextConfigContext, TestQueueEvent},
ParseTestConfig, TestConfig, TestCore,
ParseTestConfig, TestConfig, TestSMTP,
};
use smtp::{
config::ConfigContext,
core::Core,
core::SMTP,
queue::{
DeliveryAttempt, Domain, Error, ErrorDetails, HostResponse, Message, Recipient, Schedule,
Status,
@ -105,8 +105,8 @@ async fn generate_dsn() {
};
// Load config
let mut core = Core::test();
let ctx = ConfigContext::default().parse_signatures();
let mut core = SMTP::test();
let ctx = ConfigContext::new(&[]).parse_signatures();
let mut config = &mut core.queue.config.dsn;
config.sign = "['rsa']"
.parse_if::<Vec<String>>(&ctx)

View file

@ -29,11 +29,11 @@ use std::{
use crate::smtp::{
inbound::{TestMessage, TestQueueEvent},
session::{TestSession, VerifyResponse},
ParseTestConfig, TestConfig, TestCore,
ParseTestConfig, TestConfig, TestSMTP,
};
use smtp::{
config::{ConfigContext, IfBlock},
core::{Core, Session},
core::{Session, SMTP},
queue::{manager::Queue, DeliveryAttempt, Event, WorkerResult},
};
@ -46,7 +46,7 @@ async fn queue_retry() {
)
.unwrap();*/
let mut core = Core::test();
let mut core = SMTP::test();
// Create temp dir for queue
let mut qr = core.init_test_queue("smtp_queue_retry_test");
@ -64,10 +64,10 @@ async fn queue_retry() {
]);
config.notify = "[{if = 'sender-domain', eq = 'test.org', then = ['150ms', '200ms']},
{else = ['15h', '22h']}]"
.parse_if(&ConfigContext::default());
.parse_if(&ConfigContext::new(&[]));
config.expire = "[{if = 'sender-domain', eq = 'test.org', then = '600ms'},
{else = '1d'}]"
.parse_if(&ConfigContext::default());
.parse_if(&ConfigContext::new(&[]));
// Create test message
let core = Arc::new(core);

View file

@ -29,18 +29,18 @@ use std::{
use smtp_proto::{Response, MAIL_REQUIRETLS, MAIL_SMTPUTF8, RCPT_CONNEG, RCPT_NOTIFY_FAILURE};
use smtp::{
core::Core,
core::SMTP,
queue::{
Domain, Error, ErrorDetails, HostResponse, Message, Recipient, Schedule, Status,
RCPT_STATUS_CHANGED,
},
};
use crate::smtp::{inbound::TestQueueEvent, TestConfig, TestCore};
use crate::smtp::{inbound::TestQueueEvent, TestConfig, TestSMTP};
#[tokio::test]
async fn queue_serialize() {
let mut core = Core::test();
let mut core = SMTP::test();
// Create temp dir for queue
let mut qr = core.init_test_queue("smtp_queue_serialize_test");

View file

@ -24,16 +24,16 @@
use std::{fs, sync::Arc, time::Duration};
use crate::smtp::{
inbound::TestQueueEvent, make_temp_dir, session::TestSession, TestConfig, TestCore,
inbound::TestQueueEvent, make_temp_dir, session::TestSession, TestConfig, TestSMTP,
};
use smtp::{
config::{AddressMatch, IfBlock},
core::{Core, Session},
core::{Session, SMTP},
};
#[tokio::test]
async fn report_analyze() {
let mut core = Core::test();
let mut core = SMTP::test();
// Create temp dir for queue
let mut qr = core.init_test_queue("smtp_analyze_report_test");

View file

@ -37,11 +37,11 @@ use crate::smtp::{
inbound::{sign::TextConfigContext, TestMessage, TestQueueEvent},
make_temp_dir,
session::VerifyResponse,
ParseTestConfig, TestConfig, TestCore,
ParseTestConfig, TestConfig, TestSMTP,
};
use smtp::{
config::{AggregateFrequency, ConfigContext, IfBlock},
core::Core,
core::SMTP,
reporting::{
dmarc::GenerateDmarcReport,
scheduler::{ReportType, Scheduler},
@ -59,8 +59,8 @@ async fn report_dmarc() {
.unwrap();*/
// Create scheduler
let mut core = Core::test();
let ctx = ConfigContext::default().parse_signatures();
let mut core = SMTP::test();
let ctx = ConfigContext::new(&[]).parse_signatures();
let temp_dir = make_temp_dir("smtp_report_dmarc_test", true);
let config = &mut core.report.config;
config.path = IfBlock::new(temp_dir.temp_dir.clone());

View file

@ -34,7 +34,7 @@ use tokio::fs;
use crate::smtp::{make_temp_dir, TestConfig};
use smtp::{
config::{AggregateFrequency, IfBlock},
core::Core,
core::SMTP,
reporting::{
dmarc::DmarcFormat,
scheduler::{ReportType, Scheduler},
@ -52,7 +52,7 @@ async fn report_scheduler() {
.unwrap();*/
// Create scheduler
let mut core = Core::test();
let mut core = SMTP::test();
let temp_dir = make_temp_dir("smtp_report_scheduler_test", true);
let config = &mut core.report.config;
config.path = IfBlock::new(temp_dir.temp_dir.clone());

View file

@ -34,11 +34,11 @@ use crate::smtp::{
inbound::{sign::TextConfigContext, TestMessage, TestQueueEvent},
make_temp_dir,
session::VerifyResponse,
ParseTestConfig, TestConfig, TestCore,
ParseTestConfig, TestConfig, TestSMTP,
};
use smtp::{
config::{AggregateFrequency, ConfigContext, IfBlock},
core::Core,
core::SMTP,
reporting::{
scheduler::{ReportType, Scheduler},
tls::{GenerateTlsReport, TLS_HTTP_REPORT},
@ -56,8 +56,8 @@ async fn report_tls() {
.unwrap();*/
// Create scheduler
let mut core = Core::test();
let ctx = ConfigContext::default().parse_signatures();
let mut core = SMTP::test();
let ctx = ConfigContext::new(&[]).parse_signatures();
let temp_dir = make_temp_dir("smtp_report_tls_test", true);
let config = &mut core.report.config;
config.path = IfBlock::new(temp_dir.temp_dir.clone());

View file

@ -29,7 +29,7 @@ use tokio::{
};
use smtp::{
core::{Core, Session, SessionAddress, SessionData, SessionParameters, State},
core::{Session, SessionAddress, SessionData, SessionParameters, State, SMTP},
inbound::IsTls,
};
use utils::{
@ -98,7 +98,8 @@ impl Unpin for DummyIo {}
#[async_trait::async_trait]
pub trait TestSession {
fn test(core: impl Into<Arc<Core>>) -> Self;
fn test(core: impl Into<Arc<SMTP>>) -> Self;
fn test_with_shutdown(core: impl Into<Arc<SMTP>>, shutdown_rx: watch::Receiver<bool>) -> Self;
fn response(&mut self) -> Vec<String>;
fn write_rx(&mut self, data: &str);
async fn rset(&mut self);
@ -113,10 +114,10 @@ pub trait TestSession {
#[async_trait::async_trait]
impl TestSession for Session<DummyIo> {
fn test(core: impl Into<Arc<Core>>) -> Self {
fn test_with_shutdown(core: impl Into<Arc<SMTP>>, shutdown_rx: watch::Receiver<bool>) -> Self {
Self {
state: State::default(),
instance: Arc::new(ServerInstance::test()),
instance: Arc::new(ServerInstance::test_with_shutdown(shutdown_rx)),
core: core.into(),
span: tracing::info_span!("test"),
stream: DummyIo {
@ -130,6 +131,10 @@ impl TestSession for Session<DummyIo> {
}
}
fn test(core: impl Into<Arc<SMTP>>) -> Self {
Self::test_with_shutdown(core, watch::channel(false).1)
}
fn response(&mut self) -> Vec<String> {
if !self.stream.tx_buf.is_empty() {
let response = std::str::from_utf8(&self.stream.tx_buf)
@ -330,8 +335,12 @@ impl VerifyResponse for Vec<String> {
}
}
impl TestConfig for ServerInstance {
fn test() -> Self {
pub trait TestServerInstance {
fn test_with_shutdown(shutdown_rx: watch::Receiver<bool>) -> Self;
}
impl TestServerInstance for ServerInstance {
fn test_with_shutdown(shutdown_rx: watch::Receiver<bool>) -> Self {
Self {
id: "smtp".to_string(),
listener_id: 1,
@ -341,7 +350,13 @@ impl TestConfig for ServerInstance {
tls_acceptor: None,
is_tls_implicit: false,
limiter: ConcurrencyLimiter::new(100),
shutdown_rx: watch::channel(false).1,
shutdown_rx,
}
}
}
impl TestConfig for ServerInstance {
fn test() -> Self {
Self::test_with_shutdown(watch::channel(false).1)
}
}