From beffa408e6a2d2309ba717daaf98653ad60d8d59 Mon Sep 17 00:00:00 2001 From: Mauro D Date: Wed, 31 May 2023 18:03:04 +0000 Subject: [PATCH] Directory implementation - part 2 --- crates/directory/Cargo.toml | 4 +- crates/directory/src/config.rs | 63 ++++++ crates/directory/src/imap/client.rs | 30 +++ crates/directory/src/imap/config.rs | 36 ++++ crates/directory/src/imap/lookup.rs | 5 +- crates/directory/src/imap/mod.rs | 2 + crates/directory/src/imap/pool.rs | 2 +- crates/directory/src/imap/tls.rs | 2 + crates/directory/src/ldap/config.rs | 127 +++++++++++ crates/directory/src/ldap/mod.rs | 7 +- crates/directory/src/lib.rs | 74 +++++-- crates/directory/src/smtp/config.rs | 46 ++++ crates/directory/src/smtp/mod.rs | 1 + crates/directory/src/sql/config.rs | 94 +++++++++ crates/directory/src/sql/mod.rs | 1 + tests/src/directory/imap.rs | 184 ++++++++++++++++ tests/src/directory/ldap.rs | 0 tests/src/directory/mod.rs | 312 ++++++++++++++++++++++++++++ tests/src/directory/smtp.rs | 258 +++++++++++++++++++++++ tests/src/directory/sql.rs | 110 ++++++++++ 20 files changed, 1338 insertions(+), 20 deletions(-) create mode 100644 crates/directory/src/config.rs create mode 100644 crates/directory/src/imap/config.rs create mode 100644 crates/directory/src/smtp/config.rs create mode 100644 crates/directory/src/sql/config.rs create mode 100644 tests/src/directory/imap.rs create mode 100644 tests/src/directory/ldap.rs create mode 100644 tests/src/directory/mod.rs create mode 100644 tests/src/directory/smtp.rs create mode 100644 tests/src/directory/sql.rs diff --git a/crates/directory/Cargo.toml b/crates/directory/Cargo.toml index 97584eab..77847798 100644 --- a/crates/directory/Cargo.toml +++ b/crates/directory/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" resolver = "2" [dependencies] +utils = { path = "../utils" } smtp-proto = { git = "https://github.com/stalwartlabs/smtp-proto" } mail-send = { git = "https://github.com/stalwartlabs/mail-send", default-features = false, features = ["cram-md5", "skip-ehlo"] } tokio = { version = "1.23", features = ["net"] } @@ -12,10 +13,11 @@ tokio-rustls = { version = "0.24.0"} rustls = "0.21.0" sqlx = { version = "0.7.0-alpha.3", features = [ "runtime-tokio-rustls", "postgres", "mysql", "sqlite" ] } ldap3 = { version = "0.11.1", default-features = false, features = ["tls-rustls"] } -bb8 = "0.8.0" +bb8 = "0.8.1" async-trait = "0.1.68" ahash = { version = "0.8" } tracing = "0.1" +lru-cache = "0.1.2" [dev-dependencies] tokio = { version = "1.23", features = ["full"] } diff --git a/crates/directory/src/config.rs b/crates/directory/src/config.rs new file mode 100644 index 00000000..0a0bf3a8 --- /dev/null +++ b/crates/directory/src/config.rs @@ -0,0 +1,63 @@ +use bb8::{ManageConnection, Pool}; +use std::{sync::Arc, time::Duration}; +use utils::config::Config; + +use ahash::AHashMap; + +use crate::{ + imap::ImapDirectory, ldap::LdapDirectory, smtp::SmtpDirectory, sql::SqlDirectory, Directory, +}; + +pub trait ConfigDirectory { + fn parse_directory(&self) -> utils::config::Result>>; +} + +impl ConfigDirectory for Config { + fn parse_directory(&self) -> utils::config::Result>> { + let mut directories = AHashMap::new(); + for id in self.sub_keys("directory") { + directories.insert( + id.to_string(), + match self.value_require(("directory", id, "protocol"))? { + "ldap" => LdapDirectory::from_config(self, ("directory", id))?, + "sql" => SqlDirectory::from_config(self, ("directory", id))?, + "imap" => ImapDirectory::from_config(self, ("directory", id))?, + "smtp" => SmtpDirectory::from_config(self, ("directory", id), false)?, + "lmtp" => SmtpDirectory::from_config(self, ("directory", id), true)?, + unknown => { + return Err(format!("Unknown directory type: {unknown:?}")); + } + }, + ); + } + + Ok(directories) + } +} + +pub(crate) fn build_pool( + config: &Config, + prefix: &str, + manager: M, +) -> utils::config::Result> { + Ok(Pool::builder() + .min_idle( + config + .property((prefix, "pool.min-connections"))? + .and_then(|v| if v > 0 { Some(v) } else { None }), + ) + .max_size(config.property_or_static((prefix, "pool.max-connections"), "10")?) + .max_lifetime( + config + .property_or_static::((prefix, "pool.max-lifetime"), "30m")? + .into(), + ) + .idle_timeout( + config + .property_or_static::((prefix, "pool.idle-timeout"), "10m")? + .into(), + ) + .connection_timeout(config.property_or_static((prefix, "pool.connect-timeout"), "30s")?) + .test_on_check_out(true) + .build_unchecked(manager)) +} diff --git a/crates/directory/src/imap/client.rs b/crates/directory/src/imap/client.rs index 4566f7f4..576be086 100644 --- a/crates/directory/src/imap/client.rs +++ b/crates/directory/src/imap/client.rs @@ -166,3 +166,33 @@ impl ImapClient { self.stream.flush().await } } + +#[cfg(test)] +mod test { + use mail_send::smtp::tls::build_tls_connector; + use smtp_proto::{AUTH_OAUTHBEARER, AUTH_PLAIN, AUTH_XOAUTH, AUTH_XOAUTH2}; + use std::time::Duration; + + use crate::imap::ImapClient; + + #[ignore] + #[tokio::test] + async fn imap_auth() { + let connector = build_tls_connector(false); + + let mut client = ImapClient::connect( + "imap.gmail.com:993", + Duration::from_secs(5), + &connector, + "imap.gmail.com", + true, + ) + .await + .unwrap(); + assert_eq!( + AUTH_PLAIN | AUTH_XOAUTH | AUTH_XOAUTH2 | AUTH_OAUTHBEARER, + client.authentication_mechanisms().await.unwrap() + ); + client.logout().await.unwrap(); + } +} diff --git a/crates/directory/src/imap/config.rs b/crates/directory/src/imap/config.rs new file mode 100644 index 00000000..1ee78545 --- /dev/null +++ b/crates/directory/src/imap/config.rs @@ -0,0 +1,36 @@ +use std::sync::Arc; + +use mail_send::smtp::tls::build_tls_connector; +use utils::config::{utils::AsKey, Config}; + +use crate::{config::build_pool, imap::ImapConnectionManager, Directory}; + +use super::ImapDirectory; + +impl ImapDirectory { + pub fn from_config( + config: &Config, + prefix: impl AsKey, + ) -> utils::config::Result> { + let prefix = prefix.as_key(); + let address = config.value_require((&prefix, "address"))?; + let tls_implicit: bool = config.property_or_static((&prefix, "tls.implicit"), "false")?; + let port: u16 = config + .property_or_static((&prefix, "port"), if tls_implicit { "443" } else { "143" })?; + + let manager = ImapConnectionManager { + addr: format!("{address}:{port}"), + timeout: config.property_or_static((&prefix, "timeout"), "30s")?, + tls_connector: build_tls_connector( + config.property_or_static((&prefix, "tls.allow-invalid-certs"), "false")?, + ), + tls_hostname: address.to_string(), + tls_implicit, + mechanisms: 0.into(), + }; + + Ok(Arc::new(ImapDirectory { + pool: build_pool(config, &prefix, manager)?, + })) + } +} diff --git a/crates/directory/src/imap/lookup.rs b/crates/directory/src/imap/lookup.rs index 8a688b93..43df2d87 100644 --- a/crates/directory/src/imap/lookup.rs +++ b/crates/directory/src/imap/lookup.rs @@ -40,7 +40,10 @@ impl Directory for ImapDirectory { }; match client.authenticate(mechanism, credentials).await { - Ok(_) => Ok(Some(Principal::default())), + Ok(_) => { + client.is_valid = false; + Ok(Some(Principal::default())) + } Err(err) => match &err { ImapError::AuthenticationFailed => Ok(None), _ => Err(err.into()), diff --git a/crates/directory/src/imap/mod.rs b/crates/directory/src/imap/mod.rs index d5a58b48..c3e63853 100644 --- a/crates/directory/src/imap/mod.rs +++ b/crates/directory/src/imap/mod.rs @@ -1,4 +1,5 @@ pub mod client; +pub mod config; pub mod lookup; pub mod pool; pub mod tls; @@ -25,6 +26,7 @@ pub struct ImapConnectionManager { pub struct ImapClient { stream: T, mechanisms: u64, + is_valid: bool, timeout: Duration, } diff --git a/crates/directory/src/imap/pool.rs b/crates/directory/src/imap/pool.rs index 474d3295..1307245e 100644 --- a/crates/directory/src/imap/pool.rs +++ b/crates/directory/src/imap/pool.rs @@ -39,6 +39,6 @@ impl ManageConnection for ImapConnectionManager { /// Synchronously determine if the connection is no longer usable, if possible. fn has_broken(&self, conn: &mut Self::Connection) -> bool { - false + !conn.is_valid } } diff --git a/crates/directory/src/imap/tls.rs b/crates/directory/src/imap/tls.rs index a56fbf5e..93f3b5bf 100644 --- a/crates/directory/src/imap/tls.rs +++ b/crates/directory/src/imap/tls.rs @@ -44,6 +44,7 @@ impl ImapClient { .await?, timeout: self.timeout, mechanisms: self.mechanisms, + is_valid: true, }) }) .await @@ -65,6 +66,7 @@ impl ImapClient> { stream, timeout, mechanisms: 0, + is_valid: true, }), Err(err) => Err(ImapError::Io(err)), } diff --git a/crates/directory/src/ldap/config.rs b/crates/directory/src/ldap/config.rs index e69de29b..fbda4fba 100644 --- a/crates/directory/src/ldap/config.rs +++ b/crates/directory/src/ldap/config.rs @@ -0,0 +1,127 @@ +use std::sync::Arc; + +use ldap3::LdapConnSettings; +use utils::config::{utils::AsKey, Config}; + +use crate::{config::build_pool, Directory}; + +use super::{Bind, LdapConnectionManager, LdapDirectory, LdapFilter, LdapMappings}; + +impl LdapDirectory { + pub fn from_config( + config: &Config, + prefix: impl AsKey, + ) -> utils::config::Result> { + let prefix = prefix.as_key(); + let bind_dn = if let Some(dn) = config.value((&prefix, "bind.dn")) { + Bind::new( + dn.to_string(), + config.value_require((&prefix, "bind.secret"))?.to_string(), + ) + .into() + } else { + None + }; + + let manager = LdapConnectionManager::new( + config.value_require((&prefix, "address"))?.to_string(), + LdapConnSettings::new() + .set_conn_timeout(config.property_or_static((&prefix, "timeout"), "30s")?) + .set_starttls(config.property_or_static((&prefix, "tls"), "false")?) + .set_no_tls_verify( + config.property_or_static((&prefix, "allow-invalid-certs"), "false")?, + ), + bind_dn, + ); + + let mut mappings = LdapMappings { + base_dn: config.value_require((&prefix, "address"))?.to_string(), + filter_login: LdapFilter::from_config(config, (&prefix, "filter.login"))?, + filter_name: LdapFilter::from_config(config, (&prefix, "filter.name"))?, + filter_email: LdapFilter::from_config(config, (&prefix, "filter.email"))?, + filter_id: LdapFilter::from_config(config, (&prefix, "filter.id"))?, + filter_verify: LdapFilter::from_config(config, (&prefix, "filter.verify"))?, + filter_expand: LdapFilter::from_config(config, (&prefix, "filter.expand"))?, + obj_user: config + .value_require((&prefix, "object-classes.user"))? + .to_string(), + obj_group: config + .value_require((&prefix, "object-classes.group"))? + .to_string(), + attr_name: config + .values((&prefix, "attributes.name")) + .map(|(_, v)| v.to_string()) + .collect(), + attr_description: config + .values((&prefix, "attributes.description")) + .map(|(_, v)| v.to_string()) + .collect(), + attr_secret: config + .values((&prefix, "attributes.secret")) + .map(|(_, v)| v.to_string()) + .collect(), + attr_groups: config + .values((&prefix, "attributes.groups")) + .map(|(_, v)| v.to_string()) + .collect(), + attr_id: config + .values((&prefix, "attributes.id")) + .map(|(_, v)| v.to_string()) + .collect(), + attr_email_address: config + .values((&prefix, "attributes.email")) + .map(|(_, v)| v.to_string()) + .collect(), + attr_quota: config + .values((&prefix, "attributes.")) + .map(|(_, v)| v.to_string()) + .collect(), + attrs_principal: vec!["objectClass".to_string()], + attrs_email: config + .values((&prefix, "attributes.email-alias")) + .map(|(_, v)| v.to_string()) + .collect(), + }; + + for attr in [ + &mappings.attr_id, + &mappings.attr_name, + &mappings.attr_description, + &mappings.attr_secret, + &mappings.attr_quota, + &mappings.attr_groups, + ] { + mappings.attrs_principal.extend(attr.iter().cloned()); + } + + mappings + .attrs_email + .extend(mappings.attr_email_address.iter().cloned()); + + Ok(Arc::new(LdapDirectory { + mappings, + pool: build_pool(config, &prefix, manager)?, + })) + } +} + +impl LdapFilter { + fn from_config(config: &Config, key: impl AsKey) -> utils::config::Result { + if let Some(value) = config.value(key.clone()) { + let filter = LdapFilter { + filter: value.split('?').map(|s| s.to_string()).collect(), + }; + if filter.filter.len() >= 2 { + Ok(filter) + } else { + Err(format!( + "Missing '?' parameter placeholder in filter {:?} with value {:?}", + key.as_key(), + value + )) + } + } else { + Ok(Self::default()) + } + } +} diff --git a/crates/directory/src/ldap/mod.rs b/crates/directory/src/ldap/mod.rs index 012e0511..8f65c365 100644 --- a/crates/directory/src/ldap/mod.rs +++ b/crates/directory/src/ldap/mod.rs @@ -1,5 +1,5 @@ use bb8::Pool; -use ldap3::LdapConnSettings; +use ldap3::{ldap_escape, LdapConnSettings}; pub mod config; pub mod lookup; @@ -10,6 +10,7 @@ pub struct LdapDirectory { mappings: LdapMappings, } +#[derive(Debug, Default)] pub struct LdapMappings { base_dn: String, filter_login: LdapFilter, @@ -31,13 +32,15 @@ pub struct LdapMappings { attrs_email: Vec, } +#[derive(Debug, Default)] struct LdapFilter { filter: Vec, } impl LdapFilter { pub fn build(&self, value: &str) -> String { - self.filter.join(value) + let value = ldap_escape(value); + self.filter.join(value.as_ref()) } } diff --git a/crates/directory/src/lib.rs b/crates/directory/src/lib.rs index 615b51b7..bb6bbd2d 100644 --- a/crates/directory/src/lib.rs +++ b/crates/directory/src/lib.rs @@ -3,12 +3,13 @@ use imap::ImapError; use ldap3::LdapError; use mail_send::Credentials; +pub mod config; pub mod imap; pub mod ldap; pub mod smtp; pub mod sql; -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct Principal { pub id: u32, pub name: String, @@ -29,6 +30,7 @@ pub enum Type { Other, } +#[derive(Debug)] pub enum DirectoryError { Ldap(LdapError), Sql(sqlx::Error), @@ -39,7 +41,7 @@ pub enum DirectoryError { } #[async_trait::async_trait] -pub trait Directory { +pub trait Directory: Sync + Send { async fn authenticate(&self, credentials: &Credentials) -> Result>; async fn principal_by_name(&self, name: &str) -> Result>; async fn principal_by_id(&self, id: u32) -> Result>; @@ -54,17 +56,11 @@ pub trait Directory { pub type Result = std::result::Result; -impl From for DirectoryError { - fn from(error: LdapError) -> Self { - DirectoryError::Ldap(error) - } -} - impl From> for DirectoryError { fn from(error: RunError) -> Self { match error { - RunError::User(error) => DirectoryError::Ldap(error), - RunError::TimedOut => DirectoryError::TimedOut, + RunError::User(error) => error.into(), + RunError::TimedOut => DirectoryError::timeout("ldap"), } } } @@ -72,8 +68,8 @@ impl From> for DirectoryError { impl From> for DirectoryError { fn from(error: RunError) -> Self { match error { - RunError::User(error) => DirectoryError::Imap(error), - RunError::TimedOut => DirectoryError::TimedOut, + RunError::User(error) => error.into(), + RunError::TimedOut => DirectoryError::timeout("imap"), } } } @@ -81,26 +77,64 @@ impl From> for DirectoryError { impl From> for DirectoryError { fn from(error: RunError) -> Self { match error { - RunError::User(error) => DirectoryError::Smtp(error), - RunError::TimedOut => DirectoryError::TimedOut, + RunError::User(error) => error.into(), + RunError::TimedOut => DirectoryError::timeout("smtp"), } } } +impl From for DirectoryError { + fn from(error: LdapError) -> Self { + tracing::warn!( + context = "directory", + event = "error", + protocol = "ldap", + reason = %error, + "LDAP directory error" + ); + + DirectoryError::Ldap(error) + } +} + impl From for DirectoryError { fn from(error: sqlx::Error) -> Self { + tracing::warn!( + context = "directory", + event = "error", + protocol = "sql", + reason = %error, + "SQL directory error" + ); + DirectoryError::Sql(error) } } impl From for DirectoryError { fn from(error: ImapError) -> Self { + tracing::warn!( + context = "directory", + event = "error", + protocol = "imap", + reason = %error, + "IMAP directory error" + ); + DirectoryError::Imap(error) } } impl From for DirectoryError { fn from(error: mail_send::Error) -> Self { + tracing::warn!( + context = "directory", + event = "error", + protocol = "smtp", + reason = %error, + "SMTP directory error" + ); + DirectoryError::Smtp(error) } } @@ -108,7 +142,7 @@ impl From for DirectoryError { impl DirectoryError { pub fn unsupported(protocol: &str, method: &str) -> Self { tracing::warn!( - context = "remote", + context = "directory", event = "error", protocol = protocol, method = method, @@ -116,6 +150,16 @@ impl DirectoryError { ); DirectoryError::Unsupported } + + pub fn timeout(protocol: &str) -> Self { + tracing::warn!( + context = "directory", + event = "error", + protocol = protocol, + "Directory timed out" + ); + DirectoryError::TimedOut + } } #[cfg(test)] diff --git a/crates/directory/src/smtp/config.rs b/crates/directory/src/smtp/config.rs new file mode 100644 index 00000000..b5a95e01 --- /dev/null +++ b/crates/directory/src/smtp/config.rs @@ -0,0 +1,46 @@ +use std::sync::Arc; + +use mail_send::{smtp::tls::build_tls_connector, SmtpClientBuilder}; +use utils::config::{utils::AsKey, Config}; + +use crate::{config::build_pool, smtp::SmtpConnectionManager, Directory}; + +use super::SmtpDirectory; + +impl SmtpDirectory { + pub fn from_config( + config: &Config, + prefix: impl AsKey, + is_lmtp: bool, + ) -> utils::config::Result> { + let prefix = prefix.as_key(); + let address = config.value_require((&prefix, "address"))?; + let tls_implicit: bool = config.property_or_static((&prefix, "tls.implicit"), "false")?; + let port: u16 = config + .property_or_static((&prefix, "port"), if tls_implicit { "465" } else { "25" })?; + + let manager = SmtpConnectionManager { + builder: SmtpClientBuilder { + addr: format!("{address}:{port}"), + timeout: config.property_or_static((&prefix, "timeout"), "30s")?, + tls_connector: build_tls_connector( + config.property_or_static((&prefix, "tls.allow-invalid-certs"), "false")?, + ), + tls_hostname: address.to_string(), + tls_implicit, + is_lmtp, + credentials: None, + local_host: config + .value("server.hostname") + .unwrap_or("[127.0.0.1]") + .to_string(), + }, + max_rcpt: config.property_or_static((&prefix, "limits.rcpt"), "10")?, + max_auth_errors: config.property_or_static((&prefix, "limits.auth-errors"), "3")?, + }; + + Ok(Arc::new(SmtpDirectory { + pool: build_pool(config, &prefix, manager)?, + })) + } +} diff --git a/crates/directory/src/smtp/mod.rs b/crates/directory/src/smtp/mod.rs index e40fcb75..e1976217 100644 --- a/crates/directory/src/smtp/mod.rs +++ b/crates/directory/src/smtp/mod.rs @@ -1,3 +1,4 @@ +pub mod config; pub mod lookup; pub mod pool; diff --git a/crates/directory/src/sql/config.rs b/crates/directory/src/sql/config.rs new file mode 100644 index 00000000..919bad28 --- /dev/null +++ b/crates/directory/src/sql/config.rs @@ -0,0 +1,94 @@ +use std::sync::Arc; + +use sqlx::any::AnyPoolOptions; +use utils::config::{utils::AsKey, Config}; + +use crate::Directory; + +use super::{SqlDirectory, SqlMappings}; + +impl SqlDirectory { + pub fn from_config( + config: &Config, + prefix: impl AsKey, + ) -> utils::config::Result> { + let prefix = prefix.as_key(); + let address = config.value_require((&prefix, "address"))?; + + let pool = AnyPoolOptions::new() + .max_connections( + config + .property((&prefix, "pool.max-connections"))? + .unwrap_or(10), + ) + .min_connections( + config + .property((&prefix, "pool.min-connections"))? + .unwrap_or(0), + ) + .idle_timeout(config.property((&prefix, "pool.idle-timeout"))?) + .connect_lazy(address) + .map_err(|err| format!("Failed to create connection pool for {address:?}: {err}"))?; + + let mappings = SqlMappings { + query_login: config + .value((&prefix, "query.login")) + .unwrap_or_default() + .to_string(), + query_name: config + .value((&prefix, "query.name")) + .unwrap_or_default() + .to_string(), + query_id: config + .value((&prefix, "query.id")) + .unwrap_or_default() + .to_string(), + query_members: config + .value((&prefix, "query.members")) + .unwrap_or_default() + .to_string(), + query_recipients: config + .value((&prefix, "query.recipients")) + .unwrap_or_default() + .to_string(), + query_emails: config + .value((&prefix, "query.emails")) + .unwrap_or_default() + .to_string(), + query_verify: config + .value((&prefix, "query.verify")) + .unwrap_or_default() + .to_string(), + query_expand: config + .value((&prefix, "query.expand")) + .unwrap_or_default() + .to_string(), + column_name: config + .value((&prefix, "column.name")) + .unwrap_or_default() + .to_string(), + column_description: config + .value((&prefix, "column.description")) + .unwrap_or_default() + .to_string(), + column_secret: config + .value((&prefix, "column.secret")) + .unwrap_or_default() + .to_string(), + column_id: config + .value((&prefix, "column.id")) + .unwrap_or_default() + .to_string(), + column_quota: config + .value((&prefix, "column.quota")) + .unwrap_or_default() + .to_string(), + column_type: config + .value((&prefix, "column.type")) + .unwrap_or_default() + .to_string(), + }; + + Ok(Arc::new(SqlDirectory { pool, mappings })) + } +} diff --git a/crates/directory/src/sql/mod.rs b/crates/directory/src/sql/mod.rs index 2530a0c7..6cbbf424 100644 --- a/crates/directory/src/sql/mod.rs +++ b/crates/directory/src/sql/mod.rs @@ -1,5 +1,6 @@ use sqlx::{Any, Pool}; +pub mod config; pub mod lookup; pub struct SqlDirectory { diff --git a/tests/src/directory/imap.rs b/tests/src/directory/imap.rs new file mode 100644 index 00000000..10e67395 --- /dev/null +++ b/tests/src/directory/imap.rs @@ -0,0 +1,184 @@ +use std::sync::Arc; + +use mail_parser::decoders::base64::base64_decode; +use mail_send::Credentials; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::{TcpListener, TcpStream}, + sync::watch, +}; +use tokio_rustls::TlsAcceptor; + +use utils::listener::limiter::{ConcurrencyLimiter, InFlight}; + +use crate::directory::{parse_config, Item, LookupResult}; + +use super::dummy_tls_acceptor; + +#[tokio::test] +async fn imap_directory() { + // Enable logging + tracing::subscriber::set_global_default( + tracing_subscriber::FmtSubscriber::builder() + .with_max_level(tracing::Level::DEBUG) + .finish(), + ) + .unwrap(); + + // Obtain directory handle + let handle = parse_config().remove("imap").unwrap(); + + // Spawn mock LMTP server + let shutdown = spawn_mock_imap_server(5); + + // Basic lookup + let tests = vec![ + ( + Item::Authenticate(Credentials::Plain { + username: "john".to_string(), + secret: "ok".to_string(), + }), + LookupResult::True, + ), + ( + Item::Authenticate(Credentials::Plain { + username: "john".to_string(), + secret: "bad".to_string(), + }), + LookupResult::False, + ), + ]; + + for (item, expected) in &tests { + assert_eq!( + &LookupResult::from( + handle + .authenticate(item.as_credentials()) + .await + .unwrap() + .is_some() + ), + expected + ); + } + + // Concurrent requests + let mut requests = Vec::new(); + for n in 0..10 { + let (item, expected) = &tests[n % tests.len()]; + let item = item.append(n); + let item_clone = item.clone(); + let handle = handle.clone(); + requests.push(( + tokio::spawn(async move { + LookupResult::from( + handle + .authenticate(item.as_credentials()) + .await + .unwrap() + .is_some(), + ) + }), + item_clone, + expected.append(n), + )); + let fix = "true"; + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + for (result, item, expected_result) in requests { + assert_eq!( + result.await.unwrap(), + expected_result, + "Failed for {item:?}" + ); + } + + // Shutdown + shutdown.send(false).ok(); +} + +pub fn spawn_mock_imap_server(max_concurrency: u64) -> watch::Sender { + let (tx, mut rx) = watch::channel(true); + + tokio::spawn(async move { + let listener = TcpListener::bind("127.0.0.1:9198") + .await + .unwrap_or_else(|e| { + panic!("Failed to bind mock IMAP server to 127.0.0.1:9198: {e}"); + }); + let acceptor = dummy_tls_acceptor(); + let limited = ConcurrencyLimiter::new(max_concurrency); + loop { + tokio::select! { + stream = listener.accept() => { + match stream { + Ok((stream, _)) => { + //println!("--- Accepted connection --- "); + let acceptor = acceptor.clone(); + let in_flight = limited.is_allowed(); + tokio::spawn(accept_imap(stream, acceptor, in_flight)); + } + Err(err) => { + panic!("Something went wrong: {err}" ); + } + } + }, + _ = rx.changed() => { + break; + } + }; + } + }); + + tx +} + +async fn accept_imap(stream: TcpStream, acceptor: Arc, in_flight: Option) { + let mut stream = acceptor.accept(stream).await.unwrap(); + stream + .write_all(b"* OK Clueless host service ready\r\n") + .await + .unwrap(); + + if in_flight.is_none() { + eprintln!("WARNING: Concurrency exceeded!"); + } + + let mut buf_u8 = vec![0u8; 1024]; + + loop { + let br = if let Ok(br) = stream.read(&mut buf_u8).await { + br + } else { + break; + }; + let buf = std::str::from_utf8(&buf_u8[0..br]).unwrap(); + let (op, buf) = buf.split_once(' ').unwrap(); + + //print!("-> {}", buf); + let response = if buf.starts_with("CAPABILITY") { + format!( + "* CAPABILITY IMAP4rev2 IMAP4rev1 AUTH=PLAIN\r\n{op} OK CAPABILITY completed\r\n", + ) + } else if buf.starts_with("NOOP") { + format!("{op} OK NOOP completed\r\n") + } else if buf.starts_with("AUTHENTICATE PLAIN") { + let buf = base64_decode(buf.rsplit_once(' ').unwrap().1.as_bytes()).unwrap(); + if String::from_utf8_lossy(&buf).contains("ok") { + format!("{op} OK Great success!\r\n") + } else { + format!("{op} BAD No soup for you!\r\n") + } + } else if buf.starts_with("LOGOUT") { + format!("* BYE\r\n{op} OK LOGOUT completed\r\n") + } else { + panic!("Unknown command: {}", buf.trim()); + }; + //print!("<- {}", response); + stream.write_all(response.as_bytes()).await.unwrap(); + + if buf.contains("bye") || buf.starts_with("LOGOUT") { + return; + } + } +} diff --git a/tests/src/directory/ldap.rs b/tests/src/directory/ldap.rs new file mode 100644 index 00000000..e69de29b diff --git a/tests/src/directory/mod.rs b/tests/src/directory/mod.rs new file mode 100644 index 00000000..8accc88e --- /dev/null +++ b/tests/src/directory/mod.rs @@ -0,0 +1,312 @@ +pub mod imap; +pub mod ldap; +pub mod smtp; +pub mod sql; + +use ahash::AHashMap; +use directory::{config::ConfigDirectory, Directory}; +use mail_send::Credentials; +use rustls::{Certificate, PrivateKey, ServerConfig}; +use rustls_pemfile::{certs, pkcs8_private_keys}; +use std::{io::BufReader, sync::Arc}; +use tokio_rustls::TlsAcceptor; + +const CONFIG: &str = r#" +[directory."sql"] +protocol = "sql" +address = "sqlite::memory:" + +[directory."sql".query] +login = "SELECT id, secret, description, quota FROM accounts WHERE name = ? AND active = true AND type = 'individual'" +name = "SELECT id, type, description, quota FROM accounts WHERE name = ?" +id = "SELECT name, type, description, quota FROM accounts WHERE id = ?" +members = "SELECT gid FROM group_members WHERE uid = ?" +recipients = "SELECT id FROM emails WHERE address = ?" +emails = "SELECT address FROM emails WHERE id = ? AND type != 'list' ORDER BY type DESC" +verify = "SELECT address FROM emails WHERE address LIKE '%' || ? || '%' AND type != 'list' LIMIT 5" +expand = "SELECT p.address FROM emails AS p JOIN emails AS l ON p.id = l.id WHERE p.type = 'primary' AND l.address = ? AND l.type = 'list' LIMIT 50" + +[directory."sql".columns] +name = "name" +description = "description" +secret = "secret" +id = "id" +email = "address" +quota = "quota" +type = "type" + +[directory."ldap"] +protocol = "ldap" +address = "ldap://localhost:3893" +base-dn = "dc=example,dc=com" + +[directory."ldap".bind] +dn = "cn=serviceuser,ou=svcaccts,dc=example,dc=com" +secret = "mysecret" + +[directory."ldap".filter] +login = "(&(objectClass=posixAccount)(accountStatus=active)(cn=?))" +name = "(&(!(objectClass=posixAccount)(objectClass=posixGroup))(cn=?))" +email = "(&(!(objectClass=posixAccount)(objectClass=posixGroup))(!(mail=?)(mailAliases=?)(mailLists=?)))" +id = "(|(&(objectClass=posixAccount)(uidNumber=?))(&(objectClass=posixGroup)(gidNumber=?)))" +verify = "(&(!(objectClass=posixAccount)(objectClass=posixGroup))(!(mail=*?*)(mailAliases=*?*)))" +expand = "(&(!(objectClass=posixAccount)(objectClass=posixGroup))(mailLists=?))" + +[directory."ldap".object-classes] +user = "posixAccount" +group = "posixGroup" + +[directory."ldap".attributes] +name = "cn" +description = "description" +secret = "userPassword" +groups = "memberOf" +id = ["uidNumber", "gidNumber"] +email = "mail" +email-alias = "mailAliases" +quota = "diskQuota" + +[directory."imap"] +protocol = "imap" +address = "127.0.0.1" +port = 9198 + +[directory."imap".pool] +max-connections = 5 + +[directory."imap".tls] +implicit = true +allow-invalid-certs = true + +[directory."smtp"] +protocol = "lmtp" +address = "127.0.0.1" +port = 9199 + +[directory."smtp".limits] +auth-errors = 3 +rcpt = 5 + +[directory."smtp".pool] +max-connections = 5 + +[directory."smtp".tls] +implicit = true +allow-invalid-certs = true + +"#; + +pub fn parse_config() -> AHashMap> { + utils::config::Config::parse(CONFIG) + .unwrap() + .parse_directory() + .unwrap() +} + +const CERT: &str = "-----BEGIN CERTIFICATE----- +MIIFCTCCAvGgAwIBAgIUCgHGQYUqtelbHGVSzCVwBL3fyEUwDQYJKoZIhvcNAQEL +BQAwFDESMBAGA1UEAwwJbG9jYWxob3N0MB4XDTIyMDUxNjExNDAzNFoXDTIzMDUx +NjExNDAzNFowFDESMBAGA1UEAwwJbG9jYWxob3N0MIICIjANBgkqhkiG9w0BAQEF +AAOCAg8AMIICCgKCAgEAtwS0Fzl3SjaCuKEXgZ/fdWbDoj/qDphyNCAKNevQ0+D0 +STNkWCO04aFSH0zcL8zoD9gokNos0i7OU9//ZhZQmex4V6EFdZn8bFwUWN/scUvW +HEFXVjtHldO2isZgIxH9LuwRv7KAgkISuWahqerOVDhe7SeQUV0AJGNEh3cT9PZr +gSY931BxB7n+5k8eoSk8Z1gtBzQzL62kVGpHDKfw8yX8m65owF9eLUBrNzgxmXfC +xpuHwj7hmVhS09PPKeN/RsFS8PsYO7bo0u8jEKalteumjRT7RyUEbioqfo6ZFOGj +FHPIq/uKXS9zN1fpoyNh3ur5hMznQhrqlwBM9KlM7GdBJ0pZ3ad0YjT8IL/GnGKR +85J2WZdLqaQdUZo7nV67FhqdDlNE4MdwiykTMjfmLRXGAVhAzJHKyRKNwmkI2aqe +S7aqeNgvuDBwY80Q9a2rb5py1Aw+L8yCkUBuHboToDpxSVRDNN8DrWNmmsXnxsOG +wRDODy4GICKyxlP+RFSM8xWSQ6y9ktS2OfDBm+Eqcw+3pZKhdz2wgxLkUBJ8X1eh +kJrCA/6LTuhy6m6mMjAfoSOFU7fu88jxaWPgvP7GKyH+LM/t9eucobz2ks5rtSjz +V4Dc5DCS94/OpVRHwHdaFSPbJKBN9Ev8gnNrAyx/aBPGoHBPG/QUiU7dcUNIPt0C +AwEAAaNTMFEwHQYDVR0OBBYEFI167IxBmErB11EqiPPqFLa31ZaMMB8GA1UdIwQY +MBaAFI167IxBmErB11EqiPPqFLa31ZaMMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZI +hvcNAQELBQADggIBALU00IOiH5ubEauVCmakms5ermNTZfculnhnDfWTLMeh2+a7 +G4cqADErfMhm/mmLbrw33t9s6tCAhQltvewKR40ST9uMPSyiQbYaCXd5DXnuI6Ox +JtNW+UOWIaMf8abnkdLvREOvb8dVQS1i3xq14tAjY5XgpGwCPP8m54b7N3Q7soLn +e5PDhPNTnhRIn2RLuYoZmQmMA5fcqEUDYff4epUww7PhrM1QckZligI3566NlGOf +j1G9JrivBtY0eaJtamIFnGMBT0ThDudxVja2Nv0C2Elry0p4T/o4nc4M67BJ/y1R +vjNLAgFhbxssemU3lZqSd+pykpJBwDBjFSPrZZmQcbk7H6Uz8V1xr/xuzfw6fA13 +NWZ5vLgP/DQ13sM+XFlxThKfbPMPVe/UCTvfGtNW+3XyBgPntEkR+fNEawQmzbYl +R+X1ymT9MZnEZqRMf7/UD/SYek1aUJefoew3upjMgxYVvh4F8dqJ+39F+xoFzIA2 +1dDAEMzXtjA3zKhZ2cycZbEzpJvYA3eGLuR16Suqfi4kPvfwK0mOhCxQmpayt7/X +vuEzW6dPCH8Hgbb0WvsSppGOvhdbDaZFNfFc5eNSxhyKzu3H3ACNImZRtZE+yixx +0fR8+xz9kDLf8xupV+X9heyFGHSyYU2Lveaevtr2Ij3weLRgJ6LbNALoeKXk +-----END CERTIFICATE----- +"; +const PK: &str = "-----BEGIN PRIVATE KEY----- +MIIJQgIBADANBgkqhkiG9w0BAQEFAASCCSwwggkoAgEAAoICAQC3BLQXOXdKNoK4 +oReBn991ZsOiP+oOmHI0IAo169DT4PRJM2RYI7ThoVIfTNwvzOgP2CiQ2izSLs5T +3/9mFlCZ7HhXoQV1mfxsXBRY3+xxS9YcQVdWO0eV07aKxmAjEf0u7BG/soCCQhK5 +ZqGp6s5UOF7tJ5BRXQAkY0SHdxP09muBJj3fUHEHuf7mTx6hKTxnWC0HNDMvraRU +akcMp/DzJfybrmjAX14tQGs3ODGZd8LGm4fCPuGZWFLT088p439GwVLw+xg7tujS +7yMQpqW166aNFPtHJQRuKip+jpkU4aMUc8ir+4pdL3M3V+mjI2He6vmEzOdCGuqX +AEz0qUzsZ0EnSlndp3RiNPwgv8acYpHzknZZl0uppB1RmjudXrsWGp0OU0Tgx3CL +KRMyN+YtFcYBWEDMkcrJEo3CaQjZqp5Ltqp42C+4MHBjzRD1ratvmnLUDD4vzIKR +QG4duhOgOnFJVEM03wOtY2aaxefGw4bBEM4PLgYgIrLGU/5EVIzzFZJDrL2S1LY5 +8MGb4SpzD7elkqF3PbCDEuRQEnxfV6GQmsID/otO6HLqbqYyMB+hI4VTt+7zyPFp +Y+C8/sYrIf4sz+3165yhvPaSzmu1KPNXgNzkMJL3j86lVEfAd1oVI9skoE30S/yC +c2sDLH9oE8agcE8b9BSJTt1xQ0g+3QIDAQABAoICABq5oxqpF5RMtXYEgAw7rkPU +h8jPkHwlIrgd3Z/WGZ53APUXfhWo0ScJiZZsgNKyF0kJBZNxaI4gq5xv3zmnFIoF +j+Ur7EIqBERGheoceMhqjI9/syMycNeeHM/S/ALjA5ewfT8C7+UVhOpx5DWNxidi +O+phlp9q9zRZEo69grqIqVYooWxUsMyyCljTQOPDw8BLjfe5VagmsRJqmolslLDM +4UBSjZVZ18S/3Wgo2oVQia660244BHWCAkZQbbXuNI2+eUAbSoSdxw3WQcaSrywL +hzyezbqr2yPDIIVuiUgVUt0Ps0P57VCCN07jlYhvCEGnClysFzD+ATefoZ0wg7za +dQu2E+d166rAjnssyhzcHMn3pxgSdtXD+dQR/xfIGbPABucCupEFqKmhLdMm9+ud +lHay87qzMpIa8cITJwEQROfXqWAhNUU98pKCOx1SVXBqQC7QVqGQ5solDf0eMSVh +ngQ6Dz2WUI2ty75LteiFwlyTgnU9nyPN0NXsrMEET2BHWre7ufTQqiULtQ7+9BwH +AMxEKvrQHjMUjdfbXuzdyc5w5mPYJZfFVSQ1HMslx66h9yCpRIsBZvUGvoaP8Tpe +nQ66FTYRbiOkkdJ7k8DtrnhsJI1oOGjnvj/rvZ8D2pvrlJcIH2AyN3MOL8Jp5Oj1 +nCFt77TwpF92pgl0g9gBAoIBAQDcarmP54QboaIQ9S2gE/4gSVC5i44iDJuSRdI8 +K081RQcWiNzqQXTRc5nqJ7KzLyPiGlg+6rWsBKLos5l4t+MdhhH+KUvk/OtT/g8V +0NZBNXLIbSb8j8ix4v3/f2qKHN3Co6QOlxb3gFvobKDdoKqUNiSH1zTZ8/Y/BzkM +jqWKhTdaLz6eyzhKfOTA4LO8kJ3VF8HUM1N9/e8Gjorl+gZpJUXUQS0+AIi8W76C +OwDrVb3BPGVnApQJfWF78h4g20RwXrx/GYUW2vOMcLjXXDV5U7+nobPUoJnLxoZC +16o88y0Ivan8dBNXsc1epyPvvEqp6MJbAyyVuNeuRJcgYA0BAoIBAQDUkGRV7fLG +wCr5rNysUO+FKzVtTJnf9KEsqAqUmmVnG4oubxAJJtiB5n2+DT+CtO8Nrtz05BbR +uxfWm+lbEw6lVMj63bywtp0NdULg7/2t+oq2Svv16KrZIRJttXMkdEiFFmkVAEhX +l8Fyl6PJPfSMwbPdXEUPUAaNrXweVFffXczHc4W2G212ZzDB0z7QQSgEntbTDFB/ +2Cg5dvuojlM9zw0fuEyLwItZs7n16j/ONZLgBHyroMU9ZPxbnLrVyoZlqtob+RWm +Ju2fSIL9QqG6O4td1TqcUBGvFQYjGvKA+q5fsG26NBJ0Ac48cNK6PS4lMkN3Av2J +ccloYaMEHAXdAoIBAE8WMCy1Ok6byUXiYxOL+OPmyoM40q/e7DcovE2AkLQhZ3Cr +fPDEucCphPFiexkV8f8fysgQeU0WgMmUH54UBPbD81LJyISKR3nkr875Ftdg8SV/ +HL0EblN9ifuR4U1bHCrJgoUFq2T09oVH7NR44Ju7bZIcIseNZK6qzcp2qGkycXD3 +gLWDX1hCxeV6+qLPFQKvuomEPRH4+jnVDXuFIaW6jPqixDP6BxXmqU2bFDJcmnBq +VkwGvc1F4qORdUP+yOi05VeJdZqEx1x92aTUXg+BgEQKnjbNxUE7o1L6hQfHjUIU +o5iEoagWkQTEXf2YBwY+EPaNBgNWxnSuAbfJHwECggEBALOF95ezTVWauzD/U6ic ++o3n/kl/Zn4FJ5KFodn7xCSe18d7uXlhO34KYqx+l+MWWMefpbGWacdcUjfImf93 +SulLgCqP12sP7/iLzp4XUpL7hOeM0NvRU2nqSpwpoUNqik0Mrlc0U+TWoGTduVCf +aMjwV65e3VyfY8mIeclLxqM5n1fcM1OoOnzDjiRE+0n7nYa5eAnq3pn6v4449TZY +belH03e0ucFWLtrltesBmj3YdWGJqJlzQOInRhNBfXJOh8+ZynfRmP0o54udPDQV +cG3PGFd5XPTjkuvhv7sqaSGRlm/um92lWOhtFfdp+i+cuDpmByCef+7zEP19aKZx +3GkCggEAFTs7KNMfvIEaLH0yQUFeq2gLmtcMofmOmeoIECycN1rG7iJo07lJLIs0 +bVODH8Z0kX8llu3cjGMAH/6R2uugJSxkmFiZKrngTzKmxDPvTCKWR4RFwXH9j8IO +cPq7FtKN4SgrPy9ciAPdkcGmu3zz/sBKOaoPwvU2PdBRT+v/aoz+GCLXAvzFlKVe +9/7zdg87ilo8+AtV+71EJeR3kyBPKS9JrWYUKfiams12+uuH4/53rMFZfNCAaZ3Z +1sdXEO4o3Loc5TX4DbO9FVdBSBe6klEXx4T0QJboO6uBvTBnnRL2SQriJQQFwYT6 +XzVV5pwOxkIDBWDIqMUfwJDChBKfpw== +-----END PRIVATE KEY----- +"; + +pub fn dummy_tls_acceptor() -> Arc { + // Init server config builder with safe defaults + let config = ServerConfig::builder() + .with_safe_defaults() + .with_no_client_auth(); + + // load TLS key/cert files + let cert_file = &mut BufReader::new(CERT.as_bytes()); + let key_file = &mut BufReader::new(PK.as_bytes()); + + // convert files to key/cert objects + let cert_chain = certs(cert_file) + .unwrap() + .into_iter() + .map(Certificate) + .collect(); + let mut keys: Vec = pkcs8_private_keys(key_file) + .unwrap() + .into_iter() + .map(PrivateKey) + .collect(); + + // exit if no keys could be parsed + if keys.is_empty() { + panic!("Could not locate PKCS 8 private keys."); + } + + Arc::new(TlsAcceptor::from(Arc::new( + config.with_single_cert(cert_chain, keys.remove(0)).unwrap(), + ))) +} + +#[derive(Clone, PartialEq, Eq, Hash)] +pub enum Item { + IsAccount(String), + Authenticate(Credentials), + Verify(String), + Expand(String), +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum LookupResult { + True, + False, + Values(Vec), +} + +impl Item { + pub fn append(&self, append: usize) -> Self { + match self { + Item::IsAccount(str) => Item::IsAccount(format!("{append}{str}")), + Item::Authenticate(str) => Item::Authenticate(match str { + Credentials::Plain { username, secret } => Credentials::Plain { + username: username.to_string(), + secret: format!("{append}{secret}"), + }, + Credentials::OAuthBearer { token } => Credentials::OAuthBearer { + token: format!("{append}{token}"), + }, + Credentials::XOauth2 { username, secret } => Credentials::XOauth2 { + username: username.to_string(), + secret: format!("{append}{secret}"), + }, + }), + Item::Verify(str) => Item::Verify(format!("{append}{str}")), + Item::Expand(str) => Item::Expand(format!("{append}{str}")), + } + } + + pub fn as_credentials(&self) -> &Credentials { + match self { + Item::Authenticate(c) => c, + _ => panic!("Item is not a Credentials"), + } + } +} + +impl LookupResult { + fn append(&self, append: usize) -> Self { + match self { + LookupResult::True => LookupResult::True, + LookupResult::False => LookupResult::False, + LookupResult::Values(v) => { + let mut r = Vec::with_capacity(v.len()); + for (pos, val) in v.iter().enumerate() { + r.push(if pos == 0 { + format!("{append}{val}") + } else { + val.to_string() + }); + } + LookupResult::Values(r) + } + } + } +} + +impl From for LookupResult { + fn from(b: bool) -> Self { + if b { + LookupResult::True + } else { + LookupResult::False + } + } +} + +impl From> for LookupResult { + fn from(v: Vec) -> Self { + LookupResult::Values(v) + } +} + +impl core::fmt::Debug for Item { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::IsAccount(arg0) => f.debug_tuple("Rcpt").field(arg0).finish(), + Self::Authenticate(_) => f.debug_tuple("Auth").finish(), + Self::Expand(arg0) => f.debug_tuple("Expn").field(arg0).finish(), + Self::Verify(arg0) => f.debug_tuple("Vrfy").field(arg0).finish(), + } + } +} diff --git a/tests/src/directory/smtp.rs b/tests/src/directory/smtp.rs new file mode 100644 index 00000000..b6d3a0dd --- /dev/null +++ b/tests/src/directory/smtp.rs @@ -0,0 +1,258 @@ +use std::sync::Arc; + +use directory::DirectoryError; +use mail_parser::decoders::base64::base64_decode; +use mail_send::Credentials; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::{TcpListener, TcpStream}, + sync::watch, +}; +use tokio_rustls::TlsAcceptor; + +use utils::listener::limiter::{ConcurrencyLimiter, InFlight}; + +use crate::directory::{parse_config, Item, LookupResult}; + +use super::dummy_tls_acceptor; + +#[tokio::test] +async fn smtp_directory() { + // Spawn mock LMTP server + let shutdown = spawn_mock_lmtp_server(5); + + // Obtain directory handle + let handle = parse_config().remove("smtp").unwrap(); + + // Basic lookup + let tests = vec![ + ( + Item::IsAccount("john-ok@domain".to_string()), + LookupResult::True, + ), + ( + Item::IsAccount("john-bad@domain".to_string()), + LookupResult::False, + ), + ( + Item::Verify("john-ok@domain".to_string()), + LookupResult::Values(vec!["john-ok@domain".to_string()]), + ), + ( + Item::Verify("doesnot@exist.org".to_string()), + LookupResult::False, + ), + ( + Item::Expand("sales-ok,item1,item2,item3".to_string()), + LookupResult::Values(vec![ + "sales-ok".to_string(), + "item1".to_string(), + "item2".to_string(), + "item3".to_string(), + ]), + ), + (Item::Expand("other".to_string()), LookupResult::False), + ( + Item::Authenticate(Credentials::Plain { + username: "john".to_string(), + secret: "ok".to_string(), + }), + LookupResult::True, + ), + ( + Item::Authenticate(Credentials::Plain { + username: "john".to_string(), + secret: "bad".to_string(), + }), + LookupResult::False, + ), + ]; + + for (item, expected) in &tests { + let result: LookupResult = match item { + Item::IsAccount(v) => handle.rcpt(v).await.unwrap().into(), + Item::Authenticate(v) => handle.authenticate(v).await.unwrap().is_some().into(), + Item::Verify(v) => match handle.vrfy(v).await { + Ok(v) => v.into(), + Err(DirectoryError::Unsupported) => LookupResult::False, + Err(e) => panic!("Unexpected error: {e:?}"), + }, + Item::Expand(v) => match handle.expn(v).await { + Ok(v) => v.into(), + Err(DirectoryError::Unsupported) => LookupResult::False, + Err(e) => panic!("Unexpected error: {e:?}"), + }, + }; + + assert_eq!(&result, expected); + } + + // Concurrent requests + let mut requests = Vec::new(); + for n in 0..100 { + let (item, expected) = &tests[n % tests.len()]; + let item = item.append(n); + let item_clone = item.clone(); + let handle = handle.clone(); + requests.push(( + tokio::spawn(async move { + let result: LookupResult = match &item { + Item::IsAccount(v) => handle.rcpt(v).await.unwrap().into(), + Item::Authenticate(v) => handle.authenticate(v).await.unwrap().is_some().into(), + Item::Verify(v) => match handle.vrfy(v).await { + Ok(v) => v.into(), + Err(DirectoryError::Unsupported) => LookupResult::False, + Err(e) => panic!("Unexpected error: {e:?}"), + }, + Item::Expand(v) => match handle.expn(v).await { + Ok(v) => v.into(), + Err(DirectoryError::Unsupported) => LookupResult::False, + Err(e) => panic!("Unexpected error: {e:?}"), + }, + }; + + result + }), + item_clone, + expected.append(n), + )); + } + for (result, item, expected_result) in requests { + assert_eq!( + result.await.unwrap(), + expected_result, + "Failed for {item:?}" + ); + } + + // Shutdown + shutdown.send(false).ok(); +} + +pub fn spawn_mock_lmtp_server(max_concurrency: u64) -> watch::Sender { + let (tx, mut rx) = watch::channel(true); + + tokio::spawn(async move { + let listener = TcpListener::bind("127.0.0.1:9199") + .await + .unwrap_or_else(|e| { + panic!("Failed to bind mock SMTP server to 127.0.0.1:9199: {e}"); + }); + let acceptor = dummy_tls_acceptor(); + let limited = ConcurrencyLimiter::new(max_concurrency); + loop { + tokio::select! { + stream = listener.accept() => { + match stream { + Ok((stream, _)) => { + let acceptor = acceptor.clone(); + let in_flight = limited.is_allowed(); + tokio::spawn(accept_smtp(stream, acceptor, in_flight)); + } + Err(err) => { + panic!("Something went wrong: {err}" ); + } + } + }, + _ = rx.changed() => { + break; + } + }; + } + }); + + tx +} + +async fn accept_smtp(stream: TcpStream, acceptor: Arc, in_flight: Option) { + let mut stream = acceptor.accept(stream).await.unwrap(); + stream + .write_all(b"220 [127.0.0.1] Clueless host service ready\r\n") + .await + .unwrap(); + + if in_flight.is_none() { + eprintln!("WARNING: Concurrency exceeded!"); + } + + let mut buf_u8 = vec![0u8; 1024]; + + loop { + let br = if let Ok(br) = stream.read(&mut buf_u8).await { + br + } else { + break; + }; + let buf = std::str::from_utf8(&buf_u8[0..br]).unwrap(); + //print!("-> {}", buf); + let response = if buf.starts_with("LHLO") { + "250-mx.foobar.org\r\n250 AUTH PLAIN\r\n".to_string() + } else if buf.starts_with("MAIL FROM") { + if buf.contains("<>") || buf.contains("ok@") { + "250 OK\r\n".to_string() + } else { + "552-I do not\r\n552 like that MAIL FROM.\r\n".to_string() + } + } else if buf.starts_with("RCPT TO") { + if buf.contains("ok") { + "250 OK\r\n".to_string() + } else { + "550-I refuse to\r\n550 accept that recipient.\r\n".to_string() + } + } else if buf.starts_with("VRFY") { + if buf.contains("ok") { + format!("250 {}\r\n", buf.split_once(' ').unwrap().1) + } else { + "550-I refuse to\r\n550 verify that recipient.\r\n".to_string() + } + } else if buf.starts_with("EXPN") { + if buf.contains("ok") { + let parts = buf + .split_once(' ') + .unwrap() + .1 + .split(',') + .filter_map(|s| { + if !s.is_empty() { + s.to_string().into() + } else { + None + } + }) + .collect::>(); + let mut buf = String::with_capacity(16); + for (pos, part) in parts.iter().enumerate() { + buf.push_str("250"); + buf.push(if pos == parts.len() - 1 { ' ' } else { '-' }); + buf.push_str(part); + buf.push_str("\r\n"); + } + + buf + } else { + "550-I refuse to\r\n550 accept that recipient.\r\n".to_string() + } + } else if buf.starts_with("AUTH PLAIN") { + let buf = base64_decode(buf.rsplit_once(' ').unwrap().1.as_bytes()).unwrap(); + if String::from_utf8_lossy(&buf).contains("ok") { + "235 Great success!\r\n".to_string() + } else { + "535 No soup for you\r\n".to_string() + } + } else if buf.starts_with("NOOP") { + "250 Siesta time\r\n".to_string() + } else if buf.starts_with("QUIT") { + "250 Arrivederci!\r\n".to_string() + } else if buf.starts_with("RSET") { + "250 Your wish is my command.\r\n".to_string() + } else { + panic!("Unknown command: {}", buf.trim()); + }; + //print!("<- {}", response); + stream.write_all(response.as_bytes()).await.unwrap(); + + if buf.contains("bye") || buf.starts_with("QUIT") { + return; + } + } +} diff --git a/tests/src/directory/sql.rs b/tests/src/directory/sql.rs new file mode 100644 index 00000000..34b4b801 --- /dev/null +++ b/tests/src/directory/sql.rs @@ -0,0 +1,110 @@ +use directory::Directory; +use jmap_proto::types::id::Id; + +pub async fn create_test_directory(handle: &dyn Directory) { + // Create tables + for query in [ + "CREATE TABLE accounts (name TEXT, id INTEGER PRIMARY KEY, secret TEXT, description TEXT, type TEXT NOT NULL, quota INTEGER, active BOOLEAN DEFAULT 1)", + "CREATE TABLE group_members (uid INTEGER, gid INTEGER, PRIMARY KEY (uid, gid))", + "CREATE TABLE emails (id INTEGER NOT NULL, email TEXT NOT NULL, type TEXT, PRIMARY KEY (id, email))", + "INSERT INTO accounts (name, secret, type) VALUES ('admin', 'secret', 'individual')", + ] { + handle.query(query, &[]).await.unwrap_or_else(|_| panic!("failed for {query}")); + } +} + +pub async fn create_test_user(handle: &dyn Directory, login: &str, secret: &str, name: &str) -> Id { + handle + .query( + "INSERT OR IGNORE INTO users (name, secret, description, type, is_active) VALUES (?, ?, ?, 'individual', true)", + &[login, secret, name], + ) + .await + .unwrap(); + + Id::from(handle.principal_by_name(login).await.unwrap().unwrap().id) +} + +pub async fn create_test_user_with_email( + handle: &dyn Directory, + login: &str, + secret: &str, + name: &str, +) -> Id { + let id = create_test_user(handle, login, secret, name).await; + link_test_address(handle, login, login, "primary").await; + id +} + +pub async fn create_test_group(handle: &dyn Directory, login: &str, name: &str) -> Id { + handle + .query( + "INSERT OR IGNORE INTO users (name, description, type, is_active) VALUES (?, ?, 'group', true)", + &[login, name], + ) + .await + .unwrap(); + + let id = handle.principal_by_name(login).await.unwrap().unwrap().id; + + handle + .query( + &format!( + "INSERT OR IGNORE INTO emails (id, email, type) VALUES ({}, ?, 'primary')", + id + ), + &[login], + ) + .await + .unwrap(); + + Id::from(id) +} + +pub async fn link_test_address(handle: &dyn Directory, login: &str, address: &str, typ: &str) { + let id = handle.principal_by_name(login).await.unwrap().unwrap().id; + handle + .query( + &format!( + "INSERT OR IGNORE INTO emails (id, email, type) VALUES ({}, ?, ?)", + id, + ), + &[address, typ], + ) + .await + .unwrap(); +} + +pub async fn add_to_group(handle: &dyn Directory, uid: u32, gid: u32) { + handle + .query( + &format!( + "INSERT INTO group_members (uid, gid) VALUES ({}, {})", + uid, gid + ), + &[], + ) + .await + .unwrap(); +} + +pub async fn remove_from_group(handle: &dyn Directory, uid: u32, gid: u32) { + handle + .query( + &format!("DELETE FROM groups WHERE uid = {} AND gid = {}", uid, gid), + &[], + ) + .await + .unwrap(); +} + +pub async fn remove_test_alias(handle: &dyn Directory, login: &str, alias: &str) { + let id = handle.principal_by_name(login).await.unwrap().unwrap().id; + handle + .query( + &format!("DELETE FROM emails WHERE id = {} AND email = ?", id), + &[alias], + ) + .await + .unwrap(); +}