diff --git a/crates/trc/src/channel.rs b/crates/trc/src/channel.rs index 35c0ca6a..2192269c 100644 --- a/crates/trc/src/channel.rs +++ b/crates/trc/src/channel.rs @@ -7,30 +7,39 @@ use std::{ cell::UnsafeCell, sync::{ - atomic::{AtomicUsize, Ordering}, + atomic::{AtomicU64, Ordering}, Arc, }, }; -use parking_lot::Mutex; use rtrb::{Consumer, Producer, PushError, RingBuffer}; use crate::{ - collector::{spawn_collector, CollectorThread}, + collector::{spawn_collector, CollectorThread, Update, COLLECTOR_UPDATES}, Event, EventType, }; -pub(crate) static EVENT_RXS: Mutex> = Mutex::new(Vec::new()); -pub(crate) static EVENT_COUNT: AtomicUsize = AtomicUsize::new(0); +pub(crate) static CHANNEL_FLAGS: AtomicU64 = AtomicU64::new(0); pub(crate) const CHANNEL_SIZE: usize = 10240; +pub(crate) const CHANNEL_UPDATE_MARKER: u64 = 1 << 63; thread_local! { static EVENT_TX: UnsafeCell = { + // Create channel. let (tx, rx) = RingBuffer::new(CHANNEL_SIZE); - EVENT_RXS.lock().push(Receiver { rx }); + + // Register receiver with collector. + COLLECTOR_UPDATES.lock().push(Update::RegisterReceiver { receiver: Receiver { rx } }); + + // Spawn collector thread. + let collector = spawn_collector().clone(); + CHANNEL_FLAGS.fetch_or(CHANNEL_UPDATE_MARKER, Ordering::Relaxed); + collector.thread().unpark(); + + // Return sender. UnsafeCell::new(Sender { tx, - collector: spawn_collector().clone(), + collector, overflow: Vec::with_capacity(0), }) }; @@ -91,7 +100,7 @@ impl Event { let _ = EVENT_TX.try_with(|tx| unsafe { let tx = &mut *tx.get(); if tx.send(self).is_ok() { - EVENT_COUNT.fetch_add(1, Ordering::Relaxed); + CHANNEL_FLAGS.fetch_add(1, Ordering::Relaxed); tx.collector.thread().unpark(); } }); diff --git a/crates/trc/src/collector.rs b/crates/trc/src/collector.rs index c27c9400..ed4c36f8 100644 --- a/crates/trc/src/collector.rs +++ b/crates/trc/src/collector.rs @@ -15,10 +15,9 @@ use parking_lot::Mutex; use crate::{ bitset::{AtomicBitset, USIZE_BITS}, - channel::{EVENT_COUNT, EVENT_RXS}, + channel::{Receiver, CHANNEL_FLAGS, CHANNEL_UPDATE_MARKER}, subscriber::{Interests, Subscriber}, - DeliveryEvent, Event, EventDetails, EventType, Level, NetworkEvent, TelemetryEvent, - TOTAL_EVENT_COUNT, + DeliveryEvent, Event, EventDetails, EventType, Level, NetworkEvent, TOTAL_EVENT_COUNT, }; pub(crate) type GlobalInterests = @@ -33,10 +32,13 @@ pub(crate) const EVENT_TYPES: [EventType; TOTAL_EVENT_COUNT] = EventType::varian #[allow(clippy::enum_variant_names)] pub(crate) enum Update { - Register { + RegisterReceiver { + receiver: Receiver, + }, + RegisterSubscriber { subscriber: Subscriber, }, - Unregister { + UnregisterSubscriber { id: String, }, UpdateSubscriber { @@ -51,6 +53,7 @@ pub(crate) enum Update { } pub struct Collector { + receivers: Vec, subscribers: Vec, levels: [Level; TOTAL_EVENT_COUNT], active_spans: AHashMap>>, @@ -60,25 +63,31 @@ const EV_CONN_START: usize = EventType::Network(NetworkEvent::ConnectionStart).i const EV_CONN_END: usize = EventType::Network(NetworkEvent::ConnectionEnd).id(); const EV_ATTEMPT_START: usize = EventType::Delivery(DeliveryEvent::AttemptStart).id(); const EV_ATTEMPT_END: usize = EventType::Delivery(DeliveryEvent::AttemptEnd).id(); -const EV_COLLECTOR_UPDATE: usize = EventType::Telemetry(TelemetryEvent::Update).id(); const STALE_SPAN_CHECK_WATERMARK: usize = 8000; const SPAN_MAX_HOLD: u64 = 86400; impl Collector { fn collect(&mut self) -> bool { - if EVENT_COUNT.swap(0, Ordering::Relaxed) == 0 { - park(); + let mut do_continue = true; + match CHANNEL_FLAGS.swap(0, Ordering::Relaxed) { + 0 => { + park(); + } + CHANNEL_UPDATE_MARKER..=u64::MAX => { + do_continue = self.update(); + } + _ => {} } // Collect all events - let mut do_continue = true; - EVENT_RXS.lock().retain_mut(|rx| { + let mut closed_rxs = Vec::new(); + for (rx_idx, rx) in self.receivers.iter_mut().enumerate() { let timestamp = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .map_or(0, |d| d.as_secs()); - while do_continue { + loop { match rx.try_recv() { Ok(Some(event)) => { // Build event @@ -127,14 +136,6 @@ impl Collector { } Arc::new(event) } - EV_COLLECTOR_UPDATE => { - if self.update() { - continue; - } else { - do_continue = false; - return false; - } - } _ => { if let Some(span_id) = event.span_id() { if let Some(span) = self.active_spans.get(&span_id) { @@ -159,41 +160,55 @@ impl Collector { } } Ok(None) => { - return true; + break; } Err(_) => { - return false; // Channel is closed, remove. + closed_rxs.push(rx_idx); // Channel is closed, remove. + break; } } } - - false - }); - - if !self.subscribers.is_empty() { - if do_continue { - // Send batched events - self.subscribers - .retain_mut(|subscriber| subscriber.send_batch().is_ok()); - } else { - // Send remaining events - for mut subscriber in self.subscribers.drain(..) { - let _ = subscriber.send_batch(); - } - } } - do_continue + if do_continue { + // Remove closed receivers (should be rare in Tokio) + if !closed_rxs.is_empty() { + let mut receivers = Vec::with_capacity(self.receivers.len() - closed_rxs.len()); + for (rx_idx, rx) in self.receivers.drain(..).enumerate() { + if !closed_rxs.contains(&rx_idx) { + receivers.push(rx); + } + } + self.receivers = receivers; + } + + // Send batched events + if !self.subscribers.is_empty() { + self.subscribers + .retain_mut(|subscriber| subscriber.send_batch().is_ok()); + } + true + } else { + // Send remaining events + for mut subscriber in self.subscribers.drain(..) { + let _ = subscriber.send_batch(); + } + + false + } } fn update(&mut self) -> bool { for update in COLLECTOR_UPDATES.lock().drain(..) { match update { - Update::Register { subscriber } => { + Update::RegisterReceiver { receiver } => { + self.receivers.push(receiver); + } + Update::RegisterSubscriber { subscriber } => { ACTIVE_SUBSCRIBERS.lock().push(subscriber.id.clone()); self.subscribers.push(subscriber); } - Update::Unregister { id } => { + Update::UnregisterSubscriber { id } => { ACTIVE_SUBSCRIBERS.lock().retain(|s| s != &id); self.subscribers.retain(|s| s.id != id); } @@ -270,7 +285,9 @@ impl Collector { } pub fn remove_subscriber(id: String) { - COLLECTOR_UPDATES.lock().push(Update::Unregister { id }); + COLLECTOR_UPDATES + .lock() + .push(Update::UnregisterSubscriber { id }); } pub fn shutdown() { @@ -283,7 +300,8 @@ impl Collector { } pub fn reload() { - Event::new(EventType::Telemetry(TelemetryEvent::Update)).send() + CHANNEL_FLAGS.fetch_or(CHANNEL_UPDATE_MARKER, Ordering::Relaxed); + spawn_collector().thread().unpark(); } } @@ -294,8 +312,13 @@ pub(crate) fn spawn_collector() -> &'static Arc { Builder::new() .name("stalwart-collector".to_string()) .spawn(move || { + // Create collector let mut collector = Collector::default(); + // Update + collector.update(); + + // Collect events while collector.collect() {} }) .expect("Failed to start event collector"), @@ -309,6 +332,7 @@ impl Default for Collector { subscribers: Vec::new(), levels: [Level::Disable; TOTAL_EVENT_COUNT], active_spans: AHashMap::new(), + receivers: Vec::new(), }; for event in EVENT_TYPES.iter() { diff --git a/crates/trc/src/imple.rs b/crates/trc/src/imple.rs index e21034f0..4a00940a 100644 --- a/crates/trc/src/imple.rs +++ b/crates/trc/src/imple.rs @@ -1364,10 +1364,7 @@ impl EventType { | OutgoingReportEvent::SubmissionError | OutgoingReportEvent::NoRecipientsFound => Level::Info, }, - EventType::Telemetry(event) => match event { - TelemetryEvent::Update => Level::Disable, - _ => Level::Warn, - }, + EventType::Telemetry(_) => Level::Warn, EventType::MessageIngest(event) => match event { MessageIngestEvent::Ham | MessageIngestEvent::Spam @@ -1906,7 +1903,6 @@ impl ServerEvent { impl TelemetryEvent { pub fn description(&self) -> &'static str { match self { - TelemetryEvent::Update => "Tracing update", TelemetryEvent::LogError => "Log collector error", TelemetryEvent::WebhookError => "Webhook collector error", TelemetryEvent::JournalError => "Journal collector error", diff --git a/crates/trc/src/lib.rs b/crates/trc/src/lib.rs index 0a43838c..6d1664b4 100644 --- a/crates/trc/src/lib.rs +++ b/crates/trc/src/lib.rs @@ -649,7 +649,6 @@ pub enum ServerEvent { #[event_type] pub enum TelemetryEvent { - Update, LogError, WebhookError, OtelExpoterError, diff --git a/crates/trc/src/subscriber.rs b/crates/trc/src/subscriber.rs index 405b1167..9122f7b5 100644 --- a/crates/trc/src/subscriber.rs +++ b/crates/trc/src/subscriber.rs @@ -106,7 +106,7 @@ impl SubscriberBuilder { pub fn register(self) -> (mpsc::Sender, mpsc::Receiver) { let (tx, rx) = mpsc::channel(8192); - COLLECTOR_UPDATES.lock().push(Update::Register { + COLLECTOR_UPDATES.lock().push(Update::RegisterSubscriber { subscriber: Subscriber { id: self.id, interests: self.interests,