Directory implementation - part 3

This commit is contained in:
Mauro D 2023-06-01 17:09:32 +00:00
parent 93e925a635
commit 1e792b234f
35 changed files with 234 additions and 2570 deletions

View file

@ -23,155 +23,63 @@
use crate::JMAP;
use super::{AclToken, AuthDatabase, SqlDatabase};
use super::AclToken;
impl JMAP {
pub async fn authenticate(&self, account: &str, secret: &str) -> Option<u32> {
let account_id = self.get_account_id(account).await?;
let account_secret = self.get_account_secret(account_id).await?;
if secret == account_secret {
account_id.into()
} else {
tracing::debug!(context = "auth", event = "failed", account = account);
None
}
todo!()
}
pub async fn authenticate_with_token(&self, account: &str, secret: &str) -> Option<AclToken> {
self.get_acl_token(self.authenticate(account, secret).await?)
.await
todo!()
}
pub async fn get_acl_token(&self, account_id: u32) -> Option<AclToken> {
self.update_acl_token(AclToken {
primary_id: account_id,
member_of: self.get_account_gids(account_id).await,
access_to: Vec::new(),
})
.await
todo!()
}
pub async fn get_account_secret(&self, account_id: u32) -> Option<String> {
match &self.auth_db {
AuthDatabase::Sql {
db,
query_secret_by_uid,
..
} => {
db.fetch_uid_to_string(query_secret_by_uid, account_id as i64)
.await
}
AuthDatabase::Ldap => None,
}
todo!()
}
pub async fn get_account_name(&self, account_id: u32) -> Option<String> {
match &self.auth_db {
AuthDatabase::Sql {
db,
query_name_by_uid,
..
} => {
db.fetch_uid_to_string(query_name_by_uid, account_id as i64)
.await
}
AuthDatabase::Ldap => None,
}
todo!()
}
pub async fn get_account_id(&self, account: &str) -> Option<u32> {
match &self.auth_db {
AuthDatabase::Sql {
db,
query_uid_by_login,
..
} => db
.fetch_string_to_id(query_uid_by_login, account)
.await
.map(|id| id as u32),
AuthDatabase::Ldap => None,
}
todo!()
}
pub async fn get_account_gids(&self, account_id: u32) -> Vec<u32> {
match &self.auth_db {
AuthDatabase::Sql {
db,
query_gids_by_uid,
..
} => db
.fetch_uid_to_uids(query_gids_by_uid, account_id as i64)
.await
.into_iter()
.map(|id| id as u32)
.collect(),
AuthDatabase::Ldap => vec![],
}
todo!()
}
pub async fn get_account_login(&self, account_id: u32) -> Option<String> {
match &self.auth_db {
AuthDatabase::Sql {
db,
query_login_by_uid,
..
} => {
db.fetch_uid_to_string(query_login_by_uid, account_id as i64)
.await
}
AuthDatabase::Ldap => None,
}
todo!()
}
pub async fn get_uids_by_address(&self, address: &str) -> Vec<u32> {
match &self.auth_db {
AuthDatabase::Sql {
db,
query_uids_by_address,
..
} => db
.fetch_string_to_uids(query_uids_by_address, address)
.await
.into_iter()
.map(|id| id as u32)
.collect(),
AuthDatabase::Ldap => vec![],
}
todo!()
}
pub async fn get_addresses_by_uid(&self, account_id: u32) -> Vec<String> {
match &self.auth_db {
AuthDatabase::Sql {
db,
query_addresses_by_uid,
..
} => {
db.fetch_uid_to_strings(query_addresses_by_uid, account_id as i64)
.await
}
AuthDatabase::Ldap => vec![],
}
todo!()
}
pub async fn vrfy_address(&self, address: &str) -> Vec<String> {
match &self.auth_db {
AuthDatabase::Sql { db, query_vrfy, .. } => {
db.fetch_string_to_strings(query_vrfy, address).await
}
AuthDatabase::Ldap => vec![],
}
todo!()
}
pub async fn expn_address(&self, address: &str) -> Vec<String> {
match &self.auth_db {
AuthDatabase::Sql { db, query_expn, .. } => {
db.fetch_string_to_strings(query_expn, address).await
}
AuthDatabase::Ldap => vec![],
}
todo!()
}
}
struct Remove {
remove: bool,
}
/*
// TODO abstract this
impl SqlDatabase {
pub async fn fetch_uid_to_string(&self, query: &str, uid: i64) -> Option<String> {
@ -447,3 +355,4 @@ impl AuthDatabase {
}
}
}
*/

View file

