Webhook dispatch reimplementation as a tracer

This commit is contained in:
mdecimus 2024-07-30 18:53:05 +02:00
parent d29d21692e
commit 9840494094
17 changed files with 359 additions and 212 deletions

1
Cargo.lock generated
View file

@ -6615,6 +6615,7 @@ dependencies = [
"parking_lot", "parking_lot",
"reqwest 0.12.5", "reqwest 0.12.5",
"rtrb", "rtrb",
"serde",
"serde_json", "serde_json",
"tokio", "tokio",
] ]

View file

@ -12,8 +12,8 @@ use hyper::{
header::{HeaderName, HeaderValue, AUTHORIZATION, CONTENT_TYPE}, header::{HeaderName, HeaderValue, AUTHORIZATION, CONTENT_TYPE},
HeaderMap, HeaderMap,
}; };
use trc::{subscriber::Interests, EventType, Level}; use trc::{subscriber::Interests, EventType, Level, TracingEvent};
use utils::config::Config; use utils::config::{utils::ParseValue, Config};
#[derive(Debug)] #[derive(Debug)]
pub struct Tracer { pub struct Tracer {
@ -61,6 +61,7 @@ pub struct WebhookTracer {
pub key: String, pub key: String,
pub timeout: Duration, pub timeout: Duration,
pub throttle: Duration, pub throttle: Duration,
pub discard_after: Duration,
pub tls_allow_invalid_certs: bool, pub tls_allow_invalid_certs: bool,
pub headers: HeaderMap, pub headers: HeaderMap,
} }
@ -244,7 +245,7 @@ impl Tracers {
// Create tracer // Create tracer
let mut tracer = Tracer { let mut tracer = Tracer {
id: id.to_string(), id: format!("t_{id}"),
interests: Default::default(), interests: Default::default(),
lossy: config lossy: config
.property_or_default(("tracer", id, "lossy"), "false") .property_or_default(("tracer", id, "lossy"), "false")
@ -264,6 +265,21 @@ impl Tracers {
// Parse disabled events // Parse disabled events
let mut disabled_events = AHashSet::new(); let mut disabled_events = AHashSet::new();
match &tracer.typ {
TracerType::Console(_) => (),
TracerType::Log(_) => {
disabled_events.insert(EventType::Tracing(TracingEvent::LogError));
}
TracerType::Otel(_) => {
disabled_events.insert(EventType::Tracing(TracingEvent::OtelError));
}
TracerType::Webhook(_) => {
disabled_events.insert(EventType::Tracing(TracingEvent::WebhookError));
}
TracerType::Journal => {
disabled_events.insert(EventType::Tracing(TracingEvent::JournalError));
}
}
for (_, event_type) in config.properties::<EventType>(("tracer", id, "disabled-events")) for (_, event_type) in config.properties::<EventType>(("tracer", id, "disabled-events"))
{ {
disabled_events.insert(event_type); disabled_events.insert(event_type);
@ -379,7 +395,7 @@ fn parse_webhook(
// Build tracer // Build tracer
let mut tracer = Tracer { let mut tracer = Tracer {
id: id.to_string(), id: format!("w_{id}"),
interests: Default::default(), interests: Default::default(),
lossy: config lossy: config
.property_or_default(("webhook", id, "lossy"), "false") .property_or_default(("webhook", id, "lossy"), "false")
@ -400,14 +416,56 @@ fn parse_webhook(
throttle: config throttle: config
.property_or_default(("webhook", id, "throttle"), "1s") .property_or_default(("webhook", id, "throttle"), "1s")
.unwrap_or_else(|| Duration::from_secs(1)), .unwrap_or_else(|| Duration::from_secs(1)),
discard_after: config
.property_or_default(("webhook", id, "discard-after"), "5m")
.unwrap_or_else(|| Duration::from_secs(300)),
}), }),
}; };
// Parse webhook events // Parse webhook events
for (_, event_type) in config.properties::<EventType>(("webhook", id, "events")) { let event_names = EventType::variants()
.into_iter()
.filter_map(|e| {
if e != EventType::Tracing(TracingEvent::WebhookError) {
Some((e, e.name()))
} else {
None
}
})
.collect::<Vec<_>>();
for (_, event_type) in config.properties::<EventOrMany>(("webhook", id, "events")) {
match event_type {
EventOrMany::Event(event_type) => {
if event_type != EventType::Tracing(TracingEvent::WebhookError) {
tracer.interests.set(event_type); tracer.interests.set(event_type);
global_interests.set(event_type); global_interests.set(event_type);
} }
}
EventOrMany::StartsWith(value) => {
for (event_type, name) in event_names.iter() {
if name.starts_with(&value) {
tracer.interests.set(*event_type);
global_interests.set(*event_type);
}
}
}
EventOrMany::EndsWith(value) => {
for (event_type, name) in event_names.iter() {
if name.ends_with(&value) {
tracer.interests.set(*event_type);
global_interests.set(*event_type);
}
}
}
EventOrMany::All => {
for (event_type, _) in event_names.iter() {
tracer.interests.set(*event_type);
global_interests.set(*event_type);
}
break;
}
}
}
if !tracer.interests.is_empty() { if !tracer.interests.is_empty() {
Some(tracer) Some(tracer)
@ -416,3 +474,25 @@ fn parse_webhook(
None None
} }
} }
enum EventOrMany {
Event(EventType),
StartsWith(String),
EndsWith(String),
All,
}
impl ParseValue for EventOrMany {
fn parse_value(value: &str) -> Result<Self, String> {
let value = value.trim();
if value == "*" {
Ok(EventOrMany::All)
} else if let Some(suffix) = value.strip_prefix("*") {
Ok(EventOrMany::EndsWith(suffix.to_string()))
} else if let Some(prefix) = value.strip_suffix("*") {
Ok(EventOrMany::StartsWith(prefix.to_string()))
} else {
EventType::parse_value(value).map(EventOrMany::Event)
}
}
}

View file

@ -13,10 +13,10 @@ use tokio::{
fs::{File, OpenOptions}, fs::{File, OpenOptions},
io::BufWriter, io::BufWriter,
}; };
use trc::{fmt::FmtWriter, subscriber::SubscriberBuilder, ServerEvent}; use trc::{fmt::FmtWriter, subscriber::SubscriberBuilder, TracingEvent};
pub(crate) fn spawn_log_tracer(builder: SubscriberBuilder, settings: LogTracer) { pub(crate) fn spawn_log_tracer(builder: SubscriberBuilder, settings: LogTracer) {
let mut tx = builder.register(); let (_, mut rx) = builder.register();
tokio::spawn(async move { tokio::spawn(async move {
if let Some(writer) = settings.build_writer().await { if let Some(writer) = settings.build_writer().await {
let mut buf = FmtWriter::new(writer) let mut buf = FmtWriter::new(writer)
@ -24,13 +24,13 @@ pub(crate) fn spawn_log_tracer(builder: SubscriberBuilder, settings: LogTracer)
.with_multiline(settings.multiline); .with_multiline(settings.multiline);
let mut roatation_timestamp = settings.next_rotation(); let mut roatation_timestamp = settings.next_rotation();
while let Some(events) = tx.recv().await { while let Some(events) = rx.recv().await {
for event in events { for event in events {
// Check if we need to rotate the log file // Check if we need to rotate the log file
if roatation_timestamp != 0 && event.inner.timestamp > roatation_timestamp { if roatation_timestamp != 0 && event.inner.timestamp > roatation_timestamp {
if let Err(err) = buf.flush().await { if let Err(err) = buf.flush().await {
trc::event!( trc::event!(
Server(ServerEvent::TracingError), Tracing(TracingEvent::LogError),
Reason = err.to_string(), Reason = err.to_string(),
Details = "Failed to flush log buffer" Details = "Failed to flush log buffer"
); );
@ -46,7 +46,7 @@ pub(crate) fn spawn_log_tracer(builder: SubscriberBuilder, settings: LogTracer)
if let Err(err) = buf.write(&event).await { if let Err(err) = buf.write(&event).await {
trc::event!( trc::event!(
Server(ServerEvent::TracingError), Tracing(TracingEvent::LogError),
Reason = err.to_string(), Reason = err.to_string(),
Details = "Failed to write event to log" Details = "Failed to write event to log"
); );
@ -56,7 +56,7 @@ pub(crate) fn spawn_log_tracer(builder: SubscriberBuilder, settings: LogTracer)
if let Err(err) = buf.flush().await { if let Err(err) = buf.flush().await {
trc::event!( trc::event!(
Server(ServerEvent::TracingError), Tracing(TracingEvent::LogError),
Reason = err.to_string(), Reason = err.to_string(),
Details = "Failed to flush log buffer" Details = "Failed to flush log buffer"
); );
@ -105,7 +105,7 @@ impl LogTracer {
Ok(writer) => Some(BufWriter::new(writer)), Ok(writer) => Some(BufWriter::new(writer)),
Err(err) => { Err(err) => {
trc::event!( trc::event!(
Server(ServerEvent::TracingError), Tracing(TracingEvent::LogError),
Details = "Failed to create log file", Details = "Failed to create log file",
Path = path.to_string_lossy().into_owned(), Path = path.to_string_lossy().into_owned(),
Reason = err.to_string(), Reason = err.to_string(),

View file

@ -6,10 +6,12 @@
pub mod log; pub mod log;
pub mod stdout; pub mod stdout;
pub mod webhook;
use log::spawn_log_tracer; use log::spawn_log_tracer;
use stdout::spawn_console_tracer; use stdout::spawn_console_tracer;
use trc::{collector::Collector, subscriber::SubscriberBuilder}; use trc::{collector::Collector, subscriber::SubscriberBuilder};
use webhook::spawn_webhook_tracer;
use crate::config::tracers::{ConsoleTracer, TracerType, Tracers}; use crate::config::tracers::{ConsoleTracer, TracerType, Tracers};
@ -82,7 +84,7 @@ impl Tracers {
}, },
); );
Collector::set_interests(interests); Collector::union_interests(interests);
Collector::reload(); Collector::reload();
} }
} }
@ -92,8 +94,8 @@ impl TracerType {
match self { match self {
TracerType::Console(settings) => spawn_console_tracer(builder, settings), TracerType::Console(settings) => spawn_console_tracer(builder, settings),
TracerType::Log(settings) => spawn_log_tracer(builder, settings), TracerType::Log(settings) => spawn_log_tracer(builder, settings),
TracerType::Webhook(settings) => spawn_webhook_tracer(builder, settings),
TracerType::Otel(_) => todo!(), TracerType::Otel(_) => todo!(),
TracerType::Webhook(_) => todo!(),
TracerType::Journal => todo!(), TracerType::Journal => todo!(),
} }
} }

View file

@ -16,13 +16,13 @@ use tokio::io::AsyncWrite;
use trc::{fmt::FmtWriter, subscriber::SubscriberBuilder}; use trc::{fmt::FmtWriter, subscriber::SubscriberBuilder};
pub(crate) fn spawn_console_tracer(builder: SubscriberBuilder, settings: ConsoleTracer) { pub(crate) fn spawn_console_tracer(builder: SubscriberBuilder, settings: ConsoleTracer) {
let mut tx = builder.register(); let (_, mut rx) = builder.register();
tokio::spawn(async move { tokio::spawn(async move {
let mut buf = FmtWriter::new(StdErrWriter::default()) let mut buf = FmtWriter::new(StdErrWriter::default())
.with_ansi(settings.ansi) .with_ansi(settings.ansi)
.with_multiline(settings.multiline); .with_multiline(settings.multiline);
while let Some(events) = tx.recv().await { while let Some(events) = rx.recv().await {
for event in events { for event in events {
let _ = buf.write(&event).await; let _ = buf.write(&event).await;

View file

@ -5,85 +5,60 @@
*/ */
use std::{ use std::{
sync::Arc, sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use crate::{SharedCore, IPC_CHANNEL_BUFFER}; use crate::config::tracers::WebhookTracer;
use ahash::AHashMap;
use base64::{engine::general_purpose::STANDARD, Engine}; use base64::{engine::general_purpose::STANDARD, Engine};
use chrono::Utc;
use ring::hmac; use ring::hmac;
use serde::Serialize;
use store::write::now;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use utils::snowflake::SnowflakeIdGenerator; use trc::{
subscriber::{EventBatch, SubscriberBuilder},
pub enum WebhookEvent { ServerEvent, TracingEvent,
Send { };
typ: WebhookType,
payload: Arc<WebhookPayload>,
},
Success {
webhook_id: u64,
},
Retry {
webhook_id: u64,
events: WebhookEvents,
},
Stop,
}
pub const LONG_SLUMBER: Duration = Duration::from_secs(60 * 60 * 24 * 365); pub const LONG_SLUMBER: Duration = Duration::from_secs(60 * 60 * 24 * 365);
struct PendingEvents { pub(crate) fn spawn_webhook_tracer(builder: SubscriberBuilder, settings: WebhookTracer) {
next_delivery: Instant, let (tx, mut rx) = builder.register();
pending_events: WebhookEvents,
retry_num: u32,
in_flight: bool,
}
pub fn spawn_webhook_manager(core: SharedCore) -> mpsc::Sender<WebhookEvent> {
let (webhook_tx, mut webhook_rx) = mpsc::channel(IPC_CHANNEL_BUFFER);
let webhook_tx_ = webhook_tx.clone();
tokio::spawn(async move { tokio::spawn(async move {
let settings = Arc::new(settings);
let mut wakeup_time = LONG_SLUMBER; let mut wakeup_time = LONG_SLUMBER;
let mut pending_events: AHashMap<u64, PendingEvents> = AHashMap::new(); let discard_after = settings.discard_after.as_secs();
let id_generator = SnowflakeIdGenerator::new(); let mut pending_events = Vec::new();
let mut next_delivery = Instant::now();
let in_flight = Arc::new(AtomicBool::new(false));
loop { loop {
// Wait for the next event or timeout // Wait for the next event or timeout
let event_or_timeout = tokio::time::timeout(wakeup_time, webhook_rx.recv()).await; let event_or_timeout = tokio::time::timeout(wakeup_time, rx.recv()).await;
let now = now();
// Load settings
let core = core.load_full();
match event_or_timeout { match event_or_timeout {
Ok(Some(event)) => match event { Ok(Some(events)) => {
WebhookEvent::Send { typ, payload } => { let mut discard_count = 0;
for (webhook_id, webhook) in &core.web_hooks.hooks { for event in events {
if webhook.events.contains(&typ) { if now.saturating_sub(event.inner.timestamp) < discard_after {
pending_events.entry(*webhook_id).or_default().push( pending_events.push(event)
super::WebhookEvent { } else {
id: id_generator.generate().unwrap_or_default(), discard_count += 1;
created_at: Utc::now(), }
typ, }
data: payload.clone(),
}, if discard_count > 0 {
trc::event!(
Tracing(TracingEvent::WebhookError),
Details = "Discarded stale events",
Count = discard_count
); );
} }
} }
}
WebhookEvent::Success { webhook_id } => {
if let Some(pending_events) = pending_events.get_mut(&webhook_id) {
pending_events.success();
}
}
WebhookEvent::Retry { webhook_id, events } => {
pending_events.entry(webhook_id).or_default().retry(events);
}
WebhookEvent::Stop => break,
},
Ok(None) => { Ok(None) => {
break; break;
} }
@ -91,28 +66,23 @@ pub fn spawn_webhook_manager(core: SharedCore) -> mpsc::Sender<WebhookEvent> {
} }
// Process events // Process events
let mut delete_ids = Vec::new();
let mut next_retry = None; let mut next_retry = None;
for (webhook_id, events) in &mut pending_events { let now = Instant::now();
if let Some(webhook) = core.web_hooks.hooks.get(webhook_id) { if next_delivery <= now {
if events.next_delivery <= Instant::now() { if !pending_events.is_empty() {
if !events.is_empty() { next_delivery = now + settings.throttle;
events.next_delivery = Instant::now() + webhook.throttle; if !in_flight.load(Ordering::Relaxed) {
if !events.in_flight {
events.in_flight = true;
spawn_webhook_handler( spawn_webhook_handler(
webhook.clone(), settings.clone(),
events.take_events(), in_flight.clone(),
webhook_tx.clone(), std::mem::take(&mut pending_events),
tx.clone(),
); );
} }
} else {
// No more events for webhook
delete_ids.push(*webhook_id);
} }
} else if !events.is_empty() { } else if !pending_events.is_empty() {
// Retry later // Retry later
let this_retry = events.next_delivery - Instant::now(); let this_retry = next_delivery - now;
match next_retry { match next_retry {
Some(next_retry) if this_retry >= next_retry => {} Some(next_retry) if this_retry >= next_retry => {}
_ => { _ => {
@ -120,57 +90,54 @@ pub fn spawn_webhook_manager(core: SharedCore) -> mpsc::Sender<WebhookEvent> {
} }
} }
} }
} else {
delete_ids.push(*webhook_id);
}
}
wakeup_time = next_retry.unwrap_or(LONG_SLUMBER); wakeup_time = next_retry.unwrap_or(LONG_SLUMBER);
// Delete removed or empty webhooks
for webhook_id in delete_ids {
pending_events.remove(&webhook_id);
}
} }
}); });
}
webhook_tx_ #[derive(Serialize)]
struct EventWrapper {
events: EventBatch,
} }
fn spawn_webhook_handler( fn spawn_webhook_handler(
webhook: Arc<Webhook>, settings: Arc<WebhookTracer>,
events: WebhookEvents, in_flight: Arc<AtomicBool>,
webhook_tx: mpsc::Sender<WebhookEvent>, events: EventBatch,
webhook_tx: mpsc::Sender<EventBatch>,
) { ) {
tokio::spawn(async move { tokio::spawn(async move {
let response = match post_webhook_events(&webhook, &events).await { in_flight.store(true, Ordering::Relaxed);
Ok(_) => WebhookEvent::Success { let wrapper = EventWrapper { events };
webhook_id: webhook.id,
},
Err(err) => {
//trc::event!("Failed to post webhook events: {}", err);
WebhookEvent::Retry {
webhook_id: webhook.id,
events,
}
}
};
// Notify manager if let Err(err) = post_webhook_events(&settings, &wrapper).await {
if let Err(err) = webhook_tx.send(response).await { trc::event!(Tracing(TracingEvent::WebhookError), Details = err);
//trc::event!("Failed to send webhook event: {}", err);
if webhook_tx.send(wrapper.events).await.is_err() {
trc::event!(
Server(ServerEvent::ThreadError),
Details = "Failed to send failed webhook events back to main thread",
CausedBy = trc::location!()
);
} }
}
in_flight.store(false, Ordering::Relaxed);
}); });
} }
async fn post_webhook_events(webhook: &Webhook, events: &WebhookEvents) -> Result<(), String> { async fn post_webhook_events(
settings: &WebhookTracer,
events: &EventWrapper,
) -> Result<(), String> {
// Serialize body // Serialize body
let body = serde_json::to_string(events) let body = serde_json::to_string(events)
.map_err(|err| format!("Failed to serialize events: {}", err))?; .map_err(|err| format!("Failed to serialize events: {}", err))?;
// Add HMAC-SHA256 signature // Add HMAC-SHA256 signature
let mut headers = webhook.headers.clone(); let mut headers = settings.headers.clone();
if !webhook.key.is_empty() { if !settings.key.is_empty() {
let key = hmac::Key::new(hmac::HMAC_SHA256, webhook.key.as_bytes()); let key = hmac::Key::new(hmac::HMAC_SHA256, settings.key.as_bytes());
let tag = hmac::sign(&key, body.as_bytes()); let tag = hmac::sign(&key, body.as_bytes());
headers.insert( headers.insert(
@ -181,69 +148,25 @@ async fn post_webhook_events(webhook: &Webhook, events: &WebhookEvents) -> Resul
// Send request // Send request
let response = reqwest::Client::builder() let response = reqwest::Client::builder()
.timeout(webhook.timeout) .timeout(settings.timeout)
.danger_accept_invalid_certs(webhook.tls_allow_invalid_certs) .danger_accept_invalid_certs(settings.tls_allow_invalid_certs)
.build() .build()
.map_err(|err| format!("Failed to create HTTP client: {}", err))? .map_err(|err| format!("Failed to create HTTP client: {}", err))?
.post(&webhook.url) .post(&settings.url)
.headers(headers) .headers(headers)
.body(body) .body(body)
.send() .send()
.await .await
.map_err(|err| format!("Webhook request to {} failed: {err}", webhook.url))?; .map_err(|err| format!("Webhook request to {} failed: {err}", settings.url))?;
if response.status().is_success() { if response.status().is_success() {
Ok(()) Ok(())
} else { } else {
Err(format!( Err(format!(
"Webhook request to {} failed with code {}: {}", "Webhook request to {} failed with code {}: {}",
webhook.url, settings.url,
response.status().as_u16(), response.status().as_u16(),
response.status().canonical_reason().unwrap_or("Unknown") response.status().canonical_reason().unwrap_or("Unknown")
)) ))
} }
} }
impl Default for PendingEvents {
fn default() -> Self {
Self {
next_delivery: Instant::now(),
pending_events: WebhookEvents::default(),
retry_num: 0,
in_flight: false,
}
}
}
impl PendingEvents {
pub fn push(&mut self, event: super::WebhookEvent) {
self.pending_events.events.push(event);
}
pub fn success(&mut self) {
self.in_flight = false;
self.retry_num = 0;
}
pub fn retry(&mut self, events: WebhookEvents) {
// Backoff
self.next_delivery = Instant::now() + Duration::from_secs(2u64.pow(self.retry_num));
self.retry_num += 1;
self.in_flight = false;
for event in events.events {
// Drop failed events older than 5 minutes
if event.created_at + Duration::from_secs(5 * 60) >= Utc::now() {
self.pending_events.events.push(event);
}
}
}
pub fn is_empty(&self) -> bool {
self.pending_events.events.is_empty()
}
pub fn take_events(&mut self) -> WebhookEvents {
std::mem::take(&mut self.pending_events)
}
}

View file

@ -140,7 +140,7 @@ impl JMAP {
if let Err(err) = inner.webadmin.unpack(&core.load().storage.blob).await { if let Err(err) = inner.webadmin.unpack(&core.load().storage.blob).await {
trc::event!( trc::event!(
Resource(trc::ResourceEvent::Error), Resource(trc::ResourceEvent::Error),
Reason = err.to_string(), Reason = err,
Details = "Failed to unpack webadmin bundle" Details = "Failed to unpack webadmin bundle"
); );
} }

View file

@ -9,6 +9,7 @@ event_macro = { path = "./event-macro" }
mail-auth = { version = "0.4" } mail-auth = { version = "0.4" }
mail-parser = { version = "0.9", features = ["full_encoding", "ludicrous_mode"] } mail-parser = { version = "0.9", features = ["full_encoding", "ludicrous_mode"] }
base64 = "0.22.1" base64 = "0.22.1"
serde = "1.0"
serde_json = "1.0.120" serde_json = "1.0.120"
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-webpki-roots", "http2"]} reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-webpki-roots", "http2"]}
bincode = "1.3.3" bincode = "1.3.3"

View file

@ -54,6 +54,13 @@ impl<const N: usize> AtomicBitset<N> {
} }
} }
pub fn union(&self, bitset: impl AsRef<Bitset<N>>) {
let bitset = bitset.as_ref();
for i in 0..N {
self.0[i].fetch_or(bitset.0[i], Ordering::Relaxed);
}
}
pub fn clear_all(&self) { pub fn clear_all(&self) {
for i in 0..N { for i in 0..N {
self.0[i].store(0, Ordering::Relaxed); self.0[i].store(0, Ordering::Relaxed);

View file

@ -17,7 +17,7 @@ use crate::{
bitset::{AtomicBitset, USIZE_BITS}, bitset::{AtomicBitset, USIZE_BITS},
channel::{EVENT_COUNT, EVENT_RXS}, channel::{EVENT_COUNT, EVENT_RXS},
subscriber::{Interests, Subscriber}, subscriber::{Interests, Subscriber},
DeliveryEvent, Event, EventDetails, EventType, Level, NetworkEvent, ServerEvent, DeliveryEvent, Event, EventDetails, EventType, Level, NetworkEvent, TracingEvent,
TOTAL_EVENT_COUNT, TOTAL_EVENT_COUNT,
}; };
@ -58,7 +58,7 @@ const EV_CONN_START: usize = EventType::Network(NetworkEvent::ConnectionStart).i
const EV_CONN_END: usize = EventType::Network(NetworkEvent::ConnectionEnd).id(); const EV_CONN_END: usize = EventType::Network(NetworkEvent::ConnectionEnd).id();
const EV_ATTEMPT_START: usize = EventType::Delivery(DeliveryEvent::AttemptStart).id(); const EV_ATTEMPT_START: usize = EventType::Delivery(DeliveryEvent::AttemptStart).id();
const EV_ATTEMPT_END: usize = EventType::Delivery(DeliveryEvent::AttemptEnd).id(); const EV_ATTEMPT_END: usize = EventType::Delivery(DeliveryEvent::AttemptEnd).id();
const EV_COLLECTOR_UPDATE: usize = EventType::Server(ServerEvent::CollectorUpdate).id(); const EV_COLLECTOR_UPDATE: usize = EventType::Tracing(TracingEvent::Update).id();
const STALE_SPAN_CHECK_WATERMARK: usize = 8000; const STALE_SPAN_CHECK_WATERMARK: usize = 8000;
const SPAN_MAX_HOLD: u64 = 86400; const SPAN_MAX_HOLD: u64 = 86400;
@ -119,7 +119,12 @@ impl Collector {
.remove(&event.span_id().expect("Missing span ID")) .remove(&event.span_id().expect("Missing span ID"))
.is_none() .is_none()
{ {
debug_assert!(false, "Unregistered span ID: {event:?}"); #[cfg(debug_assertions)]
{
if event.span_id().unwrap() != 0 {
panic!("Unregistered span ID: {event:?}");
}
}
} }
Arc::new(event) Arc::new(event)
} }
@ -136,7 +141,12 @@ impl Collector {
if let Some(span) = self.active_spans.get(&span_id) { if let Some(span) = self.active_spans.get(&span_id) {
event.inner.span = Some(span.clone()); event.inner.span = Some(span.clone());
} else { } else {
debug_assert!(false, "Unregistered span ID: {event:?}"); #[cfg(debug_assertions)]
{
if span_id != 0 {
panic!("Unregistered span ID: {event:?}");
}
}
} }
} }
@ -226,6 +236,10 @@ impl Collector {
INTERESTS.update(interests); INTERESTS.update(interests);
} }
pub fn union_interests(interests: Interests) {
INTERESTS.union(interests);
}
pub fn enable_event(event: impl Into<usize>) { pub fn enable_event(event: impl Into<usize>) {
INTERESTS.set(event); INTERESTS.set(event);
} }
@ -271,7 +285,7 @@ impl Collector {
} }
pub fn reload() { pub fn reload() {
Event::new(EventType::Server(ServerEvent::CollectorUpdate)).send() Event::new(EventType::Tracing(TracingEvent::Update)).send()
} }
} }

View file

@ -1003,10 +1003,7 @@ impl EventType {
ServerEvent::Startup | ServerEvent::Shutdown | ServerEvent::Licensing => { ServerEvent::Startup | ServerEvent::Shutdown | ServerEvent::Licensing => {
Level::Info Level::Info
} }
ServerEvent::StartupError ServerEvent::StartupError | ServerEvent::ThreadError => Level::Error,
| ServerEvent::ThreadError
| ServerEvent::TracingError => Level::Error,
ServerEvent::CollectorUpdate => Level::Disable,
}, },
EventType::Acme(event) => match event { EventType::Acme(event) => match event {
AcmeEvent::DnsRecordCreated AcmeEvent::DnsRecordCreated
@ -1245,6 +1242,10 @@ impl EventType {
| OutgoingReportEvent::SubmissionError | OutgoingReportEvent::SubmissionError
| OutgoingReportEvent::NoRecipientsFound => Level::Info, | OutgoingReportEvent::NoRecipientsFound => Level::Info,
}, },
EventType::Tracing(event) => match event {
TracingEvent::Update => Level::Disable,
_ => Level::Error,
},
} }
} }
} }

View file

@ -11,6 +11,7 @@ pub mod conv;
pub mod fmt; pub mod fmt;
pub mod imple; pub mod imple;
pub mod macros; pub mod macros;
pub mod serializer;
pub mod subscriber; pub mod subscriber;
use std::{ use std::{
@ -68,7 +69,7 @@ pub enum Value {
None, None,
} }
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash)]
#[camel_names] #[camel_names]
pub enum Key { pub enum Key {
#[default] #[default]
@ -205,6 +206,7 @@ pub enum EventType {
MtaSts(MtaStsEvent), MtaSts(MtaStsEvent),
IncomingReport(IncomingReportEvent), IncomingReport(IncomingReportEvent),
OutgoingReport(OutgoingReportEvent), OutgoingReport(OutgoingReportEvent),
Tracing(TracingEvent),
} }
#[event_type] #[event_type]
@ -656,9 +658,16 @@ pub enum ServerEvent {
Shutdown, Shutdown,
StartupError, StartupError,
ThreadError, ThreadError,
TracingError,
Licensing, Licensing,
CollectorUpdate, }
#[event_type]
pub enum TracingEvent {
Update,
LogError,
WebhookError,
OtelError,
JournalError,
} }
#[event_type] #[event_type]

View file

@ -0,0 +1,107 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use ahash::AHashSet;
use mail_parser::DateTime;
use serde::{ser::SerializeMap, Serialize, Serializer};
use crate::{Event, EventDetails, EventType, Key, Value};
struct Keys<'x> {
keys: &'x [(Key, Value)],
span_keys: &'x [(Key, Value)],
}
impl Serialize for Event<EventDetails> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut map = serializer.serialize_map(Some(4))?;
map.serialize_entry(
"id",
&format!("{}{}", self.inner.timestamp, self.inner.typ.id()),
)?;
map.serialize_entry(
"createdAt",
&DateTime::from_timestamp(self.inner.timestamp as i64).to_rfc3339(),
)?;
map.serialize_entry("type", self.inner.typ.name())?;
map.serialize_entry(
"data",
&Keys {
keys: self.keys.as_slice(),
span_keys: self.inner.span.as_ref().map(|s| &s.keys[..]).unwrap_or(&[]),
},
)?;
map.end()
}
}
impl<'x> Serialize for Keys<'x> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let keys_len = self.keys.len() + self.span_keys.len();
let mut seen_keys = AHashSet::with_capacity(keys_len);
let mut keys = serializer.serialize_map(Some(keys_len))?;
for (key, value) in self.span_keys.iter().chain(self.keys.iter()) {
if !matches!(value, Value::None)
&& !matches!(key, Key::SpanId)
&& seen_keys.insert(*key)
{
keys.serialize_entry(key.name(), value)?;
}
}
keys.end()
}
}
impl Serialize for Event<EventType> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut map = serializer.serialize_map(Some(4))?;
map.serialize_entry("type", self.inner.name())?;
map.serialize_entry(
"data",
&Keys {
keys: self.keys.as_slice(),
span_keys: &[],
},
)?;
map.end()
}
}
impl Serialize for Value {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match self {
Value::Static(value) => value.serialize(serializer),
Value::String(value) => value.serialize(serializer),
Value::UInt(value) => value.serialize(serializer),
Value::Int(value) => value.serialize(serializer),
Value::Float(value) => value.serialize(serializer),
Value::Timestamp(value) => DateTime::from_timestamp(*value as i64)
.to_rfc3339()
.serialize(serializer),
Value::Duration(value) => value.serialize(serializer),
Value::Bytes(value) => value.serialize(serializer),
Value::Bool(value) => value.serialize(serializer),
Value::Ipv4(value) => value.serialize(serializer),
Value::Ipv6(value) => value.serialize(serializer),
Value::Protocol(value) => value.name().serialize(serializer),
Value::Event(value) => value.serialize(serializer),
Value::Array(value) => value.serialize(serializer),
Value::None => unreachable!(),
}
}
}

View file

@ -18,14 +18,15 @@ use crate::{
const MAX_BATCH_SIZE: usize = 32768; const MAX_BATCH_SIZE: usize = 32768;
pub type Interests = Box<Bitset<{ (TOTAL_EVENT_COUNT + USIZE_BITS - 1) / USIZE_BITS }>>; pub type Interests = Box<Bitset<{ (TOTAL_EVENT_COUNT + USIZE_BITS - 1) / USIZE_BITS }>>;
pub type EventBatch = Vec<Arc<Event<EventDetails>>>;
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct Subscriber { pub(crate) struct Subscriber {
pub id: String, pub id: String,
pub interests: Interests, pub interests: Interests,
pub tx: mpsc::Sender<Vec<Arc<Event<EventDetails>>>>, pub tx: mpsc::Sender<EventBatch>,
pub lossy: bool, pub lossy: bool,
pub batch: Vec<Arc<Event<EventDetails>>>, pub batch: EventBatch,
} }
pub struct SubscriberBuilder { pub struct SubscriberBuilder {
@ -99,14 +100,14 @@ impl SubscriberBuilder {
self self
} }
pub fn register(self) -> mpsc::Receiver<Vec<Arc<Event<EventDetails>>>> { pub fn register(self) -> (mpsc::Sender<EventBatch>, mpsc::Receiver<EventBatch>) {
let (tx, rx) = mpsc::channel(8192); let (tx, rx) = mpsc::channel(8192);
COLLECTOR_UPDATES.lock().push(Update::Register { COLLECTOR_UPDATES.lock().push(Update::Register {
subscriber: Subscriber { subscriber: Subscriber {
id: self.id, id: self.id,
interests: self.interests, interests: self.interests,
tx, tx: tx.clone(),
lossy: self.lossy, lossy: self.lossy,
batch: Vec::new(), batch: Vec::new(),
}, },
@ -115,6 +116,6 @@ impl SubscriberBuilder {
// Notify collector // Notify collector
Collector::reload(); Collector::reload();
rx (tx, rx)
} }
} }

View file

@ -267,10 +267,10 @@ pub async fn test(params: &mut JMAPTest) {
// Check webhook events // Check webhook events
params.webhook.assert_contains(&[ params.webhook.assert_contains(&[
"auth.failure", "authFailure",
"auth.success", "authSuccess",
"auth.banned", "authBanned",
"\"login\": \"jdoe@example.com\"", "\"name\": \"jdoe@example.com\"",
"\"accountType\": \"individual\"", "\"type\": \"individual\"",
]); ]);
} }

View file

@ -276,10 +276,7 @@ vrfy = true
[webhook."test"] [webhook."test"]
url = "http://127.0.0.1:8821/hook" url = "http://127.0.0.1:8821/hook"
events = ["auth.success", "auth.failure", "auth.banned", "auth.error", events = ["*"]
"message.accepted", "message.rejected", "message.appended",
"account.over-quota", "dsn", "double-bounce", "report.incoming.dmarc",
"report.incoming.tls", "report.incoming.arf", "report.outgoing"]
signature-key = "ovos-moles" signature-key = "ovos-moles"
throttle = "100ms" throttle = "100ms"
@ -454,6 +451,7 @@ async fn init_jmap_tests(store_id: &str, delete_if_exists: bool) -> JMAPTest {
.cloned() .cloned()
.unwrap_or_default(), .unwrap_or_default(),
}; };
let tracers = Tracers::parse(&mut config);
let core = Core::parse(&mut config, stores, config_manager).await; let core = Core::parse(&mut config, stores, config_manager).await;
let store = core.storage.data.clone(); let store = core.storage.data.clone();
let shared_core = core.into_shared(); let shared_core = core.into_shared();
@ -461,6 +459,9 @@ async fn init_jmap_tests(store_id: &str, delete_if_exists: bool) -> JMAPTest {
// Parse acceptors // Parse acceptors
servers.parse_tcp_acceptors(&mut config, shared_core.clone()); servers.parse_tcp_acceptors(&mut config, shared_core.clone());
// Enable tracing
tracers.enable();
// Setup IPC channels // Setup IPC channels
let (delivery_tx, delivery_rx) = mpsc::channel(IPC_CHANNEL_BUFFER); let (delivery_tx, delivery_rx) = mpsc::channel(IPC_CHANNEL_BUFFER);
let ipc = Ipc { delivery_tx }; let ipc = Ipc { delivery_tx };

View file

@ -42,7 +42,7 @@ pub async fn test(params: &mut JMAPTest) {
tokio::time::sleep(Duration::from_millis(1000)).await; tokio::time::sleep(Duration::from_millis(1000)).await;
// Check for events // Check for events
params.webhook.assert_contains(&["auth.success"]); params.webhook.assert_contains(&["authSuccess"]);
} }
impl MockWebhookEndpoint { impl MockWebhookEndpoint {