diff --git a/Cargo.lock b/Cargo.lock index 472b7964..0f112f7a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3431,9 +3431,9 @@ dependencies = [ [[package]] name = "libsqlite3-sys" -version = "0.28.0" +version = "0.30.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c10584274047cb335c23d3e61bcef8e323adae7c5c8c760540f73610177fc3f" +checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" dependencies = [ "cc", "pkg-config", @@ -3901,7 +3901,6 @@ dependencies = [ "bincode", "farmhash", "jieba-rs", - "lazy_static", "lru-cache", "nohash", "parking_lot", @@ -4926,10 +4925,11 @@ dependencies = [ [[package]] name = "redis" -version = "0.25.4" +version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0d7a6955c7511f60f3ba9e86c6d02b3c3f144f8c24b288d1f4e18074ab8bbec" +checksum = "e902a69d09078829137b4a5d9d082e0490393537badd7c91a3d69d14639e115f" dependencies = [ + "arc-swap", "async-trait", "bytes", "combine", @@ -4938,10 +4938,11 @@ dependencies = [ "futures-util", "itoa", "log", + "num-bigint", "percent-encoding", "pin-project-lite", "rand", - "rustls 0.22.4", + "rustls 0.23.12", "rustls-native-certs 0.7.1", "rustls-pemfile 2.1.2", "rustls-pki-types", @@ -4949,7 +4950,7 @@ dependencies = [ "sha1_smol", "socket2", "tokio", - "tokio-rustls 0.25.0", + "tokio-rustls 0.26.0", "tokio-util", "url", "webpki-roots 0.26.3", @@ -5277,9 +5278,9 @@ checksum = "f3f94e84c073f3b85d4012b44722fa8842b9986d741590d4f2636ad0a5b14143" [[package]] name = "rusqlite" -version = "0.31.0" +version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b838eba278d213a8beaf485bd313fd580ca4505a00d5871caeb1457c55322cae" +checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e" dependencies = [ "bitflags 2.6.0", "fallible-iterator 0.3.0", @@ -6002,7 +6003,6 @@ dependencies = [ "http-body-util", "hyper 1.4.1", "hyper-util", - "lazy_static", "lru-cache", "mail-auth", "mail-builder", @@ -6153,7 +6153,6 @@ dependencies = [ "flate2", "foundationdb", "futures", - "lazy_static", "lru-cache", "lz4_flex", "mysql_async", diff --git a/crates/nlp/Cargo.toml b/crates/nlp/Cargo.toml index 897f9a1b..a3270d0e 100644 --- a/crates/nlp/Cargo.toml +++ b/crates/nlp/Cargo.toml @@ -13,7 +13,6 @@ serde = { version = "1.0", features = ["derive"]} bincode = "1.3.3" nohash = "0.2.0" ahash = "0.8.3" -lazy_static = "1.4" whatlang = "0.16" # Language detection rust-stemmers = "1.2" # Stemmers tinysegmenter = "0.1" # Japanese tokenizer diff --git a/crates/nlp/src/tokenizers/chinese.rs b/crates/nlp/src/tokenizers/chinese.rs index e4bba1ed..c6057f2e 100644 --- a/crates/nlp/src/tokenizers/chinese.rs +++ b/crates/nlp/src/tokenizers/chinese.rs @@ -4,16 +4,13 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ -use std::{borrow::Cow, vec::IntoIter}; +use std::{borrow::Cow, sync::LazyLock, vec::IntoIter}; use jieba_rs::Jieba; use super::{InnerToken, Token}; -use lazy_static::lazy_static; -lazy_static! { - pub static ref JIEBA: Jieba = Jieba::new(); -} +pub(crate) static JIEBA: LazyLock = LazyLock::new(Jieba::new); pub struct ChineseTokenizer<'x, T, I> where diff --git a/crates/smtp/Cargo.toml b/crates/smtp/Cargo.toml index 19559ef8..251bb8de 100644 --- a/crates/smtp/Cargo.toml +++ b/crates/smtp/Cargo.toml @@ -50,7 +50,6 @@ reqwest = { version = "0.12", default-features = false, features = ["rustls-tls- serde = { version = "1.0", features = ["derive", "rc"] } serde_json = "1.0" num_cpus = "1.15.0" -lazy_static = "1.4" bincode = "1.3.1" chrono = "0.4" diff --git a/crates/smtp/src/core/mod.rs b/crates/smtp/src/core/mod.rs index b75ac728..d6832d7a 100644 --- a/crates/smtp/src/core/mod.rs +++ b/crates/smtp/src/core/mod.rs @@ -7,7 +7,7 @@ use std::{ hash::Hash, net::IpAddr, - sync::Arc, + sync::{Arc, LazyLock}, time::{Duration, Instant}, }; @@ -269,17 +269,17 @@ impl From for SMTP { } } -lazy_static::lazy_static! { -static ref SIEVE: Arc = Arc::new(ServerInstance { - id: "sieve".to_string(), - protocol: common::config::server::ServerProtocol::Lmtp, - acceptor: common::listener::TcpAcceptor::Plain, - limiter: ConcurrencyLimiter::new(0), - shutdown_rx: tokio::sync::watch::channel(false).1, - proxy_networks: vec![], - span_id_gen: Arc::new(SnowflakeIdGenerator::new()), +static SIEVE: LazyLock> = LazyLock::new(|| { + Arc::new(ServerInstance { + id: "sieve".to_string(), + protocol: common::config::server::ServerProtocol::Lmtp, + acceptor: common::listener::TcpAcceptor::Plain, + limiter: ConcurrencyLimiter::new(0), + shutdown_rx: tokio::sync::watch::channel(false).1, + proxy_networks: vec![], + span_id_gen: Arc::new(SnowflakeIdGenerator::new()), + }) }); -} impl Session { pub fn local(core: SMTP, instance: std::sync::Arc, data: SessionData) -> Self { diff --git a/crates/store/Cargo.toml b/crates/store/Cargo.toml index 46d5e1ea..4f9a5717 100644 --- a/crates/store/Cargo.toml +++ b/crates/store/Cargo.toml @@ -10,7 +10,7 @@ nlp = { path = "../nlp" } trc = { path = "../trc" } rocksdb = { version = "0.22", optional = true, features = ["multi-threaded-cf"] } foundationdb = { version = "0.9.0", features = ["embedded-fdb-include", "fdb-7_1"], optional = true } -rusqlite = { version = "0.31.0", features = ["bundled"], optional = true } +rusqlite = { version = "0.32", features = ["bundled"], optional = true } rust-s3 = { version = "=0.35.0-alpha.2", default-features = false, features = ["tokio-rustls-tls", "no-verify-ssl"], optional = true } tokio = { version = "1.23", features = ["sync", "fs", "io-util"] } r2d2 = { version = "0.8.10", optional = true } @@ -20,7 +20,6 @@ roaring = "0.10.1" rayon = { version = "1.5.1", optional = true } serde = { version = "1.0", features = ["derive"]} ahash = { version = "0.8.2", features = ["serde"] } -lazy_static = "1.4" xxhash-rust = { version = "0.8.5", features = ["xxh3"] } farmhash = "1.1.5" parking_lot = "0.12.1" @@ -41,7 +40,7 @@ serde_json = {version = "1.0.64", optional = true } regex = "1.7.0" flate2 = "1.0" async-trait = "0.1.68" -redis = { version = "0.25.2", features = [ "tokio-comp", "tokio-rustls-comp", "tls-rustls-insecure", "tls-rustls-webpki-roots", "cluster-async"], optional = true } +redis = { version = "0.26", features = [ "tokio-comp", "tokio-rustls-comp", "tls-rustls-insecure", "tls-rustls-webpki-roots", "cluster-async"], optional = true } deadpool = { version = "0.12", features = ["managed"], optional = true } bincode = "1.3.3" arc-swap = "1.6.0" diff --git a/crates/store/src/backend/redis/lookup.rs b/crates/store/src/backend/redis/lookup.rs index b0730f1b..c541d8bb 100644 --- a/crates/store/src/backend/redis/lookup.rs +++ b/crates/store/src/backend/redis/lookup.rs @@ -127,8 +127,9 @@ impl RedisStore { conn: &mut impl AsyncCommands, key: Vec, ) -> trc::Result> { - if let Some(value) = conn - .get::<_, Option>>(key) + if let Some(value) = redis::cmd("GET") + .arg(key) + .query_async::>>(conn) .await .map_err(into_error)? { @@ -139,7 +140,9 @@ impl RedisStore { } async fn counter_get_(&self, conn: &mut impl AsyncCommands, key: Vec) -> trc::Result { - conn.get::<_, Option>(key) + redis::cmd("GET") + .arg(key) + .query_async::>(conn) .await .map(|x| x.unwrap_or(0)) .map_err(into_error) @@ -176,7 +179,7 @@ impl RedisStore { .incr(&key, value) .expire(&key, expires as i64) .ignore() - .query_async::<_, Vec>(conn) + .query_async::>(conn) .await .map_err(into_error) .map(|v| v.first().copied().unwrap_or(0)) diff --git a/crates/store/src/dispatch/store.rs b/crates/store/src/dispatch/store.rs index 8b45f045..39746ecc 100644 --- a/crates/store/src/dispatch/store.rs +++ b/crates/store/src/dispatch/store.rs @@ -25,10 +25,14 @@ use crate::{ use super::DocumentSet; #[cfg(feature = "test_mode")] -lazy_static::lazy_static! { -pub static ref BITMAPS: std::sync::Arc, std::collections::HashSet>>> = - std::sync::Arc::new(parking_lot::Mutex::new(std::collections::HashMap::new())); -} +#[allow(clippy::type_complexity)] +static BITMAPS: std::sync::LazyLock< + std::sync::Arc< + parking_lot::Mutex, std::collections::HashSet>>, + >, +> = std::sync::LazyLock::new(|| { + std::sync::Arc::new(parking_lot::Mutex::new(std::collections::HashMap::new())) +}); impl Store { pub async fn get_value(&self, key: impl Key) -> trc::Result> diff --git a/crates/trc/src/channel.rs b/crates/trc/src/channel.rs index 2192269c..7fbaee14 100644 --- a/crates/trc/src/channel.rs +++ b/crates/trc/src/channel.rs @@ -15,7 +15,7 @@ use std::{ use rtrb::{Consumer, Producer, PushError, RingBuffer}; use crate::{ - collector::{spawn_collector, CollectorThread, Update, COLLECTOR_UPDATES}, + collector::{CollectorThread, Update, COLLECTOR_THREAD, COLLECTOR_UPDATES}, Event, EventType, }; @@ -32,7 +32,7 @@ thread_local! { COLLECTOR_UPDATES.lock().push(Update::RegisterReceiver { receiver: Receiver { rx } }); // Spawn collector thread. - let collector = spawn_collector().clone(); + let collector = COLLECTOR_THREAD.clone(); CHANNEL_FLAGS.fetch_or(CHANNEL_UPDATE_MARKER, Ordering::Relaxed); collector.thread().unpark(); diff --git a/crates/trc/src/collector.rs b/crates/trc/src/collector.rs index ed4c36f8..5be19159 100644 --- a/crates/trc/src/collector.rs +++ b/crates/trc/src/collector.rs @@ -5,7 +5,7 @@ */ use std::{ - sync::{atomic::Ordering, Arc, OnceLock}, + sync::{atomic::Ordering, Arc, LazyLock}, thread::{park, Builder, JoinHandle}, time::SystemTime, }; @@ -67,134 +67,148 @@ const EV_ATTEMPT_END: usize = EventType::Delivery(DeliveryEvent::AttemptEnd).id( const STALE_SPAN_CHECK_WATERMARK: usize = 8000; const SPAN_MAX_HOLD: u64 = 86400; +pub(crate) static COLLECTOR_THREAD: LazyLock> = LazyLock::new(|| { + Arc::new( + Builder::new() + .name("stalwart-collector".to_string()) + .spawn(move || { + Collector::default().collect(); + }) + .expect("Failed to start event collector"), + ) +}); + impl Collector { - fn collect(&mut self) -> bool { + fn collect(&mut self) { let mut do_continue = true; - match CHANNEL_FLAGS.swap(0, Ordering::Relaxed) { - 0 => { - park(); + + // Update + self.update(); + + while do_continue { + match CHANNEL_FLAGS.swap(0, Ordering::Relaxed) { + 0 => { + park(); + } + CHANNEL_UPDATE_MARKER..=u64::MAX => { + do_continue = self.update(); + } + _ => {} } - CHANNEL_UPDATE_MARKER..=u64::MAX => { - do_continue = self.update(); - } - _ => {} - } - // Collect all events - let mut closed_rxs = Vec::new(); - for (rx_idx, rx) in self.receivers.iter_mut().enumerate() { - let timestamp = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .map_or(0, |d| d.as_secs()); + // Collect all events + let mut closed_rxs = Vec::new(); + for (rx_idx, rx) in self.receivers.iter_mut().enumerate() { + let timestamp = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .map_or(0, |d| d.as_secs()); - loop { - match rx.try_recv() { - Ok(Some(event)) => { - // Build event - let event_id = event.inner.id(); - let mut event = Event { - inner: EventDetails { - level: self.levels[event_id], - typ: event.inner, - timestamp, - span: None, - }, - keys: event.keys, - }; + loop { + match rx.try_recv() { + Ok(Some(event)) => { + // Build event + let event_id = event.inner.id(); + let mut event = Event { + inner: EventDetails { + level: self.levels[event_id], + typ: event.inner, + timestamp, + span: None, + }, + keys: event.keys, + }; - // Track spans - let event = match event_id { - EV_CONN_START | EV_ATTEMPT_START => { - let event = Arc::new(event); - self.active_spans.insert( - event - .span_id() - .unwrap_or_else(|| panic!("Missing span ID: {event:?}")), - event.clone(), - ); - if self.active_spans.len() > STALE_SPAN_CHECK_WATERMARK { - self.active_spans.retain(|_, span| { - timestamp.saturating_sub(span.inner.timestamp) - < SPAN_MAX_HOLD - }); - } - event - } - EV_CONN_END | EV_ATTEMPT_END => { - if let Some(span) = self - .active_spans - .remove(&event.span_id().expect("Missing span ID")) - { - event.inner.span = Some(span.clone()); - } else { - #[cfg(debug_assertions)] - { - if event.span_id().unwrap() != 0 { - panic!("Unregistered span ID: {event:?}"); - } + // Track spans + let event = match event_id { + EV_CONN_START | EV_ATTEMPT_START => { + let event = Arc::new(event); + self.active_spans.insert( + event.span_id().unwrap_or_else(|| { + panic!("Missing span ID: {event:?}") + }), + event.clone(), + ); + if self.active_spans.len() > STALE_SPAN_CHECK_WATERMARK { + self.active_spans.retain(|_, span| { + timestamp.saturating_sub(span.inner.timestamp) + < SPAN_MAX_HOLD + }); } + event } - Arc::new(event) - } - _ => { - if let Some(span_id) = event.span_id() { - if let Some(span) = self.active_spans.get(&span_id) { + EV_CONN_END | EV_ATTEMPT_END => { + if let Some(span) = self + .active_spans + .remove(&event.span_id().expect("Missing span ID")) + { event.inner.span = Some(span.clone()); } else { #[cfg(debug_assertions)] { - if span_id != 0 { + if event.span_id().unwrap() != 0 { panic!("Unregistered span ID: {event:?}"); } } } + Arc::new(event) } + _ => { + if let Some(span_id) = event.span_id() { + if let Some(span) = self.active_spans.get(&span_id) { + event.inner.span = Some(span.clone()); + } else { + #[cfg(debug_assertions)] + { + if span_id != 0 { + panic!("Unregistered span ID: {event:?}"); + } + } + } + } - Arc::new(event) + Arc::new(event) + } + }; + + // Send to subscribers + for subscriber in self.subscribers.iter_mut() { + subscriber.push_event(event_id, event.clone()); } - }; - - // Send to subscribers - for subscriber in self.subscribers.iter_mut() { - subscriber.push_event(event_id, event.clone()); + } + Ok(None) => { + break; + } + Err(_) => { + closed_rxs.push(rx_idx); // Channel is closed, remove. + break; } } - Ok(None) => { - break; - } - Err(_) => { - closed_rxs.push(rx_idx); // Channel is closed, remove. - break; + } + } + + if do_continue { + // Remove closed receivers (should be rare in Tokio) + if !closed_rxs.is_empty() { + let mut receivers = Vec::with_capacity(self.receivers.len() - closed_rxs.len()); + for (rx_idx, rx) in self.receivers.drain(..).enumerate() { + if !closed_rxs.contains(&rx_idx) { + receivers.push(rx); + } } + self.receivers = receivers; + } + + // Send batched events + if !self.subscribers.is_empty() { + self.subscribers + .retain_mut(|subscriber| subscriber.send_batch().is_ok()); } } } - if do_continue { - // Remove closed receivers (should be rare in Tokio) - if !closed_rxs.is_empty() { - let mut receivers = Vec::with_capacity(self.receivers.len() - closed_rxs.len()); - for (rx_idx, rx) in self.receivers.drain(..).enumerate() { - if !closed_rxs.contains(&rx_idx) { - receivers.push(rx); - } - } - self.receivers = receivers; - } - - // Send batched events - if !self.subscribers.is_empty() { - self.subscribers - .retain_mut(|subscriber| subscriber.send_batch().is_ok()); - } - true - } else { - // Send remaining events - for mut subscriber in self.subscribers.drain(..) { - let _ = subscriber.send_batch(); - } - - false + // Send remaining events + for mut subscriber in self.subscribers.drain(..) { + let _ = subscriber.send_batch(); } } @@ -301,31 +315,10 @@ impl Collector { pub fn reload() { CHANNEL_FLAGS.fetch_or(CHANNEL_UPDATE_MARKER, Ordering::Relaxed); - spawn_collector().thread().unpark(); + COLLECTOR_THREAD.thread().unpark(); } } -pub(crate) fn spawn_collector() -> &'static Arc { - static COLLECTOR: OnceLock> = OnceLock::new(); - COLLECTOR.get_or_init(|| { - Arc::new( - Builder::new() - .name("stalwart-collector".to_string()) - .spawn(move || { - // Create collector - let mut collector = Collector::default(); - - // Update - collector.update(); - - // Collect events - while collector.collect() {} - }) - .expect("Failed to start event collector"), - ) - }) -} - impl Default for Collector { fn default() -> Self { let mut c = Collector {