JSON mail filtering and manipulation protocol implementation

This commit is contained in:
mdecimus 2024-06-18 19:18:54 +02:00
parent 41115e0b34
commit 5ff6bc895c
12 changed files with 1011 additions and 92 deletions

View file

@ -1,9 +1,12 @@
use std::{
net::{SocketAddr, ToSocketAddrs},
str::FromStr,
time::Duration,
};
use ahash::AHashSet;
use base64::{engine::general_purpose::STANDARD, Engine};
use hyper::{header::{HeaderName, HeaderValue, AUTHORIZATION, CONTENT_TYPE}, HeaderMap};
use smtp_proto::*;
use utils::config::{utils::ParseValue, Config};
@ -171,6 +174,7 @@ pub struct JMilter {
pub enable: IfBlock,
pub url: String,
pub timeout: Duration,
pub headers: HeaderMap,
pub tls_allow_invalid_certs: bool,
pub tempfail_on_error: bool,
pub run_on_stage: AHashSet<Stage>,
@ -575,13 +579,52 @@ fn parse_milter(config: &mut Config, id: &str, token_map: &TokenMap) -> Option<M
}
fn parse_jmilter(config: &mut Config, id: &str, token_map: &TokenMap) -> Option<JMilter> {
let mut headers = HeaderMap::new();
for (header, value) in config
.values(("session.jmilter", id, "headers"))
.map(|(_, v)| {
if let Some((k, v)) = v.split_once(':') {
Ok((
HeaderName::from_str(k.trim()).map_err(|err| {
format!(
"Invalid header found in property \"session.jmilter.{id}.headers\": {err}",
)
})?,
HeaderValue::from_str(v.trim()).map_err(|err| {
format!(
"Invalid header found in property \"session.jmilter.{id}.headers\": {err}",
)
})?,
))
} else {
Err(format!(
"Invalid header found in property \"session.jmilter.{id}.headers\": {v}",
))
}
})
.collect::<Result<Vec<(HeaderName, HeaderValue)>, String>>()
.map_err(|e| config.new_parse_error(("session.jmilter", id, "headers"), e))
.unwrap_or_default()
{
headers.insert(header, value);
}
headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
if let (Some(name), Some(secret)) = (config.value(("session.jmilter", id, "user")), config.value(("session.jmilter", id, "secret"))) {
headers.insert(AUTHORIZATION, format!("Basic {}", STANDARD.encode(format!("{}:{}", name, secret))).parse().unwrap());
}
Some(JMilter {
enable: IfBlock::try_parse(config, ("session.jmilter", id, "enable"), token_map)
.unwrap_or_else(|| {
IfBlock::new::<()>(format!("session.jmilter.{id}.enable"), [], "false")
}),
url: config
.value_require(("session.jmilter", id, "hostname"))?
.value_require(("session.jmilter", id, "url"))?
.to_string(),
timeout: config
.property_or_default(("session.jmilter", id, "timeout"), "30s")
@ -593,6 +636,7 @@ fn parse_jmilter(config: &mut Config, id: &str, token_map: &TokenMap) -> Option<
.property_or_default(("session.jmilter", id, "options.tempfail-on-error"), "true")
.unwrap_or(true),
run_on_stage: parse_stages(config, "session.jmilter", id),
headers,
})
}

View file

