From 9c23774aa5a096d7bfbd772b18505de949095a1a Mon Sep 17 00:00:00 2001 From: mdecimus Date: Mon, 22 Jul 2024 17:56:24 +0200 Subject: [PATCH] Lock-free fast tracing (closes #180) --- Cargo.lock | 11 ++ crates/common/src/listener/listen.rs | 2 +- crates/common/src/manager/config.rs | 5 +- crates/common/src/manager/webadmin.rs | 2 +- crates/common/src/webhooks/manager.rs | 2 +- crates/directory/src/backend/ldap/lookup.rs | 2 +- crates/jmap/src/api/management/reload.rs | 13 +- crates/jmap/src/lib.rs | 2 +- crates/jmap/src/push/manager.rs | 2 +- crates/jmap/src/services/housekeeper.rs | 6 +- crates/jmap/src/websocket/stream.rs | 24 +-- crates/smtp/src/core/mod.rs | 2 +- crates/smtp/src/reporting/scheduler.rs | 2 +- crates/store/src/dispatch/blob.rs | 7 +- crates/store/src/dispatch/lookup.rs | 2 +- crates/store/src/write/purge.rs | 17 +- crates/trc/Cargo.toml | 5 + crates/trc/src/channel.rs | 99 +++++++++++ crates/trc/src/collector.rs | 128 ++++++++++++++ crates/trc/src/conv.rs | 4 +- crates/trc/src/imple.rs | 183 +++++++++++++------ crates/trc/src/lib.rs | 187 +++++++++++--------- crates/trc/src/macros.rs | 64 ++++--- crates/trc/src/subscriber.rs | 110 ++++++++++++ 24 files changed, 685 insertions(+), 196 deletions(-) create mode 100644 crates/trc/src/channel.rs create mode 100644 crates/trc/src/collector.rs create mode 100644 crates/trc/src/subscriber.rs diff --git a/Cargo.lock b/Cargo.lock index 4cc29e42..76358398 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5284,6 +5284,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "rtrb" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3f94e84c073f3b85d4012b44722fa8842b9986d741590d4f2636ad0a5b14143" + [[package]] name = "rusqlite" version = "0.31.0" @@ -6873,10 +6879,15 @@ dependencies = [ name = "trc" version = "0.8.5" dependencies = [ + "ahash 0.8.11", + "arc-swap", "base64 0.22.1", "bincode", + "parking_lot", "reqwest 0.12.5", + "rtrb", "serde_json", + "tokio", ] [[package]] diff --git a/crates/common/src/listener/listen.rs b/crates/common/src/listener/listen.rs index 27645125..db4c92f8 100644 --- a/crates/common/src/listener/listen.rs +++ b/crates/common/src/listener/listen.rs @@ -97,7 +97,7 @@ impl Server { stream = listener.accept() => { match stream { Ok((stream, remote_addr)) => { - let core = core.as_ref().load(); + let core = core.as_ref().load_full(); let enable_acme = (is_https && core.has_acme_tls_providers()).then_some(core.clone()); if has_proxies && instance.proxy_networks.iter().any(|network| network.matches(&remote_addr.ip())) { diff --git a/crates/common/src/manager/config.rs b/crates/common/src/manager/config.rs index dcd9e4cb..4dc0c05c 100644 --- a/crates/common/src/manager/config.rs +++ b/crates/common/src/manager/config.rs @@ -327,7 +327,7 @@ impl ConfigManager { .fetch_config_resource(resource_id) .await .map_err(|reason| { - trc::Cause::Fetch + trc::Cause::Configuration .caused_by(trc::location!()) .ctx(trc::Key::Reason, reason) })?; @@ -492,7 +492,8 @@ impl Patterns { Pattern::Include(MatchType::StartsWith( "authentication.fallback-admin.".to_string(), )), - Pattern::Include(MatchType::Equal("cluster.node-id".to_string())), + Pattern::Exclude(MatchType::Equal("cluster.key".to_string())), + Pattern::Include(MatchType::StartsWith("cluster.".to_string())), Pattern::Include(MatchType::Equal("storage.data".to_string())), Pattern::Include(MatchType::Equal("storage.blob".to_string())), Pattern::Include(MatchType::Equal("storage.lookup".to_string())), diff --git a/crates/common/src/manager/webadmin.rs b/crates/common/src/manager/webadmin.rs index 6571e4fc..2e59e68c 100644 --- a/crates/common/src/manager/webadmin.rs +++ b/crates/common/src/manager/webadmin.rs @@ -139,7 +139,7 @@ impl WebAdminManager { .fetch_resource("webadmin") .await .map_err(|err| { - trc::Cause::Fetch + trc::ResourceCause::Error .caused_by(trc::location!()) .reason(err) .details("Failed to download webadmin") diff --git a/crates/common/src/webhooks/manager.rs b/crates/common/src/webhooks/manager.rs index 756474ea..06d27d9d 100644 --- a/crates/common/src/webhooks/manager.rs +++ b/crates/common/src/webhooks/manager.rs @@ -58,7 +58,7 @@ pub fn spawn_webhook_manager(core: SharedCore) -> mpsc::Sender { let event_or_timeout = tokio::time::timeout(wakeup_time, webhook_rx.recv()).await; // Load settings - let core = core.load(); + let core = core.load_full(); match event_or_timeout { Ok(Some(event)) => match event { diff --git a/crates/directory/src/backend/ldap/lookup.rs b/crates/directory/src/backend/ldap/lookup.rs index 7191735f..845cc357 100644 --- a/crates/directory/src/backend/ldap/lookup.rs +++ b/crates/directory/src/backend/ldap/lookup.rs @@ -332,7 +332,7 @@ impl LdapMappings { fn entry_to_principal(&self, entry: SearchEntry) -> Principal { let mut principal = Principal::default(); - trc::trace!(LdapQuery, Value = format!("{entry:?}")); + trc::event!(LdapQuery, Value = format!("{entry:?}")); for (attr, value) in entry.attrs { if self.attr_name.contains(&attr) { diff --git a/crates/jmap/src/api/management/reload.rs b/crates/jmap/src/api/management/reload.rs index bca1d9ec..01be5e90 100644 --- a/crates/jmap/src/api/management/reload.rs +++ b/crates/jmap/src/api/management/reload.rs @@ -59,9 +59,16 @@ impl JMAP { } // Reload ACME - if let Err(err) = self.inner.housekeeper_tx.send(Event::AcmeReload).await { - tracing::warn!("Failed to send ACME reload event to housekeeper: {}", err); - } + self.inner + .housekeeper_tx + .send(Event::AcmeReload) + .await + .map_err(|err| { + trc::Cause::Thread + .reason(err) + .details("Failed to send ACME reload event to housekeeper") + .caused_by(trc::location!()) + })?; } Ok(JsonResponse::new(json!({ diff --git a/crates/jmap/src/lib.rs b/crates/jmap/src/lib.rs index a14779d9..0caa6a15 100644 --- a/crates/jmap/src/lib.rs +++ b/crates/jmap/src/lib.rs @@ -510,7 +510,7 @@ impl Inner { impl From for JMAP { fn from(value: JmapInstance) -> Self { let shared_core = value.core.clone(); - let core = value.core.load().clone(); + let core = value.core.load_full(); JMAP { smtp: SMTP { core: core.clone(), diff --git a/crates/jmap/src/push/manager.rs b/crates/jmap/src/push/manager.rs index de8d4398..b3e43d29 100644 --- a/crates/jmap/src/push/manager.rs +++ b/crates/jmap/src/push/manager.rs @@ -36,7 +36,7 @@ pub fn spawn_push_manager(core: JmapInstance) -> mpsc::Sender { let event_or_timeout = tokio::time::timeout(retry_timeout, push_rx.recv()).await; // Load settings - let core_ = core.core.load(); + let core_ = core.core.load_full(); let push_attempt_interval = core_.jmap.push_attempt_interval; let push_attempts_max = core_.jmap.push_attempts_max; let push_retry_interval = core_.jmap.push_retry_interval; diff --git a/crates/jmap/src/services/housekeeper.rs b/crates/jmap/src/services/housekeeper.rs index 2bcdb936..84d410fe 100644 --- a/crates/jmap/src/services/housekeeper.rs +++ b/crates/jmap/src/services/housekeeper.rs @@ -74,7 +74,7 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver) { // Add all events to queue let mut queue = Queue::default(); { - let core_ = core.core.load(); + let core_ = core.core.load_full(); queue.schedule( Instant::now() + core_.jmap.session_purge_frequency.time_to_next(), ActionClass::Session, @@ -128,7 +128,7 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver) { match tokio::time::timeout(queue.wake_up_time(), rx.recv()).await { Ok(Some(event)) => match event { Event::AcmeReload => { - let core_ = core.core.load().clone(); + let core_ = core.core.load_full(); let inner = core.jmap_inner.clone(); tokio::spawn(async move { @@ -233,7 +233,7 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver) { return; } Err(_) => { - let core_ = core.core.load(); + let core_ = core.core.load_full(); while let Some(event) = queue.pop() { match event.event { ActionClass::Acme(provider_id) => { diff --git a/crates/jmap/src/websocket/stream.rs b/crates/jmap/src/websocket/stream.rs index d836b241..32e21fc2 100644 --- a/crates/jmap/src/websocket/stream.rs +++ b/crates/jmap/src/websocket/stream.rs @@ -45,20 +45,24 @@ impl JMAP { let mut next_event = heartbeat; // Register with state manager - let mut change_rx = if let Ok(change_rx) = self + let mut change_rx = match self .subscribe_state_manager(access_token.primary_id(), Bitmap::all()) .await { - change_rx - } else { - let todo = "log error"; - let _ = stream - .send(Message::Text( - WebSocketRequestError::from(RequestError::internal_server_error()).to_json(), - )) - .await; - return; + Ok(change_rx) => change_rx, + Err(err) => { + tracing::debug!(parent: &span, error = ?err, "Failed to subscribe to state manager"); + + let _ = stream + .send(Message::Text( + WebSocketRequestError::from(RequestError::internal_server_error()) + .to_json(), + )) + .await; + return; + } }; + let mut changes = WebSocketStateChange::new(None); let mut change_types: Bitmap = Bitmap::new(); diff --git a/crates/smtp/src/core/mod.rs b/crates/smtp/src/core/mod.rs index 4fa124ca..03b07eb8 100644 --- a/crates/smtp/src/core/mod.rs +++ b/crates/smtp/src/core/mod.rs @@ -256,7 +256,7 @@ impl PartialOrd for SessionAddress { impl From for SMTP { fn from(value: SmtpInstance) -> Self { SMTP { - core: value.core.load().clone(), + core: value.core.load_full(), inner: value.inner, } } diff --git a/crates/smtp/src/reporting/scheduler.rs b/crates/smtp/src/reporting/scheduler.rs index 7000f376..d1ba4874 100644 --- a/crates/smtp/src/reporting/scheduler.rs +++ b/crates/smtp/src/reporting/scheduler.rs @@ -31,7 +31,7 @@ impl SpawnReport for mpsc::Receiver { loop { // Read events let now = now(); - let events = next_report_event(&core.core.load()).await; + let events = next_report_event(&core.core.load_full()).await; next_wake_up = events .last() .and_then(|e| match e { diff --git a/crates/store/src/dispatch/blob.rs b/crates/store/src/dispatch/blob.rs index 68303596..146e35c9 100644 --- a/crates/store/src/dispatch/blob.rs +++ b/crates/store/src/dispatch/blob.rs @@ -6,7 +6,7 @@ use std::{borrow::Cow, ops::Range}; -use trc::AddContext; +use trc::{AddContext, Cause, StoreCause}; use utils::config::utils::ParseValue; use crate::{BlobBackend, BlobStore, CompressionAlgo, Store}; @@ -54,7 +54,10 @@ impl BlobStore { })? } Some(data) => { - let todo = "log"; + trc::event!( + Error(Cause::Store(StoreCause::BlobMissingMarker)), + Key = key, + ); data } None => return Ok(None), diff --git a/crates/store/src/dispatch/lookup.rs b/crates/store/src/dispatch/lookup.rs index afe02835..a00e6879 100644 --- a/crates/store/src/dispatch/lookup.rs +++ b/crates/store/src/dispatch/lookup.rs @@ -35,7 +35,7 @@ impl LookupStore { _ => Err(trc::StoreCause::NotSupported.into_err()), }; - trc::trace!( + trc::event!( SqlQuery, Query = query.to_string(), Parameters = params.as_slice(), diff --git a/crates/store/src/write/purge.rs b/crates/store/src/write/purge.rs index 76a1a192..79886b2e 100644 --- a/crates/store/src/write/purge.rs +++ b/crates/store/src/write/purge.rs @@ -7,6 +7,7 @@ use std::fmt::Display; use tokio::sync::watch; +use trc::PurgeEvent; use utils::config::cron::SimpleCron; use crate::{BlobStore, LookupStore, Store}; @@ -27,8 +28,8 @@ pub struct PurgeSchedule { impl PurgeSchedule { pub fn spawn(self, mut shutdown_rx: watch::Receiver) { - trc::trace!( - PurgeTaskStarted, + trc::event!( + Purge(PurgeEvent::Started), Type = self.store.as_str(), Id = self.store_id.to_string() ); @@ -39,16 +40,16 @@ impl PurgeSchedule { .await .is_ok() { - trc::trace!( - PurgeTaskFinished, + trc::event!( + Purge(PurgeEvent::Finished), Type = self.store.as_str(), Id = self.store_id.to_string() ); return; } - trc::trace!( - PurgeTaskRunning, + trc::event!( + Purge(PurgeEvent::Running), Type = self.store.as_str(), Id = self.store_id.to_string() ); @@ -62,8 +63,8 @@ impl PurgeSchedule { }; if let Err(err) = result { - trc::error!( - Purge, + trc::event!( + Purge(PurgeEvent::Error), Type = self.store.as_str(), Id = self.store_id.to_string(), CausedBy = err diff --git a/crates/trc/Cargo.toml b/crates/trc/Cargo.toml index cb49c806..fbe69793 100644 --- a/crates/trc/Cargo.toml +++ b/crates/trc/Cargo.toml @@ -9,6 +9,11 @@ base64 = "0.22.1" serde_json = "1.0.120" reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-webpki-roots", "http2"]} bincode = "1.3.3" +rtrb = "0.3.1" +parking_lot = "0.12.3" +tokio = { version = "1.23", features = ["net", "macros"] } +ahash = "0.8.11" +arc-swap = "1.7.1" [features] test_mode = [] diff --git a/crates/trc/src/channel.rs b/crates/trc/src/channel.rs new file mode 100644 index 00000000..91e2f371 --- /dev/null +++ b/crates/trc/src/channel.rs @@ -0,0 +1,99 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use std::{ + cell::UnsafeCell, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, +}; + +use parking_lot::Mutex; +use rtrb::{Consumer, Producer, PushError, RingBuffer}; + +use crate::{ + collector::{spawn_collector, CollectorThread}, + Event, +}; + +pub(crate) static EVENT_RXS: Mutex> = Mutex::new(Vec::new()); +pub(crate) static EVENT_COUNT: AtomicUsize = AtomicUsize::new(0); +pub(crate) const CHANNEL_SIZE: usize = 10240; + +thread_local! { + static EVENT_TX: UnsafeCell = { + let (tx, rx) = RingBuffer::new(CHANNEL_SIZE); + EVENT_RXS.lock().push(Receiver { rx }); + UnsafeCell::new(Sender { + tx, + collector: spawn_collector().clone(), + overflow: Vec::with_capacity(0), + }) + }; +} + +pub struct Sender { + tx: Producer>, + collector: Arc, + overflow: Vec>, +} + +pub struct Receiver { + rx: Consumer>, +} + +#[derive(Debug)] +pub struct ChannelError; + +impl Sender { + pub fn send(&mut self, event: Arc) -> Result<(), ChannelError> { + while let Some(event) = self.overflow.pop() { + if let Err(PushError::Full(event)) = self.tx.push(event) { + self.overflow.push(event); + break; + } + } + + if let Err(PushError::Full(event)) = self.tx.push(event) { + if self.overflow.len() <= CHANNEL_SIZE * 2 { + self.overflow.push(event); + } else { + return Err(ChannelError); + } + } + + Ok(()) + } +} + +impl Receiver { + pub fn try_recv(&mut self) -> Result>, ChannelError> { + match self.rx.pop() { + Ok(event) => Ok(Some(event)), + Err(_) => { + if !self.rx.is_abandoned() { + Ok(None) + } else { + Err(ChannelError) + } + } + } + } +} + +impl Event { + pub fn send(self) { + // SAFETY: EVENT_TX is thread-local. + let _ = EVENT_TX.try_with(|tx| unsafe { + let tx = &mut *tx.get(); + if tx.send(Arc::new(self)).is_ok() { + EVENT_COUNT.fetch_add(1, Ordering::Relaxed); + tx.collector.thread().unpark(); + } + }); + } +} diff --git a/crates/trc/src/collector.rs b/crates/trc/src/collector.rs new file mode 100644 index 00000000..b3eb03e4 --- /dev/null +++ b/crates/trc/src/collector.rs @@ -0,0 +1,128 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use std::{ + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, OnceLock, + }, + thread::{park, Builder, JoinHandle}, +}; + +use ahash::AHashMap; +use arc_swap::ArcSwap; + +use crate::{ + channel::{EVENT_COUNT, EVENT_RXS}, + subscriber::{Subscriber, SUBSCRIBER_UPDATE}, + Event, EventType, Level, +}; + +pub(crate) static TRACING_LEVEL: AtomicUsize = AtomicUsize::new(Level::Info as usize); + +pub(crate) type CollectorThread = JoinHandle<()>; + +#[derive(Default)] +pub struct Collector { + subscribers: Vec, +} + +impl Collector { + fn collect(&mut self) -> bool { + if EVENT_COUNT.swap(0, Ordering::Relaxed) == 0 { + park(); + } + + // Collect all events + let mut do_continue = true; + EVENT_RXS.lock().retain_mut(|rx| { + loop { + match rx.try_recv() { + Ok(Some(event)) => { + if !event.keys.is_empty() { + // Process events + for subscriber in self.subscribers.iter_mut() { + subscriber.push_event(event.clone()); + } + } else { + // Register subscriber + let subscribers = { std::mem::take(&mut (*SUBSCRIBER_UPDATE.lock())) }; + if !subscribers.is_empty() { + self.subscribers.extend(subscribers); + } else if event.level == Level::Disable { + do_continue = false; + } + } + } + Ok(None) => { + return true; + } + Err(_) => { + return false; // Channel is closed, remove. + } + } + } + }); + + if !self.subscribers.is_empty() { + self.subscribers + .retain_mut(|subscriber| subscriber.send_batch().is_ok()); + } + + do_continue + } + + pub fn set_level(level: Level) { + TRACING_LEVEL.store(level as usize, Ordering::Relaxed); + } + + pub fn update_custom_levels(levels: AHashMap) { + custom_levels().store(Arc::new(levels)); + } + + pub fn shutdown() { + Event::new(EventType::Error(crate::Cause::Thread), Level::Disable, 0).send() + } +} + +pub(crate) fn spawn_collector() -> &'static Arc { + static COLLECTOR: OnceLock> = OnceLock::new(); + COLLECTOR.get_or_init(|| { + Arc::new( + Builder::new() + .name("stalwart-collector".to_string()) + .spawn(move || { + let mut collector = Collector::default(); + + while collector.collect() {} + }) + .expect("Failed to start event collector"), + ) + }) +} + +fn custom_levels() -> &'static ArcSwap> { + static CUSTOM_LEVELS: OnceLock>> = OnceLock::new(); + CUSTOM_LEVELS.get_or_init(|| ArcSwap::from_pointee(Default::default())) +} + +impl EventType { + #[inline(always)] + pub fn effective_level(&self) -> Level { + custom_levels() + .load() + .get(self) + .copied() + .unwrap_or_else(|| self.level()) + } +} + +impl Level { + #[inline(always)] + pub fn is_enabled(&self) -> bool { + *self as usize >= TRACING_LEVEL.load(Ordering::Relaxed) + } +} diff --git a/crates/trc/src/conv.rs b/crates/trc/src/conv.rs index f65eadd8..fe3946c7 100644 --- a/crates/trc/src/conv.rs +++ b/crates/trc/src/conv.rs @@ -8,8 +8,8 @@ use std::{borrow::Cow, fmt::Debug}; use crate::*; -impl AsRef for Context { - fn as_ref(&self) -> &T { +impl AsRef for Error { + fn as_ref(&self) -> &Cause { &self.inner } } diff --git a/crates/trc/src/imple.rs b/crates/trc/src/imple.rs index e4a448ef..257d6e4e 100644 --- a/crates/trc/src/imple.rs +++ b/crates/trc/src/imple.rs @@ -4,36 +4,44 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ -use std::{borrow::Cow, fmt::Display}; +use std::{borrow::Cow, cmp::Ordering, fmt::Display}; use crate::*; -impl Context -where - [(Key, Value); N]: Default, - T: Eq, -{ - pub fn new(inner: T) -> Self { +impl Event { + pub fn new(inner: EventType, level: Level, capacity: usize) -> Self { Self { inner, - keys: Default::default(), - keys_size: 0, + level, + keys: Vec::with_capacity(capacity), } } #[inline(always)] pub fn ctx(mut self, key: Key, value: impl Into) -> Self { - if self.keys_size < N { - self.keys[self.keys_size] = (key, value.into()); - self.keys_size += 1; - } else { - #[cfg(debug_assertions)] - panic!( - "Context is full while inserting {:?}: {:?}", - key, - value.into() - ); + self.keys.push((key, value.into())); + self + } + + pub fn ctx_opt(self, key: Key, value: Option>) -> Self { + match value { + Some(value) => self.ctx(key, value), + None => self, } + } +} + +impl Error { + pub fn new(inner: Cause) -> Self { + Self { + inner, + keys: Vec::with_capacity(5), + } + } + + #[inline(always)] + pub fn ctx(mut self, key: Key, value: impl Into) -> Self { + self.keys.push((key, value.into())); self } @@ -45,20 +53,14 @@ where } #[inline(always)] - pub fn matches(&self, inner: T) -> bool { + pub fn matches(&self, inner: Cause) -> bool { self.inner == inner } pub fn value(&self, key: Key) -> Option<&Value> { - self.keys.iter().take(self.keys_size).find_map( - |(k, v)| { - if *k == key { - Some(v) - } else { - None - } - }, - ) + self.keys + .iter() + .find_map(|(k, v)| if *k == key { Some(v) } else { None }) } pub fn value_as_str(&self, key: Key) -> Option<&str> { @@ -66,16 +68,13 @@ where } pub fn take_value(&mut self, key: Key) -> Option { - self.keys - .iter_mut() - .take(self.keys_size) - .find_map(|(k, v)| { - if *k == key { - Some(std::mem::take(v)) - } else { - None - } - }) + self.keys.iter_mut().find_map(|(k, v)| { + if *k == key { + Some(std::mem::take(v)) + } else { + None + } + }) } #[inline(always)] @@ -166,7 +165,6 @@ impl Cause { Self::Pop3 => "POP3 error", Self::Smtp => "SMTP error", Self::Thread => "Thread error", - Self::Fetch => "Fetch error", Self::Acme => "ACME error", Self::Dns => "DNS error", Self::Ingest => "Message Ingest error", @@ -174,7 +172,6 @@ impl Cause { Self::Limit(cause) => cause.message(), Self::Manage(cause) => cause.message(), Self::Auth(cause) => cause.message(), - Self::Purge => "Purge error", Self::Configuration => "Configuration error", Self::Resource(cause) => cause.message(), } @@ -489,10 +486,10 @@ impl AddContext for Result { } } -impl Display for Context { +impl Display for Error { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{:?}", self.inner)?; - for (key, value) in self.keys.iter().take(self.keys_size) { + for (key, value) in self.keys.iter() { write!(f, "\n {:?} = {:?}", key, value)?; } Ok(()) @@ -501,6 +498,40 @@ impl Display for Context { impl std::error::Error for Error {} +impl PartialOrd for Level { + #[inline(always)] + fn partial_cmp(&self, other: &Level) -> Option { + Some(self.cmp(other)) + } + + #[inline(always)] + fn lt(&self, other: &Level) -> bool { + (*other as usize) < (*self as usize) + } + + #[inline(always)] + fn le(&self, other: &Level) -> bool { + (*other as usize) <= (*self as usize) + } + + #[inline(always)] + fn gt(&self, other: &Level) -> bool { + (*other as usize) > (*self as usize) + } + + #[inline(always)] + fn ge(&self, other: &Level) -> bool { + (*other as usize) >= (*self as usize) + } +} + +impl Ord for Level { + #[inline(always)] + fn cmp(&self, other: &Self) -> Ordering { + (*other as usize).cmp(&(*self as usize)) + } +} + impl PartialEq for Value { fn eq(&self, other: &Self) -> bool { match (self, other) { @@ -525,14 +556,11 @@ impl PartialEq for Value { impl Eq for Value {} -impl PartialEq for Context -where - T: Eq, -{ +impl PartialEq for Error { fn eq(&self, other: &Self) -> bool { - if self.inner == other.inner && self.keys_size == other.keys_size { - for kv in self.keys.iter().take(self.keys_size) { - if !other.keys.iter().take(other.keys_size).any(|okv| kv == okv) { + if self.inner == other.inner && self.keys.len() == other.keys.len() { + for kv in self.keys.iter() { + if !other.keys.iter().any(|okv| kv == okv) { return false; } } @@ -544,4 +572,57 @@ where } } -impl Eq for Context where T: Eq {} +impl Eq for Error {} + +impl EventType { + pub fn level(&self) -> Level { + match self { + EventType::Error(error) => match error { + Cause::Store(_) => Level::Error, + Cause::Jmap(_) => Level::Debug, + Cause::Imap => Level::Debug, + Cause::ManageSieve => Level::Debug, + Cause::Pop3 => Level::Debug, + Cause::Smtp => Level::Debug, + Cause::Thread => Level::Error, + Cause::Acme => Level::Error, + Cause::Dns => Level::Error, + Cause::Ingest => Level::Error, + Cause::Network => Level::Debug, + Cause::Limit(cause) => match cause { + LimitCause::SizeRequest => Level::Debug, + LimitCause::SizeUpload => Level::Debug, + LimitCause::CallsIn => Level::Debug, + LimitCause::ConcurrentRequest => Level::Debug, + LimitCause::ConcurrentUpload => Level::Debug, + LimitCause::Quota => Level::Debug, + LimitCause::BlobQuota => Level::Debug, + LimitCause::TooManyRequests => Level::Warn, + }, + Cause::Manage(_) => Level::Debug, + Cause::Auth(cause) => match cause { + AuthCause::Failed => Level::Debug, + AuthCause::MissingTotp => Level::Trace, + AuthCause::TooManyAttempts => Level::Warn, + AuthCause::Banned => Level::Warn, + AuthCause::Error => Level::Error, + }, + Cause::Configuration => Level::Error, + Cause::Resource(cause) => match cause { + ResourceCause::NotFound => Level::Debug, + ResourceCause::BadParameters => Level::Error, + ResourceCause::Error => Level::Error, + }, + }, + EventType::NewConnection => Level::Info, + EventType::SqlQuery => Level::Trace, + EventType::LdapQuery => Level::Trace, + EventType::Purge(event) => match event { + PurgeEvent::Started => Level::Debug, + PurgeEvent::Finished => Level::Debug, + PurgeEvent::Running => Level::Info, + PurgeEvent::Error => Level::Error, + }, + } + } +} diff --git a/crates/trc/src/lib.rs b/crates/trc/src/lib.rs index 1a5979b5..1f8976fe 100644 --- a/crates/trc/src/lib.rs +++ b/crates/trc/src/lib.rs @@ -4,18 +4,27 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ +pub mod channel; +pub mod collector; pub mod conv; pub mod imple; pub mod macros; +pub mod subscriber; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; pub type Result = std::result::Result; -pub type Error = Context; -pub type Trace = Context; -const ERROR_CONTEXT_SIZE: usize = 5; -const TRACE_CONTEXT_SIZE: usize = 10; +#[derive(Copy, Clone, Debug, Hash, Eq, PartialEq)] +#[repr(usize)] +pub enum Level { + Disable = 0, + Trace = 1, + Debug = 2, + Info = 3, + Warn = 4, + Error = 5, +} #[derive(Debug, Default, Clone)] pub enum Value { @@ -62,18 +71,24 @@ pub enum Key { AccountId, } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum Event { +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum EventType { NewConnection, Error(Cause), SqlQuery, LdapQuery, - PurgeTaskStarted, - PurgeTaskRunning, - PurgeTaskFinished, + Purge(PurgeEvent), } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum PurgeEvent { + Started, + Finished, + Running, + Error, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum Cause { Store(StoreCause), Jmap(JmapCause), @@ -82,7 +97,6 @@ pub enum Cause { Pop3, Smtp, Thread, - Fetch, Acme, Dns, Ingest, @@ -90,100 +104,99 @@ pub enum Cause { Limit(LimitCause), Manage(ManageCause), Auth(AuthCause), - Purge, Configuration, Resource(ResourceCause), } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum StoreCause { - AssertValue, - BlobMissingMarker, - FoundationDB, - MySQL, - PostgreSQL, - RocksDB, - SQLite, - Ldap, - ElasticSearch, - Redis, - S3, - Filesystem, - Pool, - DataCorruption, - Decompress, - Deserialize, - NotFound, - NotConfigured, - NotSupported, - Unexpected, - Crypto, + AssertValue = 0, + BlobMissingMarker = 1, + FoundationDB = 2, + MySQL = 3, + PostgreSQL = 4, + RocksDB = 5, + SQLite = 6, + Ldap = 7, + ElasticSearch = 8, + Redis = 9, + S3 = 10, + Filesystem = 11, + Pool = 12, + DataCorruption = 13, + Decompress = 14, + Deserialize = 15, + NotFound = 16, + NotConfigured = 17, + NotSupported = 18, + Unexpected = 19, + Crypto = 20, } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum JmapCause { // Method errors - InvalidArguments, - RequestTooLarge, - StateMismatch, - AnchorNotFound, - UnsupportedFilter, - UnsupportedSort, - UnknownMethod, - InvalidResultReference, - Forbidden, - AccountNotFound, - AccountNotSupportedByMethod, - AccountReadOnly, - NotFound, - CannotCalculateChanges, - UnknownDataType, + InvalidArguments = 0, + RequestTooLarge = 1, + StateMismatch = 2, + AnchorNotFound = 3, + UnsupportedFilter = 4, + UnsupportedSort = 5, + UnknownMethod = 6, + InvalidResultReference = 7, + Forbidden = 8, + AccountNotFound = 9, + AccountNotSupportedByMethod = 10, + AccountReadOnly = 11, + NotFound = 12, + CannotCalculateChanges = 13, + UnknownDataType = 14, // Request errors - UnknownCapability, - NotJSON, - NotRequest, + UnknownCapability = 15, + NotJSON = 16, + NotRequest = 17, } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum LimitCause { - SizeRequest, - SizeUpload, - CallsIn, - ConcurrentRequest, - ConcurrentUpload, - Quota, - BlobQuota, - TooManyRequests, + SizeRequest = 0, + SizeUpload = 1, + CallsIn = 2, + ConcurrentRequest = 3, + ConcurrentUpload = 4, + Quota = 5, + BlobQuota = 6, + TooManyRequests = 7, } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum ManageCause { - MissingParameter, - AlreadyExists, - AssertFailed, - NotFound, - NotSupported, - Error, + MissingParameter = 0, + AlreadyExists = 1, + AssertFailed = 2, + NotFound = 3, + NotSupported = 4, + Error = 5, } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum AuthCause { - Failed, - MissingTotp, - TooManyAttempts, - Banned, - Error, + Failed = 0, + MissingTotp = 1, + TooManyAttempts = 2, + Banned = 3, + Error = 4, } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum ResourceCause { - NotFound, - BadParameters, - Error, + NotFound = 0, + BadParameters = 1, + Error = 2, } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum Protocol { Jmap, Imap, @@ -194,10 +207,16 @@ pub enum Protocol { } #[derive(Debug, Clone)] -pub struct Context { - inner: T, - keys: [(Key, Value); N], - keys_size: usize, +pub struct Error { + inner: Cause, + keys: Vec<(Key, Value)>, +} + +#[derive(Debug, Clone)] +pub struct Event { + inner: EventType, + level: Level, + keys: Vec<(Key, Value)>, } pub trait AddContext { diff --git a/crates/trc/src/macros.rs b/crates/trc/src/macros.rs index b49f4e30..435c7f43 100644 --- a/crates/trc/src/macros.rs +++ b/crates/trc/src/macros.rs @@ -4,31 +4,51 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ -#[macro_export] -macro_rules! trace { - ($event:ident $(, $key:ident = $value:expr)* $(,)?) => { - { - let event = $crate::Trace::new($crate::Event::$event) - $( - .ctx($crate::Key::$key, $crate::Value::from($value)) - )* ; - - //eprintln!("{}", event); - } - }; - } +// Helper macro to count the number of arguments #[macro_export] -macro_rules! error { - ($cause:ident $(, $key:ident = $value:expr)* $(,)?) => {{ - let event = $crate::Trace::new($crate::Event::Error($crate::Cause::$cause)) - .ctx($crate::Key::CausedBy, $crate::location!()) - $( - .ctx($crate::Key::$key, $crate::Value::from($value)) - )* ; +macro_rules! event { + ($event:ident($($param:expr),* $(,)?) $(, $key:ident = $value:expr)* $(,)?) => { + { + let et = $crate::EventType::$event($($param),*); + let level = et.effective_level(); + if level.is_enabled() { + $crate::Event::new( + et, + level, + trc::__count!($($key)*) + ) + $( + .ctx($crate::Key::$key, $crate::Value::from($value)) + )* + .send(); + } + } + }; - //eprintln!("{}", event); - }}; + ($event:ident $(, $key:ident = $value:expr)* $(,)?) => { + { + let et = $crate::EventType::$event; + let level = et.effective_level(); + if level.is_enabled() { + $crate::Event::new( + et, + level, + trc::__count!($($key)*) + ) + $( + .ctx($crate::Key::$key, $crate::Value::from($value)) + )* + .send(); + } + } + }; +} + +#[macro_export] +macro_rules! __count { + () => (0usize); + ($head:tt $($tail:tt)*) => (1usize + trc::__count!($($tail)*)); } #[macro_export] diff --git a/crates/trc/src/subscriber.rs b/crates/trc/src/subscriber.rs new file mode 100644 index 00000000..6442ae7e --- /dev/null +++ b/crates/trc/src/subscriber.rs @@ -0,0 +1,110 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use std::sync::Arc; + +use ahash::AHashSet; +use parking_lot::Mutex; +use tokio::sync::mpsc::{self, error::TrySendError}; + +use crate::{channel::ChannelError, Event, EventType, Level}; + +const MAX_BATCH_SIZE: usize = 32768; + +pub(crate) static SUBSCRIBER_UPDATE: Mutex> = Mutex::new(Vec::new()); + +#[derive(Debug)] +pub(crate) struct Subscriber { + pub level: Level, + pub disabled: AHashSet, + pub tx: mpsc::Sender>>, + pub lossy: bool, + pub batch: Vec>, +} + +pub struct SubscriberBuilder { + pub level: Level, + pub disabled: AHashSet, + pub lossy: bool, +} + +impl Subscriber { + #[inline(always)] + pub fn push_event(&mut self, trace: Arc) { + if trace.level >= self.level && !self.disabled.contains(&trace.inner) { + self.batch.push(trace); + } + } + + pub fn send_batch(&mut self) -> Result<(), ChannelError> { + if !self.batch.is_empty() { + match self.tx.try_send(std::mem::take(&mut self.batch)) { + Ok(_) => Ok(()), + Err(TrySendError::Full(mut events)) => { + if self.lossy && events.len() > MAX_BATCH_SIZE { + events.retain(|e| e.level == Level::Error); + if events.len() > MAX_BATCH_SIZE { + events.truncate(MAX_BATCH_SIZE); + } + } + self.batch = events; + Ok(()) + } + Err(TrySendError::Closed(_)) => Err(ChannelError), + } + } else { + Ok(()) + } + } +} + +impl SubscriberBuilder { + pub fn new() -> Self { + Default::default() + } + + pub fn with_level(mut self, level: Level) -> Self { + self.level = level; + self + } + + pub fn with_disabled(mut self, disabled: impl IntoIterator) -> Self { + self.disabled.extend(disabled); + self + } + + pub fn with_lossy(mut self, lossy: bool) -> Self { + self.lossy = lossy; + self + } + + pub fn register(self) -> mpsc::Receiver>> { + let (tx, rx) = mpsc::channel(8192); + + SUBSCRIBER_UPDATE.lock().push(Subscriber { + level: self.level, + disabled: self.disabled, + tx, + lossy: self.lossy, + batch: Vec::new(), + }); + + // Notify collector + Event::new(EventType::Error(crate::Cause::Thread), Level::Info, 0).send(); + + rx + } +} + +impl Default for SubscriberBuilder { + fn default() -> Self { + Self { + level: Level::Info, + disabled: AHashSet::new(), + lossy: true, + } + } +}