Add elapsed times to message filtering events

This commit is contained in:
mdecimus 2024-08-23 17:09:26 +02:00
parent dcc31e8b3e
commit e4cd866be4
11 changed files with 113 additions and 48 deletions

View file

@ -421,6 +421,9 @@ impl StoreTracer {
| AuthEvent::Banned | AuthEvent::Banned
| AuthEvent::Error | AuthEvent::Error
) )
| EventType::Sieve(_)
| EventType::Milter(_)
| EventType::MtaHook(_)
) )
}) })
} }

View file

@ -42,10 +42,14 @@ impl JMAP {
path: Vec<&str>, path: Vec<&str>,
body: Option<Vec<u8>>, body: Option<Vec<u8>>,
) -> trc::Result<HttpResponse> { ) -> trc::Result<HttpResponse> {
let script = match ( let (script, script_id) = match (
path.get(1) path.get(1).and_then(|name| {
.and_then(|name| self.core.sieve.scripts.get(*name)) self.core
.cloned(), .sieve
.scripts
.get(*name)
.map(|s| (s.clone(), name.to_string()))
}),
req.method(), req.method(),
) { ) {
(Some(script), &Method::POST) => script, (Some(script), &Method::POST) => script,
@ -94,7 +98,7 @@ impl JMAP {
} }
// Run script // Run script
let result = match self.smtp.run_script(script, params, 0).await { let result = match self.smtp.run_script(script_id, script, params, 0).await {
ScriptResult::Accept { modifications } => Response::Accept { modifications }, ScriptResult::Accept { modifications } => Response::Accept { modifications },
ScriptResult::Replace { ScriptResult::Replace {
message, message,

View file

@ -446,6 +446,7 @@ impl<T: SessionStream> Session<T> {
{ {
command.arg(argument); command.arg(argument);
} }
let time = Instant::now();
match command match command
.stdin(Stdio::piped()) .stdin(Stdio::piped())
.stdout(Stdio::piped()) .stdout(Stdio::piped())
@ -475,6 +476,7 @@ impl<T: SessionStream> Session<T> {
SpanId = self.data.session_id, SpanId = self.data.session_id,
Path = command_, Path = command_,
Result = output.status.to_string(), Result = output.status.to_string(),
Elapsed = time.elapsed(),
); );
} }
Ok(Err(err)) => { Ok(Err(err)) => {
@ -482,6 +484,7 @@ impl<T: SessionStream> Session<T> {
Smtp(SmtpEvent::PipeError), Smtp(SmtpEvent::PipeError),
SpanId = self.data.session_id, SpanId = self.data.session_id,
Reason = err.to_string(), Reason = err.to_string(),
Elapsed = time.elapsed(),
); );
} }
Err(_) => { Err(_) => {
@ -489,6 +492,7 @@ impl<T: SessionStream> Session<T> {
Smtp(SmtpEvent::PipeError), Smtp(SmtpEvent::PipeError),
SpanId = self.data.session_id, SpanId = self.data.session_id,
Reason = "Timeout", Reason = "Timeout",
Elapsed = time.elapsed(),
); );
} }
} }
@ -498,6 +502,7 @@ impl<T: SessionStream> Session<T> {
Smtp(SmtpEvent::PipeError), Smtp(SmtpEvent::PipeError),
SpanId = self.data.session_id, SpanId = self.data.session_id,
Reason = err.to_string(), Reason = err.to_string(),
Elapsed = time.elapsed(),
); );
} }
Err(_) => { Err(_) => {
@ -505,6 +510,7 @@ impl<T: SessionStream> Session<T> {
Smtp(SmtpEvent::PipeError), Smtp(SmtpEvent::PipeError),
SpanId = self.data.session_id, SpanId = self.data.session_id,
Reason = "Stdin timeout", Reason = "Stdin timeout",
Elapsed = time.elapsed(),
); );
} }
} }
@ -513,6 +519,7 @@ impl<T: SessionStream> Session<T> {
Smtp(SmtpEvent::PipeError), Smtp(SmtpEvent::PipeError),
SpanId = self.data.session_id, SpanId = self.data.session_id,
Reason = "Stdin not available", Reason = "Stdin not available",
Elapsed = time.elapsed(),
); );
} }
} }
@ -528,12 +535,17 @@ impl<T: SessionStream> Session<T> {
} }
// Sieve filtering // Sieve filtering
if let Some(script) = self if let Some((script, script_id)) = self
.core .core
.core .core
.eval_if::<String, _>(&dc.script, self, self.data.session_id) .eval_if::<String, _>(&dc.script, self, self.data.session_id)
.await .await
.and_then(|name| self.core.core.get_sieve_script(&name, self.data.session_id)) .and_then(|name| {
self.core
.core
.get_sieve_script(&name, self.data.session_id)
.map(|s| (s, name))
})
{ {
let params = self let params = self
.build_script_parameters("data") .build_script_parameters("data")
@ -584,7 +596,7 @@ impl<T: SessionStream> Session<T> {
.unwrap_or_default(), .unwrap_or_default(),
); );
let modifications = match self.run_script(script.clone(), params).await { let modifications = match self.run_script(script_id, script.clone(), params).await {
ScriptResult::Accept { modifications } => modifications, ScriptResult::Accept { modifications } => modifications,
ScriptResult::Replace { ScriptResult::Replace {
message, message,

View file

@ -75,7 +75,7 @@ impl<T: SessionStream> Session<T> {
} }
// Sieve filtering // Sieve filtering
if let Some(script) = self if let Some((script, script_id)) = self
.core .core
.core .core
.eval_if::<String, _>( .eval_if::<String, _>(
@ -84,10 +84,19 @@ impl<T: SessionStream> Session<T> {
self.data.session_id, self.data.session_id,
) )
.await .await
.and_then(|name| self.core.core.get_sieve_script(&name, self.data.session_id)) .and_then(|name| {
self.core
.core
.get_sieve_script(&name, self.data.session_id)
.map(|s| (s, name))
})
{ {
if let ScriptResult::Reject(message) = self if let ScriptResult::Reject(message) = self
.run_script(script.clone(), self.build_script_parameters("ehlo")) .run_script(
script_id,
script.clone(),
self.build_script_parameters("ehlo"),
)
.await .await
{ {
self.data.mail_from = None; self.data.mail_from = None;

View file

@ -4,6 +4,8 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/ */
use std::time::Instant;
use ahash::AHashMap; use ahash::AHashMap;
use common::{ use common::{
config::smtp::session::{MTAHook, Stage}, config::smtp::session::{MTAHook, Stage},
@ -50,6 +52,7 @@ impl<T: SessionStream> Session<T> {
continue; continue;
} }
let time = Instant::now();
match self.run_mta_hook(stage, mta_hook, message).await { match self.run_mta_hook(stage, mta_hook, message).await {
Ok(response) => { Ok(response) => {
trc::event!( trc::event!(
@ -61,11 +64,7 @@ impl<T: SessionStream> Session<T> {
}), }),
SpanId = self.data.session_id, SpanId = self.data.session_id,
Id = mta_hook.id.clone(), Id = mta_hook.id.clone(),
Contents = response Elapsed = time.elapsed(),
.modifications
.iter()
.map(|m| format!("{m:?}"))
.collect::<Vec<_>>()
); );
let mut new_modifications = Vec::with_capacity(response.modifications.len()); let mut new_modifications = Vec::with_capacity(response.modifications.len());
@ -157,6 +156,7 @@ impl<T: SessionStream> Session<T> {
SpanId = self.data.session_id, SpanId = self.data.session_id,
Id = mta_hook.id.clone(), Id = mta_hook.id.clone(),
Reason = err, Reason = err,
Elapsed = time.elapsed(),
); );
if mta_hook.tempfail_on_error { if mta_hook.tempfail_on_error {

View file

@ -121,7 +121,7 @@ impl<T: SessionStream> Session<T> {
.into(); .into();
// Sieve filtering // Sieve filtering
if let Some(script) = self if let Some((script, script_id)) = self
.core .core
.core .core
.eval_if::<String, _>( .eval_if::<String, _>(
@ -130,10 +130,19 @@ impl<T: SessionStream> Session<T> {
self.data.session_id, self.data.session_id,
) )
.await .await
.and_then(|name| self.core.core.get_sieve_script(&name, self.data.session_id)) .and_then(|name| {
self.core
.core
.get_sieve_script(&name, self.data.session_id)
.map(|s| (s, name))
})
{ {
match self match self
.run_script(script.clone(), self.build_script_parameters("mail")) .run_script(
script_id,
script.clone(),
self.build_script_parameters("mail"),
)
.await .await
{ {
ScriptResult::Accept { modifications } => { ScriptResult::Accept { modifications } => {

View file

@ -4,7 +4,7 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/ */
use std::borrow::Cow; use std::{borrow::Cow, time::Instant};
use common::{ use common::{
config::smtp::session::{Milter, Stage}, config::smtp::session::{Milter, Stage},
@ -53,16 +53,14 @@ impl<T: SessionStream> Session<T> {
continue; continue;
} }
let time = Instant::now();
match self.connect_and_run(milter, message).await { match self.connect_and_run(milter, message).await {
Ok(new_modifications) => { Ok(new_modifications) => {
trc::event!( trc::event!(
Milter(MilterEvent::ActionAccept), Milter(MilterEvent::ActionAccept),
SpanId = self.data.session_id, SpanId = self.data.session_id,
Id = milter.id.to_string(), Id = milter.id.to_string(),
Contents = new_modifications Elapsed = time.elapsed(),
.iter()
.map(|m| m.to_string())
.collect::<Vec<_>>(),
); );
if !modifications.is_empty() { if !modifications.is_empty() {
@ -95,6 +93,7 @@ impl<T: SessionStream> Session<T> {
}), }),
SpanId = self.data.session_id, SpanId = self.data.session_id,
Id = milter.id.to_string(), Id = milter.id.to_string(),
Elapsed = time.elapsed(),
); );
return Err(match action { return Err(match action {
@ -144,6 +143,7 @@ impl<T: SessionStream> Session<T> {
SpanId = self.data.session_id, SpanId = self.data.session_id,
Id = milter.id.to_string(), Id = milter.id.to_string(),
Details = details, Details = details,
Elapsed = time.elapsed(),
); );
if milter.tempfail_on_error { if milter.tempfail_on_error {

View file

@ -85,8 +85,12 @@ impl<T: SessionStream> Session<T> {
self.data.session_id, self.data.session_id,
) )
.await .await
.and_then(|name| self.core.core.get_sieve_script(&name, self.data.session_id)) .and_then(|name| {
.cloned(); self.core
.core
.get_sieve_script(&name, self.data.session_id)
.map(|s| (s.clone(), name))
});
if rcpt_script.is_some() if rcpt_script.is_some()
|| !self.core.core.smtp.session.rcpt.rewrite.is_empty() || !self.core.core.smtp.session.rcpt.rewrite.is_empty()
@ -100,9 +104,13 @@ impl<T: SessionStream> Session<T> {
.any(|m| m.run_on_stage.contains(&Stage::Rcpt)) .any(|m| m.run_on_stage.contains(&Stage::Rcpt))
{ {
// Sieve filtering // Sieve filtering
if let Some(script) = rcpt_script { if let Some((script, script_id)) = rcpt_script {
match self match self
.run_script(script.clone(), self.build_script_parameters("rcpt")) .run_script(
script_id,
script.clone(),
self.build_script_parameters("rcpt"),
)
.await .await
{ {
ScriptResult::Accept { modifications } => { ScriptResult::Accept { modifications } => {

View file

@ -84,15 +84,24 @@ impl<T: SessionStream> Session<T> {
let config = &self.core.core.smtp.session.connect; let config = &self.core.core.smtp.session.connect;
// Sieve filtering // Sieve filtering
if let Some(script) = self if let Some((script, script_id)) = self
.core .core
.core .core
.eval_if::<String, _>(&config.script, self, self.data.session_id) .eval_if::<String, _>(&config.script, self, self.data.session_id)
.await .await
.and_then(|name| self.core.core.get_sieve_script(&name, self.data.session_id)) .and_then(|name| {
self.core
.core
.get_sieve_script(&name, self.data.session_id)
.map(|s| (s, name))
})
{ {
if let ScriptResult::Reject(message) = self if let ScriptResult::Reject(message) = self
.run_script(script.clone(), self.build_script_parameters("connect")) .run_script(
script_id,
script.clone(),
self.build_script_parameters("connect"),
)
.await .await
{ {
let _ = self.write(message.as_bytes()).await; let _ = self.write(message.as_bytes()).await;

View file

@ -4,7 +4,7 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/ */
use std::{borrow::Cow, sync::Arc}; use std::{borrow::Cow, sync::Arc, time::Instant};
use common::scripts::plugins::PluginContext; use common::scripts::plugins::PluginContext;
use mail_auth::common::headers::HeaderWriter; use mail_auth::common::headers::HeaderWriter;
@ -29,11 +29,13 @@ use super::{ScriptModification, ScriptParameters, ScriptResult};
impl SMTP { impl SMTP {
pub async fn run_script( pub async fn run_script(
&self, &self,
script_id: String,
script: Arc<Sieve>, script: Arc<Sieve>,
params: ScriptParameters<'_>, params: ScriptParameters<'_>,
session_id: u64, session_id: u64,
) -> ScriptResult { ) -> ScriptResult {
// Create filter instance // Create filter instance
let time = Instant::now();
let mut instance = self let mut instance = self
.core .core
.sieve .sieve
@ -62,8 +64,9 @@ impl SMTP {
} else { } else {
trc::event!( trc::event!(
Sieve(SieveEvent::ScriptNotFound), Sieve(SieveEvent::ScriptNotFound),
Id = script_id.clone(),
SpanId = session_id, SpanId = session_id,
Id = name.as_str().to_string(), Details = name.as_str().to_string(),
); );
break; break;
} }
@ -95,8 +98,9 @@ impl SMTP {
} else { } else {
trc::event!( trc::event!(
Sieve(SieveEvent::ListNotFound), Sieve(SieveEvent::ListNotFound),
Id = script_id.clone(),
SpanId = session_id, SpanId = session_id,
Id = list, Details = list,
); );
} }
} }
@ -157,8 +161,9 @@ impl SMTP {
Recipient::List(list) => { Recipient::List(list) => {
trc::event!( trc::event!(
Sieve(SieveEvent::NotSupported), Sieve(SieveEvent::NotSupported),
Id = script_id.clone(),
SpanId = session_id, SpanId = session_id,
Id = list, Details = list,
Reason = "Sending to lists is not supported.", Reason = "Sending to lists is not supported.",
); );
} }
@ -300,6 +305,7 @@ impl SMTP {
trc::event!( trc::event!(
Sieve(SieveEvent::QuotaExceeded), Sieve(SieveEvent::QuotaExceeded),
SpanId = session_id, SpanId = session_id,
Id = script_id.clone(),
From = message.return_path_lcase, From = message.return_path_lcase,
To = message To = message
.recipients .recipients
@ -326,6 +332,7 @@ impl SMTP {
unsupported => { unsupported => {
trc::event!( trc::event!(
Sieve(SieveEvent::NotSupported), Sieve(SieveEvent::NotSupported),
Id = script_id.clone(),
SpanId = session_id, SpanId = session_id,
Reason = "Unsupported event", Reason = "Unsupported event",
Details = format!("{unsupported:?}"), Details = format!("{unsupported:?}"),
@ -336,6 +343,7 @@ impl SMTP {
Err(err) => { Err(err) => {
trc::event!( trc::event!(
Sieve(SieveEvent::RuntimeError), Sieve(SieveEvent::RuntimeError),
Id = script_id.clone(),
SpanId = session_id, SpanId = session_id,
Reason = err.to_string(), Reason = err.to_string(),
); );
@ -380,18 +388,18 @@ impl SMTP {
trc::event!( trc::event!(
Sieve(SieveEvent::ActionAccept), Sieve(SieveEvent::ActionAccept),
SpanId = session_id, SpanId = session_id,
Details = modifications Id = script_id,
.iter() Elapsed = time.elapsed(),
.map(|m| trc::Value::from(format!("{m:?}")))
.collect::<Vec<_>>(),
); );
ScriptResult::Accept { modifications } ScriptResult::Accept { modifications }
} else if let Some(mut reject_reason) = reject_reason { } else if let Some(mut reject_reason) = reject_reason {
trc::event!( trc::event!(
Sieve(SieveEvent::ActionReject), Sieve(SieveEvent::ActionReject),
Id = script_id,
SpanId = session_id, SpanId = session_id,
Details = reject_reason.clone(), Details = reject_reason.clone(),
Elapsed = time.elapsed(),
); );
if !reject_reason.ends_with('\n') { if !reject_reason.ends_with('\n') {
@ -412,10 +420,8 @@ impl SMTP {
trc::event!( trc::event!(
Sieve(SieveEvent::ActionAccept), Sieve(SieveEvent::ActionAccept),
SpanId = session_id, SpanId = session_id,
Details = modifications Id = script_id,
.iter() Elapsed = time.elapsed(),
.map(|m| trc::Value::from(format!("{m:?}")))
.collect::<Vec<_>>(),
); );
ScriptResult::Replace { ScriptResult::Replace {
@ -426,16 +432,19 @@ impl SMTP {
trc::event!( trc::event!(
Sieve(SieveEvent::ActionAcceptReplace), Sieve(SieveEvent::ActionAcceptReplace),
SpanId = session_id, SpanId = session_id,
Details = modifications Id = script_id,
.iter() Elapsed = time.elapsed(),
.map(|m| trc::Value::from(format!("{m:?}")))
.collect::<Vec<_>>(),
); );
ScriptResult::Accept { modifications } ScriptResult::Accept { modifications }
} }
} else { } else {
trc::event!(Sieve(SieveEvent::ActionDiscard), SpanId = session_id,); trc::event!(
Sieve(SieveEvent::ActionDiscard),
SpanId = session_id,
Id = script_id,
Elapsed = time.elapsed()
);
ScriptResult::Discard ScriptResult::Discard
} }

View file

@ -120,11 +120,13 @@ impl<T: SessionStream> Session<T> {
pub async fn run_script( pub async fn run_script(
&self, &self,
script_id: String,
script: Arc<Sieve>, script: Arc<Sieve>,
params: ScriptParameters<'_>, params: ScriptParameters<'_>,
) -> ScriptResult { ) -> ScriptResult {
self.core self.core
.run_script( .run_script(
script_id,
script, script,
params params
.with_envelope(&self.core.core, self, self.data.session_id) .with_envelope(&self.core.core, self, self.data.session_id)