diff --git a/Cargo.lock b/Cargo.lock index 86936629..9952666b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6615,6 +6615,7 @@ dependencies = [ "parking_lot", "reqwest 0.12.5", "rtrb", + "serde", "serde_json", "tokio", ] diff --git a/crates/common/src/config/tracers.rs b/crates/common/src/config/tracers.rs index 4453ecbc..6c378fd3 100644 --- a/crates/common/src/config/tracers.rs +++ b/crates/common/src/config/tracers.rs @@ -12,8 +12,8 @@ use hyper::{ header::{HeaderName, HeaderValue, AUTHORIZATION, CONTENT_TYPE}, HeaderMap, }; -use trc::{subscriber::Interests, EventType, Level}; -use utils::config::Config; +use trc::{subscriber::Interests, EventType, Level, TracingEvent}; +use utils::config::{utils::ParseValue, Config}; #[derive(Debug)] pub struct Tracer { @@ -61,6 +61,7 @@ pub struct WebhookTracer { pub key: String, pub timeout: Duration, pub throttle: Duration, + pub discard_after: Duration, pub tls_allow_invalid_certs: bool, pub headers: HeaderMap, } @@ -244,7 +245,7 @@ impl Tracers { // Create tracer let mut tracer = Tracer { - id: id.to_string(), + id: format!("t_{id}"), interests: Default::default(), lossy: config .property_or_default(("tracer", id, "lossy"), "false") @@ -264,6 +265,21 @@ impl Tracers { // Parse disabled events let mut disabled_events = AHashSet::new(); + match &tracer.typ { + TracerType::Console(_) => (), + TracerType::Log(_) => { + disabled_events.insert(EventType::Tracing(TracingEvent::LogError)); + } + TracerType::Otel(_) => { + disabled_events.insert(EventType::Tracing(TracingEvent::OtelError)); + } + TracerType::Webhook(_) => { + disabled_events.insert(EventType::Tracing(TracingEvent::WebhookError)); + } + TracerType::Journal => { + disabled_events.insert(EventType::Tracing(TracingEvent::JournalError)); + } + } for (_, event_type) in config.properties::(("tracer", id, "disabled-events")) { disabled_events.insert(event_type); @@ -379,7 +395,7 @@ fn parse_webhook( // Build tracer let mut tracer = Tracer { - id: id.to_string(), + id: format!("w_{id}"), interests: Default::default(), lossy: config .property_or_default(("webhook", id, "lossy"), "false") @@ -400,13 +416,55 @@ fn parse_webhook( throttle: config .property_or_default(("webhook", id, "throttle"), "1s") .unwrap_or_else(|| Duration::from_secs(1)), + discard_after: config + .property_or_default(("webhook", id, "discard-after"), "5m") + .unwrap_or_else(|| Duration::from_secs(300)), }), }; // Parse webhook events - for (_, event_type) in config.properties::(("webhook", id, "events")) { - tracer.interests.set(event_type); - global_interests.set(event_type); + let event_names = EventType::variants() + .into_iter() + .filter_map(|e| { + if e != EventType::Tracing(TracingEvent::WebhookError) { + Some((e, e.name())) + } else { + None + } + }) + .collect::>(); + for (_, event_type) in config.properties::(("webhook", id, "events")) { + match event_type { + EventOrMany::Event(event_type) => { + if event_type != EventType::Tracing(TracingEvent::WebhookError) { + tracer.interests.set(event_type); + global_interests.set(event_type); + } + } + EventOrMany::StartsWith(value) => { + for (event_type, name) in event_names.iter() { + if name.starts_with(&value) { + tracer.interests.set(*event_type); + global_interests.set(*event_type); + } + } + } + EventOrMany::EndsWith(value) => { + for (event_type, name) in event_names.iter() { + if name.ends_with(&value) { + tracer.interests.set(*event_type); + global_interests.set(*event_type); + } + } + } + EventOrMany::All => { + for (event_type, _) in event_names.iter() { + tracer.interests.set(*event_type); + global_interests.set(*event_type); + } + break; + } + } } if !tracer.interests.is_empty() { @@ -416,3 +474,25 @@ fn parse_webhook( None } } + +enum EventOrMany { + Event(EventType), + StartsWith(String), + EndsWith(String), + All, +} + +impl ParseValue for EventOrMany { + fn parse_value(value: &str) -> Result { + let value = value.trim(); + if value == "*" { + Ok(EventOrMany::All) + } else if let Some(suffix) = value.strip_prefix("*") { + Ok(EventOrMany::EndsWith(suffix.to_string())) + } else if let Some(prefix) = value.strip_suffix("*") { + Ok(EventOrMany::StartsWith(prefix.to_string())) + } else { + EventType::parse_value(value).map(EventOrMany::Event) + } + } +} diff --git a/crates/common/src/tracing/log.rs b/crates/common/src/tracing/log.rs index dd0eef7b..11fb1741 100644 --- a/crates/common/src/tracing/log.rs +++ b/crates/common/src/tracing/log.rs @@ -13,10 +13,10 @@ use tokio::{ fs::{File, OpenOptions}, io::BufWriter, }; -use trc::{fmt::FmtWriter, subscriber::SubscriberBuilder, ServerEvent}; +use trc::{fmt::FmtWriter, subscriber::SubscriberBuilder, TracingEvent}; pub(crate) fn spawn_log_tracer(builder: SubscriberBuilder, settings: LogTracer) { - let mut tx = builder.register(); + let (_, mut rx) = builder.register(); tokio::spawn(async move { if let Some(writer) = settings.build_writer().await { let mut buf = FmtWriter::new(writer) @@ -24,13 +24,13 @@ pub(crate) fn spawn_log_tracer(builder: SubscriberBuilder, settings: LogTracer) .with_multiline(settings.multiline); let mut roatation_timestamp = settings.next_rotation(); - while let Some(events) = tx.recv().await { + while let Some(events) = rx.recv().await { for event in events { // Check if we need to rotate the log file if roatation_timestamp != 0 && event.inner.timestamp > roatation_timestamp { if let Err(err) = buf.flush().await { trc::event!( - Server(ServerEvent::TracingError), + Tracing(TracingEvent::LogError), Reason = err.to_string(), Details = "Failed to flush log buffer" ); @@ -46,7 +46,7 @@ pub(crate) fn spawn_log_tracer(builder: SubscriberBuilder, settings: LogTracer) if let Err(err) = buf.write(&event).await { trc::event!( - Server(ServerEvent::TracingError), + Tracing(TracingEvent::LogError), Reason = err.to_string(), Details = "Failed to write event to log" ); @@ -56,7 +56,7 @@ pub(crate) fn spawn_log_tracer(builder: SubscriberBuilder, settings: LogTracer) if let Err(err) = buf.flush().await { trc::event!( - Server(ServerEvent::TracingError), + Tracing(TracingEvent::LogError), Reason = err.to_string(), Details = "Failed to flush log buffer" ); @@ -105,7 +105,7 @@ impl LogTracer { Ok(writer) => Some(BufWriter::new(writer)), Err(err) => { trc::event!( - Server(ServerEvent::TracingError), + Tracing(TracingEvent::LogError), Details = "Failed to create log file", Path = path.to_string_lossy().into_owned(), Reason = err.to_string(), diff --git a/crates/common/src/tracing/mod.rs b/crates/common/src/tracing/mod.rs index fccb9fd8..6652806f 100644 --- a/crates/common/src/tracing/mod.rs +++ b/crates/common/src/tracing/mod.rs @@ -6,10 +6,12 @@ pub mod log; pub mod stdout; +pub mod webhook; use log::spawn_log_tracer; use stdout::spawn_console_tracer; use trc::{collector::Collector, subscriber::SubscriberBuilder}; +use webhook::spawn_webhook_tracer; use crate::config::tracers::{ConsoleTracer, TracerType, Tracers}; @@ -82,7 +84,7 @@ impl Tracers { }, ); - Collector::set_interests(interests); + Collector::union_interests(interests); Collector::reload(); } } @@ -92,8 +94,8 @@ impl TracerType { match self { TracerType::Console(settings) => spawn_console_tracer(builder, settings), TracerType::Log(settings) => spawn_log_tracer(builder, settings), + TracerType::Webhook(settings) => spawn_webhook_tracer(builder, settings), TracerType::Otel(_) => todo!(), - TracerType::Webhook(_) => todo!(), TracerType::Journal => todo!(), } } diff --git a/crates/common/src/tracing/stdout.rs b/crates/common/src/tracing/stdout.rs index 2cab1fdd..22ec5c7f 100644 --- a/crates/common/src/tracing/stdout.rs +++ b/crates/common/src/tracing/stdout.rs @@ -16,13 +16,13 @@ use tokio::io::AsyncWrite; use trc::{fmt::FmtWriter, subscriber::SubscriberBuilder}; pub(crate) fn spawn_console_tracer(builder: SubscriberBuilder, settings: ConsoleTracer) { - let mut tx = builder.register(); + let (_, mut rx) = builder.register(); tokio::spawn(async move { let mut buf = FmtWriter::new(StdErrWriter::default()) .with_ansi(settings.ansi) .with_multiline(settings.multiline); - while let Some(events) = tx.recv().await { + while let Some(events) = rx.recv().await { for event in events { let _ = buf.write(&event).await; diff --git a/crates/common/src/tracing/webhook.rs b/crates/common/src/tracing/webhook.rs index c7e0450f..e45024e7 100644 --- a/crates/common/src/tracing/webhook.rs +++ b/crates/common/src/tracing/webhook.rs @@ -5,85 +5,60 @@ */ use std::{ - sync::Arc, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, time::{Duration, Instant}, }; -use crate::{SharedCore, IPC_CHANNEL_BUFFER}; -use ahash::AHashMap; +use crate::config::tracers::WebhookTracer; use base64::{engine::general_purpose::STANDARD, Engine}; -use chrono::Utc; use ring::hmac; +use serde::Serialize; +use store::write::now; use tokio::sync::mpsc; -use utils::snowflake::SnowflakeIdGenerator; - -pub enum WebhookEvent { - Send { - typ: WebhookType, - payload: Arc, - }, - Success { - webhook_id: u64, - }, - Retry { - webhook_id: u64, - events: WebhookEvents, - }, - Stop, -} +use trc::{ + subscriber::{EventBatch, SubscriberBuilder}, + ServerEvent, TracingEvent, +}; pub const LONG_SLUMBER: Duration = Duration::from_secs(60 * 60 * 24 * 365); -struct PendingEvents { - next_delivery: Instant, - pending_events: WebhookEvents, - retry_num: u32, - in_flight: bool, -} - -pub fn spawn_webhook_manager(core: SharedCore) -> mpsc::Sender { - let (webhook_tx, mut webhook_rx) = mpsc::channel(IPC_CHANNEL_BUFFER); - - let webhook_tx_ = webhook_tx.clone(); - +pub(crate) fn spawn_webhook_tracer(builder: SubscriberBuilder, settings: WebhookTracer) { + let (tx, mut rx) = builder.register(); tokio::spawn(async move { + let settings = Arc::new(settings); let mut wakeup_time = LONG_SLUMBER; - let mut pending_events: AHashMap = AHashMap::new(); - let id_generator = SnowflakeIdGenerator::new(); + let discard_after = settings.discard_after.as_secs(); + let mut pending_events = Vec::new(); + let mut next_delivery = Instant::now(); + let in_flight = Arc::new(AtomicBool::new(false)); loop { // Wait for the next event or timeout - let event_or_timeout = tokio::time::timeout(wakeup_time, webhook_rx.recv()).await; - - // Load settings - let core = core.load_full(); + let event_or_timeout = tokio::time::timeout(wakeup_time, rx.recv()).await; + let now = now(); match event_or_timeout { - Ok(Some(event)) => match event { - WebhookEvent::Send { typ, payload } => { - for (webhook_id, webhook) in &core.web_hooks.hooks { - if webhook.events.contains(&typ) { - pending_events.entry(*webhook_id).or_default().push( - super::WebhookEvent { - id: id_generator.generate().unwrap_or_default(), - created_at: Utc::now(), - typ, - data: payload.clone(), - }, - ); - } + Ok(Some(events)) => { + let mut discard_count = 0; + for event in events { + if now.saturating_sub(event.inner.timestamp) < discard_after { + pending_events.push(event) + } else { + discard_count += 1; } } - WebhookEvent::Success { webhook_id } => { - if let Some(pending_events) = pending_events.get_mut(&webhook_id) { - pending_events.success(); - } + + if discard_count > 0 { + trc::event!( + Tracing(TracingEvent::WebhookError), + Details = "Discarded stale events", + Count = discard_count + ); } - WebhookEvent::Retry { webhook_id, events } => { - pending_events.entry(webhook_id).or_default().retry(events); - } - WebhookEvent::Stop => break, - }, + } Ok(None) => { break; } @@ -91,86 +66,78 @@ pub fn spawn_webhook_manager(core: SharedCore) -> mpsc::Sender { } // Process events - let mut delete_ids = Vec::new(); let mut next_retry = None; - for (webhook_id, events) in &mut pending_events { - if let Some(webhook) = core.web_hooks.hooks.get(webhook_id) { - if events.next_delivery <= Instant::now() { - if !events.is_empty() { - events.next_delivery = Instant::now() + webhook.throttle; - if !events.in_flight { - events.in_flight = true; - spawn_webhook_handler( - webhook.clone(), - events.take_events(), - webhook_tx.clone(), - ); - } - } else { - // No more events for webhook - delete_ids.push(*webhook_id); - } - } else if !events.is_empty() { - // Retry later - let this_retry = events.next_delivery - Instant::now(); - match next_retry { - Some(next_retry) if this_retry >= next_retry => {} - _ => { - next_retry = Some(this_retry); - } - } + let now = Instant::now(); + if next_delivery <= now { + if !pending_events.is_empty() { + next_delivery = now + settings.throttle; + if !in_flight.load(Ordering::Relaxed) { + spawn_webhook_handler( + settings.clone(), + in_flight.clone(), + std::mem::take(&mut pending_events), + tx.clone(), + ); + } + } + } else if !pending_events.is_empty() { + // Retry later + let this_retry = next_delivery - now; + match next_retry { + Some(next_retry) if this_retry >= next_retry => {} + _ => { + next_retry = Some(this_retry); } - } else { - delete_ids.push(*webhook_id); } } wakeup_time = next_retry.unwrap_or(LONG_SLUMBER); - - // Delete removed or empty webhooks - for webhook_id in delete_ids { - pending_events.remove(&webhook_id); - } } }); +} - webhook_tx_ +#[derive(Serialize)] +struct EventWrapper { + events: EventBatch, } fn spawn_webhook_handler( - webhook: Arc, - events: WebhookEvents, - webhook_tx: mpsc::Sender, + settings: Arc, + in_flight: Arc, + events: EventBatch, + webhook_tx: mpsc::Sender, ) { tokio::spawn(async move { - let response = match post_webhook_events(&webhook, &events).await { - Ok(_) => WebhookEvent::Success { - webhook_id: webhook.id, - }, - Err(err) => { - //trc::event!("Failed to post webhook events: {}", err); - WebhookEvent::Retry { - webhook_id: webhook.id, - events, - } - } - }; + in_flight.store(true, Ordering::Relaxed); + let wrapper = EventWrapper { events }; - // Notify manager - if let Err(err) = webhook_tx.send(response).await { - //trc::event!("Failed to send webhook event: {}", err); + if let Err(err) = post_webhook_events(&settings, &wrapper).await { + trc::event!(Tracing(TracingEvent::WebhookError), Details = err); + + if webhook_tx.send(wrapper.events).await.is_err() { + trc::event!( + Server(ServerEvent::ThreadError), + Details = "Failed to send failed webhook events back to main thread", + CausedBy = trc::location!() + ); + } } + + in_flight.store(false, Ordering::Relaxed); }); } -async fn post_webhook_events(webhook: &Webhook, events: &WebhookEvents) -> Result<(), String> { +async fn post_webhook_events( + settings: &WebhookTracer, + events: &EventWrapper, +) -> Result<(), String> { // Serialize body let body = serde_json::to_string(events) .map_err(|err| format!("Failed to serialize events: {}", err))?; // Add HMAC-SHA256 signature - let mut headers = webhook.headers.clone(); - if !webhook.key.is_empty() { - let key = hmac::Key::new(hmac::HMAC_SHA256, webhook.key.as_bytes()); + let mut headers = settings.headers.clone(); + if !settings.key.is_empty() { + let key = hmac::Key::new(hmac::HMAC_SHA256, settings.key.as_bytes()); let tag = hmac::sign(&key, body.as_bytes()); headers.insert( @@ -181,69 +148,25 @@ async fn post_webhook_events(webhook: &Webhook, events: &WebhookEvents) -> Resul // Send request let response = reqwest::Client::builder() - .timeout(webhook.timeout) - .danger_accept_invalid_certs(webhook.tls_allow_invalid_certs) + .timeout(settings.timeout) + .danger_accept_invalid_certs(settings.tls_allow_invalid_certs) .build() .map_err(|err| format!("Failed to create HTTP client: {}", err))? - .post(&webhook.url) + .post(&settings.url) .headers(headers) .body(body) .send() .await - .map_err(|err| format!("Webhook request to {} failed: {err}", webhook.url))?; + .map_err(|err| format!("Webhook request to {} failed: {err}", settings.url))?; if response.status().is_success() { Ok(()) } else { Err(format!( "Webhook request to {} failed with code {}: {}", - webhook.url, + settings.url, response.status().as_u16(), response.status().canonical_reason().unwrap_or("Unknown") )) } } - -impl Default for PendingEvents { - fn default() -> Self { - Self { - next_delivery: Instant::now(), - pending_events: WebhookEvents::default(), - retry_num: 0, - in_flight: false, - } - } -} - -impl PendingEvents { - pub fn push(&mut self, event: super::WebhookEvent) { - self.pending_events.events.push(event); - } - - pub fn success(&mut self) { - self.in_flight = false; - self.retry_num = 0; - } - - pub fn retry(&mut self, events: WebhookEvents) { - // Backoff - self.next_delivery = Instant::now() + Duration::from_secs(2u64.pow(self.retry_num)); - self.retry_num += 1; - self.in_flight = false; - - for event in events.events { - // Drop failed events older than 5 minutes - if event.created_at + Duration::from_secs(5 * 60) >= Utc::now() { - self.pending_events.events.push(event); - } - } - } - - pub fn is_empty(&self) -> bool { - self.pending_events.events.is_empty() - } - - pub fn take_events(&mut self) -> WebhookEvents { - std::mem::take(&mut self.pending_events) - } -} diff --git a/crates/jmap/src/lib.rs b/crates/jmap/src/lib.rs index c6f12906..17b16a57 100644 --- a/crates/jmap/src/lib.rs +++ b/crates/jmap/src/lib.rs @@ -140,7 +140,7 @@ impl JMAP { if let Err(err) = inner.webadmin.unpack(&core.load().storage.blob).await { trc::event!( Resource(trc::ResourceEvent::Error), - Reason = err.to_string(), + Reason = err, Details = "Failed to unpack webadmin bundle" ); } diff --git a/crates/trc/Cargo.toml b/crates/trc/Cargo.toml index ddc609f4..c40ad144 100644 --- a/crates/trc/Cargo.toml +++ b/crates/trc/Cargo.toml @@ -9,6 +9,7 @@ event_macro = { path = "./event-macro" } mail-auth = { version = "0.4" } mail-parser = { version = "0.9", features = ["full_encoding", "ludicrous_mode"] } base64 = "0.22.1" +serde = "1.0" serde_json = "1.0.120" reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-webpki-roots", "http2"]} bincode = "1.3.3" diff --git a/crates/trc/src/bitset.rs b/crates/trc/src/bitset.rs index 5b83b406..30890277 100644 --- a/crates/trc/src/bitset.rs +++ b/crates/trc/src/bitset.rs @@ -54,6 +54,13 @@ impl AtomicBitset { } } + pub fn union(&self, bitset: impl AsRef>) { + let bitset = bitset.as_ref(); + for i in 0..N { + self.0[i].fetch_or(bitset.0[i], Ordering::Relaxed); + } + } + pub fn clear_all(&self) { for i in 0..N { self.0[i].store(0, Ordering::Relaxed); diff --git a/crates/trc/src/collector.rs b/crates/trc/src/collector.rs index 6002a8a8..85172cc7 100644 --- a/crates/trc/src/collector.rs +++ b/crates/trc/src/collector.rs @@ -17,7 +17,7 @@ use crate::{ bitset::{AtomicBitset, USIZE_BITS}, channel::{EVENT_COUNT, EVENT_RXS}, subscriber::{Interests, Subscriber}, - DeliveryEvent, Event, EventDetails, EventType, Level, NetworkEvent, ServerEvent, + DeliveryEvent, Event, EventDetails, EventType, Level, NetworkEvent, TracingEvent, TOTAL_EVENT_COUNT, }; @@ -58,7 +58,7 @@ const EV_CONN_START: usize = EventType::Network(NetworkEvent::ConnectionStart).i const EV_CONN_END: usize = EventType::Network(NetworkEvent::ConnectionEnd).id(); const EV_ATTEMPT_START: usize = EventType::Delivery(DeliveryEvent::AttemptStart).id(); const EV_ATTEMPT_END: usize = EventType::Delivery(DeliveryEvent::AttemptEnd).id(); -const EV_COLLECTOR_UPDATE: usize = EventType::Server(ServerEvent::CollectorUpdate).id(); +const EV_COLLECTOR_UPDATE: usize = EventType::Tracing(TracingEvent::Update).id(); const STALE_SPAN_CHECK_WATERMARK: usize = 8000; const SPAN_MAX_HOLD: u64 = 86400; @@ -119,7 +119,12 @@ impl Collector { .remove(&event.span_id().expect("Missing span ID")) .is_none() { - debug_assert!(false, "Unregistered span ID: {event:?}"); + #[cfg(debug_assertions)] + { + if event.span_id().unwrap() != 0 { + panic!("Unregistered span ID: {event:?}"); + } + } } Arc::new(event) } @@ -136,7 +141,12 @@ impl Collector { if let Some(span) = self.active_spans.get(&span_id) { event.inner.span = Some(span.clone()); } else { - debug_assert!(false, "Unregistered span ID: {event:?}"); + #[cfg(debug_assertions)] + { + if span_id != 0 { + panic!("Unregistered span ID: {event:?}"); + } + } } } @@ -226,6 +236,10 @@ impl Collector { INTERESTS.update(interests); } + pub fn union_interests(interests: Interests) { + INTERESTS.union(interests); + } + pub fn enable_event(event: impl Into) { INTERESTS.set(event); } @@ -271,7 +285,7 @@ impl Collector { } pub fn reload() { - Event::new(EventType::Server(ServerEvent::CollectorUpdate)).send() + Event::new(EventType::Tracing(TracingEvent::Update)).send() } } diff --git a/crates/trc/src/imple.rs b/crates/trc/src/imple.rs index c89f7d49..791e2bbf 100644 --- a/crates/trc/src/imple.rs +++ b/crates/trc/src/imple.rs @@ -1003,10 +1003,7 @@ impl EventType { ServerEvent::Startup | ServerEvent::Shutdown | ServerEvent::Licensing => { Level::Info } - ServerEvent::StartupError - | ServerEvent::ThreadError - | ServerEvent::TracingError => Level::Error, - ServerEvent::CollectorUpdate => Level::Disable, + ServerEvent::StartupError | ServerEvent::ThreadError => Level::Error, }, EventType::Acme(event) => match event { AcmeEvent::DnsRecordCreated @@ -1245,6 +1242,10 @@ impl EventType { | OutgoingReportEvent::SubmissionError | OutgoingReportEvent::NoRecipientsFound => Level::Info, }, + EventType::Tracing(event) => match event { + TracingEvent::Update => Level::Disable, + _ => Level::Error, + }, } } } diff --git a/crates/trc/src/lib.rs b/crates/trc/src/lib.rs index bb56816a..e012bce1 100644 --- a/crates/trc/src/lib.rs +++ b/crates/trc/src/lib.rs @@ -11,6 +11,7 @@ pub mod conv; pub mod fmt; pub mod imple; pub mod macros; +pub mod serializer; pub mod subscriber; use std::{ @@ -68,7 +69,7 @@ pub enum Value { None, } -#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash)] #[camel_names] pub enum Key { #[default] @@ -205,6 +206,7 @@ pub enum EventType { MtaSts(MtaStsEvent), IncomingReport(IncomingReportEvent), OutgoingReport(OutgoingReportEvent), + Tracing(TracingEvent), } #[event_type] @@ -656,9 +658,16 @@ pub enum ServerEvent { Shutdown, StartupError, ThreadError, - TracingError, Licensing, - CollectorUpdate, +} + +#[event_type] +pub enum TracingEvent { + Update, + LogError, + WebhookError, + OtelError, + JournalError, } #[event_type] diff --git a/crates/trc/src/serializer.rs b/crates/trc/src/serializer.rs new file mode 100644 index 00000000..4b4d3e90 --- /dev/null +++ b/crates/trc/src/serializer.rs @@ -0,0 +1,107 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use ahash::AHashSet; +use mail_parser::DateTime; +use serde::{ser::SerializeMap, Serialize, Serializer}; + +use crate::{Event, EventDetails, EventType, Key, Value}; + +struct Keys<'x> { + keys: &'x [(Key, Value)], + span_keys: &'x [(Key, Value)], +} + +impl Serialize for Event { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut map = serializer.serialize_map(Some(4))?; + map.serialize_entry( + "id", + &format!("{}{}", self.inner.timestamp, self.inner.typ.id()), + )?; + map.serialize_entry( + "createdAt", + &DateTime::from_timestamp(self.inner.timestamp as i64).to_rfc3339(), + )?; + map.serialize_entry("type", self.inner.typ.name())?; + map.serialize_entry( + "data", + &Keys { + keys: self.keys.as_slice(), + span_keys: self.inner.span.as_ref().map(|s| &s.keys[..]).unwrap_or(&[]), + }, + )?; + map.end() + } +} + +impl<'x> Serialize for Keys<'x> { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let keys_len = self.keys.len() + self.span_keys.len(); + let mut seen_keys = AHashSet::with_capacity(keys_len); + let mut keys = serializer.serialize_map(Some(keys_len))?; + for (key, value) in self.span_keys.iter().chain(self.keys.iter()) { + if !matches!(value, Value::None) + && !matches!(key, Key::SpanId) + && seen_keys.insert(*key) + { + keys.serialize_entry(key.name(), value)?; + } + } + keys.end() + } +} + +impl Serialize for Event { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut map = serializer.serialize_map(Some(4))?; + map.serialize_entry("type", self.inner.name())?; + map.serialize_entry( + "data", + &Keys { + keys: self.keys.as_slice(), + span_keys: &[], + }, + )?; + map.end() + } +} + +impl Serialize for Value { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match self { + Value::Static(value) => value.serialize(serializer), + Value::String(value) => value.serialize(serializer), + Value::UInt(value) => value.serialize(serializer), + Value::Int(value) => value.serialize(serializer), + Value::Float(value) => value.serialize(serializer), + Value::Timestamp(value) => DateTime::from_timestamp(*value as i64) + .to_rfc3339() + .serialize(serializer), + Value::Duration(value) => value.serialize(serializer), + Value::Bytes(value) => value.serialize(serializer), + Value::Bool(value) => value.serialize(serializer), + Value::Ipv4(value) => value.serialize(serializer), + Value::Ipv6(value) => value.serialize(serializer), + Value::Protocol(value) => value.name().serialize(serializer), + Value::Event(value) => value.serialize(serializer), + Value::Array(value) => value.serialize(serializer), + Value::None => unreachable!(), + } + } +} diff --git a/crates/trc/src/subscriber.rs b/crates/trc/src/subscriber.rs index 670996f8..c30a8ba6 100644 --- a/crates/trc/src/subscriber.rs +++ b/crates/trc/src/subscriber.rs @@ -18,14 +18,15 @@ use crate::{ const MAX_BATCH_SIZE: usize = 32768; pub type Interests = Box>; +pub type EventBatch = Vec>>; #[derive(Debug)] pub(crate) struct Subscriber { pub id: String, pub interests: Interests, - pub tx: mpsc::Sender>>>, + pub tx: mpsc::Sender, pub lossy: bool, - pub batch: Vec>>, + pub batch: EventBatch, } pub struct SubscriberBuilder { @@ -99,14 +100,14 @@ impl SubscriberBuilder { self } - pub fn register(self) -> mpsc::Receiver>>> { + pub fn register(self) -> (mpsc::Sender, mpsc::Receiver) { let (tx, rx) = mpsc::channel(8192); COLLECTOR_UPDATES.lock().push(Update::Register { subscriber: Subscriber { id: self.id, interests: self.interests, - tx, + tx: tx.clone(), lossy: self.lossy, batch: Vec::new(), }, @@ -115,6 +116,6 @@ impl SubscriberBuilder { // Notify collector Collector::reload(); - rx + (tx, rx) } } diff --git a/tests/src/jmap/auth_limits.rs b/tests/src/jmap/auth_limits.rs index d96231d9..1de19ae7 100644 --- a/tests/src/jmap/auth_limits.rs +++ b/tests/src/jmap/auth_limits.rs @@ -267,10 +267,10 @@ pub async fn test(params: &mut JMAPTest) { // Check webhook events params.webhook.assert_contains(&[ - "auth.failure", - "auth.success", - "auth.banned", - "\"login\": \"jdoe@example.com\"", - "\"accountType\": \"individual\"", + "authFailure", + "authSuccess", + "authBanned", + "\"name\": \"jdoe@example.com\"", + "\"type\": \"individual\"", ]); } diff --git a/tests/src/jmap/mod.rs b/tests/src/jmap/mod.rs index bfea6d79..94e9fb65 100644 --- a/tests/src/jmap/mod.rs +++ b/tests/src/jmap/mod.rs @@ -276,10 +276,7 @@ vrfy = true [webhook."test"] url = "http://127.0.0.1:8821/hook" -events = ["auth.success", "auth.failure", "auth.banned", "auth.error", - "message.accepted", "message.rejected", "message.appended", - "account.over-quota", "dsn", "double-bounce", "report.incoming.dmarc", - "report.incoming.tls", "report.incoming.arf", "report.outgoing"] +events = ["*"] signature-key = "ovos-moles" throttle = "100ms" @@ -454,6 +451,7 @@ async fn init_jmap_tests(store_id: &str, delete_if_exists: bool) -> JMAPTest { .cloned() .unwrap_or_default(), }; + let tracers = Tracers::parse(&mut config); let core = Core::parse(&mut config, stores, config_manager).await; let store = core.storage.data.clone(); let shared_core = core.into_shared(); @@ -461,6 +459,9 @@ async fn init_jmap_tests(store_id: &str, delete_if_exists: bool) -> JMAPTest { // Parse acceptors servers.parse_tcp_acceptors(&mut config, shared_core.clone()); + // Enable tracing + tracers.enable(); + // Setup IPC channels let (delivery_tx, delivery_rx) = mpsc::channel(IPC_CHANNEL_BUFFER); let ipc = Ipc { delivery_tx }; diff --git a/tests/src/jmap/webhooks.rs b/tests/src/jmap/webhooks.rs index 8119bb29..7dd9b595 100644 --- a/tests/src/jmap/webhooks.rs +++ b/tests/src/jmap/webhooks.rs @@ -42,7 +42,7 @@ pub async fn test(params: &mut JMAPTest) { tokio::time::sleep(Duration::from_millis(1000)).await; // Check for events - params.webhook.assert_contains(&["auth.success"]); + params.webhook.assert_contains(&["authSuccess"]); } impl MockWebhookEndpoint {