@ -48,6 +48,7 @@ use utils::config::Rate;
use crate::{
core::{Session, SessionAddress, State},
inbound::milter::Modification,
queue::{self, Message, QueueEnvelope, Schedule},
scripts::ScriptResult,
};
@ -400,9 +401,10 @@ impl<T: SessionStream> Session<T> {
}
// Run Milter filters
let mut edited_message = match self.run_milters(Stage::Data, (&auth_message).into()).await {
Ok(modifications) => {
if !modifications.is_empty() {
let mut modifications = Vec::new();
match self.run_milters(Stage::Data, (&auth_message).into()).await {
Ok(modifications_) => {
if !modifications_.is_empty() {
tracing::debug!(
parent: &self.span,
context = "milter",
@ -416,14 +418,35 @@ impl<T: SessionStream> Session<T> {
s
}),
"Milter filter(s) accepted message.");
self.data
.apply_milter_modifications(modifications, &auth_message)
} else {
None
modifications = modifications_;
}
}
Err(response) => return response,
Err(response) => return response.into_bytes(),
};
// Run JMilter filters
match self.run_jmilters(Stage::Data, (&auth_message).into()).await {
Ok(modifications_) => {
if !modifications_.is_empty() {
tracing::debug!(
parent: &self.span,
context = "jmilter",
event = "accept",
"JMilter filter(s) accepted message.");
modifications.retain(|m| !matches!(m, Modification::ReplaceBody { .. }));
modifications.extend(modifications_);
}
}
Err(response) => return response.into_bytes(),
};
// Apply modifications
let mut edited_message = if !modifications.is_empty() {
self.data
.apply_milter_modifications(modifications, &auth_message)
} else {
None
};
// Pipe message

View file

@ -111,12 +111,26 @@ impl<T: SessionStream> Session<T> {
context = "milter",
event = "reject",
domain = &self.data.helo_domain,
reason = std::str::from_utf8(message.as_ref()).unwrap_or_default());
reason = message.message.as_ref());
self.data.mail_from = None;
self.data.helo_domain = prev_helo_domain;
self.data.spf_ehlo = None;
return self.write(message.as_ref()).await;
return self.write(message.message.as_bytes()).await;
}
// JMilter filtering
if let Err(message) = self.run_jmilters(Stage::Ehlo, None).await {
tracing::info!(parent: &self.span,
context = "jmilter",
event = "reject",
domain = &self.data.helo_domain,
reason = message.message.as_ref());
self.data.mail_from = None;
self.data.helo_domain = prev_helo_domain;
self.data.spf_ehlo = None;
return self.write(message.message.as_bytes()).await;
}
tracing::debug!(parent: &self.span,

View file

@ -0,0 +1,59 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of Stalwart Mail Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use common::config::smtp::session::JMilter;
use super::{Request, Response};
pub(super) async fn send_jmilter_request(
jmilter: &JMilter,
request: Request,
) -> Result<Response, String> {
let response = reqwest::Client::builder()
.timeout(jmilter.timeout)
.danger_accept_invalid_certs(jmilter.tls_allow_invalid_certs)
.build()
.map_err(|err| format!("Failed to create HTTP client: {}", err))?
.post(&jmilter.url)
.headers(jmilter.headers.clone())
.body(serde_json::to_string(&request).unwrap())
.send()
.await
.map_err(|err| format!("jMilter request failed: {err}"))?;
if response.status().is_success() {
serde_json::from_slice(
response
.bytes()
.await
.map_err(|err| format!("Failed to parse jMilter response: {}", err))?
.as_ref(),
)
.map_err(|err| format!("Failed to parse jMilter response: {}", err))
} else {
Err(format!(
"jMilter request failed with code {}: {}",
response.status().as_u16(),
response.status().canonical_reason().unwrap_or("Unknown")
))
}
}

View file

@ -0,0 +1,266 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of Stalwart Mail Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use ahash::AHashMap;
use common::{
config::smtp::session::{JMilter, Stage},
listener::SessionStream,
};
use mail_auth::AuthenticatedMessage;
use crate::{
core::Session,
inbound::{
jmilter::{
Address, Client, Context, Envelope, Message, Protocol, Request, Sasl, Server, Tls,
},
milter::Modification,
FilterResponse,
},
DAEMON_NAME,
};
use super::{client::send_jmilter_request, Action, Response};
impl<T: SessionStream> Session<T> {
pub async fn run_jmilters(
&self,
stage: Stage,
message: Option<&AuthenticatedMessage<'_>>,
) -> Result<Vec<Modification>, FilterResponse> {
let jmilters = &self.core.core.smtp.session.jmilters;
if jmilters.is_empty() {
return Ok(Vec::new());
}
let mut modifications = Vec::new();
for jmilter in jmilters {
if !jmilter.run_on_stage.contains(&stage)
|| !self
.core
.core
.eval_if(&jmilter.enable, self)
.await
.unwrap_or(false)
{
continue;
}
match self.run_jmilter(stage, jmilter, message).await {
Ok(response) => {
let mut new_modifications = Vec::with_capacity(response.modifications.len());
for modification in response.modifications {
new_modifications.push(match modification {
super::Modification::ChangeFrom { value, parameters } => {
Modification::ChangeFrom {
sender: value,
args: flatten_parameters(parameters),
}
}
super::Modification::AddRecipient { value, parameters } => {
Modification::AddRcpt {
recipient: value,
args: flatten_parameters(parameters),
}
}
super::Modification::DeleteRecipient { value } => {
Modification::DeleteRcpt { recipient: value }
}
super::Modification::ReplaceContents { value } => {
Modification::ReplaceBody {
value: value.into_bytes(),
}
}
super::Modification::AddHeader { name, value } => {
Modification::AddHeader { name, value }
}
super::Modification::InsertHeader { index, name, value } => {
Modification::InsertHeader { index, name, value }
}
super::Modification::ChangeHeader { index, name, value } => {
Modification::ChangeHeader { index, name, value }
}
super::Modification::DeleteHeader { index, name } => {
Modification::ChangeHeader {
index,
name,
value: String::new(),
}
}
});
}
if !modifications.is_empty() {
// The message body can only be replaced once, so we need to remove
// any previous replacements.
if new_modifications
.iter()
.any(|m| matches!(m, Modification::ReplaceBody { .. }))
{
modifications
.retain(|m| !matches!(m, Modification::ReplaceBody { .. }));
}
modifications.extend(new_modifications);
} else {
modifications = new_modifications;
}
let mut message = match response.action {
Action::Accept => continue,
Action::Discard => FilterResponse::accept(),
Action::Reject => FilterResponse::reject(),
Action::Quarantine => {
modifications.push(Modification::AddHeader {
name: "X-Quarantine".to_string(),
value: "true".to_string(),
});
FilterResponse::accept()
}
};
if let Some(response) = response.response {
if let (Some(status), Some(text)) = (response.status, response.message) {
if let Some(enhanced) = response.enhanced_status {
message.message = format!("{status} {enhanced} {text}\r\n").into();
} else {
message.message = format!("{status} {text}\r\n").into();
}
}
message.disconnect = response.disconnect;
}
return Err(message);
}
Err(err) => {
tracing::warn!(
parent: &self.span,
jmilter.url = &jmilter.url,
context = "jmilter",
event = "error",
reason = ?err,
"JMilter filter failed");
if jmilter.tempfail_on_error {
return Err(FilterResponse::server_failure());
}
}
}
}
Ok(modifications)
}
pub async fn run_jmilter(
&self,
stage: Stage,
jmilter: &JMilter,
message: Option<&AuthenticatedMessage<'_>>,
) -> Result<Response, String> {
// Build request
let (tls_version, tls_cipher) = self.stream.tls_version_and_cipher();
let request = Request {
context: Context {
stage: stage.into(),
client: Client {
ip: self.data.remote_ip.to_string(),
port: self.data.remote_port,
ptr: self
.data
.iprev
.as_ref()
.and_then(|ip_rev| ip_rev.ptr.as_ref())
.and_then(|ptrs| ptrs.first())
.cloned(),
helo: (!self.data.helo_domain.is_empty())
.then(|| self.data.helo_domain.clone()),
active_connections: 1,
},
sasl: (!self.data.authenticated_as.is_empty()).then(|| Sasl {
login: self.data.authenticated_as.clone(),
method: None,
}),
tls: (!tls_version.is_empty()).then(|| Tls {
version: tls_version.to_string(),
cipher: tls_cipher.to_string(),
bits: None,
issuer: None,
subject: None,
}),
server: Server {
name: DAEMON_NAME.to_string().into(),
port: self.data.local_port,
ip: self.data.local_ip.to_string().into(),
},
queue: None,
protocol: Protocol { version: 1 },
},
envelope: self.data.mail_from.as_ref().map(|from| Envelope {
from: Address {
address: from.address_lcase.clone(),
parameters: None,
},
to: self
.data
.rcpt_to
.iter()
.map(|to| Address {
address: to.address_lcase.clone(),
parameters: None,
})
.collect(),
}),
message: message.map(|message| Message {
headers: message
.raw_parsed_headers()
.iter()
.map(|(k, v)| {
(
String::from_utf8_lossy(k).into_owned(),
String::from_utf8_lossy(v).into_owned(),
)
})
.collect(),
server_headers: vec![],
contents: String::from_utf8_lossy(message.raw_body()).into_owned(),
size: message.raw_message().len(),
}),
};
send_jmilter_request(jmilter, request).await
}
}
fn flatten_parameters(parameters: AHashMap<String, Option<String>>) -> String {
let mut arguments = String::new();
for (key, value) in parameters {
if !arguments.is_empty() {
arguments.push(' ');
}
arguments.push_str(key.as_str());
if let Some(value) = value {
arguments.push('=');
arguments.push_str(value.as_str());
}
}
arguments
}

View file

@ -1,75 +1,102 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of Stalwart Mail Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
pub mod client;
pub mod message;
use ahash::AHashMap;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
pub struct Request {
context: Context,
pub context: Context,
#[serde(skip_serializing_if = "Option::is_none")]
envelope: Option<Envelope>,
pub envelope: Option<Envelope>,
#[serde(skip_serializing_if = "Option::is_none")]
message: Option<Message>,
pub message: Option<Message>,
}
#[derive(Serialize, Deserialize)]
pub struct Context {
stage: Stage,
client: Client,
pub stage: Stage,
pub client: Client,
#[serde(skip_serializing_if = "Option::is_none")]
sasl: Option<Sasl>,
pub sasl: Option<Sasl>,
#[serde(skip_serializing_if = "Option::is_none")]
tls: Option<Tls>,
server: Server,
pub tls: Option<Tls>,
pub server: Server,
#[serde(skip_serializing_if = "Option::is_none")]
queue: Option<Queue>,
protocol: Protocol,
pub queue: Option<Queue>,
pub protocol: Protocol,
}
#[derive(Serialize, Deserialize)]
pub struct Sasl {
login: String,
method: String,
pub login: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub method: Option<String>,
}
#[derive(Serialize, Deserialize)]
pub struct Client {
ip: String,
port: u16,
ptr: Option<String>,
helo: Option<String>,
pub ip: String,
pub port: u16,
pub ptr: Option<String>,
pub helo: Option<String>,
#[serde(rename = "activeConnections")]
active_connections: u32,
pub active_connections: u32,
}
#[derive(Serialize, Deserialize)]
pub struct Tls {
version: String,
cipher: String,
pub version: String,
pub cipher: String,
#[serde(rename = "cipherBits")]
#[serde(skip_serializing_if = "Option::is_none")]
bits: Option<u16>,
pub bits: Option<u16>,
#[serde(rename = "certIssuer")]
#[serde(skip_serializing_if = "Option::is_none")]
issuer: Option<String>,
pub issuer: Option<String>,
#[serde(rename = "certSubject")]
#[serde(skip_serializing_if = "Option::is_none")]
subject: Option<String>,
pub subject: Option<String>,
}
#[derive(Serialize, Deserialize)]
pub struct Server {
name: Option<String>,
port: u16,
ip: Option<String>,
pub name: Option<String>,
pub port: u16,
pub ip: Option<String>,
}
#[derive(Serialize, Deserialize)]
pub struct Queue {
id: String,
pub id: String,
}
#[derive(Serialize, Deserialize)]
pub struct Protocol {
version: String,
pub version: u32,
}
#[derive(Serialize, Deserialize)]
@ -90,31 +117,35 @@ pub enum Stage {
#[derive(Serialize, Deserialize)]
pub struct Address {
address: String,
pub address: String,
#[serde(skip_serializing_if = "Option::is_none")]
parameters: Option<AHashMap<String, String>>,
pub parameters: Option<AHashMap<String, String>>,
}
#[derive(Serialize, Deserialize)]
pub struct Envelope {
from: Address,
to: Vec<Address>,
pub from: Address,
pub to: Vec<Address>,
}
#[derive(Serialize, Deserialize)]
pub struct Message {
headers: Vec<(String, String)>,
pub headers: Vec<(String, String)>,
#[serde(skip_serializing_if = "Vec::is_empty")]
#[serde(rename = "serverHeaders")]
server_headers: Vec<(String, String)>,
body: String,
size: usize,
#[serde(default)]
pub server_headers: Vec<(String, String)>,
pub contents: String,
pub size: usize,
}
#[derive(Serialize, Deserialize)]
pub struct Response {
action: Action,
modifications: Vec<Modification>,
pub action: Action,
#[serde(default)]
pub response: Option<SmtpResponse>,
#[serde(default)]
pub modifications: Vec<Modification>,
}
#[derive(Serialize, Deserialize)]
@ -129,6 +160,18 @@ pub enum Action {
Quarantine,
}
#[derive(Serialize, Deserialize, Default)]
pub struct SmtpResponse {
#[serde(default)]
pub status: Option<u16>,
#[serde(default)]
pub enhanced_status: Option<String>,
#[serde(default)]
pub message: Option<String>,
#[serde(default)]
pub disconnect: bool,
}
#[derive(Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum Modification {
@ -136,36 +179,45 @@ pub enum Modification {
ChangeFrom {
value: String,
#[serde(default)]
parameters: AHashMap<String, String>,
parameters: AHashMap<String, Option<String>>,
},
#[serde(rename = "addRecipient")]
AddRecipient {
value: String,
#[serde(default)]
parameters: AHashMap<String, String>,
parameters: AHashMap<String, Option<String>>,
},
#[serde(rename = "deleteRecipient")]
DeleteRecipient { value: String },
#[serde(rename = "replaceBody")]
ReplaceBody { value: String },
#[serde(rename = "replaceContents")]
ReplaceContents { value: String },
#[serde(rename = "addHeader")]
AddHeader { name: String, value: String },
#[serde(rename = "insertHeader")]
InsertHeader {
index: i32,
index: u32,
name: String,
value: String,
},
#[serde(rename = "changeHeader")]
ChangeHeader {
index: i32,
index: u32,
name: String,
value: String,
},
#[serde(rename = "deleteHeader")]
DeleteHeader {
#[serde(default)]
index: Option<i32>,
name: String,
},
DeleteHeader { index: u32, name: String },
}
impl From<common::config::smtp::session::Stage> for Stage {
fn from(value: common::config::smtp::session::Stage) -> Self {
match value {
common::config::smtp::session::Stage::Connect => Stage::Connect,
common::config::smtp::session::Stage::Ehlo => Stage::Ehlo,
common::config::smtp::session::Stage::Auth => Stage::Auth,
common::config::smtp::session::Stage::Mail => Stage::Mail,
common::config::smtp::session::Stage::Rcpt => Stage::Rcpt,
common::config::smtp::session::Stage::Data => Stage::Data,
}
}
}

View file

@ -173,10 +173,22 @@ impl<T: SessionStream> Session<T> {
context = "milter",
event = "reject",
address = &self.data.mail_from.as_ref().unwrap().address,
reason = std::str::from_utf8(message.as_ref()).unwrap_or_default());
reason = message.message.as_ref());
self.data.mail_from = None;
return self.write(message.as_ref()).await;
return self.write(message.message.as_bytes()).await;
}
// JMilter filtering
if let Err(message) = self.run_jmilters(Stage::Mail, None).await {
tracing::info!(parent: &self.span,
context = "jmilter",
event = "reject",
address = &self.data.mail_from.as_ref().unwrap().address,
reason = message.message.as_ref());
self.data.mail_from = None;
return self.write(message.message.as_bytes()).await;
}
// Address rewriting

