Avoid using mutex on the event receiver list

This commit is contained in:
mdecimus 2024-08-09 12:55:50 +02:00
parent 35ccf92163
commit c6a61d1f45
5 changed files with 84 additions and 56 deletions

View file

@ -7,30 +7,39 @@
use std::{
cell::UnsafeCell,
sync::{
atomic::{AtomicUsize, Ordering},
atomic::{AtomicU64, Ordering},
Arc,
},
};
use parking_lot::Mutex;
use rtrb::{Consumer, Producer, PushError, RingBuffer};
use crate::{
collector::{spawn_collector, CollectorThread},
collector::{spawn_collector, CollectorThread, Update, COLLECTOR_UPDATES},
Event, EventType,
};
pub(crate) static EVENT_RXS: Mutex<Vec<Receiver>> = Mutex::new(Vec::new());
pub(crate) static EVENT_COUNT: AtomicUsize = AtomicUsize::new(0);
pub(crate) static CHANNEL_FLAGS: AtomicU64 = AtomicU64::new(0);
pub(crate) const CHANNEL_SIZE: usize = 10240;
pub(crate) const CHANNEL_UPDATE_MARKER: u64 = 1 << 63;
thread_local! {
static EVENT_TX: UnsafeCell<Sender> = {
// Create channel.
let (tx, rx) = RingBuffer::new(CHANNEL_SIZE);
EVENT_RXS.lock().push(Receiver { rx });
// Register receiver with collector.
COLLECTOR_UPDATES.lock().push(Update::RegisterReceiver { receiver: Receiver { rx } });
// Spawn collector thread.
let collector = spawn_collector().clone();
CHANNEL_FLAGS.fetch_or(CHANNEL_UPDATE_MARKER, Ordering::Relaxed);
collector.thread().unpark();
// Return sender.
UnsafeCell::new(Sender {
tx,
collector: spawn_collector().clone(),
collector,
overflow: Vec::with_capacity(0),
})
};
@ -91,7 +100,7 @@ impl Event<EventType> {
let _ = EVENT_TX.try_with(|tx| unsafe {
let tx = &mut *tx.get();
if tx.send(self).is_ok() {
EVENT_COUNT.fetch_add(1, Ordering::Relaxed);
CHANNEL_FLAGS.fetch_add(1, Ordering::Relaxed);
tx.collector.thread().unpark();
}
});

View file

@ -15,10 +15,9 @@ use parking_lot::Mutex;
use crate::{
bitset::{AtomicBitset, USIZE_BITS},
channel::{EVENT_COUNT, EVENT_RXS},
channel::{Receiver, CHANNEL_FLAGS, CHANNEL_UPDATE_MARKER},
subscriber::{Interests, Subscriber},
DeliveryEvent, Event, EventDetails, EventType, Level, NetworkEvent, TelemetryEvent,
TOTAL_EVENT_COUNT,
DeliveryEvent, Event, EventDetails, EventType, Level, NetworkEvent, TOTAL_EVENT_COUNT,
};
pub(crate) type GlobalInterests =
@ -33,10 +32,13 @@ pub(crate) const EVENT_TYPES: [EventType; TOTAL_EVENT_COUNT] = EventType::varian
#[allow(clippy::enum_variant_names)]
pub(crate) enum Update {
Register {
RegisterReceiver {
receiver: Receiver,
},
RegisterSubscriber {
subscriber: Subscriber,
},
Unregister {
UnregisterSubscriber {
id: String,
},
UpdateSubscriber {
@ -51,6 +53,7 @@ pub(crate) enum Update {
}
pub struct Collector {
receivers: Vec<Receiver>,
subscribers: Vec<Subscriber>,
levels: [Level; TOTAL_EVENT_COUNT],
active_spans: AHashMap<u64, Arc<Event<EventDetails>>>,
@ -60,25 +63,31 @@ const EV_CONN_START: usize = EventType::Network(NetworkEvent::ConnectionStart).i
const EV_CONN_END: usize = EventType::Network(NetworkEvent::ConnectionEnd).id();
const EV_ATTEMPT_START: usize = EventType::Delivery(DeliveryEvent::AttemptStart).id();
const EV_ATTEMPT_END: usize = EventType::Delivery(DeliveryEvent::AttemptEnd).id();
const EV_COLLECTOR_UPDATE: usize = EventType::Telemetry(TelemetryEvent::Update).id();
const STALE_SPAN_CHECK_WATERMARK: usize = 8000;
const SPAN_MAX_HOLD: u64 = 86400;
impl Collector {
fn collect(&mut self) -> bool {
if EVENT_COUNT.swap(0, Ordering::Relaxed) == 0 {
park();
let mut do_continue = true;
match CHANNEL_FLAGS.swap(0, Ordering::Relaxed) {
0 => {
park();
}
CHANNEL_UPDATE_MARKER..=u64::MAX => {
do_continue = self.update();
}
_ => {}
}
// Collect all events
let mut do_continue = true;
EVENT_RXS.lock().retain_mut(|rx| {
let mut closed_rxs = Vec::new();
for (rx_idx, rx) in self.receivers.iter_mut().enumerate() {
let timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map_or(0, |d| d.as_secs());
while do_continue {
loop {
match rx.try_recv() {
Ok(Some(event)) => {
// Build event
@ -127,14 +136,6 @@ impl Collector {
}
Arc::new(event)
}
EV_COLLECTOR_UPDATE => {
if self.update() {
continue;
} else {
do_continue = false;
return false;
}
}
_ => {
if let Some(span_id) = event.span_id() {
if let Some(span) = self.active_spans.get(&span_id) {
@ -159,41 +160,55 @@ impl Collector {
}
}
Ok(None) => {
return true;
break;
}
Err(_) => {
return false; // Channel is closed, remove.
closed_rxs.push(rx_idx); // Channel is closed, remove.
break;
}
}
}
false
});
if !self.subscribers.is_empty() {
if do_continue {
// Send batched events
self.subscribers
.retain_mut(|subscriber| subscriber.send_batch().is_ok());
} else {
// Send remaining events
for mut subscriber in self.subscribers.drain(..) {
let _ = subscriber.send_batch();
}
}
}
do_continue
if do_continue {
// Remove closed receivers (should be rare in Tokio)
if !closed_rxs.is_empty() {
let mut receivers = Vec::with_capacity(self.receivers.len() - closed_rxs.len());
for (rx_idx, rx) in self.receivers.drain(..).enumerate() {
if !closed_rxs.contains(&rx_idx) {
receivers.push(rx);
}
}
self.receivers = receivers;
}
// Send batched events
if !self.subscribers.is_empty() {
self.subscribers
.retain_mut(|subscriber| subscriber.send_batch().is_ok());
}
true
} else {
// Send remaining events
for mut subscriber in self.subscribers.drain(..) {
let _ = subscriber.send_batch();
}
false
}
}
fn update(&mut self) -> bool {
for update in COLLECTOR_UPDATES.lock().drain(..) {
match update {
Update::Register { subscriber } => {
Update::RegisterReceiver { receiver } => {
self.receivers.push(receiver);
}
Update::RegisterSubscriber { subscriber } => {
ACTIVE_SUBSCRIBERS.lock().push(subscriber.id.clone());
self.subscribers.push(subscriber);
}
Update::Unregister { id } => {
Update::UnregisterSubscriber { id } => {
ACTIVE_SUBSCRIBERS.lock().retain(|s| s != &id);
self.subscribers.retain(|s| s.id != id);
}
@ -270,7 +285,9 @@ impl Collector {
}
pub fn remove_subscriber(id: String) {
COLLECTOR_UPDATES.lock().push(Update::Unregister { id });
COLLECTOR_UPDATES
.lock()
.push(Update::UnregisterSubscriber { id });
}
pub fn shutdown() {
@ -283,7 +300,8 @@ impl Collector {
}
pub fn reload() {
Event::new(EventType::Telemetry(TelemetryEvent::Update)).send()
CHANNEL_FLAGS.fetch_or(CHANNEL_UPDATE_MARKER, Ordering::Relaxed);
spawn_collector().thread().unpark();
}
}
@ -294,8 +312,13 @@ pub(crate) fn spawn_collector() -> &'static Arc<CollectorThread> {
Builder::new()
.name("stalwart-collector".to_string())
.spawn(move || {
// Create collector
let mut collector = Collector::default();
// Update
collector.update();
// Collect events
while collector.collect() {}
})
.expect("Failed to start event collector"),
@ -309,6 +332,7 @@ impl Default for Collector {
subscribers: Vec::new(),
levels: [Level::Disable; TOTAL_EVENT_COUNT],
active_spans: AHashMap::new(),
receivers: Vec::new(),
};
for event in EVENT_TYPES.iter() {

View file

@ -1364,10 +1364,7 @@ impl EventType {
| OutgoingReportEvent::SubmissionError
| OutgoingReportEvent::NoRecipientsFound => Level::Info,
},
EventType::Telemetry(event) => match event {
TelemetryEvent::Update => Level::Disable,
_ => Level::Warn,
},
EventType::Telemetry(_) => Level::Warn,
EventType::MessageIngest(event) => match event {
MessageIngestEvent::Ham
| MessageIngestEvent::Spam
@ -1906,7 +1903,6 @@ impl ServerEvent {
impl TelemetryEvent {
pub fn description(&self) -> &'static str {
match self {
TelemetryEvent::Update => "Tracing update",
TelemetryEvent::LogError => "Log collector error",
TelemetryEvent::WebhookError => "Webhook collector error",
TelemetryEvent::JournalError => "Journal collector error",

View file

@ -649,7 +649,6 @@ pub enum ServerEvent {
#[event_type]
pub enum TelemetryEvent {
Update,
LogError,
WebhookError,
OtelExpoterError,

View file

@ -106,7 +106,7 @@ impl SubscriberBuilder {
pub fn register(self) -> (mpsc::Sender<EventBatch>, mpsc::Receiver<EventBatch>) {
let (tx, rx) = mpsc::channel(8192);
COLLECTOR_UPDATES.lock().push(Update::Register {
COLLECTOR_UPDATES.lock().push(Update::RegisterSubscriber {
subscriber: Subscriber {
id: self.id,
interests: self.interests,