Lock-free fast tracing (closes #180)

This commit is contained in:
mdecimus 2024-07-22 17:56:24 +02:00
parent f6ac35fd70
commit 9c23774aa5
24 changed files with 685 additions and 196 deletions

11
Cargo.lock generated
View file

@ -5284,6 +5284,12 @@ dependencies = [
"windows-sys 0.48.0", "windows-sys 0.48.0",
] ]
[[package]]
name = "rtrb"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3f94e84c073f3b85d4012b44722fa8842b9986d741590d4f2636ad0a5b14143"
[[package]] [[package]]
name = "rusqlite" name = "rusqlite"
version = "0.31.0" version = "0.31.0"
@ -6873,10 +6879,15 @@ dependencies = [
name = "trc" name = "trc"
version = "0.8.5" version = "0.8.5"
dependencies = [ dependencies = [
"ahash 0.8.11",
"arc-swap",
"base64 0.22.1", "base64 0.22.1",
"bincode", "bincode",
"parking_lot",
"reqwest 0.12.5", "reqwest 0.12.5",
"rtrb",
"serde_json", "serde_json",
"tokio",
] ]
[[package]] [[package]]

View file

@ -97,7 +97,7 @@ impl Server {
stream = listener.accept() => { stream = listener.accept() => {
match stream { match stream {
Ok((stream, remote_addr)) => { Ok((stream, remote_addr)) => {
let core = core.as_ref().load(); let core = core.as_ref().load_full();
let enable_acme = (is_https && core.has_acme_tls_providers()).then_some(core.clone()); let enable_acme = (is_https && core.has_acme_tls_providers()).then_some(core.clone());
if has_proxies && instance.proxy_networks.iter().any(|network| network.matches(&remote_addr.ip())) { if has_proxies && instance.proxy_networks.iter().any(|network| network.matches(&remote_addr.ip())) {

View file

@ -327,7 +327,7 @@ impl ConfigManager {
.fetch_config_resource(resource_id) .fetch_config_resource(resource_id)
.await .await
.map_err(|reason| { .map_err(|reason| {
trc::Cause::Fetch trc::Cause::Configuration
.caused_by(trc::location!()) .caused_by(trc::location!())
.ctx(trc::Key::Reason, reason) .ctx(trc::Key::Reason, reason)
})?; })?;
@ -492,7 +492,8 @@ impl Patterns {
Pattern::Include(MatchType::StartsWith( Pattern::Include(MatchType::StartsWith(
"authentication.fallback-admin.".to_string(), "authentication.fallback-admin.".to_string(),
)), )),
Pattern::Include(MatchType::Equal("cluster.node-id".to_string())), Pattern::Exclude(MatchType::Equal("cluster.key".to_string())),
Pattern::Include(MatchType::StartsWith("cluster.".to_string())),
Pattern::Include(MatchType::Equal("storage.data".to_string())), Pattern::Include(MatchType::Equal("storage.data".to_string())),
Pattern::Include(MatchType::Equal("storage.blob".to_string())), Pattern::Include(MatchType::Equal("storage.blob".to_string())),
Pattern::Include(MatchType::Equal("storage.lookup".to_string())), Pattern::Include(MatchType::Equal("storage.lookup".to_string())),

View file

@ -139,7 +139,7 @@ impl WebAdminManager {
.fetch_resource("webadmin") .fetch_resource("webadmin")
.await .await
.map_err(|err| { .map_err(|err| {
trc::Cause::Fetch trc::ResourceCause::Error
.caused_by(trc::location!()) .caused_by(trc::location!())
.reason(err) .reason(err)
.details("Failed to download webadmin") .details("Failed to download webadmin")

View file

@ -58,7 +58,7 @@ pub fn spawn_webhook_manager(core: SharedCore) -> mpsc::Sender<WebhookEvent> {
let event_or_timeout = tokio::time::timeout(wakeup_time, webhook_rx.recv()).await; let event_or_timeout = tokio::time::timeout(wakeup_time, webhook_rx.recv()).await;
// Load settings // Load settings
let core = core.load(); let core = core.load_full();
match event_or_timeout { match event_or_timeout {
Ok(Some(event)) => match event { Ok(Some(event)) => match event {

View file

@ -332,7 +332,7 @@ impl LdapMappings {
fn entry_to_principal(&self, entry: SearchEntry) -> Principal<String> { fn entry_to_principal(&self, entry: SearchEntry) -> Principal<String> {
let mut principal = Principal::default(); let mut principal = Principal::default();
trc::trace!(LdapQuery, Value = format!("{entry:?}")); trc::event!(LdapQuery, Value = format!("{entry:?}"));
for (attr, value) in entry.attrs { for (attr, value) in entry.attrs {
if self.attr_name.contains(&attr) { if self.attr_name.contains(&attr) {

View file

@ -59,9 +59,16 @@ impl JMAP {
} }
// Reload ACME // Reload ACME
if let Err(err) = self.inner.housekeeper_tx.send(Event::AcmeReload).await { self.inner
tracing::warn!("Failed to send ACME reload event to housekeeper: {}", err); .housekeeper_tx
} .send(Event::AcmeReload)
.await
.map_err(|err| {
trc::Cause::Thread
.reason(err)
.details("Failed to send ACME reload event to housekeeper")
.caused_by(trc::location!())
})?;
} }
Ok(JsonResponse::new(json!({ Ok(JsonResponse::new(json!({

View file

@ -510,7 +510,7 @@ impl Inner {
impl From<JmapInstance> for JMAP { impl From<JmapInstance> for JMAP {
fn from(value: JmapInstance) -> Self { fn from(value: JmapInstance) -> Self {
let shared_core = value.core.clone(); let shared_core = value.core.clone();
let core = value.core.load().clone(); let core = value.core.load_full();
JMAP { JMAP {
smtp: SMTP { smtp: SMTP {
core: core.clone(), core: core.clone(),

View file

@ -36,7 +36,7 @@ pub fn spawn_push_manager(core: JmapInstance) -> mpsc::Sender<Event> {
let event_or_timeout = tokio::time::timeout(retry_timeout, push_rx.recv()).await; let event_or_timeout = tokio::time::timeout(retry_timeout, push_rx.recv()).await;
// Load settings // Load settings
let core_ = core.core.load(); let core_ = core.core.load_full();
let push_attempt_interval = core_.jmap.push_attempt_interval; let push_attempt_interval = core_.jmap.push_attempt_interval;
let push_attempts_max = core_.jmap.push_attempts_max; let push_attempts_max = core_.jmap.push_attempts_max;
let push_retry_interval = core_.jmap.push_retry_interval; let push_retry_interval = core_.jmap.push_retry_interval;

View file

@ -74,7 +74,7 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) {
// Add all events to queue // Add all events to queue
let mut queue = Queue::default(); let mut queue = Queue::default();
{ {
let core_ = core.core.load(); let core_ = core.core.load_full();
queue.schedule( queue.schedule(
Instant::now() + core_.jmap.session_purge_frequency.time_to_next(), Instant::now() + core_.jmap.session_purge_frequency.time_to_next(),
ActionClass::Session, ActionClass::Session,
@ -128,7 +128,7 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) {
match tokio::time::timeout(queue.wake_up_time(), rx.recv()).await { match tokio::time::timeout(queue.wake_up_time(), rx.recv()).await {
Ok(Some(event)) => match event { Ok(Some(event)) => match event {
Event::AcmeReload => { Event::AcmeReload => {
let core_ = core.core.load().clone(); let core_ = core.core.load_full();
let inner = core.jmap_inner.clone(); let inner = core.jmap_inner.clone();
tokio::spawn(async move { tokio::spawn(async move {
@ -233,7 +233,7 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) {
return; return;
} }
Err(_) => { Err(_) => {
let core_ = core.core.load(); let core_ = core.core.load_full();
while let Some(event) = queue.pop() { while let Some(event) = queue.pop() {
match event.event { match event.event {
ActionClass::Acme(provider_id) => { ActionClass::Acme(provider_id) => {

View file

@ -45,20 +45,24 @@ impl JMAP {
let mut next_event = heartbeat; let mut next_event = heartbeat;
// Register with state manager // Register with state manager
let mut change_rx = if let Ok(change_rx) = self let mut change_rx = match self
.subscribe_state_manager(access_token.primary_id(), Bitmap::all()) .subscribe_state_manager(access_token.primary_id(), Bitmap::all())
.await .await
{ {
change_rx Ok(change_rx) => change_rx,
} else { Err(err) => {
let todo = "log error"; tracing::debug!(parent: &span, error = ?err, "Failed to subscribe to state manager");
let _ = stream
.send(Message::Text( let _ = stream
WebSocketRequestError::from(RequestError::internal_server_error()).to_json(), .send(Message::Text(
)) WebSocketRequestError::from(RequestError::internal_server_error())
.await; .to_json(),
return; ))
.await;
return;
}
}; };
let mut changes = WebSocketStateChange::new(None); let mut changes = WebSocketStateChange::new(None);
let mut change_types: Bitmap<DataType> = Bitmap::new(); let mut change_types: Bitmap<DataType> = Bitmap::new();

View file

@ -256,7 +256,7 @@ impl PartialOrd for SessionAddress {
impl From<SmtpInstance> for SMTP { impl From<SmtpInstance> for SMTP {
fn from(value: SmtpInstance) -> Self { fn from(value: SmtpInstance) -> Self {
SMTP { SMTP {
core: value.core.load().clone(), core: value.core.load_full(),
inner: value.inner, inner: value.inner,
} }
} }

View file

@ -31,7 +31,7 @@ impl SpawnReport for mpsc::Receiver<Event> {
loop { loop {
// Read events // Read events
let now = now(); let now = now();
let events = next_report_event(&core.core.load()).await; let events = next_report_event(&core.core.load_full()).await;
next_wake_up = events next_wake_up = events
.last() .last()
.and_then(|e| match e { .and_then(|e| match e {

View file

@ -6,7 +6,7 @@
use std::{borrow::Cow, ops::Range}; use std::{borrow::Cow, ops::Range};
use trc::AddContext; use trc::{AddContext, Cause, StoreCause};
use utils::config::utils::ParseValue; use utils::config::utils::ParseValue;
use crate::{BlobBackend, BlobStore, CompressionAlgo, Store}; use crate::{BlobBackend, BlobStore, CompressionAlgo, Store};
@ -54,7 +54,10 @@ impl BlobStore {
})? })?
} }
Some(data) => { Some(data) => {
let todo = "log"; trc::event!(
Error(Cause::Store(StoreCause::BlobMissingMarker)),
Key = key,
);
data data
} }
None => return Ok(None), None => return Ok(None),

View file

@ -35,7 +35,7 @@ impl LookupStore {
_ => Err(trc::StoreCause::NotSupported.into_err()), _ => Err(trc::StoreCause::NotSupported.into_err()),
}; };
trc::trace!( trc::event!(
SqlQuery, SqlQuery,
Query = query.to_string(), Query = query.to_string(),
Parameters = params.as_slice(), Parameters = params.as_slice(),

View file

@ -7,6 +7,7 @@
use std::fmt::Display; use std::fmt::Display;
use tokio::sync::watch; use tokio::sync::watch;
use trc::PurgeEvent;
use utils::config::cron::SimpleCron; use utils::config::cron::SimpleCron;
use crate::{BlobStore, LookupStore, Store}; use crate::{BlobStore, LookupStore, Store};
@ -27,8 +28,8 @@ pub struct PurgeSchedule {
impl PurgeSchedule { impl PurgeSchedule {
pub fn spawn(self, mut shutdown_rx: watch::Receiver<bool>) { pub fn spawn(self, mut shutdown_rx: watch::Receiver<bool>) {
trc::trace!( trc::event!(
PurgeTaskStarted, Purge(PurgeEvent::Started),
Type = self.store.as_str(), Type = self.store.as_str(),
Id = self.store_id.to_string() Id = self.store_id.to_string()
); );
@ -39,16 +40,16 @@ impl PurgeSchedule {
.await .await
.is_ok() .is_ok()
{ {
trc::trace!( trc::event!(
PurgeTaskFinished, Purge(PurgeEvent::Finished),
Type = self.store.as_str(), Type = self.store.as_str(),
Id = self.store_id.to_string() Id = self.store_id.to_string()
); );
return; return;
} }
trc::trace!( trc::event!(
PurgeTaskRunning, Purge(PurgeEvent::Running),
Type = self.store.as_str(), Type = self.store.as_str(),
Id = self.store_id.to_string() Id = self.store_id.to_string()
); );
@ -62,8 +63,8 @@ impl PurgeSchedule {
}; };
if let Err(err) = result { if let Err(err) = result {
trc::error!( trc::event!(
Purge, Purge(PurgeEvent::Error),
Type = self.store.as_str(), Type = self.store.as_str(),
Id = self.store_id.to_string(), Id = self.store_id.to_string(),
CausedBy = err CausedBy = err

View file

@ -9,6 +9,11 @@ base64 = "0.22.1"
serde_json = "1.0.120" serde_json = "1.0.120"
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-webpki-roots", "http2"]} reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-webpki-roots", "http2"]}
bincode = "1.3.3" bincode = "1.3.3"
rtrb = "0.3.1"
parking_lot = "0.12.3"
tokio = { version = "1.23", features = ["net", "macros"] }
ahash = "0.8.11"
arc-swap = "1.7.1"
[features] [features]
test_mode = [] test_mode = []

99
crates/trc/src/channel.rs Normal file
View file

@ -0,0 +1,99 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use std::{
cell::UnsafeCell,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
use parking_lot::Mutex;
use rtrb::{Consumer, Producer, PushError, RingBuffer};
use crate::{
collector::{spawn_collector, CollectorThread},
Event,
};
pub(crate) static EVENT_RXS: Mutex<Vec<Receiver>> = Mutex::new(Vec::new());
pub(crate) static EVENT_COUNT: AtomicUsize = AtomicUsize::new(0);
pub(crate) const CHANNEL_SIZE: usize = 10240;
thread_local! {
static EVENT_TX: UnsafeCell<Sender> = {
let (tx, rx) = RingBuffer::new(CHANNEL_SIZE);
EVENT_RXS.lock().push(Receiver { rx });
UnsafeCell::new(Sender {
tx,
collector: spawn_collector().clone(),
overflow: Vec::with_capacity(0),
})
};
}
pub struct Sender {
tx: Producer<Arc<Event>>,
collector: Arc<CollectorThread>,
overflow: Vec<Arc<Event>>,
}
pub struct Receiver {
rx: Consumer<Arc<Event>>,
}
#[derive(Debug)]
pub struct ChannelError;
impl Sender {
pub fn send(&mut self, event: Arc<Event>) -> Result<(), ChannelError> {
while let Some(event) = self.overflow.pop() {
if let Err(PushError::Full(event)) = self.tx.push(event) {
self.overflow.push(event);
break;
}
}
if let Err(PushError::Full(event)) = self.tx.push(event) {
if self.overflow.len() <= CHANNEL_SIZE * 2 {
self.overflow.push(event);
} else {
return Err(ChannelError);
}
}
Ok(())
}
}
impl Receiver {
pub fn try_recv(&mut self) -> Result<Option<Arc<Event>>, ChannelError> {
match self.rx.pop() {
Ok(event) => Ok(Some(event)),
Err(_) => {
if !self.rx.is_abandoned() {
Ok(None)
} else {
Err(ChannelError)
}
}
}
}
}
impl Event {
pub fn send(self) {
// SAFETY: EVENT_TX is thread-local.
let _ = EVENT_TX.try_with(|tx| unsafe {
let tx = &mut *tx.get();
if tx.send(Arc::new(self)).is_ok() {
EVENT_COUNT.fetch_add(1, Ordering::Relaxed);
tx.collector.thread().unpark();
}
});
}
}

128
crates/trc/src/collector.rs Normal file
View file

@ -0,0 +1,128 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use std::{
sync::{
atomic::{AtomicUsize, Ordering},
Arc, OnceLock,
},
thread::{park, Builder, JoinHandle},
};
use ahash::AHashMap;
use arc_swap::ArcSwap;
use crate::{
channel::{EVENT_COUNT, EVENT_RXS},
subscriber::{Subscriber, SUBSCRIBER_UPDATE},
Event, EventType, Level,
};
pub(crate) static TRACING_LEVEL: AtomicUsize = AtomicUsize::new(Level::Info as usize);
pub(crate) type CollectorThread = JoinHandle<()>;
#[derive(Default)]
pub struct Collector {
subscribers: Vec<Subscriber>,
}
impl Collector {
fn collect(&mut self) -> bool {
if EVENT_COUNT.swap(0, Ordering::Relaxed) == 0 {
park();
}
// Collect all events
let mut do_continue = true;
EVENT_RXS.lock().retain_mut(|rx| {
loop {
match rx.try_recv() {
Ok(Some(event)) => {
if !event.keys.is_empty() {
// Process events
for subscriber in self.subscribers.iter_mut() {
subscriber.push_event(event.clone());
}
} else {
// Register subscriber
let subscribers = { std::mem::take(&mut (*SUBSCRIBER_UPDATE.lock())) };
if !subscribers.is_empty() {
self.subscribers.extend(subscribers);
} else if event.level == Level::Disable {
do_continue = false;
}
}
}
Ok(None) => {
return true;
}
Err(_) => {
return false; // Channel is closed, remove.
}
}
}
});
if !self.subscribers.is_empty() {
self.subscribers
.retain_mut(|subscriber| subscriber.send_batch().is_ok());
}
do_continue
}
pub fn set_level(level: Level) {
TRACING_LEVEL.store(level as usize, Ordering::Relaxed);
}
pub fn update_custom_levels(levels: AHashMap<EventType, Level>) {
custom_levels().store(Arc::new(levels));
}
pub fn shutdown() {
Event::new(EventType::Error(crate::Cause::Thread), Level::Disable, 0).send()
}
}
pub(crate) fn spawn_collector() -> &'static Arc<CollectorThread> {
static COLLECTOR: OnceLock<Arc<CollectorThread>> = OnceLock::new();
COLLECTOR.get_or_init(|| {
Arc::new(
Builder::new()
.name("stalwart-collector".to_string())
.spawn(move || {
let mut collector = Collector::default();
while collector.collect() {}
})
.expect("Failed to start event collector"),
)
})
}
fn custom_levels() -> &'static ArcSwap<AHashMap<EventType, Level>> {
static CUSTOM_LEVELS: OnceLock<ArcSwap<AHashMap<EventType, Level>>> = OnceLock::new();
CUSTOM_LEVELS.get_or_init(|| ArcSwap::from_pointee(Default::default()))
}
impl EventType {
#[inline(always)]
pub fn effective_level(&self) -> Level {
custom_levels()
.load()
.get(self)
.copied()
.unwrap_or_else(|| self.level())
}
}
impl Level {
#[inline(always)]
pub fn is_enabled(&self) -> bool {
*self as usize >= TRACING_LEVEL.load(Ordering::Relaxed)
}
}

View file

@ -8,8 +8,8 @@ use std::{borrow::Cow, fmt::Debug};
use crate::*; use crate::*;
impl<T, const N: usize> AsRef<T> for Context<T, N> { impl AsRef<Cause> for Error {
fn as_ref(&self) -> &T { fn as_ref(&self) -> &Cause {
&self.inner &self.inner
} }
} }

View file

@ -4,36 +4,44 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/ */
use std::{borrow::Cow, fmt::Display}; use std::{borrow::Cow, cmp::Ordering, fmt::Display};
use crate::*; use crate::*;
impl<T, const N: usize> Context<T, N> impl Event {
where pub fn new(inner: EventType, level: Level, capacity: usize) -> Self {
[(Key, Value); N]: Default,
T: Eq,
{
pub fn new(inner: T) -> Self {
Self { Self {
inner, inner,
keys: Default::default(), level,
keys_size: 0, keys: Vec::with_capacity(capacity),
} }
} }
#[inline(always)] #[inline(always)]
pub fn ctx(mut self, key: Key, value: impl Into<Value>) -> Self { pub fn ctx(mut self, key: Key, value: impl Into<Value>) -> Self {
if self.keys_size < N { self.keys.push((key, value.into()));
self.keys[self.keys_size] = (key, value.into()); self
self.keys_size += 1; }
} else {
#[cfg(debug_assertions)] pub fn ctx_opt(self, key: Key, value: Option<impl Into<Value>>) -> Self {
panic!( match value {
"Context is full while inserting {:?}: {:?}", Some(value) => self.ctx(key, value),
key, None => self,
value.into()
);
} }
}
}
impl Error {
pub fn new(inner: Cause) -> Self {
Self {
inner,
keys: Vec::with_capacity(5),
}
}
#[inline(always)]
pub fn ctx(mut self, key: Key, value: impl Into<Value>) -> Self {
self.keys.push((key, value.into()));
self self
} }
@ -45,20 +53,14 @@ where
} }
#[inline(always)] #[inline(always)]
pub fn matches(&self, inner: T) -> bool { pub fn matches(&self, inner: Cause) -> bool {
self.inner == inner self.inner == inner
} }
pub fn value(&self, key: Key) -> Option<&Value> { pub fn value(&self, key: Key) -> Option<&Value> {
self.keys.iter().take(self.keys_size).find_map( self.keys
|(k, v)| { .iter()
if *k == key { .find_map(|(k, v)| if *k == key { Some(v) } else { None })
Some(v)
} else {
None
}
},
)
} }
pub fn value_as_str(&self, key: Key) -> Option<&str> { pub fn value_as_str(&self, key: Key) -> Option<&str> {
@ -66,16 +68,13 @@ where
} }
pub fn take_value(&mut self, key: Key) -> Option<Value> { pub fn take_value(&mut self, key: Key) -> Option<Value> {
self.keys self.keys.iter_mut().find_map(|(k, v)| {
.iter_mut() if *k == key {
.take(self.keys_size) Some(std::mem::take(v))
.find_map(|(k, v)| { } else {
if *k == key { None
Some(std::mem::take(v)) }
} else { })
None
}
})
} }
#[inline(always)] #[inline(always)]
@ -166,7 +165,6 @@ impl Cause {
Self::Pop3 => "POP3 error", Self::Pop3 => "POP3 error",
Self::Smtp => "SMTP error", Self::Smtp => "SMTP error",
Self::Thread => "Thread error", Self::Thread => "Thread error",
Self::Fetch => "Fetch error",
Self::Acme => "ACME error", Self::Acme => "ACME error",
Self::Dns => "DNS error", Self::Dns => "DNS error",
Self::Ingest => "Message Ingest error", Self::Ingest => "Message Ingest error",
@ -174,7 +172,6 @@ impl Cause {
Self::Limit(cause) => cause.message(), Self::Limit(cause) => cause.message(),
Self::Manage(cause) => cause.message(), Self::Manage(cause) => cause.message(),
Self::Auth(cause) => cause.message(), Self::Auth(cause) => cause.message(),
Self::Purge => "Purge error",
Self::Configuration => "Configuration error", Self::Configuration => "Configuration error",
Self::Resource(cause) => cause.message(), Self::Resource(cause) => cause.message(),
} }
@ -489,10 +486,10 @@ impl<T> AddContext<T> for Result<T> {
} }
} }
impl<T: std::fmt::Debug, const N: usize> Display for Context<T, N> { impl Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.inner)?; write!(f, "{:?}", self.inner)?;
for (key, value) in self.keys.iter().take(self.keys_size) { for (key, value) in self.keys.iter() {
write!(f, "\n {:?} = {:?}", key, value)?; write!(f, "\n {:?} = {:?}", key, value)?;
} }
Ok(()) Ok(())
@ -501,6 +498,40 @@ impl<T: std::fmt::Debug, const N: usize> Display for Context<T, N> {
impl std::error::Error for Error {} impl std::error::Error for Error {}
impl PartialOrd for Level {
#[inline(always)]
fn partial_cmp(&self, other: &Level) -> Option<Ordering> {
Some(self.cmp(other))
}
#[inline(always)]
fn lt(&self, other: &Level) -> bool {
(*other as usize) < (*self as usize)
}
#[inline(always)]
fn le(&self, other: &Level) -> bool {
(*other as usize) <= (*self as usize)
}
#[inline(always)]
fn gt(&self, other: &Level) -> bool {
(*other as usize) > (*self as usize)
}
#[inline(always)]
fn ge(&self, other: &Level) -> bool {
(*other as usize) >= (*self as usize)
}
}
impl Ord for Level {
#[inline(always)]
fn cmp(&self, other: &Self) -> Ordering {
(*other as usize).cmp(&(*self as usize))
}
}
impl PartialEq for Value { impl PartialEq for Value {
fn eq(&self, other: &Self) -> bool { fn eq(&self, other: &Self) -> bool {
match (self, other) { match (self, other) {
@ -525,14 +556,11 @@ impl PartialEq for Value {
impl Eq for Value {} impl Eq for Value {}
impl<T, const N: usize> PartialEq for Context<T, N> impl PartialEq for Error {
where
T: Eq,
{
fn eq(&self, other: &Self) -> bool { fn eq(&self, other: &Self) -> bool {
if self.inner == other.inner && self.keys_size == other.keys_size { if self.inner == other.inner && self.keys.len() == other.keys.len() {
for kv in self.keys.iter().take(self.keys_size) { for kv in self.keys.iter() {
if !other.keys.iter().take(other.keys_size).any(|okv| kv == okv) { if !other.keys.iter().any(|okv| kv == okv) {
return false; return false;
} }
} }
@ -544,4 +572,57 @@ where
} }
} }
impl<T, const N: usize> Eq for Context<T, N> where T: Eq {} impl Eq for Error {}
impl EventType {
pub fn level(&self) -> Level {
match self {
EventType::Error(error) => match error {
Cause::Store(_) => Level::Error,
Cause::Jmap(_) => Level::Debug,
Cause::Imap => Level::Debug,
Cause::ManageSieve => Level::Debug,
Cause::Pop3 => Level::Debug,
Cause::Smtp => Level::Debug,
Cause::Thread => Level::Error,
Cause::Acme => Level::Error,
Cause::Dns => Level::Error,
Cause::Ingest => Level::Error,
Cause::Network => Level::Debug,
Cause::Limit(cause) => match cause {
LimitCause::SizeRequest => Level::Debug,
LimitCause::SizeUpload => Level::Debug,
LimitCause::CallsIn => Level::Debug,
LimitCause::ConcurrentRequest => Level::Debug,
LimitCause::ConcurrentUpload => Level::Debug,
LimitCause::Quota => Level::Debug,
LimitCause::BlobQuota => Level::Debug,
LimitCause::TooManyRequests => Level::Warn,
},
Cause::Manage(_) => Level::Debug,
Cause::Auth(cause) => match cause {
AuthCause::Failed => Level::Debug,
AuthCause::MissingTotp => Level::Trace,
AuthCause::TooManyAttempts => Level::Warn,
AuthCause::Banned => Level::Warn,
AuthCause::Error => Level::Error,
},
Cause::Configuration => Level::Error,
Cause::Resource(cause) => match cause {
ResourceCause::NotFound => Level::Debug,
ResourceCause::BadParameters => Level::Error,
ResourceCause::Error => Level::Error,
},
},
EventType::NewConnection => Level::Info,
EventType::SqlQuery => Level::Trace,
EventType::LdapQuery => Level::Trace,
EventType::Purge(event) => match event {
PurgeEvent::Started => Level::Debug,
PurgeEvent::Finished => Level::Debug,
PurgeEvent::Running => Level::Info,
PurgeEvent::Error => Level::Error,
},
}
}
}

View file

@ -4,18 +4,27 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/ */
pub mod channel;
pub mod collector;
pub mod conv; pub mod conv;
pub mod imple; pub mod imple;
pub mod macros; pub mod macros;
pub mod subscriber;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
pub type Error = Context<Cause, ERROR_CONTEXT_SIZE>;
pub type Trace = Context<Event, TRACE_CONTEXT_SIZE>;
const ERROR_CONTEXT_SIZE: usize = 5; #[derive(Copy, Clone, Debug, Hash, Eq, PartialEq)]
const TRACE_CONTEXT_SIZE: usize = 10; #[repr(usize)]
pub enum Level {
Disable = 0,
Trace = 1,
Debug = 2,
Info = 3,
Warn = 4,
Error = 5,
}
#[derive(Debug, Default, Clone)] #[derive(Debug, Default, Clone)]
pub enum Value { pub enum Value {
@ -62,18 +71,24 @@ pub enum Key {
AccountId, AccountId,
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Event { pub enum EventType {
NewConnection, NewConnection,
Error(Cause), Error(Cause),
SqlQuery, SqlQuery,
LdapQuery, LdapQuery,
PurgeTaskStarted, Purge(PurgeEvent),
PurgeTaskRunning,
PurgeTaskFinished,
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum PurgeEvent {
Started,
Finished,
Running,
Error,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Cause { pub enum Cause {
Store(StoreCause), Store(StoreCause),
Jmap(JmapCause), Jmap(JmapCause),
@ -82,7 +97,6 @@ pub enum Cause {
Pop3, Pop3,
Smtp, Smtp,
Thread, Thread,
Fetch,
Acme, Acme,
Dns, Dns,
Ingest, Ingest,
@ -90,100 +104,99 @@ pub enum Cause {
Limit(LimitCause), Limit(LimitCause),
Manage(ManageCause), Manage(ManageCause),
Auth(AuthCause), Auth(AuthCause),
Purge,
Configuration, Configuration,
Resource(ResourceCause), Resource(ResourceCause),
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum StoreCause { pub enum StoreCause {
AssertValue, AssertValue = 0,
BlobMissingMarker, BlobMissingMarker = 1,
FoundationDB, FoundationDB = 2,
MySQL, MySQL = 3,
PostgreSQL, PostgreSQL = 4,
RocksDB, RocksDB = 5,
SQLite, SQLite = 6,
Ldap, Ldap = 7,
ElasticSearch, ElasticSearch = 8,
Redis, Redis = 9,
S3, S3 = 10,
Filesystem, Filesystem = 11,
Pool, Pool = 12,
DataCorruption, DataCorruption = 13,
Decompress, Decompress = 14,
Deserialize, Deserialize = 15,
NotFound, NotFound = 16,
NotConfigured, NotConfigured = 17,
NotSupported, NotSupported = 18,
Unexpected, Unexpected = 19,
Crypto, Crypto = 20,
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum JmapCause { pub enum JmapCause {
// Method errors // Method errors
InvalidArguments, InvalidArguments = 0,
RequestTooLarge, RequestTooLarge = 1,
StateMismatch, StateMismatch = 2,
AnchorNotFound, AnchorNotFound = 3,
UnsupportedFilter, UnsupportedFilter = 4,
UnsupportedSort, UnsupportedSort = 5,
UnknownMethod, UnknownMethod = 6,
InvalidResultReference, InvalidResultReference = 7,
Forbidden, Forbidden = 8,
AccountNotFound, AccountNotFound = 9,
AccountNotSupportedByMethod, AccountNotSupportedByMethod = 10,
AccountReadOnly, AccountReadOnly = 11,
NotFound, NotFound = 12,
CannotCalculateChanges, CannotCalculateChanges = 13,
UnknownDataType, UnknownDataType = 14,
// Request errors // Request errors
UnknownCapability, UnknownCapability = 15,
NotJSON, NotJSON = 16,
NotRequest, NotRequest = 17,
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum LimitCause { pub enum LimitCause {
SizeRequest, SizeRequest = 0,
SizeUpload, SizeUpload = 1,
CallsIn, CallsIn = 2,
ConcurrentRequest, ConcurrentRequest = 3,
ConcurrentUpload, ConcurrentUpload = 4,
Quota, Quota = 5,
BlobQuota, BlobQuota = 6,
TooManyRequests, TooManyRequests = 7,
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ManageCause { pub enum ManageCause {
MissingParameter, MissingParameter = 0,
AlreadyExists, AlreadyExists = 1,
AssertFailed, AssertFailed = 2,
NotFound, NotFound = 3,
NotSupported, NotSupported = 4,
Error, Error = 5,
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum AuthCause { pub enum AuthCause {
Failed, Failed = 0,
MissingTotp, MissingTotp = 1,
TooManyAttempts, TooManyAttempts = 2,
Banned, Banned = 3,
Error, Error = 4,
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ResourceCause { pub enum ResourceCause {
NotFound, NotFound = 0,
BadParameters, BadParameters = 1,
Error, Error = 2,
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Protocol { pub enum Protocol {
Jmap, Jmap,
Imap, Imap,
@ -194,10 +207,16 @@ pub enum Protocol {
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Context<T, const N: usize> { pub struct Error {
inner: T, inner: Cause,
keys: [(Key, Value); N], keys: Vec<(Key, Value)>,
keys_size: usize, }
#[derive(Debug, Clone)]
pub struct Event {
inner: EventType,
level: Level,
keys: Vec<(Key, Value)>,
} }
pub trait AddContext<T> { pub trait AddContext<T> {

View file

@ -4,31 +4,51 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/ */
#[macro_export] // Helper macro to count the number of arguments
macro_rules! trace {
($event:ident $(, $key:ident = $value:expr)* $(,)?) => {
{
let event = $crate::Trace::new($crate::Event::$event)
$(
.ctx($crate::Key::$key, $crate::Value::from($value))
)* ;
//eprintln!("{}", event);
}
};
}
#[macro_export] #[macro_export]
macro_rules! error { macro_rules! event {
($cause:ident $(, $key:ident = $value:expr)* $(,)?) => {{ ($event:ident($($param:expr),* $(,)?) $(, $key:ident = $value:expr)* $(,)?) => {
let event = $crate::Trace::new($crate::Event::Error($crate::Cause::$cause)) {
.ctx($crate::Key::CausedBy, $crate::location!()) let et = $crate::EventType::$event($($param),*);
$( let level = et.effective_level();
.ctx($crate::Key::$key, $crate::Value::from($value)) if level.is_enabled() {
)* ; $crate::Event::new(
et,
level,
trc::__count!($($key)*)
)
$(
.ctx($crate::Key::$key, $crate::Value::from($value))
)*
.send();
}
}
};
//eprintln!("{}", event); ($event:ident $(, $key:ident = $value:expr)* $(,)?) => {
}}; {
let et = $crate::EventType::$event;
let level = et.effective_level();
if level.is_enabled() {
$crate::Event::new(
et,
level,
trc::__count!($($key)*)
)
$(
.ctx($crate::Key::$key, $crate::Value::from($value))
)*
.send();
}
}
};
}
#[macro_export]
macro_rules! __count {
() => (0usize);
($head:tt $($tail:tt)*) => (1usize + trc::__count!($($tail)*));
} }
#[macro_export] #[macro_export]

View file

@ -0,0 +1,110 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use std::sync::Arc;
use ahash::AHashSet;
use parking_lot::Mutex;
use tokio::sync::mpsc::{self, error::TrySendError};
use crate::{channel::ChannelError, Event, EventType, Level};
const MAX_BATCH_SIZE: usize = 32768;
pub(crate) static SUBSCRIBER_UPDATE: Mutex<Vec<Subscriber>> = Mutex::new(Vec::new());
#[derive(Debug)]
pub(crate) struct Subscriber {
pub level: Level,
pub disabled: AHashSet<EventType>,
pub tx: mpsc::Sender<Vec<Arc<Event>>>,
pub lossy: bool,
pub batch: Vec<Arc<Event>>,
}
pub struct SubscriberBuilder {
pub level: Level,
pub disabled: AHashSet<EventType>,
pub lossy: bool,
}
impl Subscriber {
#[inline(always)]
pub fn push_event(&mut self, trace: Arc<Event>) {
if trace.level >= self.level && !self.disabled.contains(&trace.inner) {
self.batch.push(trace);
}
}
pub fn send_batch(&mut self) -> Result<(), ChannelError> {
if !self.batch.is_empty() {
match self.tx.try_send(std::mem::take(&mut self.batch)) {
Ok(_) => Ok(()),
Err(TrySendError::Full(mut events)) => {
if self.lossy && events.len() > MAX_BATCH_SIZE {
events.retain(|e| e.level == Level::Error);
if events.len() > MAX_BATCH_SIZE {
events.truncate(MAX_BATCH_SIZE);
}
}
self.batch = events;
Ok(())
}
Err(TrySendError::Closed(_)) => Err(ChannelError),
}
} else {
Ok(())
}
}
}
impl SubscriberBuilder {
pub fn new() -> Self {
Default::default()
}
pub fn with_level(mut self, level: Level) -> Self {
self.level = level;
self
}
pub fn with_disabled(mut self, disabled: impl IntoIterator<Item = EventType>) -> Self {
self.disabled.extend(disabled);
self
}
pub fn with_lossy(mut self, lossy: bool) -> Self {
self.lossy = lossy;
self
}
pub fn register(self) -> mpsc::Receiver<Vec<Arc<Event>>> {
let (tx, rx) = mpsc::channel(8192);
SUBSCRIBER_UPDATE.lock().push(Subscriber {
level: self.level,
disabled: self.disabled,
tx,
lossy: self.lossy,
batch: Vec::new(),
});
// Notify collector
Event::new(EventType::Error(crate::Cause::Thread), Level::Info, 0).send();
rx
}
}
impl Default for SubscriberBuilder {
fn default() -> Self {
Self {
level: Level::Info,
disabled: AHashSet::new(),
lossy: true,
}
}
}