View file

@ -28,12 +28,12 @@ use common::{
listener::SessionStream,
};
use mail_auth::AuthenticatedMessage;
use smtp_proto::request::parser::Rfc5321Parser;
use smtp_proto::{request::parser::Rfc5321Parser, IntoString};
use tokio::io::{AsyncRead, AsyncWrite};
use crate::{
core::{Session, SessionAddress, SessionData},
inbound::milter::MilterClient,
inbound::{milter::MilterClient, FilterResponse},
queue::DomainPart,
DAEMON_NAME,
};
@ -50,7 +50,7 @@ impl<T: SessionStream> Session<T> {
&self,
stage: Stage,
message: Option<&AuthenticatedMessage<'_>>,
) -> Result<Vec<Modification>, Cow<'static, [u8]>> {
) -> Result<Vec<Modification>, FilterResponse> {
let milters = &self.core.core.smtp.session.milters;
if milters.is_empty() {
return Ok(Vec::new());
@ -74,7 +74,13 @@ impl<T: SessionStream> Session<T> {
if !modifications.is_empty() {
// The message body can only be replaced once, so we need to remove
// any previous replacements.
modifications.retain(|m| !matches!(m, Modification::ReplaceBody { .. }));
if new_modifications
.iter()
.any(|m| matches!(m, Modification::ReplaceBody { .. }))
{
modifications
.retain(|m| !matches!(m, Modification::ReplaceBody { .. }));
}
modifications.extend(new_modifications);
} else {
modifications = new_modifications;
@ -91,13 +97,9 @@ impl<T: SessionStream> Session<T> {
"Milter rejected message.");
return Err(match action {
Action::Discard => {
(b"250 2.0.0 Message queued for delivery.\r\n"[..]).into()
}
Action::Reject => (b"503 5.5.3 Message rejected.\r\n"[..]).into(),
Action::TempFail => {
(b"451 4.3.5 Unable to accept message at this time.\r\n"[..]).into()
}
Action::Discard => FilterResponse::accept(),
Action::Reject => FilterResponse::reject(),
Action::TempFail => FilterResponse::temp_fail(),
Action::ReplyCode { code, text } => {
let mut response = Vec::with_capacity(text.len() + 6);
response.extend_from_slice(code.as_slice());
@ -106,10 +108,13 @@ impl<T: SessionStream> Session<T> {
if !text.ends_with('\n') {
response.extend_from_slice(b"\r\n");
}
response.into()
FilterResponse {
message: response.into_string().into(),
disconnect: false,
}
}
Action::Shutdown => (b"421 4.3.0 Server shutting down.\r\n"[..]).into(),
Action::ConnectionFailure => (b""[..]).into(), // TODO: Not very elegant design, fix.
Action::Shutdown => FilterResponse::shutdown(),
Action::ConnectionFailure => FilterResponse::default().disconnect(),
Action::Accept | Action::Continue => unreachable!(),
});
}
@ -123,9 +128,7 @@ impl<T: SessionStream> Session<T> {
reason = ?err,
"Milter filter failed");
if milter.tempfail_on_error {
return Err(
(b"451 4.3.5 Unable to accept message at this time.\r\n"[..]).into(),
);
return Err(FilterResponse::server_failure());
}
}
}

View file

@ -21,6 +21,8 @@
* for more details.
*/
use std::borrow::Cow;
use common::config::smtp::auth::{ArcSealer, DkimSigner};
use mail_auth::{
arc::ArcSet, dkim::Signature, dmarc::Policy, ArcOutput, AuthenticatedMessage,
@ -38,6 +40,12 @@ pub mod session;
pub mod spawn;
pub mod vrfy;
#[derive(Debug, Default)]
pub struct FilterResponse {
pub message: Cow<'static, str>,
pub disconnect: bool,
}
pub trait ArcSeal {
fn seal<'x>(
&self,
@ -145,3 +153,54 @@ impl AuthResult for Policy {
}
}
}
impl FilterResponse {
pub fn accept() -> Self {
Self {
message: Cow::Borrowed("250 2.0.0 Message queued for delivery.\r\n"),
disconnect: false,
}
}
pub fn reject() -> Self {
Self {
message: Cow::Borrowed("503 5.5.3 Message rejected.\r\n"),
disconnect: false,
}
}
pub fn temp_fail() -> Self {
Self {
message: Cow::Borrowed("451 4.3.5 Unable to accept message at this time.\r\n"),
disconnect: false,
}
}
pub fn shutdown() -> Self {
Self {
message: Cow::Borrowed("421 4.3.0 Server shutting down.\r\n"),
disconnect: true,
}
}
pub fn server_failure() -> Self {
Self {
message: Cow::Borrowed("451 4.3.5 Unable to accept message at this time.\r\n"),
disconnect: false,
}
}
pub fn disconnect(self) -> Self {
Self {
disconnect: true,
..self
}
}
pub fn into_bytes(self) -> Cow<'static, [u8]> {
match self.message {
Cow::Borrowed(s) => Cow::Borrowed(s.as_bytes()),
Cow::Owned(s) => Cow::Owned(s.into_bytes()),
}
}
}

View file

@ -137,10 +137,22 @@ impl<T: SessionStream> Session<T> {
context = "milter",
event = "reject",
address = self.data.rcpt_to.last().unwrap().address,
reason = std::str::from_utf8(message.as_ref()).unwrap_or_default());
reason = message.message.as_ref());
self.data.rcpt_to.pop();
return self.write(message.as_ref()).await;
return self.write(message.message.as_bytes()).await;
}
// JMilter filtering
if let Err(message) = self.run_jmilters(Stage::Rcpt, None).await {
tracing::info!(parent: &self.span,
context = "jmilter",
event = "reject",
address = self.data.rcpt_to.last().unwrap().address,
reason = message.message.as_ref());
self.data.rcpt_to.pop();
return self.write(message.message.as_bytes()).await;
}
// Address rewriting