@ -41,29 +41,6 @@ pub mod authenticate;
pub mod oauth;
pub mod rate_limit;
pub enum AuthDatabase {
Sql {
db: SqlDatabase,
query_uid_by_login: String,
query_login_by_uid: String,
query_secret_by_uid: String,
query_name_by_uid: String,
query_gids_by_uid: String,
query_uids_by_address: String,
query_addresses_by_uid: String,
query_vrfy: String,
query_expn: String,
},
Ldap,
}
pub enum SqlDatabase {
Postgres(sqlx::Pool<sqlx::Postgres>),
MySql(sqlx::Pool<sqlx::MySql>),
//MsSql(sqlx::Pool<sqlx::Mssql>),
SqlLite(sqlx::Pool<sqlx::Sqlite>),
}
#[derive(Debug, Clone)]
pub struct AclToken {
pub primary_id: u32,

View file

@ -28,7 +28,7 @@ use api::session::BaseCapabilities;
use auth::{
oauth::OAuthCode,
rate_limit::{AnonymousLimiter, AuthenticatedLimiter, RemoteAddress},
AclToken, AuthDatabase, SqlDatabase,
AclToken,
};
use jmap_proto::{
error::method::MethodError,
@ -45,7 +45,6 @@ use services::{
state::{self, init_state_manager, spawn_state_manager},
};
use smtp::core::SMTP;
use sqlx::{mysql::MySqlPoolOptions, postgres::PgPoolOptions, sqlite::SqlitePoolOptions};
use store::{
fts::Language,
parking_lot::Mutex,
@ -55,7 +54,7 @@ use store::{
BitmapKey, Deserialize, Serialize, Store, ValueKey,
};
use tokio::sync::mpsc;
use utils::{config::Rate, failed, ipc::DeliveryEvent, UnwrapFailure};
use utils::{config::Rate, ipc::DeliveryEvent, UnwrapFailure};
pub mod api;
pub mod auth;
@ -86,7 +85,6 @@ pub struct JMAP {
pub rate_limit_unauth: LruCache<RemoteAddress, Arc<Mutex<AnonymousLimiter>>>,
pub oauth_codes: LruCache<String, Arc<OAuthCode>>,
pub auth_db: AuthDatabase,
pub state_tx: mpsc::Sender<state::Event>,
pub housekeeper_tx: mpsc::Sender<housekeeper::Event>,
@ -159,6 +157,8 @@ impl JMAP {
delivery_rx: mpsc::Receiver<DeliveryEvent>,
smtp: Arc<SMTP>,
) -> Result<Arc<Self>, String> {
let remove = "true";
/*
let auth_db = match config.value_require("jmap.auth.database.type")? {
"ldap" => AuthDatabase::Ldap,
"sql" => {
@ -243,7 +243,7 @@ impl JMAP {
}
}
_ => failed("Invalid auth database type"),
};
};*/
// Init state manager and housekeeper
let (state_tx, state_rx) = init_state_manager();
@ -271,7 +271,6 @@ impl JMAP {
oauth_codes: LruCache::with_capacity(
config.property("oauth.code.cache-size")?.unwrap_or(128),
),
auth_db,
state_tx,
housekeeper_tx,
smtp,

View file

@ -23,6 +23,7 @@
use std::time::Duration;
use directory::config::ConfigDirectory;
use jmap::{api::JmapSessionManager, services::IPC_CHANNEL_BUFFER, JMAP};
use smtp::core::{SmtpAdminSessionManager, SmtpSessionManager, SMTP};
use tokio::sync::mpsc;
@ -35,6 +36,7 @@ use utils::{
async fn main() -> std::io::Result<()> {
let config = Config::init();
let servers = config.parse_servers().failed("Invalid configuration");
let directory = config.parse_directory().failed("Invalid configuration");
// Bind ports and drop privileges
servers.bind(&config);
@ -42,13 +44,13 @@ async fn main() -> std::io::Result<()> {
// Enable tracing
let _tracer = enable_tracing(&config).failed("Failed to enable tracing");
tracing::info!(
"Starting Stalwart mail server v{}...",
"Starting Stalwart Mail Server v{}...",
env!("CARGO_PKG_VERSION")
);
// Init servers
let (delivery_tx, delivery_rx) = mpsc::channel(IPC_CHANNEL_BUFFER);
let smtp = SMTP::init(&config, &servers, delivery_tx)
let smtp = SMTP::init(&config, &servers, &directory, delivery_tx)
.await
.failed("Invalid configuration file");
let jmap = JMAP::init(&config, delivery_rx, smtp.clone())
@ -74,7 +76,7 @@ async fn main() -> std::io::Result<()> {
// Wait for shutdown signal
wait_for_shutdown().await;
tracing::info!(
"Shutting down Stalwart mail server v{}...",
"Shutting down Stalwart Mail Server v{}...",
env!("CARGO_PKG_VERSION")
);

View file

@ -13,6 +13,7 @@ resolver = "2"
[dependencies]
utils = { path = "../utils" }
directory = { path = "../directory" }
mail-auth = { git = "https://github.com/stalwartlabs/mail-auth" }
mail-send = { git = "https://github.com/stalwartlabs/mail-send", default-features = false, features = ["cram-md5", "skip-ehlo"] }
mail-parser = { git = "https://github.com/stalwartlabs/mail-parser", features = ["full_encoding", "ludicrous_mode"] }

View file

@ -215,8 +215,8 @@ impl ConfigCondition for Config {
})?)
}
MatchType::Lookup => {
if let Some(list) = ctx.lookup.get(value_str) {
ConditionMatch::Lookup(list.clone())
if let Some(lookup) = ctx.directory.lookups.get(value_str) {
ConditionMatch::Lookup(lookup.clone())
} else {
return Err(format!(
"Lookup {:?} not found for property {:?}.",
@ -359,139 +359,3 @@ impl ParseValue for IpAddrMask {
))
}
}
#[cfg(test)]
mod tests {
use std::{fs, path::PathBuf, sync::Arc};
use ahash::AHashMap;
use utils::config::{Config, Server};
use crate::{
config::{
Condition, ConditionMatch, Conditions, ConfigContext, EnvelopeKey, IpAddrMask,
StringMatch,
},
lookup::Lookup,
};
use super::ConfigCondition;
#[test]
fn parse_conditions() {
let mut file = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
file.push("resources");
file.push("smtp");
file.push("config");
file.push("rules.toml");
let config = Config::parse(&fs::read_to_string(file).unwrap()).unwrap();
let servers = vec![Server {
id: "smtp".to_string(),
internal_id: 123,
..Default::default()
}];
let mut context = ConfigContext::new(&servers);
let list = Arc::new(Lookup::default());
context.lookup.insert("test-list".to_string(), list.clone());
let mut conditions = config.parse_conditions(&context).unwrap();
let expected_rules = AHashMap::from_iter([
(
"simple".to_string(),
Conditions {
conditions: vec![Condition::Match {
key: EnvelopeKey::Listener,
value: ConditionMatch::UInt(123),
not: false,
}],
},
),
(
"is-authenticated".to_string(),
Conditions {
conditions: vec![Condition::Match {
key: EnvelopeKey::AuthenticatedAs,
value: ConditionMatch::String(StringMatch::Equal("".to_string())),
not: true,
}],
},
),
(
"expanded".to_string(),
Conditions {
conditions: vec![
Condition::Match {
key: EnvelopeKey::SenderDomain,
value: ConditionMatch::String(StringMatch::StartsWith(
"example".to_string(),
)),
not: false,
},
Condition::JumpIfFalse { positions: 1 },
Condition::Match {
key: EnvelopeKey::Sender,
value: ConditionMatch::Lookup(list),
not: false,
},
],
},
),
(
"my-nested-rule".to_string(),
Conditions {
conditions: vec![
Condition::Match {
key: EnvelopeKey::RecipientDomain,
value: ConditionMatch::String(StringMatch::Equal(
"example.org".to_string(),
)),
not: false,
},
Condition::JumpIfTrue { positions: 9 },
Condition::Match {
key: EnvelopeKey::RemoteIp,
value: ConditionMatch::IpAddrMask(IpAddrMask::V4 {
addr: "192.168.0.0".parse().unwrap(),
mask: u32::MAX << (32 - 24),
}),
not: false,
},
Condition::JumpIfTrue { positions: 7 },
Condition::Match {
key: EnvelopeKey::Recipient,
value: ConditionMatch::String(StringMatch::StartsWith(
"no-reply@".to_string(),
)),
not: false,
},
Condition::JumpIfFalse { positions: 5 },
Condition::Match {
key: EnvelopeKey::Sender,
value: ConditionMatch::String(StringMatch::EndsWith(
"@domain.org".to_string(),
)),
not: false,
},
Condition::JumpIfFalse { positions: 3 },
Condition::Match {
key: EnvelopeKey::Priority,
value: ConditionMatch::Int(1),
not: true,
},
Condition::JumpIfTrue { positions: 1 },
Condition::Match {
key: EnvelopeKey::Priority,
value: ConditionMatch::Int(-2),
not: false,
},
],
},
),
]);
for (key, rule) in expected_rules {
assert_eq!(Some(rule), conditions.remove(&key), "failed for {key}");
}
}
}

View file

@ -1,170 +0,0 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of the Stalwart SMTP Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::{sync::Arc, time::Duration};
use parking_lot::Mutex;
use sqlx::{mysql::MySqlPoolOptions, postgres::PgPoolOptions, sqlite::SqlitePoolOptions};
use crate::lookup::{cache::LookupCache, Lookup, SqlDatabase, SqlQuery};
use utils::config::{utils::AsKey, Config};
use super::ConfigContext;
pub trait ConfigDatabase {
fn parse_databases(&self, ctx: &mut ConfigContext) -> super::Result<()>;
fn parse_database(&self, id: &str, ctx: &mut ConfigContext) -> super::Result<()>;
}
impl ConfigDatabase for Config {
fn parse_databases(&self, ctx: &mut ConfigContext) -> super::Result<()> {
for id in self.sub_keys("database") {
self.parse_database(id, ctx)?;
}
Ok(())
}
fn parse_database(&self, id: &str, ctx: &mut ConfigContext) -> super::Result<()> {
let address = self.value_require(("database", id, "address"))?;
let pool = if address.starts_with("postgres:") {
SqlDatabase::Postgres(
PgPoolOptions::new()
.max_connections(
self.property(("database", id, "max-connections"))?
.unwrap_or(10),
)
.min_connections(
self.property(("database", id, "min-connections"))?
.unwrap_or(0),
)
.idle_timeout(self.property(("database", id, "idle-timeout"))?)
.connect_lazy(address)
.map_err(|err| {
format!("Failed to create connection pool for {address:?}: {err}")
})?,
)
} else if address.starts_with("mysql:") {
SqlDatabase::MySql(
MySqlPoolOptions::new()
.max_connections(
self.property(("database", id, "max-connections"))?
.unwrap_or(10),
)
.min_connections(
self.property(("database", id, "min-connections"))?
.unwrap_or(0),
)
.idle_timeout(self.property(("database", id, "idle-timeout"))?)
.connect_lazy(address)
.map_err(|err| {
format!("Failed to create connection pool for {address:?}: {err}")
})?,
)
} else if address.starts_with("mssql:") {
unimplemented!("MSSQL support is not yet implemented")
/*SqlDatabase::MsSql(
MssqlPoolOptions::new()
.max_connections(
self.property(("database", id, "max-connections"))?
.unwrap_or(10),
)
.min_connections(
self.property(("database", id, "min-connections"))?
.unwrap_or(0),
)
.idle_timeout(self.property(("database", id, "idle-timeout"))?)
.connect_lazy(address)
.map_err(|err| {
format!("Failed to create connection pool for {address:?}: {err}")
})?,
)*/
} else if address.starts_with("sqlite:") {
SqlDatabase::SqlLite(
SqlitePoolOptions::new()
.max_connections(
self.property(("database", id, "max-connections"))?
.unwrap_or(10),
)
.min_connections(
self.property(("database", id, "min-connections"))?
.unwrap_or(0),
)
.idle_timeout(self.property(("database", id, "idle-timeout"))?)
.connect_lazy(address)
.map_err(|err| {
format!("Failed to create connection pool for {address:?}: {err}")
})?,
)
} else {
return Err(format!(
"Invalid database address {:?} for key {:?}",
address,
("database", id, "address").as_key()
));
};
// Add database
ctx.databases.insert(id.to_string(), pool.clone());
// Parse cache
let cache_entries = self
.property(("database", id, "cache.entries"))?
.unwrap_or(1024);
let cache_ttl_positive = self
.property(("database", id, "cache.ttl.positive"))?
.unwrap_or(Duration::from_secs(86400));
let cache_ttl_negative = self
.property(("database", id, "cache.ttl.positive"))?
.unwrap_or(Duration::from_secs(3600));
let cache_enable = self
.values(("database", id, "cache.enable"))
.map(|(_, v)| v)
.collect::<Vec<_>>();
// Parse lookups
for lookup_id in self.sub_keys(("database", id, "lookup")) {
ctx.lookup.insert(
format!("db/{id}/{lookup_id}"),
Arc::new(Lookup::Sql(SqlQuery {
query: self
.value_require(("database", id, "lookup", lookup_id))?
.to_string(),
db: pool.clone(),
cache: if cache_enable.contains(&lookup_id) {
Mutex::new(LookupCache::new(
cache_entries,
cache_ttl_positive,
cache_ttl_negative,
))
.into()
} else {
None
},
})),
);
}
Ok(())
}
}

View file

@ -188,7 +188,7 @@ impl<T: Default> IfBlock<Option<T>> {
}
impl IfBlock<Option<String>> {
pub fn map_if_block<T>(
pub fn map_if_block<T: ?Sized>(
self,
map: &AHashMap<String, Arc<T>>,
key_name: impl AsKey,
@ -209,7 +209,7 @@ impl IfBlock<Option<String>> {
})
}
fn map_value<T>(
fn map_value<T: ?Sized>(
map: &AHashMap<String, Arc<T>>,
value: Option<String>,
object_name: &str,
@ -230,7 +230,7 @@ impl IfBlock<Option<String>> {
}
impl IfBlock<Vec<String>> {
pub fn map_if_block<T>(
pub fn map_if_block<T: ?Sized>(
self,
map: &AHashMap<String, Arc<T>>,
key_name: &str,
@ -250,7 +250,7 @@ impl IfBlock<Vec<String>> {
})
}
fn map_value<T>(
fn map_value<T: ?Sized>(
map: &AHashMap<String, Arc<T>>,
values: Vec<String>,
object_name: &str,
@ -275,194 +275,3 @@ impl<T> IfBlock<Vec<T>> {
self.default.is_empty() || self.if_then.iter().any(|v| v.then.is_empty())
}
}
#[cfg(test)]
mod tests {
use std::{fs, path::PathBuf, time::Duration};
use utils::config::Config;
use crate::config::{
if_block::ConfigIf, Condition, ConditionMatch, Conditions, ConfigContext, EnvelopeKey,
IfBlock, IfThen, StringMatch,
};
#[test]
fn parse_if_blocks() {
let mut file = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
file.push("resources");
file.push("smtp");
file.push("config");
file.push("if-blocks.toml");
let config = Config::parse(&fs::read_to_string(file).unwrap()).unwrap();
// Create context and add some conditions
let context = ConfigContext::new(&[]);
let available_keys = vec![
EnvelopeKey::Recipient,
EnvelopeKey::RecipientDomain,
EnvelopeKey::Sender,
EnvelopeKey::SenderDomain,
EnvelopeKey::AuthenticatedAs,
EnvelopeKey::Listener,
EnvelopeKey::RemoteIp,
EnvelopeKey::LocalIp,
EnvelopeKey::Priority,
];
assert_eq!(
config
.parse_if_block::<Option<Duration>>("durations", &context, &available_keys)
.unwrap()
.unwrap(),
IfBlock {
if_then: vec![
IfThen {
conditions: Conditions {
conditions: vec![Condition::Match {
key: EnvelopeKey::Sender,
value: ConditionMatch::String(StringMatch::Equal(
"jdoe".to_string()
)),
not: false
}]
},
then: Duration::from_secs(5 * 86400).into()
},
IfThen {
conditions: Conditions {
conditions: vec![
Condition::Match {
key: EnvelopeKey::Priority,
value: ConditionMatch::Int(-1),
not: false
},
Condition::JumpIfTrue { positions: 1 },
Condition::Match {
key: EnvelopeKey::Recipient,
value: ConditionMatch::String(StringMatch::StartsWith(
"jane".to_string()
)),
not: false
}
]
},
then: Duration::from_secs(3600).into()
}
],
default: None
}
);
assert_eq!(
config
.parse_if_block::<Vec<String>>("string-list", &context, &available_keys)
.unwrap()
.unwrap(),
IfBlock {
if_then: vec![
IfThen {
conditions: Conditions {
conditions: vec![Condition::Match {
key: EnvelopeKey::Sender,
value: ConditionMatch::String(StringMatch::Equal(
"jdoe".to_string()
)),
not: false
}]
},
then: vec!["From".to_string(), "To".to_string(), "Date".to_string()]
},
IfThen {
conditions: Conditions {
conditions: vec![
Condition::Match {
key: EnvelopeKey::Priority,
value: ConditionMatch::Int(-1),
not: false
},
Condition::JumpIfTrue { positions: 1 },
Condition::Match {
key: EnvelopeKey::Recipient,
value: ConditionMatch::String(StringMatch::StartsWith(
"jane".to_string()
)),
not: false
}
]
},
then: vec!["Other-ID".to_string()]
}
],
default: vec![]
}
);
assert_eq!(
config
.parse_if_block::<Vec<String>>("string-list-bis", &context, &available_keys)
.unwrap()
.unwrap(),
IfBlock {
if_then: vec![
IfThen {
conditions: Conditions {
conditions: vec![Condition::Match {
key: EnvelopeKey::Sender,
value: ConditionMatch::String(StringMatch::Equal(
"jdoe".to_string()
)),
not: false
}]
},
then: vec!["From".to_string(), "To".to_string(), "Date".to_string()]
},
IfThen {
conditions: Conditions {
conditions: vec![
Condition::Match {
key: EnvelopeKey::Priority,
value: ConditionMatch::Int(-1),
not: false
},
Condition::JumpIfTrue { positions: 1 },
Condition::Match {
key: EnvelopeKey::Recipient,
value: ConditionMatch::String(StringMatch::StartsWith(
"jane".to_string()
)),
not: false
}
]
},
then: vec![]
}
],
default: vec!["ID-Bis".to_string()]
}
);
assert_eq!(
config
.parse_if_block::<String>("single-value", &context, &available_keys)
.unwrap()
.unwrap(),
IfBlock {
if_then: vec![],
default: "hello world".to_string()
}
);
for bad_rule in [
"bad-multi-value",
"bad-if-without-then",
"bad-if-without-else",
"bad-multiple-else",
] {
if let Ok(value) = config.parse_if_block::<u32>(bad_rule, &context, &available_keys) {
panic!("Condition {bad_rule:?} had unexpected result {value:?}");
}
}
}
}

View file

@ -1,159 +0,0 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of the Stalwart SMTP Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::{
fs::File,
io::{BufRead, BufReader},
sync::Arc,
};
use ahash::AHashSet;
use utils::config::Config;
use crate::lookup::Lookup;
use super::ConfigContext;
pub trait ConfigList {
fn parse_lists(&self, ctx: &mut ConfigContext) -> super::Result<()>;
fn parse_list(&self, id: &str) -> super::Result<Lookup>;
}
impl ConfigList for Config {
fn parse_lists(&self, ctx: &mut ConfigContext) -> super::Result<()> {
for id in self.sub_keys("list") {
ctx.lookup
.insert(format!("list/{id}"), Arc::new(self.parse_list(id)?));
}
Ok(())
}
fn parse_list(&self, id: &str) -> super::Result<Lookup> {
let mut entries = AHashSet::new();
for (_, value) in self.values(("list", id)) {
if let Some(path) = value.strip_prefix("file://") {
for line in BufReader::new(File::open(path).map_err(|err| {
format!("Failed to read file {path:?} for list {id:?}: {err}")
})?)
.lines()
{
let line_ = line.map_err(|err| {
format!("Failed to read file {path:?} for list {id:?}: {err}")
})?;
let line = line_.trim();
if !line.is_empty() {
entries.insert(line.to_string());
}
}
} else {
entries.insert(value.to_string());
}
}
Ok(Lookup::List(entries))
}
}
#[cfg(test)]
mod tests {
use std::{fs, path::PathBuf, sync::Arc};
use ahash::{AHashMap, AHashSet};
use utils::config::Config;
use crate::{
config::{remote::ConfigHost, ConfigContext},
lookup::Lookup,
};
use super::ConfigList;
#[test]
fn parse_lists() {
let mut file = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
file.push("resources");
file.push("smtp");
file.push("config");
file.push("lists.toml");
let mut list_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
list_path.push("resources");
list_path.push("smtp");
list_path.push("lists");
let mut list1 = list_path.clone();
list1.push("test-list1.txt");
let mut list2 = list_path.clone();
list2.push("test-list2.txt");
let toml = fs::read_to_string(file)
.unwrap()
.replace("{LIST1}", list1.as_path().to_str().unwrap())
.replace("{LIST2}", list2.as_path().to_str().unwrap());
let config = Config::parse(&toml).unwrap();
let mut context = ConfigContext::new(&[]);
config.parse_remote_hosts(&mut context).unwrap();
config.parse_lists(&mut context).unwrap();
let mut expected_lists = AHashMap::from_iter([
(
"list/local-domains".to_string(),
Arc::new(Lookup::List(AHashSet::from_iter([
"example.org".to_string(),
"example.net".to_string(),
]))),
),
(
"list/spammer-domains".to_string(),
Arc::new(Lookup::List(AHashSet::from_iter([
"thatdomain.net".to_string()
]))),
),
(
"list/local-users".to_string(),
Arc::new(Lookup::List(AHashSet::from_iter([
"user1@domain.org".to_string(),
"user2@domain.org".to_string(),
]))),
),
(
"list/power-users".to_string(),
Arc::new(Lookup::List(AHashSet::from_iter([
"user1@domain.org".to_string(),
"user2@domain.org".to_string(),
"user3@example.net".to_string(),
"user4@example.net".to_string(),
"user5@example.net".to_string(),
]))),
),
(
"remote/lmtp".to_string(),
context.lookup.get("remote/lmtp").unwrap().clone(),
),
]);
for (key, list) in context.lookup {
assert_eq!(Some(list), expected_lists.remove(&key), "failed for {key}");
}
}
}

View file

@ -23,9 +23,7 @@
pub mod auth;
pub mod condition;
pub mod database;
pub mod if_block;
pub mod list;
pub mod queue;
pub mod remote;
pub mod report;
@ -42,6 +40,7 @@ use std::{
};
use ahash::AHashMap;
use directory::{Directory, DirectoryConfig, Lookup};
use mail_auth::{
common::crypto::{Ed25519Key, RsaKey, Sha256},
dkim::{Canonicalization, Done},
@ -51,11 +50,8 @@ use mail_send::Credentials;
use regex::Regex;
use sieve::Sieve;
use smtp_proto::MtPriority;
use tokio::sync::mpsc;
use utils::config::{Rate, Server, ServerProtocol};
use crate::lookup::{self, Lookup, SqlDatabase};
#[derive(Debug)]
pub struct Host {
pub address: String,
@ -67,14 +63,6 @@ pub struct Host {
pub tls_allow_invalid_certs: bool,
pub username: Option<String>,
pub secret: Option<String>,
pub max_errors: usize,
pub max_requests: usize,
pub cache_entries: usize,
pub cache_ttl_positive: Duration,
pub cache_ttl_negative: Duration,
pub channel_tx: mpsc::Sender<lookup::Event>,
pub channel_rx: mpsc::Receiver<lookup::Event>,
pub lookup: bool,
}
#[derive(Debug, Clone)]
@ -100,7 +88,7 @@ pub enum StringMatch {
EndsWith(String),
}
#[derive(Debug, Clone)]
#[derive(Clone)]
pub enum ConditionMatch {
String(StringMatch),
UInt(u16),
@ -125,20 +113,22 @@ impl PartialEq for ConditionMatch {
}
}
#[cfg(feature = "test_mode")]
impl Eq for ConditionMatch {}
#[cfg(feature = "test_mode")]
impl PartialEq for Lookup {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::List(l0), Self::List(r0)) => l0 == r0,
(Self::Remote(_), Self::Remote(_)) => true,
_ => false,
impl core::fmt::Debug for ConditionMatch {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::String(arg0) => f.debug_tuple("String").field(arg0).finish(),
Self::UInt(arg0) => f.debug_tuple("UInt").field(arg0).finish(),
Self::Int(arg0) => f.debug_tuple("Int").field(arg0).finish(),
Self::IpAddrMask(arg0) => f.debug_tuple("IpAddrMask").field(arg0).finish(),
Self::Lookup(_) => f.debug_tuple("Lookup").finish(),
Self::Regex(arg0) => f.debug_tuple("Regex").field(arg0).finish(),
}
}
}
#[cfg(feature = "test_mode")]
impl Eq for ConditionMatch {}
impl Default for Condition {
fn default() -> Self {
Condition::JumpIfFalse { positions: 0 }
@ -221,6 +211,8 @@ pub struct Extensions {
pub chunking: IfBlock<bool>,
pub requiretls: IfBlock<bool>,
pub dsn: IfBlock<bool>,
pub vrfy: IfBlock<bool>,
pub expn: IfBlock<bool>,
pub no_soliciting: IfBlock<Option<String>>,
pub future_release: IfBlock<Option<Duration>>,
pub deliver_by: IfBlock<Option<Duration>>,
@ -228,7 +220,7 @@ pub struct Extensions {
}
pub struct Auth {
pub lookup: IfBlock<Option<Arc<Lookup>>>,
pub directory: IfBlock<Option<Arc<dyn Directory>>>,
pub mechanisms: IfBlock<u64>,
pub require: IfBlock<bool>,
pub errors_max: IfBlock<usize>,
@ -243,9 +235,7 @@ pub struct Rcpt {
pub script: IfBlock<Option<Arc<Sieve>>>,
pub relay: IfBlock<bool>,
pub lookup_domains: IfBlock<Option<Arc<Lookup>>>,
pub lookup_addresses: IfBlock<Option<Arc<Lookup>>>,
pub lookup_expn: IfBlock<Option<Arc<Lookup>>>,
pub lookup_vrfy: IfBlock<Option<Arc<Lookup>>>,
pub directory: IfBlock<Option<Arc<dyn Directory>>>,
// Errors
pub errors_max: IfBlock<usize>,
@ -334,7 +324,7 @@ pub struct QueueConfig {
// Throttle and Quotas
pub throttle: QueueThrottle,
pub quota: QueueQuotas,
pub management_lookup: Arc<Lookup>,
pub management_lookup: Arc<dyn Directory>,
}
pub struct QueueOutboundSourceIp {
@ -525,8 +515,7 @@ pub struct ConfigContext<'x> {
pub servers: &'x [Server],
pub hosts: AHashMap<String, Host>,
pub scripts: AHashMap<String, Arc<Sieve>>,
pub lookup: AHashMap<String, Arc<Lookup>>,
pub databases: AHashMap<String, SqlDatabase>,
pub directory: DirectoryConfig,
pub signers: AHashMap<String, Arc<DkimSigner>>,
pub sealers: AHashMap<String, Arc<ArcSealer>>,
}

View file

@ -23,6 +23,7 @@
use std::time::Duration;
use directory::memory::MemoryDirectory;
use mail_send::Credentials;
use super::{
@ -192,15 +193,16 @@ impl ConfigQueue for Config {
.unwrap_or_default()
.map_if_block(&ctx.signers, "report.dsn.sign", "signature")?,
},
management_lookup: if let Some(lookup) = self.value("management.auth.lookup") {
ctx.lookup
.get(lookup)
management_lookup: if let Some(id) = self.value("management.directory") {
ctx.directory
.directories
.get(id)
.ok_or_else(|| {
format!("Lookup {lookup:?} not found for key \"management.auth.lookup\".")
format!("Directory {id:?} not found for key \"management.directory\".")
})?
.clone()
} else {
Arc::new(Lookup::default())
Arc::new(MemoryDirectory::default())
},
};

View file

@ -21,13 +21,10 @@
* for more details.
*/
use std::{sync::Arc, time::Duration};
use std::time::Duration;
use tokio::sync::mpsc;
use utils::config::Config;
use crate::lookup::Lookup;
use super::{ConfigContext, Host};
pub trait ConfigHost {
@ -38,22 +35,13 @@ pub trait ConfigHost {
impl ConfigHost for Config {
fn parse_remote_hosts(&self, ctx: &mut ConfigContext) -> super::Result<()> {
for id in self.sub_keys("remote") {
let host = self.parse_host(id)?;
if host.lookup {
ctx.lookup.insert(
format!("remote/{id}"),
Arc::new(Lookup::Remote(host.channel_tx.clone().into())),
);
}
ctx.hosts.insert(id.to_string(), host);
ctx.hosts.insert(id.to_string(), self.parse_host(id)?);
}
Ok(())
}
fn parse_host(&self, id: &str) -> super::Result<Host> {
let (channel_tx, channel_rx) = mpsc::channel(1024);
Ok(Host {
address: self.property_require(("remote", id, "address"))?,
port: self.property_require(("remote", id, "port"))?,
@ -67,25 +55,9 @@ impl ConfigHost for Config {
.unwrap_or(false),
username: self.property(("remote", id, "auth.username"))?,
secret: self.property(("remote", id, "auth.secret"))?,
cache_entries: self
.property(("remote", id, "cache.entries"))?
.unwrap_or(1024),
cache_ttl_positive: self
.property(("remote", id, "cache.ttl.positive"))?
.unwrap_or(Duration::from_secs(86400)),
cache_ttl_negative: self
.property(("remote", id, "cache.ttl.positive"))?
.unwrap_or(Duration::from_secs(3600)),
timeout: self
.property(("remote", id, "timeout"))?
.unwrap_or(Duration::from_secs(60)),
max_errors: self.property(("remote", id, "limits.errors"))?.unwrap_or(3),
max_requests: self
.property(("remote", id, "limits.requests"))?
.unwrap_or(50),
channel_tx,
channel_rx,
lookup: self.property(("remote", id, "lookup"))?.unwrap_or(false),
})
}
}

View file

@ -64,7 +64,7 @@ impl ConfigSieve for Config {
.with_max_variable_size(102400)
.with_max_header_size(10240)
.with_valid_notification_uri("mailto")
.with_valid_ext_lists(ctx.lookup.keys().map(|k| k.to_string()));
.with_valid_ext_lists(ctx.directory.lookups.keys().map(|k| k.to_string()));
if let Some(value) = self.property("sieve.limits.redirects")? {
runtime.set_max_redirects(value);
@ -120,7 +120,7 @@ impl ConfigSieve for Config {
Ok(SieveCore {
runtime,
scripts: ctx.scripts.clone(),
lookup: ctx.lookup.clone(),
lookup: ctx.directory.lookups.clone(),
config: SieveConfig {
from_addr: self
.value("sieve.from-addr")
@ -135,12 +135,12 @@ impl ConfigSieve for Config {
.unwrap_or_default()
.to_string(),
sign,
db: if let Some(db) = self.value("sieve.use-database") {
if let Some(db) = ctx.databases.get(db) {
db: if let Some(db) = self.value("sieve.use-directory") {
if let Some(db) = ctx.directory.directories.get(db) {
Some(db.clone())
} else {
return Err(format!(
"Database {db:?} not found for key \"sieve.use-database\"."
"Directory {db:?} not found for key \"sieve.use-directory\"."
));
}
} else {

View file

@ -183,6 +183,12 @@ impl ConfigSession for Config {
dsn: self
.parse_if_block("session.extensions.dsn", ctx, &available_keys)?
.unwrap_or_else(|| IfBlock::new(true)),
vrfy: self
.parse_if_block("session.extensions.vrfy", ctx, &available_keys)?
.unwrap_or_else(|| IfBlock::new(true)),
expn: self
.parse_if_block("session.extensions.expn", ctx, &available_keys)?
.unwrap_or_else(|| IfBlock::new(true)),
chunking: self
.parse_if_block("session.extensions.chunking", ctx, &available_keys)?
.unwrap_or_else(|| IfBlock::new(true)),
@ -238,10 +244,14 @@ impl ConfigSession for Config {
.unwrap_or_default();
Ok(Auth {
lookup: self
.parse_if_block::<Option<String>>("session.auth.lookup", ctx, &available_keys)?
directory: self
.parse_if_block::<Option<String>>("session.auth.directory", ctx, &available_keys)?
.unwrap_or_default()
.map_if_block(&ctx.lookup, "session.auth.lookup", "lookup list")?,
.map_if_block(
&ctx.directory.directories,
"session.auth.directory",
"lookup list",
)?,
mechanisms: IfBlock {
if_then: mechanisms
.if_then
@ -302,31 +312,26 @@ impl ConfigSession for Config {
relay: self
.parse_if_block("session.rcpt.relay", ctx, &available_keys)?
.unwrap_or_else(|| IfBlock::new(false)),
lookup_domains: self
.parse_if_block::<Option<String>>(
"session.rcpt.lookup.domains",
"session.rcpt.directory.domains",
ctx,
&available_keys,
)?
.unwrap_or_default()
.map_if_block(&ctx.lookup, "session.rcpt.lookup.domains", "lookup list")?,
lookup_addresses: self
.parse_if_block::<Option<String>>(
"session.rcpt.lookup.addresses",
ctx,
&available_keys,
)?
.map_if_block(
&ctx.directory.lookups,
"session.rcpt.directory.domains",
"lookup list",
)?,
directory: self
.parse_if_block::<Option<String>>("session.rcpt.directory", ctx, &available_keys)?
.unwrap_or_default()
.map_if_block(&ctx.lookup, "session.rcpt.lookup.addresses", "lookup list")?,
lookup_expn: self
.parse_if_block::<Option<String>>("session.rcpt.lookup.expn", ctx, &available_keys)?
.unwrap_or_default()
.map_if_block(&ctx.lookup, "session.rcpt.lookup.expn", "lookup list")?,
lookup_vrfy: self
.parse_if_block::<Option<String>>("session.rcpt.lookup.vrfy", ctx, &available_keys)?
.unwrap_or_default()
.map_if_block(&ctx.lookup, "session.rcpt.lookup.vrfy", "lookup list")?,
.map_if_block(
&ctx.directory.directories,
"session.rcpt.directory",
"lookup list",
)?,
errors_max: self
.parse_if_block("session.rcpt.errors.max", ctx, &available_keys)?
.unwrap_or_else(|| IfBlock::new(10)),

View file

@ -164,74 +164,3 @@ impl ParseTrottleKey for &str {
}
}
}
#[cfg(test)]
mod tests {
use std::{fs, path::PathBuf, time::Duration};
use utils::config::Config;
use crate::config::{
throttle::ConfigThrottle, Condition, ConditionMatch, Conditions, ConfigContext,
EnvelopeKey, IpAddrMask, Rate, Throttle, THROTTLE_AUTH_AS, THROTTLE_REMOTE_IP,
THROTTLE_SENDER_DOMAIN,
};
#[test]
fn parse_throttle() {
let mut file = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
file.push("resources");
file.push("smtp");
file.push("config");
file.push("throttle.toml");
let available_keys = vec![
EnvelopeKey::Recipient,
EnvelopeKey::RecipientDomain,
EnvelopeKey::Sender,
EnvelopeKey::SenderDomain,
EnvelopeKey::AuthenticatedAs,
EnvelopeKey::Listener,
EnvelopeKey::RemoteIp,
EnvelopeKey::LocalIp,
EnvelopeKey::Priority,
];
let config = Config::parse(&fs::read_to_string(file).unwrap()).unwrap();
let context = ConfigContext::new(&[]);
let throttle = config
.parse_throttle("throttle", &context, &available_keys, u16::MAX)
.unwrap();
assert_eq!(
throttle,
vec![
Throttle {
conditions: Conditions {
conditions: vec![Condition::Match {
key: EnvelopeKey::RemoteIp,
value: ConditionMatch::IpAddrMask(IpAddrMask::V4 {
addr: "127.0.0.1".parse().unwrap(),
mask: u32::MAX
}),
not: false
}]
},
keys: THROTTLE_REMOTE_IP | THROTTLE_AUTH_AS,
concurrency: 100.into(),
rate: Rate {
requests: 50,
period: Duration::from_secs(30)
}
.into()
},
Throttle {
conditions: Conditions { conditions: vec![] },
keys: THROTTLE_SENDER_DOMAIN,
concurrency: 10000.into(),
rate: None
}
]
);
}
}

View file

@ -169,10 +169,11 @@ impl IpAddrMask {
mod tests {
use std::{fs, net::IpAddr, path::PathBuf};
use directory::config::ConfigDirectory;
use utils::config::{Config, Server};
use crate::{
config::{condition::ConfigCondition, list::ConfigList, ConfigContext, IfBlock, IfThen},
config::{condition::ConfigCondition, ConfigContext, IfBlock, IfThen},
core::Envelope,
};
@ -258,7 +259,7 @@ mod tests {
},
];
let mut context = ConfigContext::new(&servers);
config.parse_lists(&mut context).unwrap();
context.directory = config.parse_directory().unwrap();
let conditions = config.parse_conditions(&context).unwrap();
let envelope = TestEnvelope {

View file

@ -39,10 +39,7 @@ use tokio::{
sync::oneshot,
};
use utils::{
ipc::{Item, LookupResult},
listener::{limiter::InFlight, SessionManager},
};
use utils::listener::{limiter::InFlight, SessionManager};
use crate::{
queue::{self, instant_to_timestamp, InstantFromTimestamp, QueueId, Status},
@ -256,13 +253,13 @@ impl SMTP {
.queue
.config
.management_lookup
.lookup(Item::Authenticate(Credentials::Plain { username, secret }))
.authenticate(&Credentials::Plain { username, secret })
.await
{
Some(LookupResult::True) => {
Ok(Some(_)) => {
is_authenticated = true;
}
Some(LookupResult::False) => {
Ok(None) => {
tracing::debug!(
context = "management",
event = "auth-error",

View file

@ -31,6 +31,7 @@ use std::{
use ahash::AHashMap;
use dashmap::DashMap;
use directory::{Directory, Lookup};
use mail_auth::{common::lru::LruCache, IprevOutput, Resolver, SpfOutput};
use sieve::{Runtime, Sieve};
use smtp_proto::request::receiver::{
@ -53,7 +54,6 @@ use crate::{
VerifyStrategy,
},
inbound::auth::SaslToken,
lookup::{Lookup, SqlDatabase},
outbound::{
dane::{DnssecResolver, Tlsa},
mta_sts,
@ -117,7 +117,7 @@ pub struct SieveConfig {
pub from_name: String,
pub return_path: String,
pub sign: Vec<Arc<DkimSigner>>,
pub db: Option<SqlDatabase>,
pub db: Option<Arc<dyn Directory>>,
}
pub struct Resolvers {
@ -223,7 +223,7 @@ pub struct SessionParameters {
pub ehlo_reject_non_fqdn: bool,
// Auth parameters
pub auth_lookup: Option<Arc<Lookup>>,
pub auth_directory: Option<Arc<dyn Directory>>,
pub auth_require: bool,
pub auth_errors_max: usize,
pub auth_errors_wait: Duration,
@ -236,9 +236,9 @@ pub struct SessionParameters {
pub rcpt_max: usize,
pub rcpt_dsn: bool,
pub rcpt_lookup_domain: Option<Arc<Lookup>>,
pub rcpt_lookup_addresses: Option<Arc<Lookup>>,
pub rcpt_lookup_expn: Option<Arc<Lookup>>,
pub rcpt_lookup_vrfy: Option<Arc<Lookup>>,
pub rcpt_directory: Option<Arc<dyn Directory>>,
pub can_expn: bool,
pub can_vrfy: bool,
pub max_message_size: usize,
// Mail authentication parameters
@ -454,7 +454,7 @@ impl Session<NullIo> {
timeout: Default::default(),
ehlo_require: Default::default(),
ehlo_reject_non_fqdn: Default::default(),
auth_lookup: Default::default(),
auth_directory: Default::default(),
auth_require: Default::default(),
auth_errors_max: Default::default(),
auth_errors_wait: Default::default(),
@ -465,13 +465,13 @@ impl Session<NullIo> {
rcpt_max: Default::default(),
rcpt_dsn: Default::default(),
rcpt_lookup_domain: Default::default(),
rcpt_lookup_addresses: Default::default(),
rcpt_lookup_expn: Default::default(),
rcpt_lookup_vrfy: Default::default(),
rcpt_directory: Default::default(),
max_message_size: Default::default(),
iprev: crate::config::VerifyStrategy::Disable,
spf_ehlo: crate::config::VerifyStrategy::Disable,
spf_mail_from: crate::config::VerifyStrategy::Disable,
can_expn: false,
can_vrfy: false,
dnsbl_policy: 0,
},
in_flight: vec![],

View file

@ -44,22 +44,22 @@ impl<T: AsyncRead + AsyncWrite> Session<T> {
// Auth parameters
let ac = &self.core.session.config.auth;
self.params.auth_lookup = ac.lookup.eval(self).await.clone();
self.params.auth_directory = ac.directory.eval(self).await.clone();
self.params.auth_require = *ac.require.eval(self).await;
self.params.auth_errors_max = *ac.errors_max.eval(self).await;
self.params.auth_errors_wait = *ac.errors_wait.eval(self).await;
// VRFY/EXPN parameters
let rc = &self.core.session.config.rcpt;
self.params.rcpt_lookup_expn = rc.lookup_expn.eval(self).await.clone();
self.params.rcpt_lookup_vrfy = rc.lookup_vrfy.eval(self).await.clone();
let ec = &self.core.session.config.extensions;
self.params.can_expn = *ec.expn.eval(self).await;
self.params.can_vrfy = *ec.vrfy.eval(self).await;
}
pub async fn eval_post_auth_params(&mut self) {
// Refresh VRFY/EXPN parameters
let rc = &self.core.session.config.rcpt;
self.params.rcpt_lookup_expn = rc.lookup_expn.eval(self).await.clone();
self.params.rcpt_lookup_vrfy = rc.lookup_vrfy.eval(self).await.clone();
let ec = &self.core.session.config.extensions;
self.params.can_expn = *ec.expn.eval(self).await;
self.params.can_vrfy = *ec.vrfy.eval(self).await;
}
pub async fn eval_rcpt_params(&mut self) {
@ -70,7 +70,7 @@ impl<T: AsyncRead + AsyncWrite> Session<T> {
self.params.rcpt_errors_wait = *rc.errors_wait.eval(self).await;
self.params.rcpt_max = *rc.max_recipients.eval(self).await;
self.params.rcpt_lookup_domain = rc.lookup_domains.eval(self).await.clone();
self.params.rcpt_lookup_addresses = rc.lookup_addresses.eval(self).await.clone();
self.params.rcpt_directory = rc.directory.eval(self).await.clone();
self.params.rcpt_dsn = *self.core.session.config.extensions.dsn.eval(self).await;
self.params.max_message_size = *self

View file

@ -24,6 +24,7 @@
use std::{borrow::Cow, process::Command, sync::Arc, time::Duration};
use ahash::AHashMap;
use directory::Lookup;
use mail_auth::common::headers::HeaderWriter;
use sieve::{
compiler::grammar::actions::action_redirect::{ByMode, ByTime, Notify, NotifyItem, Ret},
@ -38,10 +39,7 @@ use tokio::{
runtime::Handle,
};
use crate::{
lookup::Lookup,
queue::{DomainPart, InstantFromTimestamp, Message},
};
use crate::queue::{DomainPart, InstantFromTimestamp, Message};
use super::{Session, SMTP};
@ -194,26 +192,26 @@ impl SMTP {
} => match command_type {
CommandType::Query => {
if let Some(db) = &self.sieve.config.db {
if command
let result = handle.block_on(db.query(
&command,
&arguments.iter().map(String::as_str).collect::<Vec<_>>(),
));
input = if command
.as_bytes()
.get(..6)
.map_or(false, |q| q.eq_ignore_ascii_case(b"SELECT"))
{
input = handle
.block_on(db.exists(&command, arguments.into_iter()))
.unwrap_or(false)
.into();
result.unwrap_or(false).into()
} else {
input = handle
.block_on(db.execute(&command, arguments.into_iter()))
.into();
}
result.is_ok().into()
};
} else {
tracing::warn!(
parent: &span,
context = "sieve",
event = "config-error",
reason = "No database configured",
reason = "No directory configured",
);
input = false.into();
}
@ -275,27 +273,16 @@ impl SMTP {
Recipient::List(list) => {
if let Some(list) = self.sieve.lookup.get(&list) {
match list.as_ref() {
Lookup::List(items) => {
for rcpt in items {
Lookup::List { list } => {
for rcpt in list {
handle.block_on(
message.add_recipient(rcpt, &self.queue.config),
);
}
}
Lookup::Sql(sql) => {
if let Some(items) = handle.block_on(sql.fetch_many(""))
{
for rcpt in items {
handle.block_on(
message.add_recipient(
rcpt,
&self.queue.config,
),
);
}
}
Lookup::Directory { .. } => {
// Not implemented
}
_ => (),
}
} else {
tracing::warn!(

View file

@ -25,7 +25,6 @@ use mail_parser::decoders::base64::base64_decode;
use mail_send::Credentials;
use smtp_proto::{IntoString, AUTH_LOGIN, AUTH_OAUTHBEARER, AUTH_PLAIN, AUTH_XOAUTH2};
use tokio::io::{AsyncRead, AsyncWrite};
use utils::ipc::Item;
use crate::core::Session;
@ -175,16 +174,14 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
}
pub async fn authenticate(&mut self, credentials: Credentials<String>) -> Result<bool, ()> {
if let Some(lookup) = &self.params.auth_lookup {
if let Some(lookup) = &self.params.auth_directory {
let authenticated_as = match &credentials {
Credentials::Plain { username, .. }
| Credentials::XOauth2 { username, .. }
| Credentials::OAuthBearer { token: username } => username.to_string(),
};
if let Some(is_authenticated) = lookup
.lookup(Item::Authenticate(credentials))
.await
.map(bool::from)
if let Ok(is_authenticated) =
lookup.authenticate(&credentials).await.map(|r| r.is_some())
{
tracing::debug!(
parent: &self.span,

View file

@ -129,7 +129,6 @@ impl<T: AsyncWrite + AsyncRead + IsTls + Unpin> Session<T> {
response.capabilities |= EXT_START_TLS;
}
let ec = &self.core.session.config.extensions;
let rc = &self.core.session.config.rcpt;
let ac = &self.core.session.config.auth;
let dc = &self.core.session.config.data;
@ -144,12 +143,12 @@ impl<T: AsyncWrite + AsyncRead + IsTls + Unpin> Session<T> {
}
// Address Expansion
if rc.lookup_expn.eval(self).await.is_some() {
if *ec.expn.eval(self).await {
response.capabilities |= EXT_EXPN;
}
// Recipient Verification
if rc.lookup_vrfy.eval(self).await.is_some() {
if *ec.vrfy.eval(self).await {
response.capabilities |= EXT_VRFY;
}

View file

@ -71,15 +71,13 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
};
// Verify address
if let (Some(domain_lookup), Some(address_lookup)) = (
&self.params.rcpt_lookup_domain,
&self.params.rcpt_lookup_addresses,
) {
let cache = "true";
if let (Some(domain_lookup), Some(address_lookup)) =
(&self.params.rcpt_lookup_domain, &self.params.auth_directory)
{
if let Some(is_local_domain) = domain_lookup.contains(&rcpt.domain).await {
if is_local_domain {
if let Some(is_local_address) =
address_lookup.contains(&rcpt.address_lcase).await
{
if let Ok(is_local_address) = address_lookup.rcpt(&rcpt.address_lcase).await {
if !is_local_address {
tracing::debug!(parent: &self.span,
context = "rcpt",

View file

@ -94,7 +94,7 @@ impl<T: AsyncWrite + AsyncRead + IsTls + Unpin> Session<T> {
} => {
let auth =
*self.core.session.config.auth.mechanisms.eval(self).await;
if auth == 0 || self.params.auth_lookup.is_none() {
if auth == 0 || self.params.auth_directory.is_none() {
self.write(b"503 5.5.1 AUTH not allowed.\r\n").await?;
} else if !self.data.authenticated_as.is_empty() {
self.write(b"503 5.5.1 Already authenticated.\r\n").await?;

View file

@ -21,108 +21,112 @@
* for more details.
*/
use directory::DirectoryError;
use tokio::io::{AsyncRead, AsyncWrite};
use utils::ipc::{Item, LookupResult};
use crate::core::Session;
use std::fmt::Write;
impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
pub async fn handle_vrfy(&mut self, address: String) -> Result<(), ()> {
if let Some(address_lookup) = &self.params.rcpt_lookup_vrfy {
if let Some(result) = address_lookup
.lookup(Item::Verify(address.to_lowercase()))
.await
{
if let LookupResult::Values(values) = result {
let mut result = String::with_capacity(32);
for (pos, value) in values.iter().enumerate() {
let _ = write!(
result,
"250{}{}\r\n",
if pos == values.len() - 1 { " " } else { "-" },
value
);
match &self.params.rcpt_directory {
Some(address_lookup) if self.params.can_vrfy => {
match address_lookup.vrfy(&address.to_lowercase()).await {
Ok(values) if !values.is_empty() => {
let mut result = String::with_capacity(32);
for (pos, value) in values.iter().enumerate() {
let _ = write!(
result,
"250{}{}\r\n",
if pos == values.len() - 1 { " " } else { "-" },
value
);
}
tracing::debug!(parent: &self.span,
context = "vrfy",
event = "success",
address = &address);
self.write(result.as_bytes()).await
}
Ok(_) | Err(DirectoryError::Unsupported) => {
tracing::debug!(parent: &self.span,
context = "vrfy",
event = "not-found",
address = &address);
tracing::debug!(parent: &self.span,
context = "vrfy",
event = "success",
address = &address);
self.write(b"550 5.1.2 Address not found.\r\n").await
}
Err(_) => {
tracing::debug!(parent: &self.span,
context = "vrfy",
event = "temp-fail",
address = &address);
self.write(result.as_bytes()).await
} else {
tracing::debug!(parent: &self.span,
context = "vrfy",
event = "not-found",
address = &address);
self.write(b"550 5.1.2 Address not found.\r\n").await
self.write(b"252 2.4.3 Unable to verify address at this time.\r\n")
.await
}
}
} else {
}
_ => {
tracing::debug!(parent: &self.span,
context = "vrfy",
event = "temp-fail",
event = "forbidden",
address = &address);
self.write(b"252 2.4.3 Unable to verify address at this time.\r\n")
.await
self.write(b"252 2.5.1 VRFY is disabled.\r\n").await
}
} else {
tracing::debug!(parent: &self.span,
context = "vrfy",
event = "forbidden",
address = &address);
self.write(b"252 2.5.1 VRFY is disabled.\r\n").await
}
}
pub async fn handle_expn(&mut self, address: String) -> Result<(), ()> {
if let Some(address_lookup) = &self.params.rcpt_lookup_expn {
if let Some(result) = address_lookup
.lookup(Item::Expand(address.to_lowercase()))
.await
{
if let LookupResult::Values(values) = result {
let mut result = String::with_capacity(32);
for (pos, value) in values.iter().enumerate() {
let _ = write!(
result,
"250{}{}\r\n",
if pos == values.len() - 1 { " " } else { "-" },
value
);
match &self.params.rcpt_directory {
Some(address_lookup) if self.params.can_expn => {
match address_lookup.expn(&address.to_lowercase()).await {
Ok(values) if !values.is_empty() => {
let mut result = String::with_capacity(32);
for (pos, value) in values.iter().enumerate() {
let _ = write!(
result,
"250{}{}\r\n",
if pos == values.len() - 1 { " " } else { "-" },
value
);
}
tracing::debug!(parent: &self.span,
context = "expn",
event = "success",
address = &address);
self.write(result.as_bytes()).await
}
tracing::debug!(parent: &self.span,
context = "expn",
event = "success",
address = &address);
self.write(result.as_bytes()).await
} else {
tracing::debug!(parent: &self.span,
context = "expn",
event = "not-found",
address = &address);
Ok(_) | Err(DirectoryError::Unsupported) => {
tracing::debug!(parent: &self.span,
context = "expn",
event = "not-found",
address = &address);
self.write(b"550 5.1.2 Mailing list not found.\r\n").await
self.write(b"550 5.1.2 Mailing list not found.\r\n").await
}
Err(_) => {
tracing::debug!(parent: &self.span,
context = "expn",
event = "temp-fail",
address = &address);
self.write(b"252 2.4.3 Unable to expand mailing list at this time.\r\n")
.await
}
}
} else {
}
_ => {
tracing::debug!(parent: &self.span,
context = "expn",
event = "temp-fail",
event = "forbidden",
address = &address);
self.write(b"252 2.4.3 Unable to expand mailing list at this time.\r\n")
.await
self.write(b"252 2.5.1 EXPN is disabled.\r\n").await
}
} else {
tracing::debug!(parent: &self.span,
context = "expn",
event = "forbidden",
address = &address);
self.write(b"252 2.5.1 EXPN is disabled.\r\n").await
}
}
}

View file

@ -27,12 +27,11 @@ use crate::core::{
use std::sync::Arc;
use config::{
auth::ConfigAuth, database::ConfigDatabase, list::ConfigList, queue::ConfigQueue,
remote::ConfigHost, report::ConfigReport, resolver::ConfigResolver, scripts::ConfigSieve,
session::ConfigSession, ConfigContext, Host,
auth::ConfigAuth, queue::ConfigQueue, remote::ConfigHost, report::ConfigReport,
resolver::ConfigResolver, scripts::ConfigSieve, session::ConfigSession, ConfigContext, Host,
};
use dashmap::DashMap;
use lookup::Lookup;
use directory::DirectoryConfig;
use mail_send::smtp::tls::build_tls_connector;
use queue::manager::SpawnQueue;
use reporting::scheduler::SpawnReport;
@ -45,7 +44,6 @@ use utils::{
pub mod config;
pub mod core;
pub mod inbound;
pub mod lookup;
pub mod outbound;
pub mod queue;
pub mod reporting;
@ -56,18 +54,15 @@ impl SMTP {
pub async fn init(
config: &Config,
servers: &Servers,
directory: &DirectoryConfig,
#[cfg(feature = "local_delivery")] delivery_tx: mpsc::Sender<utils::ipc::DeliveryEvent>,
) -> Result<Arc<Self>, String> {
// Read configuration parameters
let mut config_ctx = ConfigContext::new(&servers.inner);
config_ctx.directory = directory.clone();
#[cfg(feature = "local_delivery")]
{
config_ctx.lookup.insert(
"local".to_string(),
Arc::new(Lookup::Local(delivery_tx.clone())),
);
let (channel_tx, channel_rx) = mpsc::channel(1024);
config_ctx.hosts.insert(
"local".to_string(),
Host {
@ -80,21 +75,11 @@ impl SMTP {
tls_allow_invalid_certs: Default::default(),
username: Default::default(),
secret: Default::default(),
max_errors: Default::default(),
max_requests: Default::default(),
cache_entries: Default::default(),
cache_ttl_positive: Default::default(),
cache_ttl_negative: Default::default(),
channel_tx,
channel_rx,
lookup: false,
},
);
}
config.parse_remote_hosts(&mut config_ctx)?;
config.parse_databases(&mut config_ctx)?;
config.parse_lists(&mut config_ctx)?;
config.parse_signatures(&mut config_ctx)?;
let sieve_config = config.parse_sieve(&mut config_ctx)?;
let session_config = config.parse_session_config(&config_ctx)?;
@ -168,13 +153,6 @@ impl SMTP {
// Spawn report manager
report_rx.spawn(core.clone(), core.report.read_reports().await);
// Spawn remote hosts
for host in config_ctx.hosts.into_values() {
if host.lookup {
host.spawn(config);
}
}
Ok(core)
}
}

View file

@ -1,85 +0,0 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of the Stalwart SMTP Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::{
borrow::Borrow,
hash::Hash,
time::{Duration, Instant},
};
#[allow(clippy::type_complexity)]
#[derive(Debug)]
pub struct LookupCache<T: Hash + Eq> {
cache_pos: lru_cache::LruCache<T, Instant, ahash::RandomState>,
cache_neg: lru_cache::LruCache<T, Instant, ahash::RandomState>,
ttl_pos: Duration,
ttl_neg: Duration,
}
impl<T: Hash + Eq> LookupCache<T> {
pub fn new(capacity: usize, ttl_pos: Duration, ttl_neg: Duration) -> Self {
Self {
cache_pos: lru_cache::LruCache::with_hasher(capacity, ahash::RandomState::new()),
cache_neg: lru_cache::LruCache::with_hasher(capacity, ahash::RandomState::new()),
ttl_pos,
ttl_neg,
}
}
pub fn get<Q: ?Sized>(&mut self, name: &Q) -> Option<bool>
where
T: Borrow<Q>,
Q: Hash + Eq,
{
// Check positive cache
if let Some(valid_until) = self.cache_pos.get_mut(name) {
if *valid_until >= Instant::now() {
return Some(true);
} else {
self.cache_pos.remove(name);
}
}
// Check negative cache
let valid_until = self.cache_neg.get_mut(name)?;
if *valid_until >= Instant::now() {
Some(false)
} else {
self.cache_pos.remove(name);
None
}
}
pub fn insert_pos(&mut self, item: T) {
self.cache_pos.insert(item, Instant::now() + self.ttl_pos);
}
pub fn insert_neg(&mut self, item: T) {
self.cache_neg.insert(item, Instant::now() + self.ttl_neg);
}
pub fn clear(&mut self) {
self.cache_pos.clear();
self.cache_neg.clear();
}
}

View file

@ -1,118 +0,0 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of the Stalwart SMTP Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use mail_send::Credentials;
use tokio::sync::{mpsc, oneshot};
use utils::ipc::{DeliveryEvent, Item, LookupItem, LookupResult};
use super::Lookup;
impl Lookup {
pub async fn contains(&self, entry: &str) -> Option<bool> {
match self {
#[cfg(feature = "local_delivery")]
Lookup::Local(tx) => lookup_local(tx, Item::IsAccount(entry.to_string()))
.await
.map(|r| r.into()),
Lookup::Remote(tx) => tx
.lookup(Item::IsAccount(entry.to_string()))
.await
.map(|r| r.into()),
Lookup::Sql(sql) => sql.exists(entry).await,
Lookup::List(entries) => Some(entries.contains(entry)),
}
}
pub async fn lookup(&self, item: Item) -> Option<LookupResult> {
match self {
#[cfg(feature = "local_delivery")]
Lookup::Local(tx) => lookup_local(tx, item).await,
Lookup::Remote(tx) => tx.lookup(item).await,
Lookup::Sql(sql) => match item {
Item::IsAccount(account) => sql.exists(&account).await.map(LookupResult::from),
Item::Authenticate(credentials) => match credentials {
Credentials::Plain { username, secret }
| Credentials::XOauth2 { username, secret } => sql
.fetch_one(&username)
.await
.map(|pwd| LookupResult::from(pwd.map_or(false, |pwd| pwd == secret))),
Credentials::OAuthBearer { token } => {
sql.exists(&token).await.map(LookupResult::from)
}
},
Item::Verify(account) => sql.fetch_many(&account).await.map(LookupResult::from),
Item::Expand(list) => sql.fetch_many(&list).await.map(LookupResult::from),
},
Lookup::List(list) => match item {
Item::IsAccount(item) => Some(list.contains(&item).into()),
Item::Verify(_item) | Item::Expand(_item) => {
#[cfg(feature = "test_mode")]
for list_item in list {
if let Some((prefix, suffix)) = list_item.split_once(':') {
if prefix == _item {
return Some(LookupResult::Values(
suffix.split(',').map(|i| i.to_string()).collect::<Vec<_>>(),
));
}
}
}
Some(LookupResult::False)
}
Item::Authenticate(credentials) => {
let entry = match credentials {
Credentials::Plain { username, secret }
| Credentials::XOauth2 { username, secret } => {
format!("{username}:{secret}")
}
Credentials::OAuthBearer { token } => token,
};
if !list.is_empty() {
Some(list.contains(&entry).into())
} else {
None
}
}
},
}
}
}
async fn lookup_local(
delivery_tx: &mpsc::Sender<DeliveryEvent>,
item: Item,
) -> Option<LookupResult> {
let (tx, rx) = oneshot::channel();
if delivery_tx
.send(DeliveryEvent::Lookup(LookupItem { item, result: tx }))
.await
.is_ok()
{
rx.await.ok()
} else {
None
}
}

View file

@ -1,486 +0,0 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of the Stalwart SMTP Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::{fmt::Display, sync::Arc, time::Duration};
use mail_send::Credentials;
use rustls::ServerName;
use smtp_proto::{
request::{parser::Rfc5321Parser, AUTH},
response::generate::BitToString,
IntoString, AUTH_CRAM_MD5, AUTH_LOGIN, AUTH_OAUTHBEARER, AUTH_PLAIN, AUTH_XOAUTH2,
};
use tokio::{
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
net::{TcpStream, ToSocketAddrs},
sync::mpsc,
};
use tokio_rustls::{client::TlsStream, TlsConnector};
use utils::ipc::{Item, LookupItem};
use crate::lookup::spawn::LoggedUnwrap;
use super::{Event, RemoteLookup};
pub struct ImapAuthClient<T: AsyncRead + AsyncWrite> {
stream: T,
timeout: Duration,
}
pub struct ImapAuthClientBuilder {
pub addr: String,
timeout: Duration,
tls_connector: TlsConnector,
tls_hostname: String,
tls_implicit: bool,
mechanisms: u64,
}
impl ImapAuthClientBuilder {
pub fn new(
addr: String,
timeout: Duration,
tls_connector: TlsConnector,
tls_hostname: String,
tls_implicit: bool,
) -> Self {
Self {
addr,
timeout,
tls_connector,
tls_hostname,
tls_implicit,
mechanisms: AUTH_PLAIN,
}
}
pub async fn init(mut self) -> Self {
let err = match self.connect().await {
Ok(mut client) => match client.authentication_mechanisms().await {
Ok(mechanisms) => {
client.logout().await.ok();
self.mechanisms = mechanisms;
return self;
}
Err(err) => err,
},
Err(err) => err,
};
tracing::warn!(
context = "remote",
event = "error",
remote.addr = &self.addr,
remote.protocol = "imap",
"Could not obtain auth mechanisms: {}",
err
);
self
}
pub async fn connect(&self) -> Result<ImapAuthClient<TlsStream<TcpStream>>, Error> {
ImapAuthClient::connect(
&self.addr,
self.timeout,
&self.tls_connector,
&self.tls_hostname,
self.tls_implicit,
)
.await
}
}
#[derive(Debug)]
pub enum Error {
Io(std::io::Error),
Timeout,
InvalidResponse(String),
InvalidChallenge(String),
AuthenticationFailed,
TLSInvalidName,
Disconnected,
}
impl RemoteLookup for Arc<ImapAuthClientBuilder> {
fn spawn_lookup(&self, lookup: LookupItem, tx: mpsc::Sender<Event>) {
let builder = self.clone();
tokio::spawn(async move {
if let Err(err) = builder.lookup(lookup, &tx).await {
tracing::warn!(
context = "remote",
event = "error",
remote.addr = &builder.addr,
remote.protocol = "imap",
"Remote lookup failed: {}",
err
);
tx.send(Event::WorkerFailed).await.logged_unwrap();
}
});
}
}
impl ImapAuthClientBuilder {
pub async fn lookup(&self, lookup: LookupItem, tx: &mpsc::Sender<Event>) -> Result<(), Error> {
match &lookup.item {
Item::Authenticate(credentials) => {
let mut client = self.connect().await?;
let mechanism = match credentials {
Credentials::Plain { .. }
if (self.mechanisms & (AUTH_PLAIN | AUTH_LOGIN | AUTH_CRAM_MD5)) != 0 =>
{
if self.mechanisms & AUTH_CRAM_MD5 != 0 {
AUTH_CRAM_MD5
} else if self.mechanisms & AUTH_PLAIN != 0 {
AUTH_PLAIN
} else {
AUTH_LOGIN
}
}
Credentials::OAuthBearer { .. } if self.mechanisms & AUTH_OAUTHBEARER != 0 => {
AUTH_OAUTHBEARER
}
Credentials::XOauth2 { .. } if self.mechanisms & AUTH_XOAUTH2 != 0 => {
AUTH_XOAUTH2
}
_ => {
tracing::warn!(
context = "remote",
event = "error",
remote.addr = &self.addr,
remote.protocol = "imap",
"IMAP server does not offer any supported auth mechanisms.",
);
tx.send(Event::WorkerFailed).await.logged_unwrap();
return Ok(());
}
};
let result = match client.authenticate(mechanism, credentials).await {
Ok(_) => true,
Err(err) => match &err {
Error::AuthenticationFailed => false,
_ => return Err(err),
},
};
tx.send(Event::WorkerReady {
item: lookup.item,
result: Some(result),
next_lookup: None,
})
.await
.logged_unwrap();
lookup.result.send(result.into()).logged_unwrap();
}
_ => {
tracing::warn!(
context = "remote",
event = "error",
remote.addr = &self.addr,
remote.protocol = "imap",
"IMAP does not support validating recipients.",
);
tx.send(Event::WorkerFailed).await.logged_unwrap();
}
}
Ok(())
}
}
impl ImapAuthClient<TcpStream> {
async fn start_tls(
mut self,
tls_connector: &TlsConnector,
tls_hostname: &str,
) -> Result<ImapAuthClient<TlsStream<TcpStream>>, Error> {
let line = tokio::time::timeout(self.timeout, async {
self.write(b"C7 STARTTLS\r\n").await?;
self.read_line().await
})
.await
.map_err(|_| Error::Timeout)??;
if matches!(line.get(..5), Some(b"C7 OK")) {
self.into_tls(tls_connector, tls_hostname).await
} else {
Err(Error::InvalidResponse(line.into_string()))
}
}
async fn into_tls(
self,
tls_connector: &TlsConnector,
tls_hostname: &str,
) -> Result<ImapAuthClient<TlsStream<TcpStream>>, Error> {
tokio::time::timeout(self.timeout, async {
Ok(ImapAuthClient {
stream: tls_connector
.connect(
ServerName::try_from(tls_hostname).map_err(|_| Error::TLSInvalidName)?,
self.stream,
)
.await?,
timeout: self.timeout,
})
})
.await
.map_err(|_| Error::Timeout)?
}
}
impl ImapAuthClient<TlsStream<TcpStream>> {
pub async fn connect(
addr: impl ToSocketAddrs,
timeout: Duration,
tls_connector: &TlsConnector,
tls_hostname: &str,
tls_implicit: bool,
) -> Result<Self, Error> {
let mut client: ImapAuthClient<TcpStream> = tokio::time::timeout(timeout, async {
match TcpStream::connect(addr).await {
Ok(stream) => Ok(ImapAuthClient { stream, timeout }),
Err(err) => Err(Error::Io(err)),
}
})
.await
.map_err(|_| Error::Timeout)??;
if tls_implicit {
let mut client = client.into_tls(tls_connector, tls_hostname).await?;
client.expect_greeting().await?;
Ok(client)
} else {
client.expect_greeting().await?;
client.start_tls(tls_connector, tls_hostname).await
}
}
}
impl<T: AsyncRead + AsyncWrite + Unpin> ImapAuthClient<T> {
pub async fn authenticate(
&mut self,
mechanism: u64,
credentials: &Credentials<String>,
) -> Result<(), Error> {
if (mechanism & (AUTH_PLAIN | AUTH_XOAUTH2 | AUTH_OAUTHBEARER)) != 0 {
self.write(
format!(
"C3 AUTHENTICATE {} {}\r\n",
mechanism.to_mechanism(),
credentials
.encode(mechanism, "")
.map_err(|err| Error::InvalidChallenge(err.to_string()))?
)
.as_bytes(),
)
.await?;
} else {
self.write(format!("C3 AUTHENTICATE {}\r\n", mechanism.to_mechanism()).as_bytes())
.await?;
}
let mut line = self.read_line().await?;
for _ in 0..3 {
if matches!(line.first(), Some(b'+')) {
self.write(
format!(
"{}\r\n",
credentials
.encode(
mechanism,
std::str::from_utf8(line.get(2..).unwrap_or_default())
.unwrap_or_default()
)
.map_err(|err| Error::InvalidChallenge(err.to_string()))?
)
.as_bytes(),
)
.await?;
line = self.read_line().await?;
} else if matches!(line.get(..5), Some(b"C3 OK")) {
return Ok(());
} else if matches!(line.get(..5), Some(b"C3 NO"))
|| matches!(line.get(..6), Some(b"C3 BAD"))
{
return Err(Error::AuthenticationFailed);
} else {
return Err(Error::InvalidResponse(line.into_string()));
}
}
Err(Error::InvalidResponse(line.into_string()))
}
pub async fn authentication_mechanisms(&mut self) -> Result<u64, Error> {
tokio::time::timeout(self.timeout, async {
self.write(b"C0 CAPABILITY\r\n").await?;
let line = self.read_line().await?;
if !matches!(line.get(..12), Some(b"* CAPABILITY")) {
return Err(Error::InvalidResponse(line.into_string()));
}
let mut line_iter = line.iter();
let mut parser = Rfc5321Parser::new(&mut line_iter);
let mut mechanisms = 0;
'outer: while let Ok(ch) = parser.read_char() {
if ch == b' ' {
loop {
if parser.hashed_value().unwrap_or(0) == AUTH && parser.stop_char == b'=' {
if let Ok(Some(mechanism)) = parser.mechanism() {
mechanisms |= mechanism;
}
match parser.stop_char {
b' ' => (),
b'\n' => break 'outer,
_ => break,
}
}
}
} else if ch == b'\n' {
break;
}
}
Ok(mechanisms)
})
.await
.map_err(|_| Error::Timeout)?
}
pub async fn noop(&mut self) -> Result<(), Error> {
tokio::time::timeout(self.timeout, async {
self.write(b"C8 NOOP\r\n").await?;
self.read_line().await?;
Ok(())
})
.await
.map_err(|_| Error::Timeout)?
}
pub async fn logout(&mut self) -> Result<(), Error> {
tokio::time::timeout(self.timeout, async {
self.write(b"C9 LOGOUT\r\n").await?;
Ok(())
})
.await
.map_err(|_| Error::Timeout)?
}
pub async fn expect_greeting(&mut self) -> Result<(), Error> {
tokio::time::timeout(self.timeout, async {
let line = self.read_line().await?;
if matches!(line.get(..4), Some(b"* OK")) {
Ok(())
} else {
Err(Error::InvalidResponse(line.into_string()))
}
})
.await
.map_err(|_| Error::Timeout)?
}
pub async fn read_line(&mut self) -> Result<Vec<u8>, Error> {
let mut buf = vec![0u8; 1024];
let mut buf_extended = Vec::with_capacity(0);
loop {
let br = self.stream.read(&mut buf).await?;
if br > 0 {
if matches!(buf.get(br - 1), Some(b'\n')) {
//println!("{:?}", std::str::from_utf8(&buf[..br]).unwrap());
return Ok(if buf_extended.is_empty() {
buf.truncate(br);
buf
} else {
buf_extended.extend_from_slice(&buf[..br]);
buf_extended
});
} else if buf_extended.is_empty() {
buf_extended = buf[..br].to_vec();
} else {
buf_extended.extend_from_slice(&buf[..br]);
}
} else {
return Err(Error::Disconnected);
}
}
}
async fn write(&mut self, bytes: &[u8]) -> Result<(), std::io::Error> {
self.stream.write_all(bytes).await?;
self.stream.flush().await
}
}
impl From<std::io::Error> for Error {
fn from(error: std::io::Error) -> Self {
Error::Io(error)
}
}
impl Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::Io(io) => write!(f, "I/O error: {io}"),
Error::Timeout => f.write_str("Connection time-out"),
Error::InvalidResponse(response) => write!(f, "Unexpected response: {response:?}"),
Error::InvalidChallenge(response) => write!(f, "Invalid auth challenge: {response}"),
Error::TLSInvalidName => f.write_str("Invalid TLS name"),
Error::Disconnected => f.write_str("Connection disconnected by peer"),
Error::AuthenticationFailed => f.write_str("Authentication failed"),
}
}
}
#[cfg(test)]
mod test {
use crate::lookup::imap::ImapAuthClient;
use mail_send::smtp::tls::build_tls_connector;
use smtp_proto::{AUTH_OAUTHBEARER, AUTH_PLAIN, AUTH_XOAUTH, AUTH_XOAUTH2};
use std::time::Duration;
#[ignore]
#[tokio::test]
async fn imap_auth() {
let connector = build_tls_connector(false);
let mut client = ImapAuthClient::connect(
"imap.gmail.com:993",
Duration::from_secs(5),
&connector,
"imap.gmail.com",
true,
)
.await
.unwrap();
assert_eq!(
AUTH_PLAIN | AUTH_XOAUTH | AUTH_XOAUTH2 | AUTH_OAUTHBEARER,
client.authentication_mechanisms().await.unwrap()
);
client.logout().await.unwrap();
}
}

View file

@ -1,94 +0,0 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of the Stalwart SMTP Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use ahash::AHashSet;
use parking_lot::Mutex;
use tokio::sync::{mpsc, oneshot};
use utils::ipc::DeliveryEvent;
use self::cache::LookupCache;
pub mod cache;
pub mod dispatch;
pub mod imap;
pub mod smtp;
pub mod spawn;
pub mod sql;
#[derive(Debug)]
pub enum Lookup {
List(AHashSet<String>),
Remote(LookupChannel),
Sql(SqlQuery),
#[cfg(feature = "local_delivery")]
Local(mpsc::Sender<DeliveryEvent>),
}
#[derive(Debug, Clone)]
pub enum SqlDatabase {
Postgres(sqlx::Pool<sqlx::Postgres>),
MySql(sqlx::Pool<sqlx::MySql>),
//MsSql(sqlx::Pool<sqlx::Mssql>),
SqlLite(sqlx::Pool<sqlx::Sqlite>),
}
#[derive(Debug)]
pub struct SqlQuery {
pub query: String,
pub db: SqlDatabase,
pub cache: Option<Mutex<LookupCache<String>>>,
}
impl Default for Lookup {
fn default() -> Self {
Lookup::List(AHashSet::default())
}
}
#[derive(Debug)]
pub enum Event {
Lookup(utils::ipc::LookupItem),
WorkerReady {
item: utils::ipc::Item,
result: Option<bool>,
next_lookup: Option<oneshot::Sender<Option<utils::ipc::LookupItem>>>,
},
WorkerFailed,
Reload,
Stop,
}
#[derive(Debug, Clone)]
pub struct LookupChannel {
pub tx: mpsc::Sender<Event>,
}
#[derive(Clone)]
struct NextHop<T: RemoteLookup> {
tx: mpsc::Sender<Event>,
host: T,
}
pub trait RemoteLookup: Clone {
fn spawn_lookup(&self, lookup: utils::ipc::LookupItem, tx: mpsc::Sender<Event>);
}

View file

@ -1,181 +0,0 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of the Stalwart SMTP Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::sync::Arc;
use mail_send::smtp::AssertReply;
use smtp_proto::Severity;
use tokio::sync::{mpsc, oneshot};
use utils::ipc::{Item, LookupItem, LookupResult};
use super::{spawn::LoggedUnwrap, Event, RemoteLookup};
pub struct SmtpClientBuilder {
pub builder: mail_send::SmtpClientBuilder<String>,
pub max_rcpt: usize,
pub max_auth_errors: usize,
}
impl SmtpClientBuilder {
pub async fn lookup_smtp(
&self,
mut lookup: LookupItem,
tx: &mpsc::Sender<Event>,
) -> Result<(), mail_send::Error> {
let mut client = self.builder.connect().await?;
let mut sent_mail_from = false;
let mut num_rcpts = 0;
let mut num_auth_failures = 0;
let capabilities = client
.capabilities(&self.builder.local_host, self.builder.is_lmtp)
.await?;
loop {
let (result, is_reusable): (LookupResult, bool) = match &lookup.item {
Item::IsAccount(rcpt_to) => {
if !sent_mail_from {
client
.cmd(b"MAIL FROM:<>\r\n")
.await?
.assert_positive_completion()?;
sent_mail_from = true;
}
let reply = client
.cmd(format!("RCPT TO:<{rcpt_to}>\r\n").as_bytes())
.await?;
let result = match reply.severity() {
Severity::PositiveCompletion => {
num_rcpts += 1;
LookupResult::True
}
Severity::PermanentNegativeCompletion => LookupResult::False,
_ => return Err(mail_send::Error::UnexpectedReply(reply)),
};
// Try to reuse the connection with any queued requests
(result, num_rcpts < self.max_rcpt)
}
Item::Authenticate(credentials) => {
let result = match client.authenticate(credentials, &capabilities).await {
Ok(_) => true,
Err(err) => match &err {
mail_send::Error::AuthenticationFailed(err) if err.code() == 535 => {
num_auth_failures += 1;
false
}
_ => {
return Err(err);
}
},
};
(
result.into(),
!result && num_auth_failures < self.max_auth_errors,
)
}
Item::Verify(address) | Item::Expand(address) => {
let reply = client
.cmd(
if matches!(&lookup.item, Item::Verify(_)) {
format!("VRFY {address}\r\n")
} else {
format!("EXPN {address}\r\n")
}
.as_bytes(),
)
.await?;
match reply.code() {
250 | 251 => (
reply
.message()
.split('\n')
.map(|p| p.to_string())
.collect::<Vec<String>>()
.into(),
true,
),
550 | 551 | 553 | 500 | 502 => (LookupResult::False, true),
_ => {
return Err(mail_send::Error::UnexpectedReply(reply));
}
}
}
};
// Try to reuse the connection with any queued requests
let cached_result = match &result {
LookupResult::True => Some(true),
LookupResult::False => Some(false),
LookupResult::Values(_) => None,
};
lookup.result.send(result).logged_unwrap();
if is_reusable {
let (next_lookup_tx, next_lookup_rx) = oneshot::channel::<Option<LookupItem>>();
if tx
.send(Event::WorkerReady {
item: lookup.item,
result: cached_result,
next_lookup: next_lookup_tx.into(),
})
.await
.logged_unwrap()
{
if let Ok(Some(next_lookup)) = next_lookup_rx.await {
lookup = next_lookup;
continue;
}
}
} else {
tx.send(Event::WorkerReady {
item: lookup.item,
result: cached_result,
next_lookup: None,
})
.await
.logged_unwrap();
}
break;
}
Ok(())
}
}
impl RemoteLookup for Arc<SmtpClientBuilder> {
fn spawn_lookup(&self, lookup: LookupItem, tx: mpsc::Sender<Event>) {
let builder = self.clone();
tokio::spawn(async move {
if let Err(err) = builder.lookup_smtp(lookup, &tx).await {
tracing::warn!(
context = "remote",
event = "lookup-failed",
remote.addr = &builder.builder.addr,
remote.protocol = "smtp",
"Remote lookup failed: {}",
err
);
tx.send(Event::WorkerFailed).await.logged_unwrap();
}
});
}
}

View file

@ -1,225 +0,0 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of the Stalwart SMTP Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::{collections::VecDeque, sync::Arc, time::Duration};
use crate::config::Host;
use mail_send::smtp::tls::build_tls_connector;
use tokio::sync::{mpsc, oneshot};
use utils::{
config::{Config, ServerProtocol},
ipc::{Item, LookupItem, LookupResult},
};
use super::{
cache::LookupCache, imap::ImapAuthClientBuilder, smtp::SmtpClientBuilder, Event, LookupChannel,
NextHop, RemoteLookup,
};
impl Host {
pub fn spawn(self, config: &Config) -> LookupChannel {
// Create channel
let local_host = config
.value("server.hostname")
.unwrap_or("[127.0.0.1]")
.to_string();
let tx_ = self.channel_tx.clone();
tokio::spawn(async move {
// Prepare builders
match self.protocol {
ServerProtocol::Smtp | ServerProtocol::Lmtp => {
NextHop {
tx: self.channel_tx,
host: Arc::new(SmtpClientBuilder {
builder: mail_send::SmtpClientBuilder {
addr: format!("{}:{}", self.address, self.port),
timeout: self.timeout,
tls_connector: build_tls_connector(self.tls_allow_invalid_certs),
tls_hostname: self.address,
tls_implicit: self.tls_implicit,
is_lmtp: matches!(self.protocol, ServerProtocol::Lmtp),
credentials: None,
local_host,
},
max_rcpt: self.max_requests,
max_auth_errors: self.max_errors,
}),
}
.run(
self.channel_rx,
self.cache_entries,
self.cache_ttl_positive,
self.cache_ttl_negative,
self.concurrency,
)
.await;
}
ServerProtocol::Imap => {
NextHop {
tx: self.channel_tx,
host: Arc::new(
ImapAuthClientBuilder::new(
format!("{}:{}", self.address, self.port),
self.timeout,
build_tls_connector(self.tls_allow_invalid_certs),
self.address,
self.tls_implicit,
)
.init()
.await,
),
}
.run(
self.channel_rx,
self.cache_entries,
self.cache_ttl_positive,
self.cache_ttl_negative,
self.concurrency,
)
.await;
}
ServerProtocol::Http | ServerProtocol::Jmap => {
eprintln!("HTTP/JMAP lookups are not supported.");
std::process::exit(0);
}
}
});
LookupChannel { tx: tx_ }
}
}
impl<T: RemoteLookup> NextHop<T> {
pub async fn run(
&self,
mut rx: mpsc::Receiver<Event>,
entries: usize,
ttl_pos: Duration,
ttl_neg: Duration,
max_concurrent: usize,
) {
// Create caches and queue
let mut cache = LookupCache::new(entries, ttl_pos, ttl_neg);
let mut queue = VecDeque::new();
let mut active_lookups = 0;
while let Some(event) = rx.recv().await {
match event {
Event::Lookup(lookup) => {
if let Some(result) = cache.get(&lookup.item) {
lookup.result.send(result.into()).logged_unwrap();
} else if active_lookups < max_concurrent {
active_lookups += 1;
self.host.spawn_lookup(lookup, self.tx.clone());
} else {
queue.push_back(lookup);
}
}
Event::WorkerReady {
item,
result,
next_lookup,
} => {
match result {
Some(true) => cache.insert_pos(item),
Some(false) => cache.insert_neg(item),
_ => (),
}
let mut lookup = None;
while let Some(queued_lookup) = queue.pop_front() {
if let Some(result) = cache.get(&queued_lookup.item) {
queued_lookup.result.send(result.into()).logged_unwrap();
} else {
lookup = queued_lookup.into();
break;
}
}
if let Some(next_lookup) = next_lookup {
if lookup.is_none() {
active_lookups -= 1;
}
next_lookup.send(lookup).logged_unwrap();
} else if let Some(lookup) = lookup {
self.host.spawn_lookup(lookup, self.tx.clone());
} else {
active_lookups -= 1;
}
}
Event::WorkerFailed => {
if let Some(queued_lookup) = queue.pop_front() {
self.host.spawn_lookup(queued_lookup, self.tx.clone());
} else {
active_lookups -= 1;
}
}
Event::Stop => {
queue.clear();
break;
}
Event::Reload => {
cache.clear();
}
}
}
}
}
impl LookupChannel {
pub async fn lookup(&self, item: Item) -> Option<LookupResult> {
let (tx, rx) = oneshot::channel();
if self
.tx
.send(Event::Lookup(LookupItem { item, result: tx }))
.await
.is_ok()
{
rx.await.ok()
} else {
None
}
}
}
impl From<mpsc::Sender<Event>> for LookupChannel {
fn from(tx: mpsc::Sender<Event>) -> Self {
LookupChannel { tx }
}
}
pub trait LoggedUnwrap {
fn logged_unwrap(self) -> bool;
}
impl<T, E: std::fmt::Debug> LoggedUnwrap for Result<T, E> {
fn logged_unwrap(self) -> bool {
match self {
Ok(_) => true,
Err(err) => {
tracing::debug!("Failed to send message over channel: {:?}", err);
false
}
}
}
}

View file

@ -1,237 +0,0 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of the Stalwart SMTP Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use super::{SqlDatabase, SqlQuery};
impl SqlQuery {
pub async fn exists(&self, param: &str) -> Option<bool> {
if let Some(result) = self
.cache
.as_ref()
.and_then(|cache| cache.lock().get(param))
{
return Some(result);
}
let result = match &self.db {
super::SqlDatabase::Postgres(pool) => {
sqlx::query_scalar::<_, bool>(&self.query)
.bind(param)
.fetch_one(pool)
.await
}
super::SqlDatabase::MySql(pool) => {
sqlx::query_scalar::<_, bool>(&self.query)
.bind(param)
.fetch_one(pool)
.await
}
/*super::SqlDatabase::MsSql(pool) => {
sqlx::query_scalar::<_, bool>(&self.query)
.bind(param)
.fetch_one(pool)
.await
}*/
super::SqlDatabase::SqlLite(pool) => {
sqlx::query_scalar::<_, bool>(&self.query)
.bind(param)
.fetch_one(pool)
.await
}
};
match result {
Ok(result) => {
if let Some(cache) = &self.cache {
if result {
cache.lock().insert_pos(param.to_string());
} else {
cache.lock().insert_neg(param.to_string());
}
}
Some(result)
}
Err(err) => {
tracing::warn!(context = "sql", event = "error", query = self.query, reason = ?err);
None
}
}
}
pub async fn fetch_one(&self, param: &str) -> Option<Option<String>> {
let result = match &self.db {
super::SqlDatabase::Postgres(pool) => {
sqlx::query_scalar::<_, String>(&self.query)
.bind(param)
.fetch_optional(pool)
.await
}
super::SqlDatabase::MySql(pool) => {
sqlx::query_scalar::<_, String>(&self.query)
.bind(param)
.fetch_optional(pool)
.await
}
/*super::SqlDatabase::MsSql(pool) => {
sqlx::query_scalar::<_, String>(&self.query)
.bind(param)
.fetch_optional(pool)
.await
}*/
super::SqlDatabase::SqlLite(pool) => {
sqlx::query_scalar::<_, String>(&self.query)
.bind(param)
.fetch_optional(pool)
.await
}
};
match result {
Ok(result) => Some(result),
Err(err) => {
tracing::warn!(context = "sql", event = "error", query = self.query, reason = ?err);
None
}
}
}
pub async fn fetch_many(&self, param: &str) -> Option<Vec<String>> {
let result = match &self.db {
super::SqlDatabase::Postgres(pool) => {
sqlx::query_scalar::<_, String>(&self.query)
.bind(param)
.fetch_all(pool)
.await
}
super::SqlDatabase::MySql(pool) => {
sqlx::query_scalar::<_, String>(&self.query)
.bind(param)
.fetch_all(pool)
.await
}
/*super::SqlDatabase::MsSql(pool) => {
sqlx::query_scalar::<_, String>(&self.query)
.bind(param)
.fetch_all(pool)
.await
}*/
super::SqlDatabase::SqlLite(pool) => {
sqlx::query_scalar::<_, String>(&self.query)
.bind(param)
.fetch_all(pool)
.await
}
};
match result {
Ok(result) => Some(result),
Err(err) => {
tracing::warn!(context = "sql", event = "error", query = self.query, reason = ?err);
None
}
}
}
}
impl SqlDatabase {
pub async fn exists(&self, query: &str, params: impl Iterator<Item = String>) -> Option<bool> {
let result = match self {
super::SqlDatabase::Postgres(pool) => {
let mut q = sqlx::query_scalar::<_, bool>(query);
for param in params {
q = q.bind(param);
}
q.fetch_one(pool).await
}
super::SqlDatabase::MySql(pool) => {
let mut q = sqlx::query_scalar::<_, bool>(query);
for param in params {
q = q.bind(param);
}
q.fetch_one(pool).await
}
/*super::SqlDatabase::MsSql(pool) => {
let mut q = sqlx::query_scalar::<_, bool>(query);
for param in params {
q = q.bind(param);
}
q.fetch_one(pool).await
}*/
super::SqlDatabase::SqlLite(pool) => {
let mut q = sqlx::query_scalar::<_, bool>(query);
for param in params {
q = q.bind(param);
}
q.fetch_one(pool).await
}
};
match result {
Ok(result) => Some(result),
Err(err) => {
tracing::warn!(context = "sql", event = "error", query = query, reason = ?err);
None
}
}
}
pub async fn execute(&self, query: &str, params: impl Iterator<Item = String>) -> bool {
let result = match self {
super::SqlDatabase::Postgres(pool) => {
let mut q = sqlx::query(query);
for param in params {
q = q.bind(param);
}
q.execute(pool).await.map(|_| ())
}
super::SqlDatabase::MySql(pool) => {
let mut q = sqlx::query(query);
for param in params {
q = q.bind(param);
}
q.execute(pool).await.map(|_| ())
}
/*super::SqlDatabase::MsSql(pool) => {
let mut q = sqlx::query(query);
for param in params {
q = q.bind(param);
}
q.execute(pool).await.map(|_| ())
}*/
super::SqlDatabase::SqlLite(pool) => {
let mut q = sqlx::query(query);
for param in params {
q = q.bind(param);
}
q.execute(pool).await.map(|_| ())
}
};
match result {
Ok(_) => true,
Err(err) => {
tracing::warn!(context = "sql", event = "error", query = query, reason = ?err);
false
}
}
}
}

View file

@ -34,7 +34,7 @@ use crate::{
use super::NextHop;
impl SMTP {
pub(super) async fn resolve_host(
pub async fn resolve_host(
&self,
remote_host: &NextHop<'_>,
envelope: &impl Envelope,
@ -106,7 +106,7 @@ impl SMTP {
}
}
pub(super) trait ToNextHop {
pub trait ToNextHop {
fn to_remote_hosts<'x, 'y: 'x>(
&'x self,
domain: &'y str,

View file

@ -232,7 +232,7 @@ impl From<Box<Message>> for DeliveryAttempt {
}
#[derive(Debug)]
enum NextHop<'x> {
pub enum NextHop<'x> {
Relay(&'x RelayHost),
MX(&'x str),
}