Renamed jmilters to hooks

This commit is contained in:
mdecimus 2024-06-21 17:05:14 +02:00
parent 68a189ed9f
commit 0b46ec5808
34 changed files with 249 additions and 214 deletions

50
Cargo.lock generated
View file

@ -1010,7 +1010,7 @@ dependencies = [
[[package]]
name = "common"
version = "0.8.1"
version = "0.8.2"
dependencies = [
"ahash 0.8.11",
"arc-swap",
@ -1024,8 +1024,8 @@ dependencies = [
"hostname 0.4.0",
"hyper 1.3.1",
"idna 1.0.1",
"imagesize 0.13.0",
"infer 0.16.0",
"imagesize",
"infer",
"jmap_proto",
"mail-auth",
"mail-parser",
@ -1612,7 +1612,7 @@ dependencies = [
[[package]]
name = "directory"
version = "0.8.1"
version = "0.8.2"
dependencies = [
"ahash 0.8.11",
"argon2",
@ -2941,12 +2941,6 @@ dependencies = [
"utf8_iter",
]
[[package]]
name = "imagesize"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "029d73f573d8e8d63e6d5020011d3255b28c3ba85d6cf870a07184ed23de9284"
[[package]]
name = "imagesize"
version = "0.13.0"
@ -2955,7 +2949,7 @@ checksum = "edcd27d72f2f071c64249075f42e205ff93c9a4c5f6c6da53e79ed9f9832c285"
[[package]]
name = "imap"
version = "0.8.1"
version = "0.8.2"
dependencies = [
"ahash 0.8.11",
"common",
@ -3024,15 +3018,6 @@ dependencies = [
"unicode-width",
]
[[package]]
name = "infer"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb33622da908807a06f9513c19b3c1ad50fab3e4137d82a78107d502075aa199"
dependencies = [
"cfb",
]
[[package]]
name = "infer"
version = "0.16.0"
@ -3166,7 +3151,7 @@ dependencies = [
[[package]]
name = "jmap"
version = "0.8.1"
version = "0.8.2"
dependencies = [
"aes",
"aes-gcm",
@ -3604,7 +3589,7 @@ dependencies = [
[[package]]
name = "mail-server"
version = "0.8.1"
version = "0.8.2"
dependencies = [
"common",
"directory",
@ -3623,7 +3608,7 @@ dependencies = [
[[package]]
name = "managesieve"
version = "0.8.1"
version = "0.8.2"
dependencies = [
"ahash 0.8.11",
"bincode",
@ -3917,7 +3902,7 @@ dependencies = [
[[package]]
name = "nlp"
version = "0.8.1"
version = "0.8.2"
dependencies = [
"ahash 0.8.11",
"bincode",
@ -4473,7 +4458,7 @@ dependencies = [
[[package]]
name = "pop3"
version = "0.8.1"
version = "0.8.2"
dependencies = [
"common",
"imap",
@ -6021,22 +6006,19 @@ checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67"
[[package]]
name = "smtp"
version = "0.8.1"
version = "0.8.2"
dependencies = [
"ahash 0.8.11",
"bincode",
"blake3",
"chrono",
"common",
"dashmap",
"decancer",
"directory",
"form_urlencoded",
"http-body-util",
"hyper 1.3.1",
"hyper-util",
"idna 0.5.0",
"imagesize 0.12.0",
"infer 0.15.0",
"lazy_static",
"lru-cache",
"mail-auth",
@ -6064,10 +6046,8 @@ dependencies = [
"tokio",
"tokio-rustls 0.25.0",
"tracing",
"unicode-security",
"utils",
"webpki-roots 0.26.3",
"whatlang",
"x509-parser 0.16.0",
]
@ -6143,7 +6123,7 @@ checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
[[package]]
name = "stalwart-cli"
version = "0.8.1"
version = "0.8.2"
dependencies = [
"clap",
"console",
@ -6174,7 +6154,7 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "store"
version = "0.8.1"
version = "0.8.2"
dependencies = [
"ahash 0.8.11",
"arc-swap",
@ -7085,7 +7065,7 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
name = "utils"
version = "0.8.1"
version = "0.8.2"
dependencies = [
"ahash 0.8.11",
"base64 0.22.1",

View file

@ -5,7 +5,7 @@ authors = ["Stalwart Labs Ltd. <hello@stalw.art>"]
license = "AGPL-3.0-only"
repository = "https://github.com/stalwartlabs/cli"
homepage = "https://github.com/stalwartlabs/cli"
version = "0.8.1"
version = "0.8.2"
edition = "2021"
readme = "README.md"
resolver = "2"

View file

@ -1,6 +1,6 @@
[package]
name = "common"
version = "0.8.1"
version = "0.8.2"
edition = "2021"
resolver = "2"
@ -16,7 +16,7 @@ mail-auth = { version = "0.4" }
mail-send = { version = "0.4", default-features = false, features = ["cram-md5"] }
smtp-proto = { version = "0.1", features = ["serde_support"] }
dns-update = { version = "0.1" }
ahash = { version = "0.8.1", features = ["serde"] }
ahash = { version = "0.8.2", features = ["serde"] }
parking_lot = "0.12.1"
regex = "1.7.0"
tracing = "0.1"
@ -36,7 +36,7 @@ serde_json = "1.0"
base64 = "0.22"
x509-parser = "0.16.0"
pem = "3.0"
chrono = "0.4"
chrono = { version = "0.4", features = ["serde"] }
hyper = { version = "1.0.1", features = ["server", "http1", "http2"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-appender = "0.2"

View file

@ -445,7 +445,7 @@ impl ParseValue for SpecialUse {
"archive" => Ok(SpecialUse::Archive),
"sent" => Ok(SpecialUse::Sent),
"shared" => Ok(SpecialUse::Shared),
"none" => Ok(SpecialUse::None),
//"none" => Ok(SpecialUse::None),
other => Err(format!("Unknown folder role {other:?}")),
}
}

View file

@ -124,8 +124,8 @@ fn parse_webhook(config: &mut Config, id: &str) -> Option<Webhook> {
headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
if let (Some(name), Some(secret)) = (
config.value(("webhook", id, "user")),
config.value(("webhook", id, "secret")),
config.value(("webhook", id, "auth.username")),
config.value(("webhook", id, "auth.secret")),
) {
headers.insert(
AUTHORIZATION,

View file

@ -39,7 +39,7 @@ pub struct SessionConfig {
pub mta_sts_policy: Option<Policy>,
pub milters: Vec<Milter>,
pub jmilters: Vec<JMilter>,
pub hooks: Vec<FilterHook>,
}
#[derive(Default, Debug, Clone)]
@ -173,7 +173,7 @@ pub enum MilterVersion {
}
#[derive(Clone)]
pub struct JMilter {
pub struct FilterHook {
pub enable: IfBlock,
pub url: String,
pub timeout: Duration,
@ -181,6 +181,7 @@ pub struct JMilter {
pub tls_allow_invalid_certs: bool,
pub tempfail_on_error: bool,
pub run_on_stage: AHashSet<Stage>,
pub max_response_size: usize,
}
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
@ -212,12 +213,12 @@ impl SessionConfig {
.into_iter()
.filter_map(|id| parse_milter(config, &id, &has_rcpt_vars))
.collect();
session.jmilters = config
.sub_keys("session.jmilter", ".url")
session.hooks = config
.sub_keys("session.hook", ".url")
.map(|s| s.to_string())
.collect::<Vec<_>>()
.into_iter()
.filter_map(|id| parse_jmilter(config, &id, &has_rcpt_vars))
.filter_map(|id| parse_hooks(config, &id, &has_rcpt_vars))
.collect();
session.data.pipe_commands = config
.sub_keys("session.data.pipe", "")
@ -581,33 +582,33 @@ fn parse_milter(config: &mut Config, id: &str, token_map: &TokenMap) -> Option<M
})
}
fn parse_jmilter(config: &mut Config, id: &str, token_map: &TokenMap) -> Option<JMilter> {
fn parse_hooks(config: &mut Config, id: &str, token_map: &TokenMap) -> Option<FilterHook> {
let mut headers = HeaderMap::new();
for (header, value) in config
.values(("session.jmilter", id, "headers"))
.values(("session.hook", id, "headers"))
.map(|(_, v)| {
if let Some((k, v)) = v.split_once(':') {
Ok((
HeaderName::from_str(k.trim()).map_err(|err| {
format!(
"Invalid header found in property \"session.jmilter.{id}.headers\": {err}",
"Invalid header found in property \"session.hook.{id}.headers\": {err}",
)
})?,
HeaderValue::from_str(v.trim()).map_err(|err| {
format!(
"Invalid header found in property \"session.jmilter.{id}.headers\": {err}",
"Invalid header found in property \"session.hook.{id}.headers\": {err}",
)
})?,
))
} else {
Err(format!(
"Invalid header found in property \"session.jmilter.{id}.headers\": {v}",
"Invalid header found in property \"session.hook.{id}.headers\": {v}",
))
}
})
.collect::<Result<Vec<(HeaderName, HeaderValue)>, String>>()
.map_err(|e| config.new_parse_error(("session.jmilter", id, "headers"), e))
.map_err(|e| config.new_parse_error(("session.hook", id, "headers"), e))
.unwrap_or_default()
{
headers.insert(header, value);
@ -615,8 +616,8 @@ fn parse_jmilter(config: &mut Config, id: &str, token_map: &TokenMap) -> Option<
headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
if let (Some(name), Some(secret)) = (
config.value(("session.jmilter", id, "user")),
config.value(("session.jmilter", id, "secret")),
config.value(("session.hook", id, "auth.username")),
config.value(("session.hook", id, "auth.secret")),
) {
headers.insert(
AUTHORIZATION,
@ -626,24 +627,30 @@ fn parse_jmilter(config: &mut Config, id: &str, token_map: &TokenMap) -> Option<
);
}
Some(JMilter {
enable: IfBlock::try_parse(config, ("session.jmilter", id, "enable"), token_map)
Some(FilterHook {
enable: IfBlock::try_parse(config, ("session.hook", id, "enable"), token_map)
.unwrap_or_else(|| {
IfBlock::new::<()>(format!("session.jmilter.{id}.enable"), [], "false")
IfBlock::new::<()>(format!("session.hook.{id}.enable"), [], "false")
}),
url: config
.value_require(("session.jmilter", id, "url"))?
.value_require(("session.hook", id, "url"))?
.to_string(),
timeout: config
.property_or_default(("session.jmilter", id, "timeout"), "30s")
.property_or_default(("session.hook", id, "timeout"), "30s")
.unwrap_or_else(|| Duration::from_secs(30)),
tls_allow_invalid_certs: config
.property_or_default(("session.jmilter", id, "allow-invalid-certs"), "false")
.property_or_default(("session.hook", id, "allow-invalid-certs"), "false")
.unwrap_or_default(),
tempfail_on_error: config
.property_or_default(("session.jmilter", id, "options.tempfail-on-error"), "true")
.property_or_default(("session.hook", id, "options.tempfail-on-error"), "true")
.unwrap_or(true),
run_on_stage: parse_stages(config, "session.jmilter", id),
run_on_stage: parse_stages(config, "session.hook", id),
max_response_size: config
.property_or_default(
("session.hook", id, "options.max-response-size"),
"52428800",
)
.unwrap_or(52428800),
headers,
})
}
@ -853,7 +860,7 @@ impl Default for SessionConfig {
},
mta_sts_policy: None,
milters: Default::default(),
jmilters: Default::default(),
hooks: Default::default(),
}
}
}

View file

@ -113,7 +113,7 @@ impl AllowedIps {
{
// Add loopback addresses
ip_addresses.insert(IpAddr::V4(std::net::Ipv4Addr::LOCALHOST));
ip_addresses.insert(IpAddr::V6(std::net::IIpv6Addr::LOCALHOST));
ip_addresses.insert(IpAddr::V6(std::net::Ipv6Addr::LOCALHOST));
}
AllowedIps {

View file

@ -9,6 +9,8 @@ pub mod functions;
pub mod plugins;
#[derive(Debug, serde::Serialize)]
#[serde(tag = "action")]
#[serde(rename_all = "camelCase")]
pub enum ScriptModification {
SetEnvelope {
name: Envelope,

View file

@ -136,10 +136,10 @@ pub enum WebhookPayload {
return_path: String,
recipients: Vec<String>,
#[serde(rename = "nextRetry")]
next_retry: String,
next_retry: DateTime<Utc>,
#[serde(rename = "nextDSN")]
next_dsn: String,
expires: String,
next_dsn: DateTime<Utc>,
expires: DateTime<Utc>,
size: usize,
},
MessageRejected {
@ -172,7 +172,7 @@ pub enum WebhookPayload {
sender: String,
status: Vec<WebhookDSN>,
#[serde(rename = "createdAt")]
created: String,
created: DateTime<Utc>,
},
IncomingDmarcReport {
#[serde(rename = "rangeFrom")]
@ -296,9 +296,9 @@ pub struct WebhookDSN {
pub message: String,
#[serde(rename = "nextRetry")]
#[serde(skip_serializing_if = "Option::is_none")]
pub next_retry: Option<String>,
pub next_retry: Option<DateTime<Utc>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub expires: Option<String>,
pub expires: Option<DateTime<Utc>>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "retryCount")]
pub retry_count: Option<u32>,

View file

@ -1,6 +1,6 @@
[package]
name = "directory"
version = "0.8.1"
version = "0.8.2"
edition = "2021"
resolver = "2"

View file

@ -1,6 +1,6 @@
[package]
name = "imap"
version = "0.8.1"
version = "0.8.2"
edition = "2021"
resolver = "2"

View file

@ -10,7 +10,7 @@ utils = { path = "../utils" }
mail-parser = { version = "0.9", features = ["full_encoding", "serde_support", "ludicrous_mode"] }
fast-float = "0.2.0"
serde = { version = "1.0", features = ["derive"]}
ahash = { version = "0.8.1", features = ["serde"] }
ahash = { version = "0.8.2", features = ["serde"] }
serde_json = { version = "1.0", features = ["raw_value"] }
tracing = "0.1"

View file

@ -1,6 +1,6 @@
[package]
name = "jmap"
version = "0.8.1"
version = "0.8.2"
edition = "2021"
resolver = "2"

View file

@ -23,11 +23,12 @@
use std::time::SystemTime;
use common::{scripts::ScriptModification, IntoString};
use hyper::Method;
use jmap_proto::error::request::RequestError;
use serde_json::json;
use sieve::{runtime::Variable, Envelope};
use smtp::scripts::ScriptParameters;
use smtp::scripts::{ScriptParameters, ScriptResult};
use utils::url_params::UrlParams;
use crate::{
@ -35,6 +36,23 @@ use crate::{
JMAP,
};
#[derive(Debug, serde::Serialize)]
#[serde(tag = "action")]
#[serde(rename_all = "lowercase")]
pub enum Response {
Accept {
modifications: Vec<ScriptModification>,
},
Replace {
message: String,
modifications: Vec<ScriptModification>,
},
Reject {
reason: String,
},
Discard,
}
impl JMAP {
pub async fn handle_run_sieve(
&self,
@ -66,11 +84,11 @@ impl JMAP {
let mut envelope_to = Vec::new();
for (key, value) in UrlParams::new(req.uri().query()).into_inner() {
if key.starts_with("env_to") {
envelope_to.push(Variable::from(value.to_lowercase()));
continue;
}
let env = match key.as_ref() {
"env_to" => {
envelope_to.push(Variable::from(value.to_lowercase()));
continue;
}
"env_from" => Envelope::From,
"env_orcpt" => Envelope::Orcpt,
"env_ret" => Envelope::Ret,
@ -94,10 +112,22 @@ impl JMAP {
}
// Run script
let result = self
let result = match self
.smtp
.run_script(script, params, tracing::debug_span!("sieve_manual_run"))
.await;
.await
{
ScriptResult::Accept { modifications } => Response::Accept { modifications },
ScriptResult::Replace {
message,
modifications,
} => Response::Replace {
message: message.into_string(),
modifications,
},
ScriptResult::Reject(reason) => Response::Reject { reason },
ScriptResult::Discard => Response::Discard,
};
JsonResponse::new(json!({
"data": result,

View file

@ -7,7 +7,7 @@ homepage = "https://stalw.art"
keywords = ["imap", "jmap", "smtp", "email", "mail", "server"]
categories = ["email"]
license = "AGPL-3.0-only"
version = "0.8.1"
version = "0.8.2"
edition = "2021"
resolver = "2"

View file

@ -1,6 +1,6 @@
[package]
name = "managesieve"
version = "0.8.1"
version = "0.8.2"
edition = "2021"
resolver = "2"

View file

@ -1,6 +1,6 @@
[package]
name = "nlp"
version = "0.8.1"
version = "0.8.2"
edition = "2021"
resolver = "2"

View file

@ -1,6 +1,6 @@
[package]
name = "pop3"
version = "0.8.1"
version = "0.8.2"
edition = "2021"
resolver = "2"

View file

@ -7,7 +7,7 @@ homepage = "https://stalw.art/smtp"
keywords = ["smtp", "email", "mail", "server"]
categories = ["email"]
license = "AGPL-3.0-only"
version = "0.8.1"
version = "0.8.2"
edition = "2021"
resolver = "2"
@ -51,13 +51,8 @@ serde = { version = "1.0", features = ["derive", "rc"] }
serde_json = "1.0"
num_cpus = "1.15.0"
lazy_static = "1.4"
whatlang = "0.16"
imagesize = "0.12"
idna = "0.5"
decancer = "3.0.1"
unicode-security = "0.1.0"
infer = "0.15.0"
bincode = "1.3.1"
chrono = "0.4"
[features]
test_mode = []

View file

@ -28,6 +28,7 @@ use std::{
time::{Duration, SystemTime},
};
use chrono::{TimeZone, Utc};
use common::{
config::smtp::{auth::VerifyStrategy, session::Stage},
listener::SessionStream,
@ -39,7 +40,6 @@ use mail_auth::{
dmarc, AuthenticatedMessage, AuthenticationResults, DkimResult, DmarcResult, ReceivedSpf,
};
use mail_builder::headers::{date::Date, message_id::generate_message_id_header};
use mail_parser::DateTime;
use sieve::runtime::Variable;
use smtp_proto::{
MAIL_BY_RETURN, RCPT_NOTIFY_DELAY, RCPT_NOTIFY_FAILURE, RCPT_NOTIFY_NEVER, RCPT_NOTIFY_SUCCESS,
@ -447,15 +447,18 @@ impl<T: SessionStream> Session<T> {
}
};
// Run JMilter filters
match self.run_jmilters(Stage::Data, (&auth_message).into()).await {
// Run filter hooks
match self
.run_filter_hooks(Stage::Data, (&auth_message).into())
.await
{
Ok(modifications_) => {
if !modifications_.is_empty() {
tracing::debug!(
parent: &self.span,
context = "jmilter",
context = "filter_hook",
event = "accept",
"JMilter filter(s) accepted message.");
"FilterHook filter(s) accepted message.");
modifications.retain(|m| !matches!(m, Modification::ReplaceBody { .. }));
modifications.extend(modifications_);
@ -777,10 +780,18 @@ impl<T: SessionStream> Session<T> {
.iter()
.map(|r| r.address_lcase.clone())
.collect(),
next_retry: DateTime::from_timestamp(message.next_delivery_event() as i64)
.to_rfc3339(),
next_dsn: DateTime::from_timestamp(message.next_dsn() as i64).to_rfc3339(),
expires: DateTime::from_timestamp(message.expires() as i64).to_rfc3339(),
next_retry: Utc
.timestamp_opt(message.next_delivery_event() as i64, 0)
.single()
.unwrap_or_else(Utc::now),
next_dsn: Utc
.timestamp_opt(message.next_dsn() as i64, 0)
.single()
.unwrap_or_else(Utc::now),
expires: Utc
.timestamp_opt(message.expires() as i64, 0)
.single()
.unwrap_or_else(Utc::now),
size: message.size,
});

View file

@ -119,10 +119,10 @@ impl<T: SessionStream> Session<T> {
return self.write(message.message.as_bytes()).await;
}
// JMilter filtering
if let Err(message) = self.run_jmilters(Stage::Ehlo, None).await {
// FilterHook filtering
if let Err(message) = self.run_filter_hooks(Stage::Ehlo, None).await {
tracing::info!(parent: &self.span,
context = "jmilter",
context = "filter_hook",
event = "reject",
domain = &self.data.helo_domain,
reason = message.message.as_ref());

View file

@ -21,41 +21,52 @@
* for more details.
*/
use common::config::smtp::session::JMilter;
use common::config::smtp::session::FilterHook;
use super::{Request, Response};
pub(super) async fn send_jmilter_request(
jmilter: &JMilter,
pub(super) async fn send_filter_hook_request(
filter_hook: &FilterHook,
request: Request,
) -> Result<Response, String> {
let response = reqwest::Client::builder()
.timeout(jmilter.timeout)
.danger_accept_invalid_certs(jmilter.tls_allow_invalid_certs)
.timeout(filter_hook.timeout)
.danger_accept_invalid_certs(filter_hook.tls_allow_invalid_certs)
.build()
.map_err(|err| format!("Failed to create HTTP client: {}", err))?
.post(&jmilter.url)
.headers(jmilter.headers.clone())
.post(&filter_hook.url)
.headers(filter_hook.headers.clone())
.body(
serde_json::to_string(&request)
.map_err(|err| format!("Failed to serialize jMilter request: {}", err))?,
.map_err(|err| format!("Failed to serialize Hook request: {}", err))?,
)
.send()
.await
.map_err(|err| format!("jMilter request failed: {err}"))?;
.map_err(|err| format!("Hook request failed: {err}"))?;
if response.status().is_success() {
if response
.content_length()
.map_or(false, |len| len as usize > filter_hook.max_response_size)
{
return Err(format!(
"Hook response too large ({} bytes)",
response.content_length().unwrap()
));
}
// TODO: Stream response body to limit response size
serde_json::from_slice(
response
.bytes()
.await
.map_err(|err| format!("Failed to parse jMilter response: {}", err))?
.map_err(|err| format!("Failed to parse Hook response: {}", err))?
.as_ref(),
)
.map_err(|err| format!("Failed to parse jMilter response: {}", err))
.map_err(|err| format!("Failed to parse Hook response: {}", err))
} else {
Err(format!(
"jMilter request failed with code {}: {}",
"Hook request failed with code {}: {}",
response.status().as_u16(),
response.status().canonical_reason().unwrap_or("Unknown")
))

View file

@ -23,7 +23,7 @@
use ahash::AHashMap;
use common::{
config::smtp::session::{JMilter, Stage},
config::smtp::session::{FilterHook, Stage},
listener::SessionStream,
DAEMON_NAME,
};
@ -32,7 +32,7 @@ use mail_auth::AuthenticatedMessage;
use crate::{
core::Session,
inbound::{
jmilter::{
hooks::{
Address, Client, Context, Envelope, Message, Protocol, Request, Sasl, Server, Tls,
},
milter::Modification,
@ -40,33 +40,33 @@ use crate::{
},
};
use super::{client::send_jmilter_request, Action, Response};
use super::{client::send_filter_hook_request, Action, Response};
impl<T: SessionStream> Session<T> {
pub async fn run_jmilters(
pub async fn run_filter_hooks(
&self,
stage: Stage,
message: Option<&AuthenticatedMessage<'_>>,
) -> Result<Vec<Modification>, FilterResponse> {
let jmilters = &self.core.core.smtp.session.jmilters;
if jmilters.is_empty() {
let filter_hooks = &self.core.core.smtp.session.hooks;
if filter_hooks.is_empty() {
return Ok(Vec::new());
}
let mut modifications = Vec::new();
for jmilter in jmilters {
if !jmilter.run_on_stage.contains(&stage)
for filter_hook in filter_hooks {
if !filter_hook.run_on_stage.contains(&stage)
|| !self
.core
.core
.eval_if(&jmilter.enable, self)
.eval_if(&filter_hook.enable, self)
.await
.unwrap_or(false)
{
continue;
}
match self.run_jmilter(stage, jmilter, message).await {
match self.run_filter_hook(stage, filter_hook, message).await {
Ok(response) => {
let mut new_modifications = Vec::with_capacity(response.modifications.len());
for modification in response.modifications {
@ -154,12 +154,12 @@ impl<T: SessionStream> Session<T> {
Err(err) => {
tracing::warn!(
parent: &self.span,
jmilter.url = &jmilter.url,
context = "jmilter",
filter_hook.url = &filter_hook.url,
context = "filter_hook",
event = "error",
reason = ?err,
"JMilter filter failed");
if jmilter.tempfail_on_error {
"FilterHook filter failed");
if filter_hook.tempfail_on_error {
return Err(FilterResponse::server_failure());
}
}
@ -169,10 +169,10 @@ impl<T: SessionStream> Session<T> {
Ok(modifications)
}
pub async fn run_jmilter(
pub async fn run_filter_hook(
&self,
stage: Stage,
jmilter: &JMilter,
filter_hook: &FilterHook,
message: Option<&AuthenticatedMessage<'_>>,
) -> Result<Response, String> {
// Build request
@ -245,7 +245,7 @@ impl<T: SessionStream> Session<T> {
}),
};
send_jmilter_request(jmilter, request).await
send_filter_hook_request(filter_hook, request).await
}
}

View file

@ -179,10 +179,10 @@ impl<T: SessionStream> Session<T> {
return self.write(message.message.as_bytes()).await;
}
// JMilter filtering
if let Err(message) = self.run_jmilters(Stage::Mail, None).await {
// FilterHook filtering
if let Err(message) = self.run_filter_hooks(Stage::Mail, None).await {
tracing::info!(parent: &self.span,
context = "jmilter",
context = "filter_hook",
event = "reject",
address = &self.data.mail_from.as_ref().unwrap().address,
reason = message.message.as_ref());

View file

@ -32,7 +32,7 @@ use mail_auth::{
pub mod auth;
pub mod data;
pub mod ehlo;
pub mod jmilter;
pub mod hooks;
pub mod mail;
pub mod milter;
pub mod rcpt;

View file

@ -143,10 +143,10 @@ impl<T: SessionStream> Session<T> {
return self.write(message.message.as_bytes()).await;
}
// JMilter filtering
if let Err(message) = self.run_jmilters(Stage::Rcpt, None).await {
// FilterHook filtering
if let Err(message) = self.run_filter_hooks(Stage::Rcpt, None).await {
tracing::info!(parent: &self.span,
context = "jmilter",
context = "filter_hook",
event = "reject",
address = self.data.rcpt_to.last().unwrap().address,
reason = message.message.as_ref());

View file

@ -131,11 +131,11 @@ impl<T: SessionStream> Session<T> {
return false;
}
// JMilter filtering
if let Err(message) = self.run_jmilters(Stage::Connect, None).await {
// FilterHook filtering
if let Err(message) = self.run_filter_hooks(Stage::Connect, None).await {
tracing::debug!(parent: &self.span,
context = "connect",
event = "jmilter-reject",
event = "filter_hook-reject",
reason = message.message.as_ref());
let _ = self.write(message.message.as_bytes()).await;
return false;

View file

@ -74,7 +74,7 @@ impl SMTP {
// Fetch policy
#[cfg(not(feature = "test_mode"))]
let bytes = reqwest::Client::builder()
.user_agent(crate::USER_AGENT)
.user_agent(common::USER_AGENT)
.timeout(timeout)
.redirect(reqwest::redirect::Policy::none())
.build()?

View file

@ -21,6 +21,7 @@
* for more details.
*/
use chrono::{TimeZone, Utc};
use common::webhooks::{WebhookDSN, WebhookDSNType, WebhookPayload, WebhookType};
use mail_builder::headers::content_type::ContentType;
use mail_builder::headers::HeaderType;
@ -112,12 +113,8 @@ impl SMTP {
typ: WebhookDSNType::TemporaryFailure,
remote_host: response.hostname.entity.clone().into(),
message: response.response.to_string(),
next_retry: DateTime::from_timestamp(domain.retry.due as i64)
.to_rfc3339()
.into(),
expires: DateTime::from_timestamp(domain.expires as i64)
.to_rfc3339()
.into(),
next_retry: Utc.timestamp_opt(domain.retry.due as i64, 0).single(),
expires: Utc.timestamp_opt(domain.expires as i64, 0).single(),
retry_count: domain.retry.inner.into(),
});
}
@ -152,12 +149,8 @@ impl SMTP {
typ: WebhookDSNType::TemporaryFailure,
remote_host: None,
message: err.to_string(),
next_retry: DateTime::from_timestamp(domain.retry.due as i64)
.to_rfc3339()
.into(),
expires: DateTime::from_timestamp(domain.expires as i64)
.to_rfc3339()
.into(),
next_retry: Utc.timestamp_opt(domain.retry.due as i64, 0).single(),
expires: Utc.timestamp_opt(domain.expires as i64, 0).single(),
retry_count: domain.retry.inner.into(),
});
}
@ -167,12 +160,8 @@ impl SMTP {
typ: WebhookDSNType::TemporaryFailure,
remote_host: None,
message: "Concurrency limited".to_string(),
next_retry: DateTime::from_timestamp(domain.retry.due as i64)
.to_rfc3339()
.into(),
expires: DateTime::from_timestamp(domain.expires as i64)
.to_rfc3339()
.into(),
next_retry: Utc.timestamp_opt(domain.retry.due as i64, 0).single(),
expires: Utc.timestamp_opt(domain.expires as i64, 0).single(),
retry_count: domain.retry.inner.into(),
});
}
@ -193,7 +182,10 @@ impl SMTP {
id: message.id,
sender: message.return_path_lcase.clone(),
status: webhook_data,
created: DateTime::from_timestamp(message.created as i64).to_rfc3339(),
created: Utc
.timestamp_opt(message.created as i64, 0)
.single()
.unwrap_or_else(Utc::now),
},
)
.await;

View file

@ -23,6 +23,7 @@
use std::{io, sync::Arc, time::SystemTime};
use chrono::{TimeZone, Utc};
use common::{
config::smtp::{
report::{AddressMatch, AggregateFrequency},
@ -186,10 +187,18 @@ impl SMTP {
.iter()
.map(|r| r.address_lcase.clone())
.collect(),
next_retry: DateTime::from_timestamp(message.next_delivery_event() as i64)
.to_rfc3339(),
next_dsn: DateTime::from_timestamp(message.next_dsn() as i64).to_rfc3339(),
expires: DateTime::from_timestamp(message.expires() as i64).to_rfc3339(),
next_retry: Utc
.timestamp_opt(message.next_delivery_event() as i64, 0)
.single()
.unwrap_or_else(Utc::now),
next_dsn: Utc
.timestamp_opt(message.next_dsn() as i64, 0)
.single()
.unwrap_or_else(Utc::now),
expires: Utc
.timestamp_opt(message.expires() as i64, 0)
.single()
.unwrap_or_else(Utc::now),
size: message.size,
},
)

View file

@ -1,6 +1,6 @@
[package]
name = "store"
version = "0.8.1"
version = "0.8.2"
edition = "2021"
resolver = "2"
@ -18,7 +18,7 @@ rand = "0.8.5"
roaring = "0.10.1"
rayon = { version = "1.5.1", optional = true }
serde = { version = "1.0", features = ["derive"]}
ahash = { version = "0.8.1", features = ["serde"] }
ahash = { version = "0.8.2", features = ["serde"] }
lazy_static = "1.4"
xxhash-rust = { version = "0.8.5", features = ["xxh3"] }
farmhash = "1.1.5"

View file

@ -1,6 +1,6 @@
[package]
name = "utils"
version = "0.8.1"
version = "0.8.2"
edition = "2021"
resolver = "2"

View file

@ -39,7 +39,7 @@ use serde::Deserialize;
use smtp::{
core::{Inner, Session, SessionData},
inbound::{
jmilter::{self, Request, SmtpResponse},
hooks::{self, Request, SmtpResponse},
milter::{
receiver::{FrameResult, Receiver},
Action, Command, Macros, MilterClient, Modification, Options, Response,
@ -107,7 +107,7 @@ path = "{TMP}/queue.db"
[session.rcpt]
relay = true
[[session.jmilter]]
[[session.hook]]
url = "http://127.0.0.1:9333"
enable = true
stages = ["data"]
@ -247,7 +247,7 @@ async fn milter_session() {
}
#[tokio::test]
async fn jmilter_session() {
async fn filter_hook_session() {
// Enable logging
/*let disable = "true";
tracing::subscriber::set_global_default(
@ -258,11 +258,11 @@ async fn jmilter_session() {
.unwrap();*/
// Configure tests
let tmp_dir = TempDir::new("smtp_jmilter_test", true);
let tmp_dir = TempDir::new("smtp_filter_hook_test", true);
let mut config = Config::new(tmp_dir.update_config(CONFIG_JMILTER)).unwrap();
let stores = Stores::parse_all(&mut config).await;
let core = Core::parse(&mut config, stores, Default::default()).await;
let _rx = spawn_mock_jmilter_server();
let _rx = spawn_mock_filter_hook_server();
tokio::time::sleep(Duration::from_millis(100)).await;
let mut inner = Inner::default();
let mut qr = inner.init_test_queue(&core);
@ -785,7 +785,7 @@ async fn accept_milter(
}
}
pub fn spawn_mock_jmilter_server() -> watch::Sender<bool> {
pub fn spawn_mock_filter_hook_server() -> watch::Sender<bool> {
let (tx, rx) = watch::channel(true);
let tests = Arc::new(
serde_json::from_str::<Vec<HeaderTest>>(
@ -826,7 +826,7 @@ pub fn spawn_mock_jmilter_server() -> watch::Sender<bool> {
let request = serde_json::from_slice::<Request>(&fetch_body(&mut req, 1024 * 1024).await.unwrap())
.unwrap();
let response = handle_jmilter(request, tests);
let response = handle_filter_hook(request, tests);
Ok::<_, hyper::Error>(
Resource {
@ -856,7 +856,7 @@ pub fn spawn_mock_jmilter_server() -> watch::Sender<bool> {
tx
}
fn handle_jmilter(request: Request, tests: Arc<Vec<HeaderTest>>) -> jmilter::Response {
fn handle_filter_hook(request: Request, tests: Arc<Vec<HeaderTest>>) -> hooks::Response {
match request
.envelope
.unwrap()
@ -866,23 +866,23 @@ fn handle_jmilter(request: Request, tests: Arc<Vec<HeaderTest>>) -> jmilter::Res
.unwrap()
.0
{
"accept" => jmilter::Response {
action: jmilter::Action::Accept,
"accept" => hooks::Response {
action: hooks::Action::Accept,
response: None,
modifications: vec![],
},
"reject" => jmilter::Response {
action: jmilter::Action::Reject,
"reject" => hooks::Response {
action: hooks::Action::Reject,
response: None,
modifications: vec![],
},
"discard" => jmilter::Response {
action: jmilter::Action::Discard,
"discard" => hooks::Response {
action: hooks::Action::Discard,
response: None,
modifications: vec![],
},
"temp_fail" => jmilter::Response {
action: jmilter::Action::Reject,
"temp_fail" => hooks::Response {
action: hooks::Action::Reject,
response: SmtpResponse {
status: 451.into(),
enhanced_status: "4.3.5".to_string().into(),
@ -892,8 +892,8 @@ fn handle_jmilter(request: Request, tests: Arc<Vec<HeaderTest>>) -> jmilter::Res
.into(),
modifications: vec![],
},
"shutdown" => jmilter::Response {
action: jmilter::Action::Reject,
"shutdown" => hooks::Response {
action: hooks::Action::Reject,
response: SmtpResponse {
status: 421.into(),
enhanced_status: "4.3.0".to_string().into(),
@ -903,8 +903,8 @@ fn handle_jmilter(request: Request, tests: Arc<Vec<HeaderTest>>) -> jmilter::Res
.into(),
modifications: vec![],
},
"conn_fail" => jmilter::Response {
action: jmilter::Action::Accept,
"conn_fail" => hooks::Response {
action: hooks::Action::Accept,
response: SmtpResponse {
disconnect: true,
..Default::default()
@ -912,8 +912,8 @@ fn handle_jmilter(request: Request, tests: Arc<Vec<HeaderTest>>) -> jmilter::Res
.into(),
modifications: vec![],
},
"reply_code" => jmilter::Response {
action: jmilter::Action::Reject,
"reply_code" => hooks::Response {
action: hooks::Action::Reject,
response: SmtpResponse {
status: 321.into(),
enhanced_status: "3.1.1".to_string().into(),
@ -923,27 +923,25 @@ fn handle_jmilter(request: Request, tests: Arc<Vec<HeaderTest>>) -> jmilter::Res
.into(),
modifications: vec![],
},
test_num => jmilter::Response {
action: jmilter::Action::Accept,
test_num => hooks::Response {
action: hooks::Action::Accept,
response: None,
modifications: tests[test_num.parse::<usize>().unwrap()]
.modifications
.iter()
.map(|m| match m {
Modification::ChangeFrom { sender, args } => {
jmilter::Modification::ChangeFrom {
value: sender.clone(),
parameters: args
.split_whitespace()
.map(|arg| {
let (key, value) = arg.split_once('=').unwrap();
(key.to_string(), Some(value.to_string()))
})
.collect(),
}
}
Modification::ChangeFrom { sender, args } => hooks::Modification::ChangeFrom {
value: sender.clone(),
parameters: args
.split_whitespace()
.map(|arg| {
let (key, value) = arg.split_once('=').unwrap();
(key.to_string(), Some(value.to_string()))
})
.collect(),
},
Modification::AddRcpt { recipient, args } => {
jmilter::Modification::AddRecipient {
hooks::Modification::AddRecipient {
value: recipient.clone(),
parameters: args
.split_whitespace()
@ -955,32 +953,32 @@ fn handle_jmilter(request: Request, tests: Arc<Vec<HeaderTest>>) -> jmilter::Res
}
}
Modification::DeleteRcpt { recipient } => {
jmilter::Modification::DeleteRecipient {
hooks::Modification::DeleteRecipient {
value: recipient.clone(),
}
}
Modification::ReplaceBody { value } => jmilter::Modification::ReplaceContents {
Modification::ReplaceBody { value } => hooks::Modification::ReplaceContents {
value: String::from_utf8(value.clone()).unwrap(),
},
Modification::AddHeader { name, value } => jmilter::Modification::AddHeader {
Modification::AddHeader { name, value } => hooks::Modification::AddHeader {
name: name.clone(),
value: value.clone(),
},
Modification::InsertHeader { index, name, value } => {
jmilter::Modification::InsertHeader {
hooks::Modification::InsertHeader {
index: *index,
name: name.clone(),
value: value.clone(),
}
}
Modification::ChangeHeader { index, name, value } => {
jmilter::Modification::ChangeHeader {
hooks::Modification::ChangeHeader {
index: *index,
name: name.clone(),
value: value.clone(),
}
}
Modification::Quarantine { reason } => jmilter::Modification::AddHeader {
Modification::Quarantine { reason } => hooks::Modification::AddHeader {
name: "X-Quarantine".to_string(),
value: reason.to_string(),
},