View file

@ -124,10 +124,20 @@ impl<T: SessionStream> Session<T> {
// Milter filtering
if let Err(message) = self.run_milters(Stage::Connect, None).await {
tracing::debug!(parent: &self.span,
context = "connext",
context = "connect",
event = "milter-reject",
reason = std::str::from_utf8(message.as_ref()).unwrap_or_default());
let _ = self.write(message.as_ref()).await;
reason = message.message.as_ref());
let _ = self.write(message.message.as_bytes()).await;
return false;
}
// JMilter filtering
if let Err(message) = self.run_jmilters(Stage::Connect, None).await {
tracing::debug!(parent: &self.span,
context = "connect",
event = "jmilter-reject",
reason = message.message.as_ref());
let _ = self.write(message.message.as_bytes()).await;
return false;
}

View file

@ -27,16 +27,23 @@ use ahash::AHashSet;
use common::{
config::smtp::session::{Milter, MilterVersion, Stage},
expr::if_block::IfBlock,
manager::webadmin::Resource,
Core,
};
use hyper::{body, server::conn::http1, service::service_fn};
use hyper_util::rt::TokioIo;
use jmap::api::http::{fetch_body, ToHttpResponse};
use mail_auth::AuthenticatedMessage;
use mail_parser::MessageParser;
use serde::Deserialize;
use smtp::{
core::{Inner, Session, SessionData},
inbound::milter::{
receiver::{FrameResult, Receiver},
Action, Command, Macros, MilterClient, Modification, Options, Response,
inbound::{
jmilter::{self, Request, SmtpResponse},
milter::{
receiver::{FrameResult, Receiver},
Action, Command, Macros, MilterClient, Modification, Options, Response,
},
},
};
use store::Stores;
@ -60,7 +67,7 @@ struct HeaderTest {
result: String,
}
const CONFIG: &str = r#"
const CONFIG_MILTER: &str = r#"
[storage]
data = "sqlite"
lookup = "sqlite"
@ -86,6 +93,26 @@ stages = ["data"]
"#;
const CONFIG_JMILTER: &str = r#"
[storage]
data = "sqlite"
lookup = "sqlite"
blob = "sqlite"
fts = "sqlite"
[store."sqlite"]
type = "sqlite"
path = "{TMP}/queue.db"
[session.rcpt]
relay = true
[[session.jmilter]]
url = "http://127.0.0.1:9333"
enable = true
stages = ["data"]
"#;
#[tokio::test]
async fn milter_session() {
// Enable logging
@ -99,7 +126,7 @@ async fn milter_session() {
// Configure tests
let tmp_dir = TempDir::new("smtp_milter_test", true);
let mut config = Config::new(tmp_dir.update_config(CONFIG)).unwrap();
let mut config = Config::new(tmp_dir.update_config(CONFIG_MILTER)).unwrap();
let stores = Stores::parse_all(&mut config).await;
let core = Core::parse(&mut config, stores, Default::default()).await;
let _rx = spawn_mock_milter_server();
@ -219,6 +246,139 @@ async fn milter_session() {
.assert_contains("123456");
}
#[tokio::test]
async fn jmilter_session() {
// Enable logging
/*let disable = "true";
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::TRACE)
.finish(),
)
.unwrap();*/
// Configure tests
let tmp_dir = TempDir::new("smtp_jmilter_test", true);
let mut config = Config::new(tmp_dir.update_config(CONFIG_JMILTER)).unwrap();
let stores = Stores::parse_all(&mut config).await;
let core = Core::parse(&mut config, stores, Default::default()).await;
let _rx = spawn_mock_jmilter_server();
tokio::time::sleep(Duration::from_millis(100)).await;
let mut inner = Inner::default();
let mut qr = inner.init_test_queue(&core);
// Build session
let mut session = Session::test(build_smtp(core, inner));
session.data.remote_ip_str = "10.0.0.1".to_string();
session.eval_session_params().await;
session.ehlo("mx.doe.org").await;
// Test reject
session
.send_message(
"reject@doe.org",
&["bill@foobar.org"],
"test:no_dkim",
"503 5.5.3",
)
.await;
qr.assert_no_events();
// Test discard
session
.send_message(
"discard@doe.org",
&["bill@foobar.org"],
"test:no_dkim",
"250 2.0.0",
)
.await;
qr.assert_no_events();
// Test temp fail
session
.send_message(
"temp_fail@doe.org",
&["bill@foobar.org"],
"test:no_dkim",
"451 4.3.5",
)
.await;
qr.assert_no_events();
// Test shutdown
session
.send_message(
"shutdown@doe.org",
&["bill@foobar.org"],
"test:no_dkim",
"421 4.3.0",
)
.await;
qr.assert_no_events();
// Test reply code
session
.send_message(
"reply_code@doe.org",
&["bill@foobar.org"],
"test:no_dkim",
"321",
)
.await;
qr.assert_no_events();
// Test accept with header addition
session
.send_message(
"0@doe.org",
&["bill@foobar.org"],
"test:no_dkim",
"250 2.0.0",
)
.await;
qr.expect_message()
.await
.read_lines(&qr)
.await
.assert_contains("X-Hello: World")
.assert_contains("Subject: Is dinner ready?")
.assert_contains("Are you hungry yet?");
// Test accept with header replacement
session
.send_message(
"3@doe.org",
&["bill@foobar.org"],
"test:no_dkim",
"250 2.0.0",
)
.await;
qr.expect_message()
.await
.read_lines(&qr)
.await
.assert_contains("Subject: [SPAM] Saying Hello")
.assert_count("References: ", 1)
.assert_contains("Are you hungry yet?");
// Test accept with body replacement
session
.send_message(
"2@doe.org",
&["bill@foobar.org"],
"test:no_dkim",
"250 2.0.0",
)
.await;
qr.expect_message()
.await
.read_lines(&qr)
.await
.assert_contains("X-Spam: Yes")
.assert_contains("123456");
}
#[test]
fn milter_address_modifications() {
let test_message = fs::read_to_string(
@ -521,7 +681,7 @@ async fn accept_milter(
let mut buf = vec![0u8; 1024];
let mut receiver = Receiver::with_max_frame_len(5000000);
let mut action = None;
let mut modidications = None;
let mut modifications = None;
'outer: loop {
let br = tokio::select! {
@ -585,7 +745,7 @@ async fn accept_milter(
text: "test".to_string(),
},
test_num => {
modidications = tests[test_num.parse::<usize>().unwrap()]
modifications = tests[test_num.parse::<usize>().unwrap()]
.modifications
.clone()
.into();
@ -597,7 +757,7 @@ async fn accept_milter(
}
Command::Quit => break 'outer,
Command::EndOfBody => {
if let Some(modifications) = modidications.take() {
if let Some(modifications) = modifications.take() {
for modification in modifications {
// Write modifications
stream
@ -624,3 +784,208 @@ async fn accept_milter(
}
}
}
pub fn spawn_mock_jmilter_server() -> watch::Sender<bool> {
let (tx, rx) = watch::channel(true);
let tests = Arc::new(
serde_json::from_str::<Vec<HeaderTest>>(
&fs::read_to_string(
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("resources")
.join("smtp")
.join("milter")
.join("message.json"),
)
.unwrap(),
)
.unwrap(),
);
tokio::spawn(async move {
let listener = TcpListener::bind("127.0.0.1:9333")
.await
.unwrap_or_else(|e| {
panic!("Failed to bind mock Milter server to 127.0.0.1:9333: {e}");
});
let mut rx_ = rx.clone();
//println!("Mock jMilter server listening on port 9333");
loop {
tokio::select! {
stream = listener.accept() => {
match stream {
Ok((stream, _)) => {
let _ = http1::Builder::new()
.keep_alive(false)
.serve_connection(
TokioIo::new(stream),
service_fn(|mut req: hyper::Request<body::Incoming>| {
let tests = tests.clone();
async move {
let request = serde_json::from_slice::<Request>(&fetch_body(&mut req, 1024 * 1024).await.unwrap())
.unwrap();
let response = handle_jmilter(request, tests);
Ok::<_, hyper::Error>(
Resource {
content_type: "application/json",
contents: serde_json::to_string(&response).unwrap().into_bytes(),
}
.into_http_response(),
)
}
}),
)
.await;
}
Err(err) => {
panic!("Something went wrong: {err}" );
}
}
},
_ = rx_.changed() => {
//println!("Mock jMilter server stopping");
break;
}
};
}
});
tx
}
fn handle_jmilter(request: Request, tests: Arc<Vec<HeaderTest>>) -> jmilter::Response {
match request
.envelope
.unwrap()
.from
.address
.split_once('@')
.unwrap()
.0
{
"accept" => jmilter::Response {
action: jmilter::Action::Accept,
response: None,
modifications: vec![],
},
"reject" => jmilter::Response {
action: jmilter::Action::Reject,
response: None,
modifications: vec![],
},
"discard" => jmilter::Response {
action: jmilter::Action::Discard,
response: None,
modifications: vec![],
},
"temp_fail" => jmilter::Response {
action: jmilter::Action::Reject,
response: SmtpResponse {
status: 451.into(),
enhanced_status: "4.3.5".to_string().into(),
message: "Unable to accept message at this time.".to_string().into(),
disconnect: false,
}
.into(),
modifications: vec![],
},
"shutdown" => jmilter::Response {
action: jmilter::Action::Reject,
response: SmtpResponse {
status: 421.into(),
enhanced_status: "4.3.0".to_string().into(),
message: "Server shutting down".to_string().into(),
disconnect: false,
}
.into(),
modifications: vec![],
},
"conn_fail" => jmilter::Response {
action: jmilter::Action::Accept,
response: SmtpResponse {
disconnect: true,
..Default::default()
}
.into(),
modifications: vec![],
},
"reply_code" => jmilter::Response {
action: jmilter::Action::Reject,
response: SmtpResponse {
status: 321.into(),
enhanced_status: "3.1.1".to_string().into(),
message: "Test".to_string().into(),
disconnect: false,
}
.into(),
modifications: vec![],
},
test_num => jmilter::Response {
action: jmilter::Action::Accept,
response: None,
modifications: tests[test_num.parse::<usize>().unwrap()]
.modifications
.iter()
.map(|m| match m {
Modification::ChangeFrom { sender, args } => {
jmilter::Modification::ChangeFrom {
value: sender.clone(),
parameters: args
.split_whitespace()
.map(|arg| {
let (key, value) = arg.split_once('=').unwrap();
(key.to_string(), Some(value.to_string()))
})
.collect(),
}
}
Modification::AddRcpt { recipient, args } => {
jmilter::Modification::AddRecipient {
value: recipient.clone(),
parameters: args
.split_whitespace()
.map(|arg| {
let (key, value) = arg.split_once('=').unwrap();
(key.to_string(), Some(value.to_string()))
})
.collect(),
}
}
Modification::DeleteRcpt { recipient } => {
jmilter::Modification::DeleteRecipient {
value: recipient.clone(),
}
}
Modification::ReplaceBody { value } => jmilter::Modification::ReplaceContents {
value: String::from_utf8(value.clone()).unwrap(),
},
Modification::AddHeader { name, value } => jmilter::Modification::AddHeader {
name: name.clone(),
value: value.clone(),
},
Modification::InsertHeader { index, name, value } => {
jmilter::Modification::InsertHeader {
index: *index,
name: name.clone(),
value: value.clone(),
}
}
Modification::ChangeHeader { index, name, value } => {
jmilter::Modification::ChangeHeader {
index: *index,
name: name.clone(),
value: value.clone(),
}
}
Modification::Quarantine { reason } => jmilter::Modification::AddHeader {
name: "X-Quarantine".to_string(),
value: reason.to_string(),
},
})
.collect(),
},
}
}