mirror of
https://github.com/stalwartlabs/mail-server.git
synced 2025-09-16 17:04:43 +08:00
Purge task locking and spam rule fixes]
This commit is contained in:
parent
1726d68018
commit
2cfe467b20
17 changed files with 178 additions and 214 deletions
|
@ -8,7 +8,7 @@ use std::sync::Arc;
|
|||
|
||||
use ahash::AHashMap;
|
||||
use directory::Directory;
|
||||
use store::{write::purge::PurgeSchedule, BlobStore, FtsStore, InMemoryStore, Store};
|
||||
use store::{BlobStore, FtsStore, InMemoryStore, PurgeSchedule, Store};
|
||||
|
||||
use crate::manager::config::ConfigManager;
|
||||
|
||||
|
|
|
@ -260,7 +260,7 @@ impl EmailDeletion for Server {
|
|||
{
|
||||
Ok(true) => (),
|
||||
Ok(false) => {
|
||||
trc::event!(Purge(trc::PurgeEvent::PurgeActive), AccountId = account_id,);
|
||||
trc::event!(Purge(trc::PurgeEvent::InProgress), AccountId = account_id,);
|
||||
return;
|
||||
}
|
||||
Err(err) => {
|
||||
|
|
|
@ -25,9 +25,9 @@ use common::telemetry::{
|
|||
};
|
||||
|
||||
use smtp::reporting::SmtpReporting;
|
||||
use store::write::{now, purge::PurgeStore};
|
||||
use store::{write::now, PurgeStore};
|
||||
use tokio::sync::mpsc;
|
||||
use trc::{Collector, MetricType};
|
||||
use trc::{Collector, MetricType, PurgeEvent};
|
||||
use utils::map::ttl_dashmap::TtlMap;
|
||||
|
||||
use crate::{email::delete::EmailDeletion, JmapMethods, LONG_SLUMBER};
|
||||
|
@ -233,7 +233,7 @@ pub fn spawn_housekeeper(inner: Arc<Inner>, mut rx: mpsc::Receiver<HousekeeperEv
|
|||
HousekeeperEvent::Purge(purge) => {
|
||||
let server = inner.build_server();
|
||||
tokio::spawn(async move {
|
||||
server.purge(purge).await;
|
||||
server.purge(purge, 0).await;
|
||||
});
|
||||
}
|
||||
HousekeeperEvent::Exit => {
|
||||
|
@ -251,6 +251,8 @@ pub fn spawn_housekeeper(inner: Arc<Inner>, mut rx: mpsc::Receiver<HousekeeperEv
|
|||
while let Some(event) = queue.pop() {
|
||||
match event.event {
|
||||
ActionClass::Acme(provider_id) => {
|
||||
trc::event!(Housekeeper(trc::HousekeeperEvent::Run), Type = "acme");
|
||||
|
||||
let server = server.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Some(provider) =
|
||||
|
@ -298,6 +300,11 @@ pub fn spawn_housekeeper(inner: Arc<Inner>, mut rx: mpsc::Receiver<HousekeeperEv
|
|||
});
|
||||
}
|
||||
ActionClass::Account => {
|
||||
trc::event!(
|
||||
Housekeeper(trc::HousekeeperEvent::Run),
|
||||
Type = "purge_account"
|
||||
);
|
||||
|
||||
let server = server.clone();
|
||||
queue.schedule(
|
||||
Instant::now()
|
||||
|
@ -305,11 +312,15 @@ pub fn spawn_housekeeper(inner: Arc<Inner>, mut rx: mpsc::Receiver<HousekeeperEv
|
|||
ActionClass::Account,
|
||||
);
|
||||
tokio::spawn(async move {
|
||||
trc::event!(Housekeeper(trc::HousekeeperEvent::PurgeAccounts));
|
||||
server.purge_accounts().await;
|
||||
server.purge(PurgeType::Account(None), 0).await;
|
||||
});
|
||||
}
|
||||
ActionClass::Session => {
|
||||
trc::event!(
|
||||
Housekeeper(trc::HousekeeperEvent::Run),
|
||||
Type = "purge_session"
|
||||
);
|
||||
|
||||
let server = server.clone();
|
||||
queue.schedule(
|
||||
Instant::now()
|
||||
|
@ -318,7 +329,7 @@ pub fn spawn_housekeeper(inner: Arc<Inner>, mut rx: mpsc::Receiver<HousekeeperEv
|
|||
);
|
||||
|
||||
tokio::spawn(async move {
|
||||
trc::event!(Housekeeper(trc::HousekeeperEvent::PurgeSessions));
|
||||
trc::event!(Purge(PurgeEvent::Started), Type = "session");
|
||||
server.inner.data.http_auth_cache.cleanup();
|
||||
server
|
||||
.inner
|
||||
|
@ -341,44 +352,45 @@ pub fn spawn_housekeeper(inner: Arc<Inner>, mut rx: mpsc::Receiver<HousekeeperEv
|
|||
if let Some(schedule) =
|
||||
server.core.storage.purge_schedules.get(idx).cloned()
|
||||
{
|
||||
trc::event!(
|
||||
Housekeeper(trc::HousekeeperEvent::Run),
|
||||
Type = "purge_store",
|
||||
Id = idx
|
||||
);
|
||||
|
||||
queue.schedule(
|
||||
Instant::now() + schedule.cron.time_to_next(),
|
||||
ActionClass::Store(idx),
|
||||
);
|
||||
tokio::spawn(async move {
|
||||
let (class, result) = match schedule.store {
|
||||
PurgeStore::Data(store) => {
|
||||
("data", store.purge_store().await)
|
||||
}
|
||||
PurgeStore::Blobs { store, blob_store } => {
|
||||
("blob", store.purge_blobs(blob_store).await)
|
||||
}
|
||||
PurgeStore::Lookup(in_memory_store) => (
|
||||
"lookup",
|
||||
in_memory_store.purge_in_memory_store().await,
|
||||
),
|
||||
};
|
||||
|
||||
match result {
|
||||
Ok(_) => {
|
||||
trc::event!(
|
||||
Housekeeper(trc::HousekeeperEvent::PurgeStore),
|
||||
Id = schedule.store_id
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
trc::error!(err
|
||||
.details(format!(
|
||||
"Failed to purge {class} store."
|
||||
))
|
||||
.id(schedule.store_id));
|
||||
}
|
||||
}
|
||||
let server = server.clone();
|
||||
tokio::spawn(async move {
|
||||
server
|
||||
.purge(
|
||||
match schedule.store {
|
||||
PurgeStore::Data(store) => {
|
||||
PurgeType::Data(store)
|
||||
}
|
||||
PurgeStore::Blobs { store, blob_store } => {
|
||||
PurgeType::Blobs { store, blob_store }
|
||||
}
|
||||
PurgeStore::Lookup(in_memory_store) => {
|
||||
PurgeType::Lookup(in_memory_store)
|
||||
}
|
||||
},
|
||||
idx as u32,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
}
|
||||
}
|
||||
ActionClass::OtelMetrics => {
|
||||
if let Some(otel) = &server.core.metrics.otel {
|
||||
trc::event!(
|
||||
Housekeeper(trc::HousekeeperEvent::Run),
|
||||
Type = "metrics_report"
|
||||
);
|
||||
|
||||
queue.schedule(
|
||||
Instant::now() + otel.interval,
|
||||
ActionClass::OtelMetrics,
|
||||
|
@ -398,6 +410,11 @@ pub fn spawn_housekeeper(inner: Arc<Inner>, mut rx: mpsc::Receiver<HousekeeperEv
|
|||
}
|
||||
}
|
||||
ActionClass::CalculateMetrics => {
|
||||
trc::event!(
|
||||
Housekeeper(trc::HousekeeperEvent::Run),
|
||||
Type = "metrics_calculate"
|
||||
);
|
||||
|
||||
// Calculate expensive metrics every 5 minutes
|
||||
queue.schedule(
|
||||
Instant::now() + Duration::from_secs(5 * 60),
|
||||
|
@ -495,6 +512,11 @@ pub fn spawn_housekeeper(inner: Arc<Inner>, mut rx: mpsc::Receiver<HousekeeperEv
|
|||
.as_ref()
|
||||
.and_then(|e| e.metrics_store.as_ref())
|
||||
{
|
||||
trc::event!(
|
||||
Housekeeper(trc::HousekeeperEvent::Run),
|
||||
Type = "metrics_internal"
|
||||
);
|
||||
|
||||
queue.schedule(
|
||||
Instant::now() + metrics_store.interval.time_to_next(),
|
||||
ActionClass::InternalMetrics,
|
||||
|
@ -516,6 +538,11 @@ pub fn spawn_housekeeper(inner: Arc<Inner>, mut rx: mpsc::Receiver<HousekeeperEv
|
|||
|
||||
#[cfg(feature = "enterprise")]
|
||||
ActionClass::AlertMetrics => {
|
||||
trc::event!(
|
||||
Housekeeper(trc::HousekeeperEvent::Run),
|
||||
Type = "metrics_alert"
|
||||
);
|
||||
|
||||
let server = server.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
|
@ -537,6 +564,11 @@ pub fn spawn_housekeeper(inner: Arc<Inner>, mut rx: mpsc::Receiver<HousekeeperEv
|
|||
|
||||
#[cfg(feature = "enterprise")]
|
||||
ActionClass::RenewLicense => {
|
||||
trc::event!(
|
||||
Housekeeper(trc::HousekeeperEvent::Run),
|
||||
Type = "renew_license"
|
||||
);
|
||||
|
||||
match server.reload().await {
|
||||
Ok(result) => {
|
||||
if let Some(new_core) = result.new_core {
|
||||
|
@ -583,38 +615,62 @@ pub fn spawn_housekeeper(inner: Arc<Inner>, mut rx: mpsc::Receiver<HousekeeperEv
|
|||
}
|
||||
|
||||
pub trait Purge: Sync + Send {
|
||||
fn purge(&self, purge: PurgeType) -> impl Future<Output = ()> + Send;
|
||||
fn purge(&self, purge: PurgeType, store_idx: u32) -> impl Future<Output = ()> + Send;
|
||||
}
|
||||
|
||||
impl Purge for Server {
|
||||
async fn purge(&self, purge: PurgeType) {
|
||||
async fn purge(&self, purge: PurgeType, store_idx: u32) {
|
||||
// Lock task
|
||||
let lock_name = match &purge {
|
||||
PurgeType::Data(_) => "data".into(),
|
||||
PurgeType::Blobs { .. } => "blob".into(),
|
||||
PurgeType::Lookup(_) => "lookup".into(),
|
||||
PurgeType::Account(_) => None,
|
||||
let (lock_type, lock_name) = match &purge {
|
||||
PurgeType::Data(_) => (
|
||||
"data",
|
||||
[0u8]
|
||||
.into_iter()
|
||||
.chain(store_idx.to_be_bytes().into_iter())
|
||||
.collect::<Vec<_>>()
|
||||
.into(),
|
||||
),
|
||||
PurgeType::Blobs { .. } => (
|
||||
"blob",
|
||||
[1u8]
|
||||
.into_iter()
|
||||
.chain(store_idx.to_be_bytes().into_iter())
|
||||
.collect::<Vec<_>>()
|
||||
.into(),
|
||||
),
|
||||
PurgeType::Lookup(_) => (
|
||||
"in-memory",
|
||||
[2u8]
|
||||
.into_iter()
|
||||
.chain(store_idx.to_be_bytes().into_iter())
|
||||
.collect::<Vec<_>>()
|
||||
.into(),
|
||||
),
|
||||
PurgeType::Account(_) => ("account", None),
|
||||
};
|
||||
if let Some(lock_name) = lock_name {
|
||||
if let Some(lock_name) = &lock_name {
|
||||
match self
|
||||
.core
|
||||
.storage
|
||||
.lookup
|
||||
.try_lock(KV_LOCK_HOUSEKEEPER, lock_name.as_bytes(), 3600)
|
||||
.try_lock(KV_LOCK_HOUSEKEEPER, lock_name, 3600)
|
||||
.await
|
||||
{
|
||||
Ok(true) => (),
|
||||
Ok(false) => {
|
||||
trc::event!(Purge(trc::PurgeEvent::PurgeActive), Details = lock_name);
|
||||
trc::event!(Purge(PurgeEvent::InProgress), Details = lock_type);
|
||||
return;
|
||||
}
|
||||
Err(err) => {
|
||||
trc::error!(err.details("Failed to lock task.").details(lock_name));
|
||||
trc::error!(err.details("Failed to lock task.").details(lock_type));
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
trc::event!(Purge(PurgeEvent::Started), Type = lock_type, Id = store_idx);
|
||||
let time = Instant::now();
|
||||
|
||||
match purge {
|
||||
PurgeType::Data(store) => {
|
||||
// SPDX-SnippetBegin
|
||||
|
@ -636,11 +692,6 @@ impl Purge for Server {
|
|||
.and_then(|m| m.retention);
|
||||
// SPDX-SnippetEnd
|
||||
|
||||
trc::event!(
|
||||
Housekeeper(trc::HousekeeperEvent::PurgeStore),
|
||||
Type = "data"
|
||||
);
|
||||
|
||||
if let Err(err) = store.purge_store().await {
|
||||
trc::error!(err.details("Failed to purge data store"));
|
||||
}
|
||||
|
@ -664,28 +715,16 @@ impl Purge for Server {
|
|||
// SPDX-SnippetEnd
|
||||
}
|
||||
PurgeType::Blobs { store, blob_store } => {
|
||||
trc::event!(
|
||||
Housekeeper(trc::HousekeeperEvent::PurgeStore),
|
||||
Type = "blob"
|
||||
);
|
||||
|
||||
if let Err(err) = store.purge_blobs(blob_store).await {
|
||||
trc::error!(err.details("Failed to purge blob store"));
|
||||
}
|
||||
}
|
||||
PurgeType::Lookup(store) => {
|
||||
trc::event!(
|
||||
Housekeeper(trc::HousekeeperEvent::PurgeStore),
|
||||
Type = "lookup"
|
||||
);
|
||||
|
||||
if let Err(err) = store.purge_in_memory_store().await {
|
||||
trc::error!(err.details("Failed to purge lookup store"));
|
||||
}
|
||||
}
|
||||
PurgeType::Account(account_id) => {
|
||||
trc::event!(Housekeeper(trc::HousekeeperEvent::PurgeAccounts));
|
||||
|
||||
if let Some(account_id) = account_id {
|
||||
self.purge_account(account_id).await;
|
||||
} else {
|
||||
|
@ -694,16 +733,23 @@ impl Purge for Server {
|
|||
}
|
||||
}
|
||||
|
||||
trc::event!(
|
||||
Purge(PurgeEvent::Finished),
|
||||
Type = lock_type,
|
||||
Id = store_idx,
|
||||
Elapsed = time.elapsed()
|
||||
);
|
||||
|
||||
// Remove lock
|
||||
if let Some(lock_name) = lock_name {
|
||||
if let Some(lock_name) = &lock_name {
|
||||
if let Err(err) = self
|
||||
.in_memory_store()
|
||||
.remove_lock(KV_LOCK_HOUSEKEEPER, lock_name.as_bytes())
|
||||
.remove_lock(KV_LOCK_HOUSEKEEPER, lock_name)
|
||||
.await
|
||||
{
|
||||
trc::error!(err
|
||||
.details("Failed to delete task lock.")
|
||||
.details(lock_name));
|
||||
.details(lock_type));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -153,7 +153,7 @@ impl<'x> TypesTokenizer<'x> {
|
|||
has_number = true;
|
||||
} else {
|
||||
let last_was_space = self.last_ch_is_space;
|
||||
self.last_ch_is_space = ch.is_ascii_whitespace();
|
||||
self.last_ch_is_space = ch.is_whitespace();
|
||||
stop_char = Token {
|
||||
word: if self.last_ch_is_space {
|
||||
if last_was_space {
|
||||
|
@ -1034,7 +1034,7 @@ mod test {
|
|||
"http://example.org/\u{b}bar",
|
||||
vec![
|
||||
TokenType::Url("http://example.org/"),
|
||||
TokenType::Punctuation('\u{b}'),
|
||||
TokenType::Space,
|
||||
TokenType::Alphabetic("bar"),
|
||||
],
|
||||
),
|
||||
|
@ -1090,7 +1090,7 @@ mod test {
|
|||
"example.org/\u{b}bar",
|
||||
vec![
|
||||
TokenType::UrlNoScheme("example.org/"),
|
||||
TokenType::Punctuation('\u{b}'),
|
||||
TokenType::Space,
|
||||
TokenType::Alphabetic("bar"),
|
||||
],
|
||||
),
|
||||
|
@ -2875,7 +2875,7 @@ mod test {
|
|||
.map(|t| t.word)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
assert_eq!(result, expected);
|
||||
assert_eq!(result, expected, "text: {:?}", text);
|
||||
|
||||
/*print!("({text:?}, ");
|
||||
print!("vec![");
|
||||
|
|
|
@ -135,7 +135,7 @@ impl SpamFilterInit for Server {
|
|||
HtmlToken::Text { text } if !in_head => {
|
||||
if !text_body.is_empty()
|
||||
&& !text_body.ends_with(' ')
|
||||
&& text.starts_with(' ')
|
||||
&& !text.starts_with(' ')
|
||||
{
|
||||
text_body.push(' ');
|
||||
}
|
||||
|
|
|
@ -285,17 +285,22 @@ impl SpamFilterAnalyzeMime for Server {
|
|||
ctx.result.add_tag("R_MISSING_CHARSET");
|
||||
}
|
||||
|
||||
match &part.body {
|
||||
PartType::Text(text) | PartType::Html(text)
|
||||
if ctx.input.message.text_body.contains(&part_id)
|
||||
|| ctx.input.message.html_body.contains(&part_id) =>
|
||||
{
|
||||
if !text.as_ref().is_single_script() {
|
||||
// Text part contains multiple scripts
|
||||
ctx.result.add_tag("R_MIXED_CHARSET");
|
||||
}
|
||||
}
|
||||
_ => (),
|
||||
if ctx
|
||||
.output
|
||||
.text_parts
|
||||
.get(part_id)
|
||||
.filter(|_| {
|
||||
ctx.input.message.text_body.contains(&part_id)
|
||||
|| ctx.input.message.html_body.contains(&part_id)
|
||||
})
|
||||
.map_or(false, |p| match p {
|
||||
TextPart::Plain { text_body, .. } => !text_body.is_single_script(),
|
||||
TextPart::Html { text_body, .. } => !text_body.is_single_script(),
|
||||
TextPart::None => false,
|
||||
})
|
||||
{
|
||||
// Text part contains multiple scripts
|
||||
ctx.result.add_tag("R_MIXED_CHARSET");
|
||||
}
|
||||
|
||||
has_text_part = true;
|
||||
|
|
|
@ -167,7 +167,7 @@ impl SpamFilterAnalyzeUrl for Server {
|
|||
UrlParts::new(url.element),
|
||||
url.location,
|
||||
));
|
||||
ctx.result.add_tag("R_SUSPICIOUS_URL");
|
||||
ctx.result.add_tag("R_UNPARSABLE_URL");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
|
|
@ -54,6 +54,7 @@ pub(crate) async fn is_dnsbl(
|
|||
Ok(result) => {
|
||||
trc::event!(
|
||||
Spam(SpamEvent::Dnsbl),
|
||||
Hostname = zone,
|
||||
Result = result
|
||||
.iter()
|
||||
.map(|ip| trc::Value::from(ip.to_string()))
|
||||
|
@ -83,6 +84,7 @@ pub(crate) async fn is_dnsbl(
|
|||
Err(Error::DnsRecordNotFound(_)) => {
|
||||
trc::event!(
|
||||
Spam(SpamEvent::Dnsbl),
|
||||
Hostname = zone,
|
||||
Result = trc::Value::None,
|
||||
Elapsed = time.elapsed()
|
||||
);
|
||||
|
@ -92,6 +94,7 @@ pub(crate) async fn is_dnsbl(
|
|||
Err(err) => {
|
||||
trc::event!(
|
||||
Spam(SpamEvent::DnsblError),
|
||||
Hostname = zone,
|
||||
Elapsed = time.elapsed(),
|
||||
CausedBy = err.to_string()
|
||||
);
|
||||
|
|
|
@ -7,9 +7,8 @@
|
|||
use utils::config::{cron::SimpleCron, utils::ParseValue, Config};
|
||||
|
||||
use crate::{
|
||||
backend::fs::FsStore,
|
||||
write::purge::{PurgeSchedule, PurgeStore},
|
||||
BlobStore, CompressionAlgo, InMemoryStore, Store, Stores,
|
||||
backend::fs::FsStore, BlobStore, CompressionAlgo, InMemoryStore, PurgeSchedule, PurgeStore,
|
||||
Store, Stores,
|
||||
};
|
||||
|
||||
#[cfg(feature = "s3")]
|
||||
|
|
|
@ -20,7 +20,8 @@ pub use blake3;
|
|||
pub use parking_lot;
|
||||
pub use rand;
|
||||
pub use roaring;
|
||||
use write::{purge::PurgeSchedule, BitmapClass, ValueClass};
|
||||
use utils::config::cron::SimpleCron;
|
||||
use write::{BitmapClass, ValueClass};
|
||||
|
||||
#[cfg(feature = "s3")]
|
||||
use backend::s3::S3Store;
|
||||
|
@ -353,6 +354,20 @@ impl Default for FtsStore {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub enum PurgeStore {
|
||||
Data(Store),
|
||||
Blobs { store: Store, blob_store: BlobStore },
|
||||
Lookup(InMemoryStore),
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct PurgeSchedule {
|
||||
pub cron: SimpleCron,
|
||||
pub store_id: String,
|
||||
pub store: PurgeStore,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub enum Value<'x> {
|
||||
Integer(i64),
|
||||
|
|
|
@ -31,7 +31,6 @@ pub mod blob;
|
|||
pub mod hash;
|
||||
pub mod key;
|
||||
pub mod log;
|
||||
pub mod purge;
|
||||
|
||||
pub trait SerializeWithId: Send + Sync {
|
||||
fn serialize_with_id(&self, ids: &AssignedIds) -> trc::Result<Vec<u8>>;
|
||||
|
|
|
@ -1,96 +0,0 @@
|
|||
/*
|
||||
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
|
||||
*/
|
||||
|
||||
use std::fmt::Display;
|
||||
|
||||
use tokio::sync::watch;
|
||||
use trc::PurgeEvent;
|
||||
use utils::config::cron::SimpleCron;
|
||||
|
||||
use crate::{BlobStore, InMemoryStore, Store};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub enum PurgeStore {
|
||||
Data(Store),
|
||||
Blobs { store: Store, blob_store: BlobStore },
|
||||
Lookup(InMemoryStore),
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct PurgeSchedule {
|
||||
pub cron: SimpleCron,
|
||||
pub store_id: String,
|
||||
pub store: PurgeStore,
|
||||
}
|
||||
|
||||
impl PurgeSchedule {
|
||||
pub fn spawn(self, mut shutdown_rx: watch::Receiver<bool>) {
|
||||
trc::event!(
|
||||
Purge(PurgeEvent::Started),
|
||||
Type = self.store.as_str(),
|
||||
Id = self.store_id.to_string()
|
||||
);
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
if tokio::time::timeout(self.cron.time_to_next(), shutdown_rx.changed())
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
trc::event!(
|
||||
Purge(PurgeEvent::Finished),
|
||||
Type = self.store.as_str(),
|
||||
Id = self.store_id.to_string()
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
trc::event!(
|
||||
Purge(PurgeEvent::Running),
|
||||
Type = self.store.as_str(),
|
||||
Id = self.store_id.to_string()
|
||||
);
|
||||
|
||||
let result = match &self.store {
|
||||
PurgeStore::Data(store) => store.purge_store().await,
|
||||
PurgeStore::Blobs { store, blob_store } => {
|
||||
store.purge_blobs(blob_store.clone()).await
|
||||
}
|
||||
PurgeStore::Lookup(store) => store.purge_in_memory_store().await,
|
||||
};
|
||||
|
||||
if let Err(err) = result {
|
||||
trc::event!(
|
||||
Purge(PurgeEvent::Error),
|
||||
Type = self.store.as_str(),
|
||||
Id = self.store_id.to_string(),
|
||||
CausedBy = err
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl PurgeStore {
|
||||
pub fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
PurgeStore::Data(_) => "data",
|
||||
PurgeStore::Blobs { .. } => "blobs",
|
||||
PurgeStore::Lookup(_) => "lookup",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for PurgeStore {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
PurgeStore::Data(_) => write!(f, "bitmaps"),
|
||||
PurgeStore::Blobs { .. } => write!(f, "blobs"),
|
||||
PurgeStore::Lookup(_) => write!(f, "expired keys"),
|
||||
}
|
||||
}
|
||||
}
|
|
@ -174,9 +174,7 @@ impl HousekeeperEvent {
|
|||
HousekeeperEvent::Start => "Housekeeper process started",
|
||||
HousekeeperEvent::Stop => "Housekeeper process stopped",
|
||||
HousekeeperEvent::Schedule => "Housekeeper task scheduled",
|
||||
HousekeeperEvent::PurgeAccounts => "Purging accounts",
|
||||
HousekeeperEvent::PurgeSessions => "Purging sessions",
|
||||
HousekeeperEvent::PurgeStore => "Purging store",
|
||||
HousekeeperEvent::Run => "Housekeeper task run",
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -185,9 +183,7 @@ impl HousekeeperEvent {
|
|||
HousekeeperEvent::Start => "The housekeeper process has started",
|
||||
HousekeeperEvent::Stop => "The housekeeper process has stopped",
|
||||
HousekeeperEvent::Schedule => "A housekeeper task has been scheduled",
|
||||
HousekeeperEvent::PurgeAccounts => "Purging accounts",
|
||||
HousekeeperEvent::PurgeSessions => "Purging sessions",
|
||||
HousekeeperEvent::PurgeStore => "Purging store",
|
||||
HousekeeperEvent::Run => "A housekeeper task is running",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1266,7 +1262,7 @@ impl PurgeEvent {
|
|||
PurgeEvent::Finished => "Purge finished",
|
||||
PurgeEvent::Running => "Purge running",
|
||||
PurgeEvent::Error => "Purge error",
|
||||
PurgeEvent::PurgeActive => "Active purge in progress",
|
||||
PurgeEvent::InProgress => "Active purge in progress",
|
||||
PurgeEvent::AutoExpunge => "Auto-expunge executed",
|
||||
PurgeEvent::TombstoneCleanup => "Tombstone cleanup executed",
|
||||
}
|
||||
|
@ -1278,7 +1274,7 @@ impl PurgeEvent {
|
|||
PurgeEvent::Finished => "The purge has finished",
|
||||
PurgeEvent::Running => "The purge is running",
|
||||
PurgeEvent::Error => "An error occurred with the purge",
|
||||
PurgeEvent::PurgeActive => "An active purge is in progress",
|
||||
PurgeEvent::InProgress => "An active purge is in progress",
|
||||
PurgeEvent::AutoExpunge => "Auto-expunge has been executed",
|
||||
PurgeEvent::TombstoneCleanup => "Tombstone cleanup has been executed",
|
||||
}
|
||||
|
|
|
@ -268,9 +268,9 @@ impl EventType {
|
|||
PurgeEvent::Finished => Level::Debug,
|
||||
PurgeEvent::Running => Level::Info,
|
||||
PurgeEvent::Error => Level::Error,
|
||||
PurgeEvent::PurgeActive
|
||||
| PurgeEvent::AutoExpunge
|
||||
| PurgeEvent::TombstoneCleanup => Level::Debug,
|
||||
PurgeEvent::InProgress | PurgeEvent::AutoExpunge | PurgeEvent::TombstoneCleanup => {
|
||||
Level::Debug
|
||||
}
|
||||
},
|
||||
EventType::Eval(event) => match event {
|
||||
EvalEvent::Error | EvalEvent::StoreNotFound => Level::Debug,
|
||||
|
@ -370,12 +370,8 @@ impl EventType {
|
|||
| ClusterEvent::InvalidPacket => Level::Warn,
|
||||
},
|
||||
EventType::Housekeeper(event) => match event {
|
||||
HousekeeperEvent::Start
|
||||
| HousekeeperEvent::PurgeAccounts
|
||||
| HousekeeperEvent::PurgeSessions
|
||||
| HousekeeperEvent::PurgeStore
|
||||
| HousekeeperEvent::Stop => Level::Info,
|
||||
HousekeeperEvent::Schedule => Level::Debug,
|
||||
HousekeeperEvent::Start | HousekeeperEvent::Stop => Level::Info,
|
||||
HousekeeperEvent::Run | HousekeeperEvent::Schedule => Level::Debug,
|
||||
},
|
||||
EventType::FtsIndex(event) => match event {
|
||||
FtsIndexEvent::Index => Level::Info,
|
||||
|
|
|
@ -229,9 +229,7 @@ pub enum HousekeeperEvent {
|
|||
Start,
|
||||
Stop,
|
||||
Schedule,
|
||||
PurgeAccounts,
|
||||
PurgeSessions,
|
||||
PurgeStore,
|
||||
Run,
|
||||
}
|
||||
|
||||
#[event_type]
|
||||
|
@ -707,7 +705,7 @@ pub enum PurgeEvent {
|
|||
Finished,
|
||||
Running,
|
||||
Error,
|
||||
PurgeActive,
|
||||
InProgress,
|
||||
AutoExpunge,
|
||||
TombstoneCleanup,
|
||||
}
|
||||
|
|
|
@ -450,9 +450,7 @@ impl EventType {
|
|||
EventType::FtsIndex(FtsIndexEvent::Index) => 142,
|
||||
EventType::FtsIndex(FtsIndexEvent::Locked) => 144,
|
||||
EventType::FtsIndex(FtsIndexEvent::MetadataNotFound) => 145,
|
||||
EventType::Housekeeper(HousekeeperEvent::PurgeAccounts) => 146,
|
||||
EventType::Housekeeper(HousekeeperEvent::PurgeSessions) => 147,
|
||||
EventType::Housekeeper(HousekeeperEvent::PurgeStore) => 148,
|
||||
EventType::Housekeeper(HousekeeperEvent::Run) => 146,
|
||||
EventType::Housekeeper(HousekeeperEvent::Schedule) => 149,
|
||||
EventType::Housekeeper(HousekeeperEvent::Start) => 150,
|
||||
EventType::Housekeeper(HousekeeperEvent::Stop) => 151,
|
||||
|
@ -669,7 +667,7 @@ impl EventType {
|
|||
EventType::Purge(PurgeEvent::AutoExpunge) => 364,
|
||||
EventType::Purge(PurgeEvent::Error) => 365,
|
||||
EventType::Purge(PurgeEvent::Finished) => 366,
|
||||
EventType::Purge(PurgeEvent::PurgeActive) => 367,
|
||||
EventType::Purge(PurgeEvent::InProgress) => 367,
|
||||
EventType::Purge(PurgeEvent::Running) => 368,
|
||||
EventType::Purge(PurgeEvent::Started) => 369,
|
||||
EventType::Purge(PurgeEvent::TombstoneCleanup) => 370,
|
||||
|
@ -1014,9 +1012,7 @@ impl EventType {
|
|||
142 => Some(EventType::FtsIndex(FtsIndexEvent::Index)),
|
||||
144 => Some(EventType::FtsIndex(FtsIndexEvent::Locked)),
|
||||
145 => Some(EventType::FtsIndex(FtsIndexEvent::MetadataNotFound)),
|
||||
146 => Some(EventType::Housekeeper(HousekeeperEvent::PurgeAccounts)),
|
||||
147 => Some(EventType::Housekeeper(HousekeeperEvent::PurgeSessions)),
|
||||
148 => Some(EventType::Housekeeper(HousekeeperEvent::PurgeStore)),
|
||||
146 => Some(EventType::Housekeeper(HousekeeperEvent::Run)),
|
||||
149 => Some(EventType::Housekeeper(HousekeeperEvent::Schedule)),
|
||||
150 => Some(EventType::Housekeeper(HousekeeperEvent::Start)),
|
||||
151 => Some(EventType::Housekeeper(HousekeeperEvent::Stop)),
|
||||
|
@ -1269,7 +1265,7 @@ impl EventType {
|
|||
364 => Some(EventType::Purge(PurgeEvent::AutoExpunge)),
|
||||
365 => Some(EventType::Purge(PurgeEvent::Error)),
|
||||
366 => Some(EventType::Purge(PurgeEvent::Finished)),
|
||||
367 => Some(EventType::Purge(PurgeEvent::PurgeActive)),
|
||||
367 => Some(EventType::Purge(PurgeEvent::InProgress)),
|
||||
368 => Some(EventType::Purge(PurgeEvent::Running)),
|
||||
369 => Some(EventType::Purge(PurgeEvent::Started)),
|
||||
370 => Some(EventType::Purge(PurgeEvent::TombstoneCleanup)),
|
||||
|
|
|
@ -189,6 +189,13 @@ Test
|
|||
envelope_from baz@domain.org
|
||||
expect SPOOF_DISPLAY_NAME FROM_HAS_DN FROM_EQ_ENVFROM
|
||||
|
||||
From: Foo (foo@bar.com) <baz@domain.org>
|
||||
|
||||
Test
|
||||
<!-- NEXT TEST -->
|
||||
envelope_from baz@domain.org
|
||||
expect SPOOF_DISPLAY_NAME FROM_HAS_DN FROM_EQ_ENVFROM
|
||||
|
||||
From: "Foo foo@bar.com" <baz@domain.org>
|
||||
|
||||
Test
|
||||
|
|
Loading…
Add table
Reference in a new issue