Automatic spam filter and webadmin downloading + quickstart

This commit is contained in:
mdecimus 2024-04-05 19:00:55 +02:00
parent 89433f3f06
commit ab47eab1d9
32 changed files with 1308 additions and 599 deletions

1
.gitignore vendored
View file

@ -2,7 +2,6 @@
.vscode
*.failed
*_failed
/resources/config/config.toml
run.sh
_ignore
.DS_Store

26
Cargo.lock generated
View file

@ -995,6 +995,7 @@ dependencies = [
"decancer",
"directory",
"futures",
"hostname 0.4.0",
"hyper 1.2.0",
"idna 0.5.0",
"imagesize",
@ -1013,6 +1014,7 @@ dependencies = [
"pem",
"privdrop",
"proxy-header",
"pwhash",
"rcgen 0.12.1",
"regex",
"reqwest 0.12.2",
@ -1038,6 +1040,7 @@ dependencies = [
"utils",
"whatlang",
"x509-parser 0.16.0",
"zip",
]
[[package]]
@ -2426,6 +2429,17 @@ dependencies = [
"winapi",
]
[[package]]
name = "hostname"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9c7c7c8ac16c798734b8a24560c1362120597c40d5e1459f09498f8f6c8f2ba"
dependencies = [
"cfg-if",
"libc",
"windows",
]
[[package]]
name = "http"
version = "0.2.12"
@ -4748,7 +4762,7 @@ version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52e44394d2086d010551b14b53b1f24e31647570cd1deb0379e2c21b329aba00"
dependencies = [
"hostname",
"hostname 0.3.1",
"quick-error",
]
@ -6923,6 +6937,16 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be"
dependencies = [
"windows-core",
"windows-targets 0.52.4",
]
[[package]]
name = "windows-core"
version = "0.52.0"

View file

@ -54,7 +54,9 @@ decancer = "3.0.1"
unicode-security = "0.1.0"
infer = "0.15.0"
bincode = "1.3.1"
hostname = "0.4.0"
zip = "0.6.6"
pwhash = "1.0.0"
[target.'cfg(unix)'.dependencies]
privdrop = "0.5.3"

View file

@ -61,6 +61,7 @@ pub struct JmapConfig {
pub oauth_expiry_refresh_token: u64,
pub oauth_expiry_refresh_token_renew: u64,
pub oauth_max_auth_attempts: u32,
pub fallback_admin: Option<(String, String)>,
pub spam_header: Option<(HeaderName<'static>, String)>,
@ -78,6 +79,59 @@ pub struct JmapConfig {
impl JmapConfig {
pub fn parse(config: &mut Config) -> Self {
// Parse HTTP headers
let mut http_headers = config
.values("server.http.headers")
.map(|(_, v)| {
if let Some((k, v)) = v.split_once(':') {
Ok((
hyper::header::HeaderName::from_str(k.trim()).map_err(|err| {
format!(
"Invalid header found in property \"server.http.headers\": {}",
err
)
})?,
hyper::header::HeaderValue::from_str(v.trim()).map_err(|err| {
format!(
"Invalid header found in property \"server.http.headers\": {}",
err
)
})?,
))
} else {
Err(format!(
"Invalid header found in property \"server.http.headers\": {}",
v
))
}
})
.collect::<Result<Vec<_>, String>>()
.map_err(|e| config.new_parse_error("server.http.headers", e))
.unwrap_or_default();
// Add permissive CORS headers
if config
.property::<bool>("server.http.permissive-cors")
.unwrap_or(false)
{
http_headers.push((
hyper::header::ACCESS_CONTROL_ALLOW_ORIGIN,
hyper::header::HeaderValue::from_static("*"),
));
http_headers.push((
hyper::header::ACCESS_CONTROL_ALLOW_HEADERS,
hyper::header::HeaderValue::from_static(
"Authorization, Content-Type, Accept, X-Requested-With",
),
));
http_headers.push((
hyper::header::ACCESS_CONTROL_ALLOW_METHODS,
hyper::header::HeaderValue::from_static(
"POST, GET, PATCH, PUT, DELETE, HEAD, OPTIONS",
),
));
}
let mut jmap = JmapConfig {
default_language: Language::from_iso_639(
config
@ -210,45 +264,21 @@ impl JmapConfig {
encrypt_append: config
.property_or_default("storage.encryption.append", "false")
.unwrap_or(false),
spam_header: config.value("spam.header.is-spam").and_then(|v| {
v.split_once(':').map(|(k, v)| {
(
mail_parser::HeaderName::parse(k.trim().to_string()).unwrap(),
v.trim().to_string(),
)
})
}),
spam_header: config
.property_or_default::<Option<String>>("spam.header.is-spam", "X-Spam-Status: Yes")
.unwrap_or_default()
.and_then(|v| {
v.split_once(':').map(|(k, v)| {
(
mail_parser::HeaderName::parse(k.trim().to_string()).unwrap(),
v.trim().to_string(),
)
})
}),
http_use_forwarded: config
.property("server.http.use-x-forwarded")
.unwrap_or(false),
http_headers: config
.values("server.http.headers")
.map(|(_, v)| {
if let Some((k, v)) = v.split_once(':') {
Ok((
hyper::header::HeaderName::from_str(k.trim()).map_err(|err| {
format!(
"Invalid header found in property \"server.http.headers\": {}",
err
)
})?,
hyper::header::HeaderValue::from_str(v.trim()).map_err(|err| {
format!(
"Invalid header found in property \"server.http.headers\": {}",
err
)
})?,
))
} else {
Err(format!(
"Invalid header found in property \"server.http.headers\": {}",
v
))
}
})
.collect::<Result<Vec<_>, String>>()
.map_err(|e| config.new_parse_error("server.http.headers", e))
.unwrap_or_default(),
http_headers,
push_attempt_interval: config
.property_or_default("jmap.push.attempts.interval", "1m")
.unwrap_or_else(|| Duration::from_secs(60)),
@ -270,6 +300,13 @@ impl JmapConfig {
session_purge_frequency: config
.property_or_default::<SimpleCron>("jmap.session.purge.frequency", "15 * *")
.unwrap_or_else(|| SimpleCron::parse_value("15 * *").unwrap()),
fallback_admin: config
.value("authentication.fallback-admin.user")
.and_then(|u| {
config
.value("authentication.fallback-admin.secret")
.map(|p| (u.to_string(), p.to_string()))
}),
};
// Add capabilities

View file

@ -5,16 +5,15 @@ use directory::{Directories, Directory};
use store::{BlobBackend, BlobStore, FtsStore, LookupStore, Store, Stores};
use utils::config::Config;
use crate::{expr::*, listener::tls::TlsManager, Core, Network};
use crate::{expr::*, listener::tls::TlsManager, manager::config::ConfigManager, Core, Network};
use self::{
imap::ImapConfig, jmap::settings::JmapConfig, manager::ConfigManager, scripts::Scripting,
smtp::SmtpConfig, storage::Storage,
imap::ImapConfig, jmap::settings::JmapConfig, scripts::Scripting, smtp::SmtpConfig,
storage::Storage,
};
pub mod imap;
pub mod jmap;
pub mod manager;
pub mod network;
pub mod scripts;
pub mod server;

View file

@ -629,7 +629,14 @@ impl Default for SessionConfig {
subaddressing: AddressMapping::Enable,
},
data: Data {
#[cfg(not(feature = "test_mode"))]
script: IfBlock::empty("session.data.script"),
#[cfg(feature = "test_mode")]
script: IfBlock::new::<()>(
"session.data.script",
[("is_empty(authenticated_as)", "'spam-filter'")],
"'track-replies'",
),
pipe_commands: Default::default(),
milters: Default::default(),
max_messages: IfBlock::new::<()>("session.data.limits.messages", [], "10"),

View file

@ -4,7 +4,7 @@ use ahash::AHashMap;
use directory::Directory;
use store::{write::purge::PurgeSchedule, BlobStore, FtsStore, LookupStore, Store};
use super::manager::ConfigManager;
use crate::manager::config::ConfigManager;
#[derive(Default, Clone)]
pub struct Storage {

View file

@ -13,7 +13,7 @@ use config::{
storage::Storage,
tracers::{OtelTracer, Tracer, Tracers},
};
use directory::{Directory, Principal, QueryBy};
use directory::{core::secret::verify_secret_hash, Directory, Principal, QueryBy};
use expr::if_block::IfBlock;
use listener::{blocked::BlockedIps, tls::TlsManager};
use mail_send::Credentials;
@ -26,15 +26,17 @@ use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION
use sieve::Sieve;
use store::LookupStore;
use tokio::sync::oneshot;
use tracing::{level_filters::LevelFilter, Level};
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
use tracing_subscriber::{
layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer, Registry,
};
use utils::{config::Config, BlobHash};
pub mod addresses;
pub mod config;
pub mod expr;
pub mod listener;
pub mod manager;
pub mod scripts;
pub static USER_AGENT: &str = concat!("StalwartMail/", env!("CARGO_PKG_VERSION"),);
@ -205,11 +207,29 @@ impl Core {
remote_ip: IpAddr,
return_member_of: bool,
) -> directory::Result<AuthResult<Principal<u32>>> {
if let Some(principal) = directory
// First try to authenticate the user against the default directory
let result = match directory
.query(QueryBy::Credentials(credentials), return_member_of)
.await?
.await
{
Ok(AuthResult::Success(principal))
Ok(Some(principal)) => return Ok(AuthResult::Success(principal)),
Ok(None) => Ok(()),
Err(err) => Err(err),
};
// Then check if the credentials match the fallback admin
if let (Some((fallback_admin, fallback_pass)), Credentials::Plain { username, secret }) =
(&self.jmap.fallback_admin, credentials)
{
if username == fallback_admin && verify_secret_hash(fallback_pass, secret).await {
return Ok(AuthResult::Success(Principal::fallback_admin(
fallback_pass,
)));
}
}
if let Err(err) = result {
Err(err)
} else if self.has_fail2ban() {
let login = match credentials {
Credentials::Plain { username, .. }
@ -237,60 +257,42 @@ impl Core {
impl Tracers {
pub fn enable(self, config: &mut Config) -> Option<Vec<WorkerGuard>> {
let mut layers = Vec::new();
let mut level = Level::TRACE;
for tracer in &self.tracers {
let tracer_level = *match tracer {
Tracer::Stdout { level, .. }
| Tracer::Log { level, .. }
| Tracer::Journal { level }
| Tracer::Otel { level, .. } => level,
};
if tracer_level < level {
level = tracer_level;
}
}
let mut layers: Option<Box<dyn Layer<Registry> + Sync + Send>> = None;
let mut guards = Vec::new();
match EnvFilter::builder().parse(format!(
"smtp={level},imap={level},jmap={level},store={level},common={level},utils={level},directory={level}"
)) {
Ok(layer) => {
layers.push(layer.boxed());
}
Err(err) => {
config.new_build_error("tracer", format!("Failed to set env filter: {err}"));
}
}
for tracer in self.tracers {
match tracer {
Tracer::Stdout { level, ansi } => {
layers.push(
tracing_subscriber::fmt::layer()
.with_ansi(ansi)
.with_filter(LevelFilter::from_level(level))
.boxed(),
);
let (Tracer::Stdout { level, .. }
| Tracer::Log { level, .. }
| Tracer::Journal { level }
| Tracer::Otel { level, .. }) = tracer;
let filter = match EnvFilter::builder().parse(format!(
"smtp={level},imap={level},jmap={level},store={level},common={level},utils={level},directory={level}"
)) {
Ok(filter) => {
filter
}
Tracer::Log {
level,
appender,
ansi,
} => {
Err(err) => {
config.new_build_error("tracer", format!("Failed to set env filter: {err}"));
continue;
}
};
let layer = match tracer {
Tracer::Stdout { ansi, .. } => tracing_subscriber::fmt::layer()
.with_ansi(ansi)
.with_filter(filter)
.boxed(),
Tracer::Log { appender, ansi, .. } => {
let (non_blocking, guard) = tracing_appender::non_blocking(appender);
guards.push(guard);
layers.push(
tracing_subscriber::fmt::layer()
.with_writer(non_blocking)
.with_ansi(ansi)
.with_filter(LevelFilter::from_level(level))
.boxed(),
);
tracing_subscriber::fmt::layer()
.with_writer(non_blocking)
.with_ansi(ansi)
.with_filter(filter)
.boxed()
}
Tracer::Otel { level, tracer } => {
Tracer::Otel { tracer, .. } => {
let tracer = match tracer {
OtelTracer::Gprc(exporter) => opentelemetry_otlp::new_pipeline()
.tracing()
@ -313,36 +315,30 @@ impl Tracers {
.install_batch(opentelemetry_sdk::runtime::Tokio);
match tracer {
Ok(tracer) => {
layers.push(
tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(LevelFilter::from_level(level))
.boxed(),
);
}
Ok(tracer) => tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(filter)
.boxed(),
Err(err) => {
config.new_build_error(
"tracer",
format!("Failed to start OpenTelemetry: {err}"),
);
continue;
}
}
}
Tracer::Journal { level } => {
Tracer::Journal { .. } => {
#[cfg(unix)]
{
match tracing_journald::layer() {
Ok(layer) => {
layers.push(
layer.with_filter(LevelFilter::from_level(level)).boxed(),
);
}
Ok(layer) => layer.with_filter(filter).boxed(),
Err(err) => {
config.new_build_error(
"tracer",
format!("Failed to start Journald: {err}"),
);
continue;
}
}
}
@ -353,21 +349,23 @@ impl Tracers {
"tracer",
"Journald is only available on Unix systems.",
);
continue;
}
}
}
};
layers = Some(match layers {
Some(layers) => layers.and_then(layer).boxed(),
None => layer,
});
}
if layers.len() > 1 {
match tracing_subscriber::registry().with(layers).try_init() {
Ok(_) => Some(guards),
Err(err) => {
config.new_build_error("tracer", format!("Failed to start tracing: {err}"));
None
}
match tracing_subscriber::registry().with(layers?).try_init() {
Ok(_) => Some(guards),
Err(err) => {
config.new_build_error("tracer", format!("Failed to start tracing: {err}"));
None
}
} else {
None
}
}
}

View file

@ -0,0 +1,373 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of Stalwart Mail Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::path::PathBuf;
use arc_swap::ArcSwap;
use pwhash::sha512_crypt;
use store::{
rand::{distributions::Alphanumeric, thread_rng, Rng},
Stores,
};
use tracing_appender::non_blocking::WorkerGuard;
use utils::{
config::{Config, ConfigKey},
failed, UnwrapFailure,
};
use crate::{
config::{server::Servers, tracers::Tracers},
manager::SPAMFILTER_URL,
Core, SharedCore,
};
use super::{
config::{ConfigManager, Patterns},
download_resource, WEBADMIN_KEY, WEBADMIN_URL,
};
pub struct BootManager {
pub config: Config,
pub core: SharedCore,
pub servers: Servers,
pub guards: Option<Vec<WorkerGuard>>,
}
impl BootManager {
pub async fn init() -> Self {
let mut config_path = std::env::var("CONFIG_PATH").ok();
if config_path.is_none() {
let mut args = std::env::args().skip(1);
if let Some(arg) = args
.next()
.and_then(|arg| arg.strip_prefix("--").map(|arg| arg.to_string()))
{
let (key, value) = if let Some((key, value)) = arg.split_once('=') {
(key.to_string(), value.trim().to_string())
} else if let Some(value) = args.next() {
(arg, value)
} else {
failed(&format!("Invalid command line argument: {arg}"));
};
match key.as_str() {
"config" => {
config_path = Some(value);
}
"init" => {
quickstart(value);
std::process::exit(0);
}
_ => {
failed(&format!("Invalid command line argument: {key}"));
}
}
}
}
// Read main configuration file
let cfg_local_path =
PathBuf::from(config_path.failed("Missing parameter --config=<path-to-config>."));
let mut config = Config::default();
match std::fs::read_to_string(&cfg_local_path) {
Ok(value) => {
config.parse(&value).failed("Invalid configuration file");
}
Err(err) => {
config.new_build_error("*", format!("Could not read configuration file: {err}"));
}
}
let cfg_local = config.keys.clone();
// Resolve macros
config.resolve_macros().await;
// Parser servers
let mut servers = Servers::parse(&mut config);
// Bind ports and drop privileges
servers.bind_and_drop_priv(&mut config);
// Load stores
let mut stores = Stores::parse(&mut config).await;
// Build manager
let manager = ConfigManager {
cfg_local: ArcSwap::from_pointee(cfg_local),
cfg_local_path,
cfg_local_patterns: Patterns::parse(&mut config).into(),
cfg_store: config
.value("storage.data")
.and_then(|id| stores.stores.get(id))
.cloned()
.unwrap_or_default(),
};
// Extend configuration with settings stored in the db
if !manager.cfg_store.is_none() {
manager
.extend_config(&mut config, "")
.await
.failed("Failed to read configuration");
}
// Enable tracing
let guards = Tracers::parse(&mut config).enable(&mut config);
// Add hostname lookup if missing
let mut insert_keys = Vec::new();
if config
.value("lookup.default.hostname")
.filter(|v| !v.is_empty())
.is_none()
{
insert_keys.push(ConfigKey::from((
"lookup.default.hostname",
hostname::get()
.map(|v| v.to_string_lossy().into_owned())
.unwrap_or_else(|_| "localhost".to_string()),
)));
}
// Generate an OAuth key if missing
if config
.value("oauth.key")
.filter(|v| !v.is_empty())
.is_none()
{
insert_keys.push(ConfigKey::from((
"oauth.key",
thread_rng()
.sample_iter(Alphanumeric)
.take(64)
.map(char::from)
.collect::<String>(),
)));
}
// Download SPAM filters if missing
if config
.value("version.spam-filter")
.filter(|v| !v.is_empty())
.is_none()
{
match manager.fetch_external_config(SPAMFILTER_URL).await {
Ok(external_config) => {
tracing::info!(
context = "config",
event = "import",
url = SPAMFILTER_URL,
version = external_config.version,
"Imported spam filter rules"
);
insert_keys.extend(external_config.keys);
}
Err(err) => {
config.new_build_error("*", format!("Failed to fetch spam filter: {err}"));
}
}
// Add default settings
for key in [
("queue.quota.size.messages", "100000"),
("queue.quota.size.size", "10737418240"),
("queue.quota.size.enable", "true"),
("queue.throttle.rcpt.key", "rcpt_domain"),
("queue.throttle.rcpt.concurrency", "5"),
("queue.throttle.rcpt.enable", "true"),
("session.throttle.ip.key", "remote_ip"),
("session.throttle.ip.concurrency", "5"),
("session.throttle.ip.enable", "true"),
("session.throttle.sender.key.0", "sender_domain"),
("session.throttle.sender.key.1", "rcpt"),
("session.throttle.sender.rate", "25/1h"),
("session.throttle.sender.enable", "true"),
("report.analysis.addresses", "postmaster@*"),
] {
insert_keys.push(ConfigKey::from(key));
}
}
// Download webadmin if missing
if let Some(blob_store) = config
.value("storage.blob")
.and_then(|id| stores.blob_stores.get(id))
{
match blob_store.get_blob(WEBADMIN_KEY, 0..usize::MAX).await {
Ok(Some(_)) => (),
Ok(None) => match download_resource(WEBADMIN_URL).await {
Ok(bytes) => match blob_store.put_blob(WEBADMIN_KEY, &bytes).await {
Ok(_) => {
tracing::info!(
context = "webadmin",
event = "download",
url = WEBADMIN_URL,
"Downloaded webadmin bundle"
);
}
Err(err) => {
config.new_build_error(
"*",
format!("Failed to store webadmin blob: {err}"),
);
}
},
Err(err) => {
config.new_build_error("*", format!("Failed to download webadmin: {err}"));
}
},
Err(err) => {
config.new_build_error("*", format!("Failed to access webadmin blob: {err}"))
}
}
}
// Add missing settings
if !insert_keys.is_empty() {
for item in &insert_keys {
config.keys.insert(item.key.clone(), item.value.clone());
}
if let Err(err) = manager.set(insert_keys).await {
config.new_build_error("*", format!("Failed to update configuration: {err}"));
}
}
// Parse lookup stores
stores.parse_lookups(&mut config).await;
// Parse settings and build shared core
let core = Core::parse(&mut config, stores, manager)
.await
.into_shared();
// Parse TCP acceptors
servers.parse_tcp_acceptors(&mut config, core.clone());
BootManager {
core,
guards,
config,
servers,
}
}
}
fn quickstart(path: impl Into<PathBuf>) {
let path = path.into();
if !path.exists() {
std::fs::create_dir_all(&path).failed("Failed to create directory");
}
for dir in &["etc", "data", "logs"] {
std::fs::create_dir(path.join(dir)).failed(&format!("Failed to create {dir} directory"));
}
let admin_pass = thread_rng()
.sample_iter(Alphanumeric)
.take(10)
.map(char::from)
.collect::<String>();
std::fs::write(
path.join("etc").join("config.toml"),
QUICKSTART_CONFIG
.replace("_P_", &path.to_string_lossy())
.replace("_S_", &sha512_crypt::hash(&admin_pass).unwrap()),
)
.failed("Failed to write configuration file");
eprintln!(
"✅ Configuration file written to {}/etc/config.toml",
path.to_string_lossy()
);
eprintln!("🔑 Your administrator account is 'admin' with password '{admin_pass}'.");
}
const QUICKSTART_CONFIG: &str = r#"[server.listener.smtp]
bind = "[::]:25"
protocol = "smtp"
[server.listener.submission]
bind = "[::]:587"
protocol = "smtp"
[server.listener.submissions]
bind = "[::]:465"
protocol = "smtp"
tls.implicit = true
[server.listener.imap]
bind = "[::]:143"
protocol = "imap"
[server.listener.imaptls]
bind = "[::]:993"
protocol = "imap"
tls.implicit = true
[server.listener.sieve]
bind = "[::]:4190"
protocol = "managesieve"
[server.listener.https]
protocol = "http"
bind = "[::]:443"
tls.implicit = true
[server.listener.http]
protocol = "http"
bind = "[::]:8080"
[storage]
data = "rocksdb"
fts = "rocksdb"
blob = "rocksdb"
lookup = "rocksdb"
directory = "internal"
[store.rocksdb]
type = "rocksdb"
path = "_P_/data"
compression = "lz4"
[directory.internal]
type = "internal"
store = "rocksdb"
[tracer.log]
type = "log"
level = "info"
path = "_P_/logs"
prefix = "stalwart.log"
rotate = "daily"
ansi = false
enable = true
[authentication.fallback-admin]
user = "admin"
secret = "_S_"
"#;

View file

@ -1,37 +1,50 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of Stalwart Mail Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::{
collections::{btree_map::Entry, BTreeMap},
path::PathBuf,
sync::Arc,
};
use ahash::AHashSet;
use arc_swap::ArcSwap;
use parking_lot::RwLock;
use store::{
write::{BatchBuilder, ValueClass},
Deserialize, IterateParams, Store, Stores, ValueKey,
Deserialize, IterateParams, Store, ValueKey,
};
use tracing_appender::non_blocking::WorkerGuard;
use utils::{
config::{ipmask::IpAddrOrMask, utils::ParseValue, Config, ConfigKey},
failed,
config::{Config, ConfigKey},
glob::GlobPattern,
UnwrapFailure,
};
use crate::{listener::blocked::BLOCKED_IP_KEY, Core, SharedCore};
use super::{
server::{tls::parse_certificates, Servers},
tracers::Tracers,
};
use super::download_resource;
#[derive(Default)]
pub struct ConfigManager {
cfg_local: ArcSwap<BTreeMap<String, String>>,
cfg_local_path: PathBuf,
cfg_local_patterns: Arc<Patterns>,
cfg_store: Store,
pub cfg_local: ArcSwap<BTreeMap<String, String>>,
pub cfg_local_path: PathBuf,
pub cfg_local_patterns: Arc<Patterns>,
pub cfg_store: Store,
}
#[derive(Default)]
@ -44,11 +57,6 @@ enum Pattern {
Exclude(MatchType),
}
pub struct ReloadResult {
pub config: Config,
pub new_core: Option<Core>,
}
enum MatchType {
Equal(String),
StartsWith(String),
@ -57,102 +65,10 @@ enum MatchType {
All,
}
pub struct BootManager {
pub config: Config,
pub core: SharedCore,
pub servers: Servers,
pub guards: Option<Vec<WorkerGuard>>,
}
impl BootManager {
pub async fn init() -> Self {
let mut config_path = std::env::var("CONFIG_PATH").ok();
let mut found_param = false;
if config_path.is_none() {
for arg in std::env::args().skip(1) {
if let Some((key, value)) = arg.split_once('=') {
if key.starts_with("--config") {
config_path = value.trim().to_string().into();
break;
} else {
failed(&format!("Invalid command line argument: {key}"));
}
} else if found_param {
config_path = arg.into();
break;
} else if arg.starts_with("--config") {
found_param = true;
} else {
failed(&format!("Invalid command line argument: {arg}"));
}
}
}
// Read main configuration file
let cfg_local_path =
PathBuf::from(config_path.failed("Missing parameter --config=<path-to-config>."));
let mut config = Config::default();
match std::fs::read_to_string(&cfg_local_path) {
Ok(value) => {
config.parse(&value).failed("Invalid configuration file");
}
Err(err) => {
config.new_build_error("*", format!("Could not read configuration file: {err}"));
}
}
let cfg_local = config.keys.clone();
// Resolve macros
config.resolve_macros().await;
// Parser servers
let mut servers = Servers::parse(&mut config);
// Bind ports and drop privileges
servers.bind_and_drop_priv(&mut config);
// Load stores
let mut stores = Stores::parse(&mut config).await;
// Build manager
let manager = ConfigManager {
cfg_local: ArcSwap::from_pointee(cfg_local),
cfg_local_path,
cfg_local_patterns: Patterns::parse(&mut config).into(),
cfg_store: config
.value("storage.data")
.and_then(|id| stores.stores.get(id))
.cloned()
.unwrap_or_default(),
};
// Extend configuration with settings stored in the db
if !manager.cfg_store.is_none() {
manager
.extend_config(&mut config, "")
.await
.failed("Failed to read configuration");
}
// Parse lookup stores
stores.parse_lookups(&mut config).await;
// Parse settings and build shared core
let core = Core::parse(&mut config, stores, manager)
.await
.into_shared();
// Parse TCP acceptors
servers.parse_tcp_acceptors(&mut config, core.clone());
BootManager {
core,
guards: Tracers::parse(&mut config).enable(&mut config),
config,
servers,
}
}
pub(crate) struct ExternalConfig {
pub id: String,
pub version: String,
pub keys: Vec<ConfigKey>,
}
impl ConfigManager {
@ -167,7 +83,11 @@ impl ConfigManager {
.map(|_| config)
}
async fn extend_config(&self, config: &mut Config, prefix: &str) -> store::Result<()> {
pub(crate) async fn extend_config(
&self,
config: &mut Config,
prefix: &str,
) -> store::Result<()> {
for (key, value) in self.db_list(prefix, false).await? {
config.keys.entry(key).or_insert(value);
}
@ -399,124 +319,73 @@ impl ConfigManager {
))
})
}
}
impl Core {
pub async fn reload_blocked_ips(&self) -> store::Result<ReloadResult> {
let mut ip_addresses = AHashSet::new();
let mut config = self.storage.config.build_config(BLOCKED_IP_KEY).await?;
pub async fn update_external_config(&self, url: &str) -> store::Result<Option<String>> {
let external = self
.fetch_external_config(url)
.await
.map_err(store::Error::InternalError)?;
for ip in config
.set_values(BLOCKED_IP_KEY)
.map(IpAddrOrMask::parse_value)
.collect::<Vec<_>>()
if self
.get(&external.id)
.await?
.map_or(true, |v| v != external.version)
{
match ip {
Ok(IpAddrOrMask::Ip(ip)) => {
ip_addresses.insert(ip);
}
Ok(IpAddrOrMask::Mask(_)) => {}
Err(err) => {
config.new_parse_error(BLOCKED_IP_KEY, err);
}
}
}
*self.network.blocked_ips.ip_addresses.write() = ip_addresses;
Ok(config.into())
}
pub async fn reload_certificates(&self) -> store::Result<ReloadResult> {
let mut config = self.storage.config.build_config("certificate").await?;
let mut certificates = self.tls.certificates.load().as_ref().clone();
parse_certificates(&mut config, &mut certificates, &mut Default::default());
self.tls.certificates.store(certificates.into());
Ok(config.into())
}
pub async fn reload_lookups(&self) -> store::Result<ReloadResult> {
let mut config = self.storage.config.build_config("certificate").await?;
let mut stores = Stores::default();
stores.parse_memory_stores(&mut config);
let mut core = self.clone();
for (id, store) in stores.lookup_stores {
core.storage.lookups.insert(id, store);
}
Ok(ReloadResult {
config,
new_core: core.into(),
})
}
pub async fn reload(&self) -> store::Result<ReloadResult> {
let mut config = self.storage.config.build_config("").await?;
// Parse tracers
Tracers::parse(&mut config);
// Load stores
let mut stores = Stores {
stores: self.storage.stores.clone(),
blob_stores: self.storage.blobs.clone(),
fts_stores: self.storage.ftss.clone(),
lookup_stores: self.storage.lookups.clone(),
purge_schedules: Default::default(),
};
stores.parse_stores(&mut config).await;
stores.parse_lookups(&mut config).await;
if !config.errors.is_empty() {
return Ok(config.into());
}
// Build manager
let manager = ConfigManager {
cfg_local: ArcSwap::from_pointee(self.storage.config.cfg_local.load().as_ref().clone()),
cfg_local_path: self.storage.config.cfg_local_path.clone(),
cfg_local_patterns: Patterns::parse(&mut config).into(),
cfg_store: config
.value("storage.data")
.and_then(|id| stores.stores.get(id))
.cloned()
.unwrap_or_default(),
};
// Parse settings and build shared core
let mut core = Core::parse(&mut config, stores, manager).await;
if !config.errors.is_empty() {
return Ok(config.into());
}
// Transfer Sieve cache
core.sieve.bayes_cache = self.sieve.bayes_cache.clone();
core.sieve.remote_lists = RwLock::new(self.sieve.remote_lists.read().clone());
// Copy ACME certificates
let mut certificates = core.tls.certificates.load().as_ref().clone();
for (cert_id, cert) in self.tls.certificates.load().iter() {
certificates
.entry(cert_id.to_string())
.or_insert(cert.clone());
}
core.tls.certificates.store(certificates.into());
core.tls.self_signed_cert = self.tls.self_signed_cert.clone();
// Parser servers
let mut servers = Servers::parse(&mut config);
servers.parse_tcp_acceptors(&mut config, core.clone().into_shared());
Ok(if config.errors.is_empty() {
ReloadResult {
config,
new_core: core.into(),
}
self.set(external.keys).await?;
Ok(Some(external.version))
} else {
config.into()
})
tracing::debug!(
context = "config",
event = "update",
url = url,
version = external.version,
"Configuration version is up-to-date"
);
Ok(None)
}
}
pub(crate) async fn fetch_external_config(&self, url: &str) -> Result<ExternalConfig, String> {
let config = String::from_utf8(download_resource(url).await?)
.map_err(|err| format!("Configuration file has invalid UTF-8: {err}"))?;
let config = Config::new(config)
.map_err(|err| format!("Failed to parse external configuration: {err}"))?;
// Import configuration
let mut external = ExternalConfig {
id: String::new(),
version: String::new(),
keys: Vec::new(),
};
for (key, value) in config.keys {
if key.starts_with("version.") {
external.id = key.clone();
external.version = value.clone();
external.keys.push(ConfigKey::from((key, value)));
} else if key.starts_with("queue.quota.")
|| key.starts_with("queue.throttle.")
|| key.starts_with("session.throttle.")
|| (key.starts_with("lookup.") && !key.starts_with("lookup.default."))
|| key.starts_with("sieve.trusted.scripts.")
{
external.keys.push(ConfigKey::from((key, value)));
} else {
tracing::debug!(
context = "config",
event = "import",
key = key,
value = value,
url = url,
"Ignoring key"
);
}
}
if !external.version.is_empty() {
Ok(external)
} else {
Err("External configuration file does not contain a version key".to_string())
}
}
}
@ -560,20 +429,19 @@ impl Patterns {
if cfg_local_patterns.is_empty() {
cfg_local_patterns = vec![
Pattern::Include(MatchType::StartsWith("store.".to_string())),
Pattern::Include(MatchType::StartsWith("server.listener.".to_string())),
Pattern::Include(MatchType::StartsWith("server.socket.".to_string())),
Pattern::Include(MatchType::StartsWith("server.tls.".to_string())),
Pattern::Include(MatchType::StartsWith("directory.".to_string())),
Pattern::Include(MatchType::StartsWith("tracer.".to_string())),
Pattern::Include(MatchType::StartsWith("server.".to_string())),
Pattern::Include(MatchType::StartsWith(
"authentication.fallback-admin.".to_string(),
)),
Pattern::Include(MatchType::Equal("cluster.node-id".to_string())),
Pattern::Include(MatchType::Equal("storage.data".to_string())),
Pattern::Include(MatchType::Equal("storage.blob".to_string())),
Pattern::Include(MatchType::Equal("storage.lookup".to_string())),
Pattern::Include(MatchType::Equal("storage.fts".to_string())),
Pattern::Include(MatchType::Equal("server.run-as.user".to_string())),
Pattern::Include(MatchType::Equal("server.run-as.group".to_string())),
Pattern::Exclude(MatchType::Matches(GlobPattern::compile(
"store.*.query.*",
false,
))),
Pattern::Include(MatchType::Equal("storage.directory".to_string())),
Pattern::Include(MatchType::Equal("lookup.default.hostname".to_string())),
];
}
@ -626,12 +494,3 @@ impl Clone for ConfigManager {
}
}
}
impl From<Config> for ReloadResult {
fn from(config: Config) -> Self {
Self {
config,
new_core: None,
}
}
}

View file

@ -0,0 +1,56 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of Stalwart Mail Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::time::Duration;
use crate::USER_AGENT;
pub mod boot;
pub mod config;
pub mod reload;
pub mod webadmin;
pub const SPAMFILTER_URL: &str = "https://get.stalw.art/resources/config/spamfilter.toml";
pub const WEBADMIN_URL: &str = "file://get.stalw.art/resources/config/webadmin.toml";
pub const WEBADMIN_KEY: &[u8] = "STALWART_WEBADMIN".as_bytes();
async fn download_resource(url: &str) -> Result<Vec<u8>, String> {
let todo = "remove";
if url == WEBADMIN_URL {
return Ok(tokio::fs::read("/tmp/dist.zip").await.unwrap());
}
reqwest::Client::builder()
.timeout(Duration::from_secs(60))
.user_agent(USER_AGENT)
.build()
.unwrap_or_default()
.get(url)
.send()
.await
.map_err(|err| format!("Failed to fetch {url}: {err}"))?
.bytes()
.await
.map_err(|err| format!("Failed to fetch {url}: {err}"))
.map(|bytes| bytes.to_vec())
}

View file

@ -0,0 +1,172 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of Stalwart Mail Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use ahash::AHashSet;
use arc_swap::ArcSwap;
use parking_lot::RwLock;
use store::Stores;
use utils::config::{ipmask::IpAddrOrMask, utils::ParseValue, Config};
use crate::{
config::{
server::{tls::parse_certificates, Servers},
tracers::Tracers,
},
listener::blocked::BLOCKED_IP_KEY,
Core,
};
use super::config::{ConfigManager, Patterns};
pub struct ReloadResult {
pub config: Config,
pub new_core: Option<Core>,
}
impl Core {
pub async fn reload_blocked_ips(&self) -> store::Result<ReloadResult> {
let mut ip_addresses = AHashSet::new();
let mut config = self.storage.config.build_config(BLOCKED_IP_KEY).await?;
for ip in config
.set_values(BLOCKED_IP_KEY)
.map(IpAddrOrMask::parse_value)
.collect::<Vec<_>>()
{
match ip {
Ok(IpAddrOrMask::Ip(ip)) => {
ip_addresses.insert(ip);
}
Ok(IpAddrOrMask::Mask(_)) => {}
Err(err) => {
config.new_parse_error(BLOCKED_IP_KEY, err);
}
}
}
*self.network.blocked_ips.ip_addresses.write() = ip_addresses;
Ok(config.into())
}
pub async fn reload_certificates(&self) -> store::Result<ReloadResult> {
let mut config = self.storage.config.build_config("certificate").await?;
let mut certificates = self.tls.certificates.load().as_ref().clone();
parse_certificates(&mut config, &mut certificates, &mut Default::default());
self.tls.certificates.store(certificates.into());
Ok(config.into())
}
pub async fn reload_lookups(&self) -> store::Result<ReloadResult> {
let mut config = self.storage.config.build_config("certificate").await?;
let mut stores = Stores::default();
stores.parse_memory_stores(&mut config);
let mut core = self.clone();
for (id, store) in stores.lookup_stores {
core.storage.lookups.insert(id, store);
}
Ok(ReloadResult {
config,
new_core: core.into(),
})
}
pub async fn reload(&self) -> store::Result<ReloadResult> {
let mut config = self.storage.config.build_config("").await?;
// Parse tracers
Tracers::parse(&mut config);
// Load stores
let mut stores = Stores {
stores: self.storage.stores.clone(),
blob_stores: self.storage.blobs.clone(),
fts_stores: self.storage.ftss.clone(),
lookup_stores: self.storage.lookups.clone(),
purge_schedules: Default::default(),
};
stores.parse_stores(&mut config).await;
stores.parse_lookups(&mut config).await;
if !config.errors.is_empty() {
return Ok(config.into());
}
// Build manager
let manager = ConfigManager {
cfg_local: ArcSwap::from_pointee(self.storage.config.cfg_local.load().as_ref().clone()),
cfg_local_path: self.storage.config.cfg_local_path.clone(),
cfg_local_patterns: Patterns::parse(&mut config).into(),
cfg_store: config
.value("storage.data")
.and_then(|id| stores.stores.get(id))
.cloned()
.unwrap_or_default(),
};
// Parse settings and build shared core
let mut core = Core::parse(&mut config, stores, manager).await;
if !config.errors.is_empty() {
return Ok(config.into());
}
// Transfer Sieve cache
core.sieve.bayes_cache = self.sieve.bayes_cache.clone();
core.sieve.remote_lists = RwLock::new(self.sieve.remote_lists.read().clone());
// Copy ACME certificates
let mut certificates = core.tls.certificates.load().as_ref().clone();
for (cert_id, cert) in self.tls.certificates.load().iter() {
certificates
.entry(cert_id.to_string())
.or_insert(cert.clone());
}
core.tls.certificates.store(certificates.into());
core.tls.self_signed_cert = self.tls.self_signed_cert.clone();
// Parser servers
let mut servers = Servers::parse(&mut config);
servers.parse_tcp_acceptors(&mut config, core.clone().into_shared());
Ok(if config.errors.is_empty() {
ReloadResult {
config,
new_core: core.into(),
}
} else {
config.into()
})
}
}
impl From<Config> for ReloadResult {
fn from(config: Config) -> Self {
Self {
config,
new_core: None,
}
}
}

View file

@ -0,0 +1,175 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of Stalwart Mail Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::{
io::{self, Cursor, Read},
path::PathBuf,
};
use ahash::AHashMap;
use arc_swap::ArcSwap;
use store::BlobStore;
use super::{download_resource, WEBADMIN_KEY, WEBADMIN_URL};
pub struct WebAdminManager {
bundle_path: TempDir,
routes: ArcSwap<AHashMap<String, Resource<PathBuf>>>,
}
#[derive(Default)]
pub struct Resource<T> {
pub content_type: &'static str,
pub contents: T,
}
impl WebAdminManager {
pub fn new() -> Self {
Self {
bundle_path: TempDir::new(),
routes: ArcSwap::from_pointee(Default::default()),
}
}
pub async fn get(&self, path: &str) -> io::Result<Resource<Vec<u8>>> {
let routes = self.routes.load();
if let Some(resource) = routes.get(path).or_else(|| routes.get("index.html")) {
tokio::fs::read(&resource.contents)
.await
.map(|contents| Resource {
content_type: resource.content_type,
contents,
})
} else {
Ok(Resource::default())
}
}
pub async fn unpack(&self, blob_store: &BlobStore) -> store::Result<()> {
// Delete any existing bundles
self.bundle_path.clean().await?;
// Obtain webadmin bundle
let bundle = blob_store
.get_blob(WEBADMIN_KEY, 0..usize::MAX)
.await?
.ok_or_else(|| store::Error::InternalError("WebAdmin bundle not found".to_string()))?;
// Uncompress
let mut bundle = zip::ZipArchive::new(Cursor::new(bundle))
.map_err(|err| store::Error::InternalError(format!("Unzip error: {err}")))?;
let mut routes = AHashMap::new();
for i in 0..bundle.len() {
let (file_name, contents) = {
let mut file = bundle
.by_index(i)
.map_err(|err| store::Error::InternalError(format!("Unzip error: {err}")))?;
if file.is_dir() {
continue;
}
let mut contents = Vec::new();
file.read_to_end(&mut contents)?;
(file.name().to_string(), contents)
};
let path = self.bundle_path.path.join(format!("{i:02}"));
tokio::fs::write(&path, contents).await?;
let resource = Resource {
content_type: match file_name
.rsplit_once('.')
.map(|(_, ext)| ext)
.unwrap_or_default()
{
"html" => "text/html",
"css" => "text/css",
"js" => "application/javascript",
"json" => "application/json",
"png" => "image/png",
"svg" => "image/svg+xml",
"ico" => "image/x-icon",
_ => "application/octet-stream",
},
contents: path,
};
routes.insert(file_name, resource);
}
// Update routes
self.routes.store(routes.into());
Ok(())
}
pub async fn update_and_unpack(&self, blob_store: &BlobStore) -> store::Result<()> {
let bytes = download_resource(WEBADMIN_URL).await.map_err(|err| {
store::Error::InternalError(format!("Failed to download webadmin: {err}"))
})?;
blob_store.put_blob(WEBADMIN_KEY, &bytes).await?;
self.unpack(blob_store).await
}
}
impl Resource<Vec<u8>> {
pub fn is_empty(&self) -> bool {
self.content_type.is_empty() && self.contents.is_empty()
}
}
pub struct TempDir {
pub path: PathBuf,
}
impl TempDir {
pub fn new() -> TempDir {
TempDir {
path: std::env::temp_dir().join(std::str::from_utf8(WEBADMIN_KEY).unwrap()),
}
}
pub async fn clean(&self) -> io::Result<()> {
if tokio::fs::metadata(&self.path).await.is_ok() {
let _ = tokio::fs::remove_dir_all(&self.path).await;
}
tokio::fs::create_dir(&self.path).await
}
}
impl Default for WebAdminManager {
fn default() -> Self {
Self::new()
}
}
impl Default for TempDir {
fn default() -> Self {
Self::new()
}
}
impl Drop for TempDir {
fn drop(&mut self) {
let _ = std::fs::remove_dir_all(&self.path);
}
}

View file

@ -22,14 +22,11 @@
*/
use jmap_proto::types::collection::Collection;
use pwhash::sha512_crypt;
use store::{
rand::{distributions::Alphanumeric, thread_rng, Rng},
write::{
assert::HashedValue, key::DeserializeBigEndian, BatchBuilder, BitmapClass, DirectoryClass,
ValueClass,
assert::HashedValue, key::DeserializeBigEndian, BatchBuilder, DirectoryClass, ValueClass,
},
BitmapKey, Deserialize, IterateParams, Serialize, Store, ValueKey, U32_LEN,
Deserialize, IterateParams, Serialize, Store, ValueKey, U32_LEN,
};
use crate::{DirectoryError, ManagementError, Principal, QueryBy, Type};
@ -76,7 +73,6 @@ pub trait ManageDirectory: Sized {
async fn create_domain(&self, domain: &str) -> crate::Result<()>;
async fn delete_domain(&self, domain: &str) -> crate::Result<()>;
async fn list_domains(&self, filter: Option<&str>) -> crate::Result<Vec<String>>;
async fn init(self) -> crate::Result<Self>;
}
impl ManageDirectory for Store {
@ -973,85 +969,6 @@ impl ManageDirectory for Store {
.await?;
Ok(results)
}
async fn init(self) -> crate::Result<Self> {
// Create admin account if requested
if let (Ok(admin_user), Ok(admin_pass)) = (
std::env::var("SET_ADMIN_USER"),
std::env::var("SET_ADMIN_PASS"),
) {
if let Some(account_id) = self.get_account_id(&admin_user).await? {
self.update_account(
QueryBy::Id(account_id),
vec![PrincipalUpdate {
action: PrincipalAction::Set,
field: PrincipalField::Secrets,
value: PrincipalValue::StringList(vec![admin_pass]),
}],
)
.await?;
eprintln!("Successfully updated password for {admin_user:?}.");
} else {
self.create_account(
Principal {
typ: Type::Superuser,
quota: 0,
name: admin_user.clone(),
secrets: vec![admin_pass],
emails: vec![],
member_of: vec![],
description: "Superuser".to_string().into(),
..Default::default()
},
vec![],
)
.await?;
eprintln!("Successfully created administrator account {admin_user:?}.");
}
std::process::exit(0);
}
// Create a default administrator account if none exists
if self
.get_bitmap(BitmapKey {
account_id: u32::MAX,
collection: Collection::Principal.into(),
class: BitmapClass::DocumentIds,
block_num: 0,
})
.await?
.unwrap_or_default()
.is_empty()
{
let secret = thread_rng()
.sample_iter(Alphanumeric)
.take(12)
.map(char::from)
.collect::<String>();
let hashed_secret = sha512_crypt::hash(&secret).unwrap();
self.create_account(
Principal {
typ: Type::Superuser,
quota: 0,
name: "admin".to_string(),
secrets: vec![hashed_secret],
emails: vec![],
member_of: vec![],
description: "Superuser".to_string().into(),
..Default::default()
},
vec![],
)
.await?;
tracing::info!(
"Created default administrator account \"admin\" with password {secret:?}."
)
}
Ok(self)
}
}
impl From<Principal<String>> for Principal<u32> {

View file

@ -155,9 +155,9 @@ pub enum PrincipalField {
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct PrincipalUpdate {
action: PrincipalAction,
field: PrincipalField,
value: PrincipalValue,
pub action: PrincipalAction,
pub field: PrincipalField,
pub value: PrincipalValue,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]

View file

@ -33,8 +33,8 @@ use ahash::AHashMap;
use crate::{
backend::{
imap::ImapDirectory, internal::manage::ManageDirectory, ldap::LdapDirectory,
memory::MemoryDirectory, smtp::SmtpDirectory, sql::SqlDirectory,
imap::ImapDirectory, ldap::LdapDirectory, memory::MemoryDirectory, smtp::SmtpDirectory,
sql::SqlDirectory,
},
Directories, Directory, DirectoryInner,
};
@ -68,15 +68,7 @@ impl Directories {
"internal" => Some(DirectoryInner::Internal(
if let Some(store_id) = config.value_require(("directory", id, "store")) {
if let Some(data) = stores.stores.get(store_id) {
match data.clone().init().await {
Ok(data) => data,
Err(err) => {
let err =
format!("Failed to initialize store {store_id:?}: {err:?}");
config.new_parse_error(("directory", id, "store"), err);
continue;
}
}
data.clone()
} else {
config.new_parse_error(
("directory", id, "store"),

View file

@ -109,7 +109,7 @@ async fn verify_hash_prefix(hashed_secret: &str, secret: &str) -> bool {
}
}
async fn verify_secret_hash(hashed_secret: &str, secret: &str) -> bool {
pub async fn verify_secret_hash(hashed_secret: &str, secret: &str) -> bool {
if hashed_secret.starts_with('$') {
verify_hash_prefix(hashed_secret, secret).await
} else if hashed_secret.starts_with('_') {

View file

@ -201,6 +201,19 @@ impl From<PoolError<mail_send::Error>> for DirectoryError {
}
}
impl Principal<u32> {
pub fn fallback_admin(fallback_pass: impl Into<String>) -> Self {
Principal {
id: u32::MAX,
typ: Type::Superuser,
quota: 0,
name: "Fallback Administrator".to_string(),
secrets: vec![fallback_pass.into()],
..Default::default()
}
}
}
impl From<LdapError> for DirectoryError {
fn from(error: LdapError) -> Self {
tracing::warn!(

View file

@ -26,6 +26,7 @@ use std::{net::IpAddr, sync::Arc};
use common::{
expr::{functions::ResolveVariable, *},
listener::{ServerInstance, SessionData, SessionManager, SessionStream},
manager::webadmin::Resource,
Core,
};
use http_body_util::{BodyExt, Full};
@ -270,7 +271,19 @@ impl JMAP {
Err(err) => err.into_http_response(),
};
}
_ => (),
_ => {
let path = req.uri().path();
return match self
.inner
.webadmin
.get(path.strip_prefix('/').unwrap_or(path))
.await
{
Ok(resource) if !resource.is_empty() => resource.into_http_response(),
Err(err) => err.into_http_response(),
_ => RequestError::not_found().into_http_response(),
};
}
}
RequestError::not_found().into_http_response()
}
@ -451,6 +464,14 @@ impl ToHttpResponse for store::Error {
}
}
impl ToHttpResponse for std::io::Error {
fn into_http_response(self) -> HttpResponse {
tracing::error!(context = "i/o", error = %self, "I/O error");
RequestError::internal_server_error().into_http_response()
}
}
impl ToHttpResponse for serde_json::Error {
fn into_http_response(self) -> HttpResponse {
RequestError::blank(
@ -527,6 +548,20 @@ impl ToHttpResponse for DownloadResponse {
}
}
impl ToHttpResponse for Resource<Vec<u8>> {
fn into_http_response(self) -> HttpResponse {
hyper::Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, self.content_type)
.body(
Full::new(Bytes::from(self.contents))
.map_err(|never| match never {})
.boxed(),
)
.unwrap()
}
}
impl ToHttpResponse for UploadResponse {
fn into_http_response(self) -> HttpResponse {
JsonResponse::new(self).into_http_response()

View file

@ -22,8 +22,8 @@
*/
use directory::backend::internal::manage::ManageDirectory;
use http_body_util::combinators::BoxBody;
use hyper::{body::Bytes, Method};
use hyper::Method;
use jmap_proto::error::request::RequestError;
use serde::{Deserialize, Serialize};
use serde_json::json;
@ -34,7 +34,7 @@ use crate::{
api::{
http::ToHttpResponse,
management::dkim::{obtain_dkim_public_key, Algorithm},
HttpRequest, JsonResponse,
HttpRequest, HttpResponse, JsonResponse,
},
JMAP,
};
@ -48,11 +48,7 @@ struct DnsRecord {
}
impl JMAP {
pub async fn handle_manage_domain(
&self,
req: &HttpRequest,
path: Vec<&str>,
) -> hyper::Response<BoxBody<Bytes, hyper::Error>> {
pub async fn handle_manage_domain(&self, req: &HttpRequest, path: Vec<&str>) -> HttpResponse {
match (path.get(1), req.method()) {
(None, &Method::GET) => {
// List domains
@ -97,10 +93,28 @@ impl JMAP {
(Some(domain), &Method::POST) => {
// Create domain
match self.core.storage.data.create_domain(domain).await {
Ok(_) => JsonResponse::new(json!({
"data": (),
}))
.into_http_response(),
Ok(_) => {
// Set default domain name if missing
if matches!(
self.core.storage.config.get("lookup.default.domain").await,
Ok(None)
) {
if let Err(err) = self
.core
.storage
.config
.set([("lookup.default.domain", *domain)])
.await
{
tracing::error!("Failed to set default domain name: {}", err);
}
}
JsonResponse::new(json!({
"data": (),
}))
.into_http_response()
}
Err(err) => err.into_http_response(),
}
}

View file

@ -32,14 +32,13 @@ pub mod stores;
use std::{borrow::Cow, sync::Arc};
use http_body_util::combinators::BoxBody;
use hyper::{body::Bytes, Method};
use hyper::Method;
use jmap_proto::error::request::RequestError;
use serde::Serialize;
use crate::{auth::AccessToken, JMAP};
use super::{http::ToHttpResponse, HttpRequest, JsonResponse};
use super::{http::ToHttpResponse, HttpRequest, HttpResponse, JsonResponse};
#[derive(Serialize)]
#[serde(tag = "error")]
@ -72,7 +71,7 @@ impl JMAP {
req: &HttpRequest,
body: Option<Vec<u8>>,
access_token: Arc<AccessToken>,
) -> hyper::Response<BoxBody<Bytes, hyper::Error>> {
) -> HttpResponse {
let path = req.uri().path().split('/').skip(2).collect::<Vec<_>>();
let is_superuser = access_token.is_super_user();
@ -85,16 +84,16 @@ impl JMAP {
"queue" if is_superuser => self.handle_manage_queue(req, path).await,
"reports" if is_superuser => self.handle_manage_reports(req, path).await,
"dkim" if is_superuser => self.handle_manage_dkim(req, path, body).await,
"update" if is_superuser => self.handle_manage_update(req, path).await,
"oauth" => self.handle_oauth_api_request(access_token, body).await,
"crypto" => match *req.method() {
Method::POST => self.handle_crypto_post(access_token, body).await,
Method::GET => self.handle_crypto_get(access_token).await,
_ => RequestError::not_found().into_http_response(),
},
"password" => match *req.method() {
Method::POST => self.handle_change_password(req, access_token, body).await,
_ => RequestError::not_found().into_http_response(),
},
"password" if req.method() == Method::POST => {
self.handle_change_password(req, access_token, body).await
}
_ => RequestError::not_found().into_http_response(),
}
}

View file

@ -30,8 +30,8 @@ use directory::{
},
DirectoryError, DirectoryInner, ManagementError, Principal, QueryBy, Type,
};
use http_body_util::combinators::BoxBody;
use hyper::{body::Bytes, header, Method, StatusCode};
use hyper::{header, Method, StatusCode};
use jmap_proto::error::request::RequestError;
use serde_json::json;
use utils::url_params::UrlParams;
@ -76,7 +76,7 @@ impl JMAP {
req: &HttpRequest,
path: Vec<&str>,
body: Option<Vec<u8>>,
) -> hyper::Response<BoxBody<Bytes, hyper::Error>> {
) -> HttpResponse {
match (path.get(1), req.method()) {
(None, &Method::POST) => {
// Make sure the current directory supports updates
@ -251,6 +251,18 @@ impl JMAP {
body.as_deref().unwrap_or_default(),
) {
Ok(changes) => {
// Make sure the current directory supports updates
if let Some(response) = self.assert_supported_directory() {
if changes.iter().any(|change| {
!matches!(
change.field,
PrincipalField::Quota | PrincipalField::Description
)
}) {
return response;
}
}
match self
.core
.storage
@ -281,7 +293,7 @@ impl JMAP {
req: &HttpRequest,
access_token: Arc<AccessToken>,
body: Option<Vec<u8>>,
) -> hyper::Response<BoxBody<Bytes, hyper::Error>> {
) -> HttpResponse {
// Make sure the user authenticated using Basic auth
if req
.headers()
@ -295,11 +307,7 @@ impl JMAP {
.into_http_response();
}
// Make sure the current directory supports updates
if let Some(response) = self.assert_supported_directory() {
return response;
}
// Obtain new password
let new_password = match String::from_utf8(body.unwrap_or_default()) {
Ok(new_password) if !new_password.is_empty() => new_password,
_ => {
@ -310,6 +318,28 @@ impl JMAP {
}
};
// Handle Fallback admin password changes
if access_token.is_super_user() && access_token.primary_id() == u32::MAX {
return match self
.core
.storage
.config
.set([("authentication.fallback-admin.secret", new_password)])
.await
{
Ok(_) => JsonResponse::new(json!({
"data": (),
}))
.into_http_response(),
Err(err) => err.into_http_response(),
};
}
// Make sure the current directory supports updates
if let Some(response) = self.assert_supported_directory() {
return response;
}
// Update password
match self
.core
@ -332,9 +362,7 @@ impl JMAP {
}
}
pub fn assert_supported_directory(
&self,
) -> Option<hyper::Response<BoxBody<Bytes, hyper::Error>>> {
pub fn assert_supported_directory(&self) -> Option<HttpResponse> {
ManagementApiError::UnsupportedDirectoryOperation {
class: match &self.core.storage.directory.store {
DirectoryInner::Internal(_) => return None,

View file

@ -23,8 +23,7 @@
use std::str::FromStr;
use http_body_util::combinators::BoxBody;
use hyper::{body::Bytes, Method};
use hyper::Method;
use jmap_proto::error::request::RequestError;
use mail_auth::{
dmarc::URI,
@ -42,7 +41,7 @@ use store::{
use utils::url_params::UrlParams;
use crate::{
api::{http::ToHttpResponse, HttpRequest, JsonResponse},
api::{http::ToHttpResponse, HttpRequest, HttpResponse, JsonResponse},
JMAP,
};
@ -118,11 +117,7 @@ pub enum Report {
}
impl JMAP {
pub async fn handle_manage_queue(
&self,
req: &HttpRequest,
path: Vec<&str>,
) -> hyper::Response<BoxBody<Bytes, hyper::Error>> {
pub async fn handle_manage_queue(&self, req: &HttpRequest, path: Vec<&str>) -> HttpResponse {
let params = UrlParams::new(req.uri().query());
match (

View file

@ -21,23 +21,19 @@
* for more details.
*/
use http_body_util::combinators::BoxBody;
use hyper::{body::Bytes, Method};
use common::manager::SPAMFILTER_URL;
use hyper::Method;
use jmap_proto::error::request::RequestError;
use serde_json::json;
use utils::url_params::UrlParams;
use crate::{
api::{http::ToHttpResponse, HttpRequest, JsonResponse},
api::{http::ToHttpResponse, HttpRequest, HttpResponse, JsonResponse},
JMAP,
};
impl JMAP {
pub async fn handle_manage_reload(
&self,
req: &HttpRequest,
path: Vec<&str>,
) -> hyper::Response<BoxBody<Bytes, hyper::Error>> {
pub async fn handle_manage_reload(&self, req: &HttpRequest, path: Vec<&str>) -> HttpResponse {
match (path.get(1).copied(), req.method()) {
(Some("lookup"), &Method::GET) => {
match self.core.reload_lookups().await {
@ -92,4 +88,39 @@ impl JMAP {
_ => RequestError::not_found().into_http_response(),
}
}
pub async fn handle_manage_update(&self, req: &HttpRequest, path: Vec<&str>) -> HttpResponse {
match (path.get(1).copied(), req.method()) {
(Some("spam-filter"), &Method::GET) => {
match self
.core
.storage
.config
.update_external_config(SPAMFILTER_URL)
.await
{
Ok(result) => JsonResponse::new(json!({
"data": result,
}))
.into_http_response(),
Err(err) => err.into_http_response(),
}
}
(Some("webadmin"), &Method::GET) => {
match self
.inner
.webadmin
.update_and_unpack(&self.core.storage.blob)
.await
{
Ok(_) => JsonResponse::new(json!({
"data": (),
}))
.into_http_response(),
Err(err) => err.into_http_response(),
}
}
_ => RequestError::not_found().into_http_response(),
}
}
}

View file

@ -21,8 +21,7 @@
* for more details.
*/
use http_body_util::combinators::BoxBody;
use hyper::{body::Bytes, Method};
use hyper::Method;
use jmap_proto::error::request::RequestError;
use mail_auth::report::{
tlsrpt::{FailureDetails, Policy, TlsReport},
@ -37,7 +36,7 @@ use store::{
use utils::url_params::UrlParams;
use crate::{
api::{http::ToHttpResponse, HttpRequest, JsonResponse},
api::{http::ToHttpResponse, HttpRequest, HttpResponse, JsonResponse},
JMAP,
};
@ -48,11 +47,7 @@ enum ReportType {
}
impl JMAP {
pub async fn handle_manage_reports(
&self,
req: &HttpRequest,
path: Vec<&str>,
) -> hyper::Response<BoxBody<Bytes, hyper::Error>> {
pub async fn handle_manage_reports(&self, req: &HttpRequest, path: Vec<&str>) -> HttpResponse {
match (
path.get(1).copied().unwrap_or_default(),
path.get(2).copied(),

View file

@ -21,15 +21,14 @@
* for more details.
*/
use http_body_util::combinators::BoxBody;
use hyper::{body::Bytes, Method};
use hyper::Method;
use jmap_proto::error::request::RequestError;
use serde_json::json;
use store::ahash::AHashMap;
use utils::{config::ConfigKey, url_params::UrlParams};
use crate::{
api::{http::ToHttpResponse, HttpRequest, JsonResponse},
api::{http::ToHttpResponse, HttpRequest, HttpResponse, JsonResponse},
JMAP,
};
@ -57,7 +56,7 @@ impl JMAP {
req: &HttpRequest,
path: Vec<&str>,
body: Option<Vec<u8>>,
) -> hyper::Response<BoxBody<Bytes, hyper::Error>> {
) -> HttpResponse {
match (path.get(1).copied(), req.method()) {
(Some("group"), &Method::GET) => {
// List settings

View file

@ -21,22 +21,17 @@
* for more details.
*/
use http_body_util::combinators::BoxBody;
use hyper::{body::Bytes, Method};
use hyper::Method;
use jmap_proto::error::request::RequestError;
use serde_json::json;
use crate::{
api::{http::ToHttpResponse, HttpRequest, JsonResponse},
api::{http::ToHttpResponse, HttpRequest, HttpResponse, JsonResponse},
JMAP,
};
impl JMAP {
pub async fn handle_manage_store(
&self,
req: &HttpRequest,
path: Vec<&str>,
) -> hyper::Response<BoxBody<Bytes, hyper::Error>> {
pub async fn handle_manage_store(&self, req: &HttpRequest, path: Vec<&str>) -> HttpResponse {
match (path.get(1).copied(), req.method()) {
(Some("maintenance"), &Method::GET) => {
match self

View file

@ -24,7 +24,7 @@
use std::{net::IpAddr, sync::Arc, time::Instant};
use common::{listener::limiter::InFlight, AuthResult};
use directory::QueryBy;
use directory::{Principal, QueryBy};
use hyper::header;
use jmap_proto::error::request::RequestError;
use mail_parser::decoders::base64::base64_decode;
@ -179,15 +179,21 @@ impl JMAP {
}
pub async fn get_access_token(&self, account_id: u32) -> Option<AccessToken> {
// Create access token
self.update_access_token(AccessToken::new(
self.core
.storage
.directory
.query(QueryBy::Id(account_id), true)
.await
.ok()??,
))
.await
match self
.core
.storage
.directory
.query(QueryBy::Id(account_id), true)
.await
{
Ok(Some(principal)) => self.update_access_token(AccessToken::new(principal)).await,
_ => match &self.core.jmap.fallback_admin {
Some((_, secret)) if account_id == u32::MAX => {
self.update_access_token(AccessToken::new(Principal::fallback_admin(secret)))
.await
}
_ => None,
},
}
}
}

View file

@ -198,24 +198,33 @@ impl JMAP {
.into_http_response()
}
async fn password_hash(&self, account_id: u32) -> Result<String, &'static str> {
if account_id != u32::MAX {
self.core
.storage
.directory
.query(QueryBy::Id(account_id), false)
.await
.map_err(|_| "Temporary lookup error")?
.ok_or("Account no longer exists")?
.secrets
.into_iter()
.next()
.ok_or("Failed to obtain password hash")
} else if let Some((_, secret)) = &self.core.jmap.fallback_admin {
Ok(secret.clone())
} else {
Err("Invalid account id.")
}
}
pub async fn issue_token(
&self,
account_id: u32,
client_id: &str,
with_refresh_token: bool,
) -> Result<OAuthResponse, &'static str> {
let password_hash = self
.core
.storage
.directory
.query(QueryBy::Id(account_id), false)
.await
.map_err(|_| "Temporary lookup error")?
.ok_or("Account no longer exists")?
.secrets
.into_iter()
.next()
.ok_or("Failed to obtain password hash")?;
let password_hash = self.password_hash(account_id).await?;
Ok(OAuthResponse {
access_token: self.encode_access_token(
@ -324,18 +333,7 @@ impl JMAP {
}
// Obtain password hash
let password_hash = self
.core
.storage
.directory
.query(QueryBy::Id(account_id), false)
.await
.map_err(|_| "Temporary lookup error")?
.ok_or("Account no longer exists")?
.secrets
.into_iter()
.next()
.ok_or("Failed to obtain password hash")?;
let password_hash = self.password_hash(account_id).await?;
// Build context
let key = self.core.jmap.oauth_key.clone();

View file

@ -24,7 +24,7 @@
use std::{collections::hash_map::RandomState, fmt::Display, sync::Arc, time::Duration};
use auth::{rate_limit::ConcurrencyLimiters, AccessToken};
use common::{Core, DeliveryEvent, SharedCore};
use common::{manager::webadmin::WebAdminManager, Core, DeliveryEvent, SharedCore};
use dashmap::DashMap;
use directory::QueryBy;
use email::cache::Threads;
@ -98,6 +98,7 @@ pub struct Inner {
pub sessions: TtlDashMap<String, u32>,
pub access_tokens: TtlDashMap<u32, Arc<AccessToken>>,
pub snowflake_id: SnowflakeIdGenerator,
pub webadmin: WebAdminManager,
pub concurrency_limiter: DashMap<u32, Arc<ConcurrencyLimiters>>,
@ -131,6 +132,7 @@ impl JMAP {
let capacity = config.property("cache.capacity").unwrap_or(100);
let inner = Inner {
webadmin: WebAdminManager::new(),
sessions: TtlDashMap::with_capacity(capacity, shard_amount),
access_tokens: TtlDashMap::with_capacity(capacity, shard_amount),
snowflake_id: config
@ -149,6 +151,11 @@ impl JMAP {
),
};
// Unpack webadmin
if let Err(err) = inner.webadmin.unpack(&core.load().storage.blob).await {
tracing::warn!(event = "error", error = ?err, "Failed to unpack webadmin bundle.");
}
let jmap_instance = JmapInstance {
core,
jmap_inner: Arc::new(inner),

View file

@ -23,7 +23,7 @@
use std::time::Duration;
use common::config::{manager::BootManager, server::ServerProtocol};
use common::{config::server::ServerProtocol, manager::boot::BootManager};
use imap::core::{ImapSessionManager, IMAP};
use jmap::{api::JmapSessionManager, services::IPC_CHANNEL_BUFFER, JMAP};
use managesieve::core::ManageSieveSessionManager;

View file

@ -1,30 +1,14 @@
#!/bin/bash
BASE_DIR="/Users/me/Downloads/stalwart-test"
DOMAIN="example.org"
FEATURES="sqlite foundationdb postgres mysql rocks elastic s3 redis"
# Delete previous tests
rm -rf $BASE_DIR
# Create directories
mkdir -p $BASE_DIR $BASE_DIR/data $BASE_DIR/etc
# Copy resources
cp -r resources/config/config.toml $BASE_DIR/etc
# Replace settings
sed -i '' -e "s|%{env:STALWART_PATH}%|$BASE_DIR|g" \
-e "s|%{env:DOMAIN}%|$DOMAIN|g" \
-e "s|%{env:HOSTNAME}%|mail.$DOMAIN|g" \
-e "s|%{env:OAUTH_KEY}%|12345|g" \
-e 's/level = "info"/level = "trace"/g' "$BASE_DIR/etc/config.toml"
#sed -i '' -e 's/allow-plain-text = false/allow-plain-text = true/g' \
# -e 's/2000\/1m/9999999\/100m/g' \
# -e 's/concurrent = 4/concurrent = 90000/g' "$BASE_DIR/etc/imap/settings.toml"
# Create admin user
SET_ADMIN_USER="admin" SET_ADMIN_PASS="secret" cargo run -p mail-server --no-default-features --features "$FEATURES" -- --config=$BASE_DIR/etc/config.toml
cargo run -p mail-server --no-default-features --features "$FEATURES" -- --config=$BASE_DIR/etc/config.toml
cargo run -p mail-server --no-default-features --features "$FEATURES" -- --init=$BASE_DIR
echo "[server.http]\npermissive-cors = true\n" >> $BASE_DIR/etc/config.toml
echo "[tracer.stdout]\ntype = 'stdout'\nlevel = 'info'\nansi = true\nenable = true" >> $BASE_DIR/etc/config.toml
#cargo run -p mail-server --no-default-features --features "$FEATURES" -- --config=$BASE_DIR/etc/config.toml