diff --git a/Cargo.lock b/Cargo.lock index 1f38eb22..bc04f6b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3217,6 +3217,7 @@ dependencies = [ "mail-builder", "mail-parser", "mail-send", + "memory-stats", "mime", "nlp", "p256", @@ -3741,6 +3742,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "memory-stats" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c73f5c649995a115e1a0220b35e4df0a1294500477f97a91d0660fb5abeb574a" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "memsec" version = "0.7.0" diff --git a/crates/common/src/config/network.rs b/crates/common/src/config/network.rs index bda6d9a7..c3829dbf 100644 --- a/crates/common/src/config/network.rs +++ b/crates/common/src/config/network.rs @@ -32,6 +32,7 @@ impl Default for Network { Self { blocked_ips: Default::default(), allowed_ips: Default::default(), + node_id: 0, http_response_url: IfBlock::new::<()>( "server.http.url", [], @@ -45,6 +46,7 @@ impl Default for Network { impl Network { pub fn parse(config: &mut Config) -> Self { let mut network = Network { + node_id: config.property("cluster.node-id").unwrap_or_default(), blocked_ips: BlockedIps::parse(config), allowed_ips: AllowedIps::parse(config), ..Default::default() diff --git a/crates/common/src/config/telemetry.rs b/crates/common/src/config/telemetry.rs index 30896e7d..b54cc1ba 100644 --- a/crates/common/src/config/telemetry.rs +++ b/crates/common/src/config/telemetry.rs @@ -137,28 +137,18 @@ impl Telemetry { }; // Parse metrics - if config - .property_or_default("metrics.prometheus.enable", "false") - .unwrap_or(false) - || ["http", "grpc"].contains( - &config - .value("metrics.open-telemetry.transport") - .unwrap_or("disabled"), - ) - { - apply_events( - config - .properties::("metrics.disabled-events") - .into_iter() - .map(|(_, e)| e), - false, - |event_type| { - if event_type.is_metric() { - telemetry.metrics.set(event_type); - } - }, - ); - } + apply_events( + config + .properties::("metrics.disabled-events") + .into_iter() + .map(|(_, e)| e), + false, + |event_type| { + if event_type.is_metric() { + telemetry.metrics.set(event_type); + } + }, + ); telemetry } diff --git a/crates/common/src/enterprise/config.rs b/crates/common/src/enterprise/config.rs index 57570d2d..3339f6ed 100644 --- a/crates/common/src/enterprise/config.rs +++ b/crates/common/src/enterprise/config.rs @@ -12,9 +12,9 @@ use std::time::Duration; use jmap_proto::types::collection::Collection; use store::{BitmapKey, Store, Stores}; -use utils::config::Config; +use utils::config::{cron::SimpleCron, utils::ParseValue, Config}; -use super::{license::LicenseValidator, Enterprise}; +use super::{license::LicenseValidator, Enterprise, MetricsStore, TraceStore, Undelete}; impl Enterprise { pub async fn parse(config: &mut Config, stores: &Stores, data: &Store) -> Option { @@ -54,18 +54,62 @@ impl Enterprise { _ => (), } - Some(Enterprise { - license, - undelete_period: config - .property_or_default::>("storage.undelete.retention", "false") - .unwrap_or_default(), - trace_hold_period: config - .property_or_default::>("tracing.history.retention", "30d") - .unwrap_or(Some(Duration::from_secs(30 * 24 * 60 * 60))), - trace_store: config + let trace_store = if config + .property_or_default("tracing.history.enable", "false") + .unwrap_or(false) + { + if let Some(store) = config .value("tracing.history.store") .and_then(|name| stores.stores.get(name)) - .cloned(), + .cloned() + { + TraceStore { + retention: config + .property_or_default::>("tracing.history.retention", "30d") + .unwrap_or(Some(Duration::from_secs(30 * 24 * 60 * 60))), + store, + } + .into() + } else { + None + } + } else { + None + }; + let metrics_store = if config + .property_or_default("metrics.history.enable", "false") + .unwrap_or(false) + { + if let Some(store) = config + .value("metrics.history.store") + .and_then(|name| stores.stores.get(name)) + .cloned() + { + MetricsStore { + retention: config + .property_or_default::>("metrics.history.retention", "30d") + .unwrap_or(Some(Duration::from_secs(30 * 24 * 60 * 60))), + store, + interval: config + .property_or_default::("metrics.history.interval", "0 * *") + .unwrap_or_else(|| SimpleCron::parse_value("0 * *").unwrap()), + } + .into() + } else { + None + } + } else { + None + }; + + Some(Enterprise { + license, + undelete: config + .property_or_default::>("storage.undelete.retention", "false") + .unwrap_or_default() + .map(|retention| Undelete { retention }), + trace_store, + metrics_store, }) } } diff --git a/crates/common/src/enterprise/mod.rs b/crates/common/src/enterprise/mod.rs index f5155217..0539a09f 100644 --- a/crates/common/src/enterprise/mod.rs +++ b/crates/common/src/enterprise/mod.rs @@ -17,15 +17,34 @@ use std::time::Duration; use license::LicenseKey; use mail_parser::DateTime; use store::Store; +use utils::config::cron::SimpleCron; use crate::Core; #[derive(Clone)] pub struct Enterprise { pub license: LicenseKey, - pub undelete_period: Option, - pub trace_hold_period: Option, - pub trace_store: Option, + pub undelete: Option, + pub trace_store: Option, + pub metrics_store: Option, +} + +#[derive(Clone)] +pub struct Undelete { + pub retention: Duration, +} + +#[derive(Clone)] +pub struct TraceStore { + pub retention: Option, + pub store: Store, +} + +#[derive(Clone)] +pub struct MetricsStore { + pub retention: Option, + pub store: Store, + pub interval: SimpleCron, } impl Core { diff --git a/crates/common/src/enterprise/undelete.rs b/crates/common/src/enterprise/undelete.rs index f99abd8a..57355c43 100644 --- a/crates/common/src/enterprise/undelete.rs +++ b/crates/common/src/enterprise/undelete.rs @@ -40,13 +40,13 @@ impl Core { blob_hash: &BlobHash, blob_size: usize, ) { - if let Some(hold_period) = self.enterprise.as_ref().and_then(|e| e.undelete_period) { + if let Some(undelete) = self.enterprise.as_ref().and_then(|e| e.undelete.as_ref()) { let now = now(); batch.set( BlobOp::Reserve { hash: blob_hash.clone(), - until: now + hold_period.as_secs(), + until: now + undelete.retention.as_secs(), }, KeySerializer::new(U64_LEN + U64_LEN) .write(blob_size as u32) diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 21c6e890..9d6f8367 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -28,7 +28,10 @@ use listener::{ use mail_send::Credentials; use sieve::Sieve; -use store::LookupStore; +use store::{ + write::{QueueClass, ValueClass}, + IterateParams, LookupStore, ValueKey, +}; use tokio::sync::{mpsc, oneshot}; use utils::BlobHash; @@ -65,6 +68,7 @@ pub struct Core { #[derive(Clone)] pub struct Network { + pub node_id: u64, pub blocked_ips: BlockedIps, pub allowed_ips: AllowedIps, pub http_response_url: IfBlock, @@ -303,6 +307,26 @@ impl Core { .ctx(trc::Key::AccountName, credentials.login().to_string())) } } + + pub async fn message_queue_size(&self) -> trc::Result { + let mut total = 0; + self.storage + .data + .iterate( + IterateParams::new( + ValueKey::from(ValueClass::Queue(QueueClass::Message(0))), + ValueKey::from(ValueClass::Queue(QueueClass::Message(u64::MAX))), + ) + .no_values(), + |_, _| { + total += 1; + + Ok(true) + }, + ) + .await + .map(|_| total) + } } trait CredentialsUsername { diff --git a/crates/common/src/telemetry/metrics/mod.rs b/crates/common/src/telemetry/metrics/mod.rs index c998e920..ebd02504 100644 --- a/crates/common/src/telemetry/metrics/mod.rs +++ b/crates/common/src/telemetry/metrics/mod.rs @@ -6,3 +6,6 @@ pub mod otel; pub mod prometheus; + +#[cfg(feature = "enterprise")] +pub mod store; diff --git a/crates/common/src/telemetry/metrics/otel.rs b/crates/common/src/telemetry/metrics/otel.rs index e2f2ab2e..a4e10c9b 100644 --- a/crates/common/src/telemetry/metrics/otel.rs +++ b/crates/common/src/telemetry/metrics/otel.rs @@ -29,28 +29,8 @@ impl OtelMetrics { // Add counters for counter in Collector::collect_counters(is_enterprise) { metrics.push(Metric { - name: counter.id().into(), - description: counter.description().into(), - unit: counter.unit().into(), - data: Box::new(Sum { - data_points: vec![DataPoint { - attributes: vec![], - start_time: start_time.into(), - time: now.into(), - value: counter.get(), - exemplars: vec![], - }], - temporality: Temporality::Cumulative, - is_monotonic: true, - }), - }); - } - - // Add event counters - for counter in Collector::collect_event_counters(is_enterprise) { - metrics.push(Metric { - name: counter.id().into(), - description: counter.description().into(), + name: counter.id().name().into(), + description: counter.id().description().into(), unit: "events".into(), data: Box::new(Sum { data_points: vec![DataPoint { @@ -69,9 +49,9 @@ impl OtelMetrics { // Add gauges for gauge in Collector::collect_gauges(is_enterprise) { metrics.push(Metric { - name: gauge.id().into(), - description: gauge.description().into(), - unit: gauge.unit().into(), + name: gauge.id().name().into(), + description: gauge.id().description().into(), + unit: gauge.id().unit().into(), data: Box::new(Gauge { data_points: vec![DataPoint { attributes: vec![], @@ -87,9 +67,9 @@ impl OtelMetrics { // Add histograms for histogram in Collector::collect_histograms(is_enterprise) { metrics.push(Metric { - name: histogram.id().into(), - description: histogram.description().into(), - unit: histogram.unit().into(), + name: histogram.id().name().into(), + description: histogram.id().description().into(), + unit: histogram.id().unit().into(), data: Box::new(Histogram { data_points: vec![HistogramDataPoint { attributes: vec![], diff --git a/crates/common/src/telemetry/metrics/prometheus.rs b/crates/common/src/telemetry/metrics/prometheus.rs index 929b8f65..35ff3c01 100644 --- a/crates/common/src/telemetry/metrics/prometheus.rs +++ b/crates/common/src/telemetry/metrics/prometheus.rs @@ -25,18 +25,8 @@ impl Core { // Add counters for counter in Collector::collect_counters(is_enterprise) { let mut metric = MetricFamily::default(); - metric.set_name(metric_name(counter.id())); - metric.set_help(counter.description().into()); - metric.set_field_type(MetricType::COUNTER); - metric.set_metric(vec![new_counter(counter.get())]); - metrics.push(metric); - } - - // Add event counters - for counter in Collector::collect_event_counters(is_enterprise) { - let mut metric = MetricFamily::default(); - metric.set_name(metric_name(counter.id())); - metric.set_help(counter.description().into()); + metric.set_name(metric_name(counter.id().name())); + metric.set_help(counter.id().description().into()); metric.set_field_type(MetricType::COUNTER); metric.set_metric(vec![new_counter(counter.value())]); metrics.push(metric); @@ -45,8 +35,8 @@ impl Core { // Add gauges for gauge in Collector::collect_gauges(is_enterprise) { let mut metric = MetricFamily::default(); - metric.set_name(metric_name(gauge.id())); - metric.set_help(gauge.description().into()); + metric.set_name(metric_name(gauge.id().name())); + metric.set_help(gauge.id().description().into()); metric.set_field_type(MetricType::GAUGE); metric.set_metric(vec![new_gauge(gauge.get())]); metrics.push(metric); @@ -55,8 +45,8 @@ impl Core { // Add histograms for histogram in Collector::collect_histograms(is_enterprise) { let mut metric = MetricFamily::default(); - metric.set_name(metric_name(histogram.id())); - metric.set_help(histogram.description().into()); + metric.set_name(metric_name(histogram.id().name())); + metric.set_help(histogram.id().description().into()); metric.set_field_type(MetricType::HISTOGRAM); metric.set_metric(vec![new_histogram(histogram)]); metrics.push(metric); diff --git a/crates/common/src/telemetry/metrics/store.rs b/crates/common/src/telemetry/metrics/store.rs new file mode 100644 index 00000000..89146cdb --- /dev/null +++ b/crates/common/src/telemetry/metrics/store.rs @@ -0,0 +1,257 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd + * + * SPDX-License-Identifier: LicenseRef-SEL + * + * This file is subject to the Stalwart Enterprise License Agreement (SEL) and + * is NOT open source software. + * + */ + +use std::{future::Future, sync::Arc, time::Duration}; + +use ahash::AHashMap; +use parking_lot::Mutex; +use serde::{Deserialize, Serialize}; +use store::{ + write::{ + key::{DeserializeBigEndian, KeySerializer}, + now, BatchBuilder, TelemetryClass, ValueClass, + }, + IterateParams, Store, ValueKey, U32_LEN, U64_LEN, +}; +use trc::*; +use utils::codec::leb128::Leb128Reader; + +use crate::Core; + +pub trait MetricsStore: Sync + Send { + fn write_metrics( + &self, + core: Arc, + history: SharedMetricHistory, + ) -> impl Future> + Send; + fn query_metrics( + &self, + from_timestamp: u64, + to_timestamp: u64, + ) -> impl Future>>> + Send; + fn purge_metrics(&self, period: Duration) -> impl Future> + Send; +} + +#[derive(Default)] +pub struct MetricsHistory { + events: AHashMap, + histograms: AHashMap, +} + +#[derive(Default)] +struct HistogramHistory { + sum: u64, + count: u64, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(tag = "type")] +#[serde(rename_all = "snake_case")] +pub enum Metric { + Counter { + id: CI, + timestamp: T, + value: u32, + }, + Histogram { + id: MI, + timestamp: T, + count: u64, + sum: u64, + }, +} + +pub type SharedMetricHistory = Arc>; + +const TYPE_COUNTER: u64 = 0x00; +const TYPE_HISTOGRAM: u64 = 0x01; + +impl MetricsStore for Store { + async fn write_metrics( + &self, + core: Arc, + history_: SharedMetricHistory, + ) -> trc::Result<()> { + let mut batch = BatchBuilder::new(); + { + let timestamp = now(); + let node_id = core.network.node_id; + let mut history = history_.lock(); + for event in [ + EventType::Smtp(SmtpEvent::ConnectionStart), + EventType::Imap(ImapEvent::ConnectionStart), + EventType::Pop3(Pop3Event::ConnectionStart), + EventType::ManageSieve(ManageSieveEvent::ConnectionStart), + EventType::Http(HttpEvent::ConnectionStart), + EventType::Delivery(DeliveryEvent::AttemptStart), + EventType::Delivery(DeliveryEvent::Completed), + EventType::MessageIngest(MessageIngestEvent::Ham), + EventType::MessageIngest(MessageIngestEvent::Spam), + EventType::Network(NetworkEvent::DropBlocked), + EventType::IncomingReport(IncomingReportEvent::DmarcReport), + EventType::IncomingReport(IncomingReportEvent::DmarcReportWithWarnings), + EventType::IncomingReport(IncomingReportEvent::TlsReport), + EventType::IncomingReport(IncomingReportEvent::TlsReportWithWarnings), + ] { + let reading = Collector::read_event_metric(event.id()); + if reading > 0 { + let history = history.events.entry(event).or_insert(0); + let diff = reading - *history; + if diff > 0 { + batch.set( + ValueClass::Telemetry(TelemetryClass::Metric { + timestamp, + metric_id: (event.code() << 2) | TYPE_COUNTER, + node_id, + }), + KeySerializer::new(U32_LEN).write_leb128(diff).finalize(), + ); + } + *history = reading; + } + } + + for histogram in Collector::collect_histograms(true) { + let histogram_id = histogram.id(); + if matches!( + histogram_id, + MetricType::MessageIngestionTime + | MetricType::MessageFtsIndexTime + | MetricType::DeliveryTotalTime + | MetricType::DeliveryTime + | MetricType::DnsLookupTime + ) { + let history = history.histograms.entry(histogram_id).or_default(); + let sum = histogram.sum(); + let count = histogram.count(); + let diff_sum = sum - history.sum; + let diff_count = count - history.count; + if diff_sum > 0 || diff_count > 0 { + batch.set( + ValueClass::Telemetry(TelemetryClass::Metric { + timestamp, + metric_id: (histogram_id.code() << 2) | TYPE_HISTOGRAM, + node_id, + }), + KeySerializer::new(U32_LEN) + .write_leb128(diff_count) + .write_leb128(diff_sum) + .finalize(), + ); + } + history.sum = sum; + history.count = count; + } + } + } + + if !batch.is_empty() { + self.write(batch.build()) + .await + .caused_by(trc::location!())?; + } + + Ok(()) + } + + async fn query_metrics( + &self, + from_timestamp: u64, + to_timestamp: u64, + ) -> trc::Result>> { + let mut metrics = Vec::new(); + self.iterate( + IterateParams::new( + ValueKey::from(ValueClass::Telemetry(TelemetryClass::Metric { + timestamp: from_timestamp, + metric_id: 0, + node_id: 0, + })), + ValueKey::from(ValueClass::Telemetry(TelemetryClass::Metric { + timestamp: to_timestamp, + metric_id: 0, + node_id: 0, + })), + ), + |key, value| { + let timestamp = key.deserialize_be_u64(0).caused_by(trc::location!())?; + let (metric_type, _) = key + .get(U64_LEN..) + .and_then(|bytes| bytes.read_leb128::()) + .ok_or_else(|| trc::Error::corrupted_key(key, None, trc::location!()))?; + match metric_type & 0x03 { + TYPE_COUNTER => { + let id = EventType::from_code(metric_type >> 2).ok_or_else(|| { + trc::Error::corrupted_key(key, None, trc::location!()) + })?; + let (value, _) = value.read_leb128::().ok_or_else(|| { + trc::Error::corrupted_key(key, value.into(), trc::location!()) + })?; + metrics.push(Metric::Counter { + id, + timestamp, + value, + }); + } + TYPE_HISTOGRAM => { + let id = MetricType::from_code(metric_type >> 2).ok_or_else(|| { + trc::Error::corrupted_key(key, None, trc::location!()) + })?; + let (count, bytes_read) = value.read_leb128::().ok_or_else(|| { + trc::Error::corrupted_key(key, value.into(), trc::location!()) + })?; + let (sum, _) = value + .get(bytes_read..) + .and_then(|bytes| bytes.read_leb128::()) + .ok_or_else(|| { + trc::Error::corrupted_key(key, value.into(), trc::location!()) + })?; + metrics.push(Metric::Histogram { + id, + timestamp, + count, + sum, + }); + } + _ => return Err(trc::Error::corrupted_key(key, None, trc::location!())), + } + + Ok(true) + }, + ) + .await + .caused_by(trc::location!())?; + + Ok(metrics) + } + + async fn purge_metrics(&self, period: Duration) -> trc::Result<()> { + self.delete_range( + ValueKey::from(ValueClass::Telemetry(TelemetryClass::Metric { + timestamp: 0, + metric_id: 0, + node_id: 0, + })), + ValueKey::from(ValueClass::Telemetry(TelemetryClass::Metric { + timestamp: now() - period.as_secs(), + metric_id: 0, + node_id: 0, + })), + ) + .await + .caused_by(trc::location!()) + } +} + +impl MetricsHistory { + pub fn init() -> SharedMetricHistory { + Arc::new(Mutex::new(Self::default())) + } +} diff --git a/crates/common/src/telemetry/tracers/store.rs b/crates/common/src/telemetry/tracers/store.rs index a47ce4f5..c7e71c19 100644 --- a/crates/common/src/telemetry/tracers/store.rs +++ b/crates/common/src/telemetry/tracers/store.rs @@ -12,7 +12,7 @@ use std::{future::Future, time::Duration}; use ahash::{AHashMap, AHashSet}; use store::{ - write::{key::DeserializeBigEndian, BatchBuilder, MaybeDynamicId, TraceClass, ValueClass}, + write::{key::DeserializeBigEndian, BatchBuilder, MaybeDynamicId, TelemetryClass, ValueClass}, Deserialize, IterateParams, Store, ValueKey, U64_LEN, }; use trc::{ @@ -81,7 +81,7 @@ pub(crate) fn spawn_store_tracer(builder: SubscriberBuilder, settings: StoreTrac if !queue_ids.is_empty() { // Serialize events batch.set( - ValueClass::Trace(TraceClass::Span { span_id }), + ValueClass::Telemetry(TelemetryClass::Span { span_id }), serialize_events( [span.as_ref()] .into_iter() @@ -93,7 +93,7 @@ pub(crate) fn spawn_store_tracer(builder: SubscriberBuilder, settings: StoreTrac // Build index batch.set( - ValueClass::Trace(TraceClass::Index { + ValueClass::Telemetry(TelemetryClass::Index { span_id, value: (span.inner.typ.code() as u16).to_be_bytes().to_vec(), }), @@ -101,7 +101,7 @@ pub(crate) fn spawn_store_tracer(builder: SubscriberBuilder, settings: StoreTrac ); for queue_id in queue_ids { batch.set( - ValueClass::Trace(TraceClass::Index { + ValueClass::Telemetry(TelemetryClass::Index { span_id, value: queue_id.to_be_bytes().to_vec(), }), @@ -110,7 +110,7 @@ pub(crate) fn spawn_store_tracer(builder: SubscriberBuilder, settings: StoreTrac } for value in values { batch.set( - ValueClass::Trace(TraceClass::Index { + ValueClass::Telemetry(TelemetryClass::Index { span_id, value: value.into_bytes(), }), @@ -158,18 +158,18 @@ pub trait TracingStore: Sync + Send { impl TracingStore for Store { async fn get_span(&self, span_id: u64) -> trc::Result>> { - self.get_value::(ValueKey::from(ValueClass::Trace(TraceClass::Span { - span_id, - }))) + self.get_value::(ValueKey::from(ValueClass::Telemetry( + TelemetryClass::Span { span_id }, + ))) .await .caused_by(trc::location!()) .map(|span| span.map(|span| span.0).unwrap_or_default()) } async fn get_raw_span(&self, span_id: u64) -> trc::Result>> { - self.get_value::(ValueKey::from(ValueClass::Trace(TraceClass::Span { - span_id, - }))) + self.get_value::(ValueKey::from(ValueClass::Telemetry( + TelemetryClass::Span { span_id }, + ))) .await .caused_by(trc::location!()) .map(|span| span.map(|span| span.0)) @@ -206,11 +206,11 @@ impl TracingStore for Store { let mut param_spans = SpanCollector::new(num_params); self.iterate( IterateParams::new( - ValueKey::from(ValueClass::Trace(TraceClass::Index { + ValueKey::from(ValueClass::Telemetry(TelemetryClass::Index { span_id: 0, value: value.clone(), })), - ValueKey::from(ValueClass::Trace(TraceClass::Index { + ValueKey::from(ValueClass::Telemetry(TelemetryClass::Index { span_id: u64::MAX, value, })), @@ -253,8 +253,8 @@ impl TracingStore for Store { })?; self.delete_range( - ValueKey::from(ValueClass::Trace(TraceClass::Span { span_id: 0 })), - ValueKey::from(ValueClass::Trace(TraceClass::Span { + ValueKey::from(ValueClass::Telemetry(TelemetryClass::Span { span_id: 0 })), + ValueKey::from(ValueClass::Telemetry(TelemetryClass::Span { span_id: until_span_id, })), ) @@ -264,11 +264,11 @@ impl TracingStore for Store { let mut delete_keys: Vec> = Vec::new(); self.iterate( IterateParams::new( - ValueKey::from(ValueClass::Trace(TraceClass::Index { + ValueKey::from(ValueClass::Telemetry(TelemetryClass::Index { span_id: 0, value: vec![], })), - ValueKey::from(ValueClass::Trace(TraceClass::Index { + ValueKey::from(ValueClass::Telemetry(TelemetryClass::Index { span_id: u64::MAX, value: vec![u8::MAX; 16], })), @@ -279,7 +279,7 @@ impl TracingStore for Store { .deserialize_be_u64(key.len() - U64_LEN) .caused_by(trc::location!())?; if span_id < until_span_id { - delete_keys.push(ValueClass::Trace(TraceClass::Index { + delete_keys.push(ValueClass::Telemetry(TelemetryClass::Index { span_id, value: key[0..key.len() - U64_LEN].to_vec(), })); diff --git a/crates/jmap/Cargo.toml b/crates/jmap/Cargo.toml index 5d81e496..41a8a425 100644 --- a/crates/jmap/Cargo.toml +++ b/crates/jmap/Cargo.toml @@ -57,7 +57,7 @@ lz4_flex = { version = "0.11", default-features = false } rev_lines = "0.3.0" x509-parser = "0.16.0" quick-xml = "0.36" - +memory-stats = "1.2.0" [features] test_mode = [] diff --git a/crates/jmap/src/api/http.rs b/crates/jmap/src/api/http.rs index 0e1b94d7..2ff5acaf 100644 --- a/crates/jmap/src/api/http.rs +++ b/crates/jmap/src/api/http.rs @@ -297,16 +297,26 @@ impl JMAP { if err.matches(trc::EventType::Auth(trc::AuthEvent::Failed)) && self.core.is_enterprise_edition() { - if let Some(token) = - req.uri().path().strip_prefix("/api/tracing/live/") + if let Some((live_path, token)) = req + .uri() + .path() + .strip_prefix("/api/telemetry/") + .and_then(|p| { + p.strip_suffix("traces/live/") + .map(|t| ("traces", t)) + .or_else(|| { + p.strip_suffix("metrics/live/") + .map(|t| ("metrics", t)) + }) + }) { let (account_id, _, _) = - self.validate_access_token("live_tracing", token).await?; + self.validate_access_token("live_telemetry", token).await?; return self - .handle_tracing_api_request( + .handle_telemetry_api_request( &req, - vec!["", "live"], + vec!["", live_path, "live"], account_id, ) .await; diff --git a/crates/jmap/src/api/management/enterprise/mod.rs b/crates/jmap/src/api/management/enterprise/mod.rs index 4c8bbf69..9fc035c3 100644 --- a/crates/jmap/src/api/management/enterprise/mod.rs +++ b/crates/jmap/src/api/management/enterprise/mod.rs @@ -8,5 +8,5 @@ * */ -pub mod tracing; +pub mod telemetry; pub mod undelete; diff --git a/crates/jmap/src/api/management/enterprise/tracing.rs b/crates/jmap/src/api/management/enterprise/telemetry.rs similarity index 67% rename from crates/jmap/src/api/management/enterprise/tracing.rs rename to crates/jmap/src/api/management/enterprise/telemetry.rs index 6e630c40..735c70c9 100644 --- a/crates/jmap/src/api/management/enterprise/tracing.rs +++ b/crates/jmap/src/api/management/enterprise/telemetry.rs @@ -8,9 +8,15 @@ * */ -use std::time::{Duration, Instant}; +use std::{ + fmt::Write, + time::{Duration, Instant}, +}; -use common::telemetry::tracers::store::{TracingQuery, TracingStore}; +use common::telemetry::{ + metrics::store::{Metric, MetricsStore}, + tracers::store::{TracingQuery, TracingStore}, +}; use directory::backend::internal::manage; use http_body_util::{combinators::BoxBody, StreamBody}; use hyper::{ @@ -23,7 +29,7 @@ use store::ahash::{AHashMap, AHashSet}; use trc::{ ipc::{bitset::Bitset, subscriber::SubscriberBuilder}, serializers::json::JsonEventSerializer, - DeliveryEvent, EventType, Key, QueueEvent, Value, + Collector, DeliveryEvent, EventType, Key, MetricType, QueueEvent, Value, }; use utils::{snowflake::SnowflakeIdGenerator, url_params::UrlParams}; @@ -36,7 +42,7 @@ use crate::{ }; impl JMAP { - pub async fn handle_tracing_api_request( + pub async fn handle_telemetry_api_request( &self, req: &HttpRequest, path: Vec<&str>, @@ -49,7 +55,7 @@ impl JMAP { path.get(2).copied(), req.method(), ) { - ("spans", None, &Method::GET) => { + ("traces", None, &Method::GET) => { let page: usize = params.parse("page").unwrap_or(0); let limit: usize = params.parse("limit").unwrap_or(0); let mut tracing_query = Vec::new(); @@ -100,12 +106,13 @@ impl JMAP { .and_then(SnowflakeIdGenerator::from_timestamp) .unwrap_or(0); let values = params.get("values").is_some(); - let store = self + let store = &self .core .enterprise .as_ref() .and_then(|e| e.trace_store.as_ref()) - .ok_or_else(|| manage::unsupported("No tracing store has been configured"))?; + .ok_or_else(|| manage::unsupported("No tracing store has been configured"))? + .store; let span_ids = store.query_spans(&tracing_query, after, before).await?; let (total, span_ids) = if limit > 0 { @@ -154,52 +161,7 @@ impl JMAP { .into_http_response()) } } - ("span", id, &Method::GET) => { - let store = self - .core - .enterprise - .as_ref() - .and_then(|e| e.trace_store.as_ref()) - .ok_or_else(|| manage::unsupported("No tracing store has been configured"))?; - - let mut events = Vec::new(); - for span_id in id - .or_else(|| params.get("id")) - .unwrap_or_default() - .split(',') - { - if let Ok(span_id) = span_id.parse::() { - events.push( - JsonEventSerializer::new(store.get_span(span_id).await?) - .with_description() - .with_explanation(), - ); - } else { - events.push(JsonEventSerializer::new(Vec::new())); - } - } - - if events.len() == 1 && id.is_some() { - Ok(JsonResponse::new(json!({ - "data": events.into_iter().next().unwrap(), - })) - .into_http_response()) - } else { - Ok(JsonResponse::new(json!({ - "data": events, - })) - .into_http_response()) - } - } - ("live", Some("token"), &Method::GET) => { - // Issue a live tracing token valid for 60 seconds - - Ok(JsonResponse::new(json!({ - "data": self.issue_custom_token(account_id, "live_tracing", "web", 60).await?, - })) - .into_http_response()) - } - ("live", _, &Method::GET) => { + ("traces", Some("live"), &Method::GET) => { let mut key_filters = AHashMap::new(); let mut filter = None; @@ -327,6 +289,190 @@ impl JMAP { ))), }) } + ("trace", id, &Method::GET) => { + let store = &self + .core + .enterprise + .as_ref() + .and_then(|e| e.trace_store.as_ref()) + .ok_or_else(|| manage::unsupported("No tracing store has been configured"))? + .store; + + let mut events = Vec::new(); + for span_id in id + .or_else(|| params.get("id")) + .unwrap_or_default() + .split(',') + { + if let Ok(span_id) = span_id.parse::() { + events.push( + JsonEventSerializer::new(store.get_span(span_id).await?) + .with_description() + .with_explanation(), + ); + } else { + events.push(JsonEventSerializer::new(Vec::new())); + } + } + + if events.len() == 1 && id.is_some() { + Ok(JsonResponse::new(json!({ + "data": events.into_iter().next().unwrap(), + })) + .into_http_response()) + } else { + Ok(JsonResponse::new(json!({ + "data": events, + })) + .into_http_response()) + } + } + ("live", Some("token"), &Method::GET) => { + // Issue a live telemetry token valid for 60 seconds + + Ok(JsonResponse::new(json!({ + "data": self.issue_custom_token(account_id, "live_telemetry", "web", 60).await?, + })) + .into_http_response()) + } + ("metrics", None, &Method::GET) => { + let before = params + .parse::("before") + .map(|t| t.into_inner()) + .unwrap_or(0); + let after = params + .parse::("after") + .map(|t| t.into_inner()) + .unwrap_or(0); + let results = self + .core + .enterprise + .as_ref() + .and_then(|e| e.metrics_store.as_ref()) + .ok_or_else(|| manage::unsupported("No metrics store has been configured"))? + .store + .query_metrics(after, before) + .await?; + let mut metrics = Vec::with_capacity(results.len()); + + for metric in results { + metrics.push(match metric { + Metric::Counter { + id, + timestamp, + value, + } => Metric::Counter { + id: id.name(), + timestamp: DateTime::from_timestamp(timestamp as i64).to_rfc3339(), + value, + }, + Metric::Histogram { + id, + timestamp, + count, + sum, + } => Metric::Histogram { + id: id.name(), + timestamp: DateTime::from_timestamp(timestamp as i64).to_rfc3339(), + count, + sum, + }, + }); + } + + Ok(JsonResponse::new(json!({ + "data": metrics, + })) + .into_http_response()) + } + ("metrics", Some("live"), &Method::GET) => { + let interval = Duration::from_secs( + params + .parse::("interval") + .filter(|interval| *interval >= 1) + .unwrap_or(30), + ); + let mut event_types = AHashSet::new(); + let mut metric_types = AHashSet::new(); + for metric_name in params.get("metrics").unwrap_or_default().split(',') { + let metric_name = metric_name.trim(); + if !metric_name.is_empty() { + if let Some(event_type) = EventType::try_parse(metric_name) { + event_types.insert(event_type); + } else if let Some(metric_type) = MetricType::try_parse(metric_name) { + metric_types.insert(metric_type); + } + } + } + + Ok(HttpResponse { + status: StatusCode::OK, + content_type: "text/event-stream".into(), + content_disposition: "".into(), + cache_control: "no-store".into(), + body: HttpResponseBody::Stream(BoxBody::new(StreamBody::new( + async_stream::stream! { + + loop { + let mut metrics = String::with_capacity(512); + metrics.push_str("event: metrics\ndata: ["); + let mut is_first = true; + + for counter in Collector::collect_counters(true) { + if event_types.is_empty() || event_types.contains(&counter.id()) { + if !is_first { + metrics.push(','); + } else { + is_first = false; + } + let _ = write!( + &mut metrics, + "{{\"id\":\"{}\",\"type\":\"counter\",\"value\":{}}}", + counter.id().name(), + counter.value() + ); + } + } + for gauge in Collector::collect_gauges(true) { + if metric_types.is_empty() || metric_types.contains(&gauge.id()) { + if !is_first { + metrics.push(','); + } else { + is_first = false; + } + let _ = write!( + &mut metrics, + "{{\"id\":\"{}\",\"type\":\"gauge\",\"value\":{}}}", + gauge.id().name(), + gauge.get() + ); + } + } + for histogram in Collector::collect_histograms(true) { + if metric_types.is_empty() || metric_types.contains(&histogram.id()) { + if !is_first { + metrics.push(','); + } else { + is_first = false; + } + let _ = write!( + &mut metrics, + "{{\"id\":\"{}\",\"type\":\"histogram\",\"count\":{},\"sum\":{}}}", + histogram.id().name(), + histogram.count(), + histogram.sum() + ); + } + } + metrics.push_str("]\n\n"); + + yield Ok(Frame::data(Bytes::from(metrics))); + tokio::time::sleep(interval).await; + } + }, + ))), + }) + } _ => Err(trc::ResourceEvent::NotFound.into_err()), } } diff --git a/crates/jmap/src/api/management/mod.rs b/crates/jmap/src/api/management/mod.rs index ce167c91..d46e62ce 100644 --- a/crates/jmap/src/api/management/mod.rs +++ b/crates/jmap/src/api/management/mod.rs @@ -83,7 +83,7 @@ impl JMAP { // SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd // SPDX-License-Identifier: LicenseRef-SEL #[cfg(feature = "enterprise")] - "tracing" if is_superuser => { + "telemetry" if is_superuser => { // WARNING: TAMPERING WITH THIS FUNCTION IS STRICTLY PROHIBITED // Any attempt to modify, bypass, or disable this license validation mechanism // constitutes a severe violation of the Stalwart Enterprise License Agreement. @@ -94,7 +94,7 @@ impl JMAP { // for copyright infringement, breach of contract, and fraud. if self.core.is_enterprise_edition() { - self.handle_tracing_api_request(req, path, access_token.primary_id()) + self.handle_telemetry_api_request(req, path, access_token.primary_id()) .await } else { Err(manage::enterprise()) diff --git a/crates/jmap/src/services/housekeeper.rs b/crates/jmap/src/services/housekeeper.rs index 686ff7b8..1b63ff38 100644 --- a/crates/jmap/src/services/housekeeper.rs +++ b/crates/jmap/src/services/housekeeper.rs @@ -12,14 +12,17 @@ use std::{ use common::{config::telemetry::OtelMetrics, IPC_CHANNEL_BUFFER}; #[cfg(feature = "enterprise")] -use common::telemetry::tracers::store::TracingStore; +use common::telemetry::{ + metrics::store::{MetricsStore, SharedMetricHistory}, + tracers::store::TracingStore, +}; use store::{ write::{now, purge::PurgeStore}, BlobStore, LookupStore, Store, }; use tokio::sync::mpsc; -use trc::HousekeeperEvent; +use trc::{Collector, HousekeeperEvent, MetricType}; use utils::map::ttl_dashmap::TtlMap; use crate::{Inner, JmapInstance, JMAP, LONG_SLUMBER}; @@ -55,6 +58,9 @@ enum ActionClass { Acme(String), OtelMetrics, #[cfg(feature = "enterprise")] + InternalMetrics, + CalculateMetrics, + #[cfg(feature = "enterprise")] ReloadSettings, } @@ -99,6 +105,9 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver) { queue.schedule(Instant::now() + otel.interval, ActionClass::OtelMetrics); } + // Calculate expensive metrics + queue.schedule(Instant::now(), ActionClass::CalculateMetrics); + // Add all ACME renewals to heap for provider in core_.tls.acme_providers.values() { match core_.init_acme(provider).await { @@ -125,10 +134,21 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver) { Instant::now() + enterprise.license.expires_in(), ActionClass::ReloadSettings, ); + + if let Some(metrics_store) = enterprise.metrics_store.as_ref() { + queue.schedule( + Instant::now() + metrics_store.interval.time_to_next(), + ActionClass::InternalMetrics, + ); + } } // SPDX-SnippetEnd } + // Metrics history + #[cfg(feature = "enterprise")] + let metrics_history = SharedMetricHistory::default(); + loop { match tokio::time::timeout(queue.wake_up_time(), rx.recv()).await { Ok(Some(event)) => match event { @@ -185,12 +205,21 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver) { // SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd // SPDX-License-Identifier: LicenseRef-SEL #[cfg(feature = "enterprise")] - let trace_hold_period = core + let trace_retention = core .core .load() .enterprise .as_ref() - .and_then(|e| e.trace_hold_period); + .and_then(|e| e.trace_store.as_ref()) + .and_then(|t| t.retention); + #[cfg(feature = "enterprise")] + let metrics_retention = core + .core + .load() + .enterprise + .as_ref() + .and_then(|e| e.metrics_store.as_ref()) + .and_then(|m| m.retention); // SPDX-SnippetEnd tokio::spawn(async move { @@ -206,11 +235,18 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver) { // SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd // SPDX-License-Identifier: LicenseRef-SEL #[cfg(feature = "enterprise")] - if let Some(trace_hold_period) = trace_hold_period { - if let Err(err) = store.purge_spans(trace_hold_period).await { + if let Some(trace_retention) = trace_retention { + if let Err(err) = store.purge_spans(trace_retention).await { trc::error!(err.details("Failed to purge tracing spans")); } } + + #[cfg(feature = "enterprise")] + if let Some(metrics_retention) = metrics_retention { + if let Err(err) = store.purge_metrics(metrics_retention).await { + trc::error!(err.details("Failed to purge metrics")); + } + } // SPDX-SnippetEnd }); } @@ -382,10 +418,83 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver) { }); } } + ActionClass::CalculateMetrics => { + // Calculate expensive metrics every 5 minutes + queue.schedule( + Instant::now() + Duration::from_secs(5 * 60), + ActionClass::OtelMetrics, + ); + + let core = core_.clone(); + tokio::spawn(async move { + #[cfg(feature = "enterprise")] + if core.is_enterprise_edition() { + // Obtain queue size + match core.message_queue_size().await { + Ok(total) => { + Collector::update_gauge( + MetricType::QueueCount, + total, + ); + } + Err(err) => { + trc::error!( + err.details("Failed to obtain queue size") + ); + } + } + } + + match tokio::task::spawn_blocking(memory_stats::memory_stats) + .await + { + Ok(Some(stats)) => { + Collector::update_gauge( + MetricType::ServerMemory, + stats.physical_mem as u64, + ); + } + Ok(None) => {} + Err(err) => { + trc::error!(trc::EventType::Server( + trc::ServerEvent::ThreadError, + ) + .reason(err) + .caused_by(trc::location!()) + .details("Join Error")); + } + } + }); + } // SPDX-SnippetBegin // SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd // SPDX-License-Identifier: LicenseRef-SEL + #[cfg(feature = "enterprise")] + ActionClass::InternalMetrics => { + if let Some(metrics_store) = &core_ + .enterprise + .as_ref() + .and_then(|e| e.metrics_store.as_ref()) + { + queue.schedule( + Instant::now() + metrics_store.interval.time_to_next(), + ActionClass::InternalMetrics, + ); + + let metrics_store = metrics_store.store.clone(); + let metrics_history = metrics_history.clone(); + let core = core_.clone(); + tokio::spawn(async move { + if let Err(err) = + metrics_store.write_metrics(core, metrics_history).await + { + trc::error!(err.details("Failed to write metrics")); + } + }); + } + } + #[cfg(feature = "enterprise")] ActionClass::ReloadSettings => { match core_.reload().await { diff --git a/crates/store/src/backend/mysql/main.rs b/crates/store/src/backend/mysql/main.rs index 781896a9..2d9e7f0a 100644 --- a/crates/store/src/backend/mysql/main.rs +++ b/crates/store/src/backend/mysql/main.rs @@ -105,8 +105,9 @@ impl MysqlStore { SUBSPACE_REPORT_IN, SUBSPACE_FTS_INDEX, SUBSPACE_LOGS, - SUBSPACE_TRACE, - SUBSPACE_TRACE_INDEX, + SUBSPACE_TELEMETRY_SPAN, + SUBSPACE_TELEMETRY_METRIC, + SUBSPACE_TELEMETRY_INDEX, ] { let table = char::from(table); conn.query_drop(&format!( diff --git a/crates/store/src/backend/postgres/main.rs b/crates/store/src/backend/postgres/main.rs index b67240c7..3f140d87 100644 --- a/crates/store/src/backend/postgres/main.rs +++ b/crates/store/src/backend/postgres/main.rs @@ -93,8 +93,9 @@ impl PostgresStore { SUBSPACE_FTS_INDEX, SUBSPACE_LOGS, SUBSPACE_BLOBS, - SUBSPACE_TRACE, - SUBSPACE_TRACE_INDEX, + SUBSPACE_TELEMETRY_SPAN, + SUBSPACE_TELEMETRY_METRIC, + SUBSPACE_TELEMETRY_INDEX, ] { let table = char::from(table); conn.execute( diff --git a/crates/store/src/backend/rocksdb/main.rs b/crates/store/src/backend/rocksdb/main.rs index 2f77de1f..c933d281 100644 --- a/crates/store/src/backend/rocksdb/main.rs +++ b/crates/store/src/backend/rocksdb/main.rs @@ -87,8 +87,9 @@ impl RocksDbStore { SUBSPACE_FTS_INDEX, SUBSPACE_LOGS, SUBSPACE_BLOBS, - SUBSPACE_TRACE, - SUBSPACE_TRACE_INDEX, + SUBSPACE_TELEMETRY_SPAN, + SUBSPACE_TELEMETRY_METRIC, + SUBSPACE_TELEMETRY_INDEX, ] { let cf_opts = Options::default(); cfs.push(ColumnFamilyDescriptor::new( diff --git a/crates/store/src/backend/sqlite/main.rs b/crates/store/src/backend/sqlite/main.rs index c92960dc..716670f8 100644 --- a/crates/store/src/backend/sqlite/main.rs +++ b/crates/store/src/backend/sqlite/main.rs @@ -104,8 +104,9 @@ impl SqliteStore { SUBSPACE_FTS_INDEX, SUBSPACE_LOGS, SUBSPACE_BLOBS, - SUBSPACE_TRACE, - SUBSPACE_TRACE_INDEX, + SUBSPACE_TELEMETRY_SPAN, + SUBSPACE_TELEMETRY_METRIC, + SUBSPACE_TELEMETRY_INDEX, ] { let table = char::from(table); conn.execute( diff --git a/crates/store/src/dispatch/store.rs b/crates/store/src/dispatch/store.rs index 0e8c89e0..9a938120 100644 --- a/crates/store/src/dispatch/store.rs +++ b/crates/store/src/dispatch/store.rs @@ -560,8 +560,9 @@ impl Store { SUBSPACE_REPORT_OUT, SUBSPACE_REPORT_IN, SUBSPACE_FTS_INDEX, - SUBSPACE_TRACE, - SUBSPACE_TRACE_INDEX, + SUBSPACE_TELEMETRY_SPAN, + SUBSPACE_TELEMETRY_METRIC, + SUBSPACE_TELEMETRY_INDEX, ] { self.delete_range( AnyKey { @@ -745,8 +746,9 @@ impl Store { (SUBSPACE_BITMAP_TAG, false), (SUBSPACE_BITMAP_TEXT, false), (SUBSPACE_INDEXES, false), - (SUBSPACE_TRACE, true), - (SUBSPACE_TRACE_INDEX, true), + (SUBSPACE_TELEMETRY_SPAN, true), + (SUBSPACE_TELEMETRY_METRIC, true), + (SUBSPACE_TELEMETRY_INDEX, true), ] { let from_key = crate::write::AnyKey { subspace, diff --git a/crates/store/src/lib.rs b/crates/store/src/lib.rs index 114d0088..d102e719 100644 --- a/crates/store/src/lib.rs +++ b/crates/store/src/lib.rs @@ -147,12 +147,12 @@ pub const SUBSPACE_QUOTA: u8 = b'u'; pub const SUBSPACE_REPORT_OUT: u8 = b'h'; pub const SUBSPACE_REPORT_IN: u8 = b'r'; pub const SUBSPACE_FTS_INDEX: u8 = b'g'; -pub const SUBSPACE_TRACE: u8 = b'o'; -pub const SUBSPACE_TRACE_INDEX: u8 = b'w'; +pub const SUBSPACE_TELEMETRY_SPAN: u8 = b'o'; +pub const SUBSPACE_TELEMETRY_INDEX: u8 = b'w'; +pub const SUBSPACE_TELEMETRY_METRIC: u8 = b'x'; -pub const SUBSPACE_RESERVED_1: u8 = b'x'; -pub const SUBSPACE_RESERVED_2: u8 = b'y'; -pub const SUBSPACE_RESERVED_3: u8 = b'z'; +pub const SUBSPACE_RESERVED_1: u8 = b'y'; +pub const SUBSPACE_RESERVED_2: u8 = b'z'; #[derive(Clone)] pub struct IterateParams { diff --git a/crates/store/src/write/key.rs b/crates/store/src/write/key.rs index 86649255..eec920f9 100644 --- a/crates/store/src/write/key.rs +++ b/crates/store/src/write/key.rs @@ -13,13 +13,13 @@ use crate::{ SUBSPACE_BLOB_RESERVE, SUBSPACE_COUNTER, SUBSPACE_DIRECTORY, SUBSPACE_FTS_INDEX, SUBSPACE_FTS_QUEUE, SUBSPACE_INDEXES, SUBSPACE_LOGS, SUBSPACE_LOOKUP_VALUE, SUBSPACE_PROPERTY, SUBSPACE_QUEUE_EVENT, SUBSPACE_QUEUE_MESSAGE, SUBSPACE_QUOTA, SUBSPACE_REPORT_IN, - SUBSPACE_REPORT_OUT, SUBSPACE_SETTINGS, SUBSPACE_TRACE, SUBSPACE_TRACE_INDEX, U32_LEN, U64_LEN, - WITH_SUBSPACE, + SUBSPACE_REPORT_OUT, SUBSPACE_SETTINGS, SUBSPACE_TELEMETRY_INDEX, SUBSPACE_TELEMETRY_METRIC, + SUBSPACE_TELEMETRY_SPAN, U32_LEN, U64_LEN, WITH_SUBSPACE, }; use super::{ AnyKey, AssignedIds, BitmapClass, BlobOp, DirectoryClass, LookupClass, QueueClass, ReportClass, - ReportEvent, ResolveId, TagValue, TraceClass, ValueClass, + ReportEvent, ResolveId, TagValue, TelemetryClass, ValueClass, }; pub struct KeySerializer { @@ -366,11 +366,19 @@ impl ValueClass { serializer.write(2u8).write(*expires).write(*id) } }, - ValueClass::Trace(trace) => match trace { - TraceClass::Span { span_id } => serializer.write(*span_id), - TraceClass::Index { span_id, value } => { + ValueClass::Telemetry(telemetry) => match telemetry { + TelemetryClass::Span { span_id } => serializer.write(*span_id), + TelemetryClass::Index { span_id, value } => { serializer.write(value.as_slice()).write(*span_id) } + TelemetryClass::Metric { + timestamp, + metric_id, + node_id, + } => serializer + .write(*timestamp) + .write_leb128(*metric_id) + .write_leb128(*node_id), }, ValueClass::Any(any) => serializer.write(any.key.as_slice()), } @@ -550,9 +558,10 @@ impl ValueClass { QueueClass::QuotaCount(v) | QueueClass::QuotaSize(v) => v.len(), }, ValueClass::Report(_) => U64_LEN * 2 + 1, - ValueClass::Trace(trace) => match trace { - TraceClass::Span { .. } => U64_LEN + 1, - TraceClass::Index { value, .. } => U64_LEN + value.len() + 1, + ValueClass::Telemetry(telemetry) => match telemetry { + TelemetryClass::Span { .. } => U64_LEN + 1, + TelemetryClass::Index { value, .. } => U64_LEN + value.len() + 1, + TelemetryClass::Metric { .. } => U64_LEN * 2 + 1, }, ValueClass::Any(v) => v.key.len(), } @@ -595,9 +604,10 @@ impl ValueClass { QueueClass::QuotaCount(_) | QueueClass::QuotaSize(_) => SUBSPACE_QUOTA, }, ValueClass::Report(_) => SUBSPACE_REPORT_IN, - ValueClass::Trace(trace) => match trace { - TraceClass::Span { .. } => SUBSPACE_TRACE, - TraceClass::Index { .. } => SUBSPACE_TRACE_INDEX, + ValueClass::Telemetry(telemetry) => match telemetry { + TelemetryClass::Span { .. } => SUBSPACE_TELEMETRY_SPAN, + TelemetryClass::Index { .. } => SUBSPACE_TELEMETRY_INDEX, + TelemetryClass::Metric { .. } => SUBSPACE_TELEMETRY_METRIC, }, ValueClass::Any(any) => any.subspace, } diff --git a/crates/store/src/write/mod.rs b/crates/store/src/write/mod.rs index 5c3faff0..671ad340 100644 --- a/crates/store/src/write/mod.rs +++ b/crates/store/src/write/mod.rs @@ -153,7 +153,7 @@ pub enum ValueClass { Config(Vec), Queue(QueueClass), Report(ReportClass), - Trace(TraceClass), + Telemetry(TelemetryClass), Any(AnyClass), } @@ -206,9 +206,19 @@ pub enum ReportClass { } #[derive(Debug, PartialEq, Clone, Eq, Hash)] -pub enum TraceClass { - Span { span_id: u64 }, - Index { span_id: u64, value: Vec }, +pub enum TelemetryClass { + Span { + span_id: u64, + }, + Metric { + timestamp: u64, + metric_id: u64, + node_id: u64, + }, + Index { + span_id: u64, + value: Vec, + }, } #[derive(Debug, PartialEq, Clone, Eq, Hash)] diff --git a/crates/trc/src/atomics/gauge.rs b/crates/trc/src/atomics/gauge.rs index 5b7b088b..4f27d923 100644 --- a/crates/trc/src/atomics/gauge.rs +++ b/crates/trc/src/atomics/gauge.rs @@ -6,19 +6,17 @@ use std::sync::atomic::{AtomicU64, Ordering}; +use crate::MetricType; + pub struct AtomicGauge { - id: &'static str, - description: &'static str, - unit: &'static str, + id: MetricType, value: AtomicU64, } impl AtomicGauge { - pub const fn new(id: &'static str, description: &'static str, unit: &'static str) -> Self { + pub const fn new(id: MetricType) -> Self { Self { id, - description, - unit, value: AtomicU64::new(0), } } @@ -53,15 +51,7 @@ impl AtomicGauge { self.value.fetch_sub(value, Ordering::Relaxed); } - pub fn id(&self) -> &'static str { + pub fn id(&self) -> MetricType { self.id } - - pub fn description(&self) -> &'static str { - self.description - } - - pub fn unit(&self) -> &'static str { - self.unit - } } diff --git a/crates/trc/src/atomics/histogram.rs b/crates/trc/src/atomics/histogram.rs index d2add79e..1be9f556 100644 --- a/crates/trc/src/atomics/histogram.rs +++ b/crates/trc/src/atomics/histogram.rs @@ -6,12 +6,12 @@ use std::sync::atomic::{AtomicU64, Ordering}; +use crate::MetricType; + use super::array::AtomicU64Array; pub struct AtomicHistogram { - id: &'static str, - description: &'static str, - unit: &'static str, + id: MetricType, buckets: AtomicU64Array, upper_bounds: [u64; N], sum: AtomicU64, @@ -21,12 +21,7 @@ pub struct AtomicHistogram { } impl AtomicHistogram { - pub const fn new( - id: &'static str, - description: &'static str, - unit: &'static str, - upper_bounds: [u64; N], - ) -> Self { + pub const fn new(id: MetricType, upper_bounds: [u64; N]) -> Self { Self { buckets: AtomicU64Array::new(), upper_bounds, @@ -35,8 +30,6 @@ impl AtomicHistogram { min: AtomicU64::new(u64::MAX), max: AtomicU64::new(0), id, - description, - unit, } } @@ -56,18 +49,10 @@ impl AtomicHistogram { unreachable!() } - pub fn id(&self) -> &'static str { + pub fn id(&self) -> MetricType { self.id } - pub fn description(&self) -> &'static str { - self.description - } - - pub fn unit(&self) -> &'static str { - self.unit - } - pub fn sum(&self) -> u64 { self.sum.load(Ordering::Relaxed) } @@ -129,14 +114,9 @@ impl AtomicHistogram { self.count.load(Ordering::Relaxed) > 0 } - pub const fn new_message_sizes( - id: &'static str, - description: &'static str, - ) -> AtomicHistogram<12> { + pub const fn new_message_sizes(id: MetricType) -> AtomicHistogram<12> { AtomicHistogram::new( id, - description, - "bytes", [ 500, // 500 bytes 1_000, // 1 KB @@ -154,14 +134,9 @@ impl AtomicHistogram { ) } - pub const fn new_short_durations( - id: &'static str, - description: &'static str, - ) -> AtomicHistogram<12> { + pub const fn new_short_durations(id: MetricType) -> AtomicHistogram<12> { AtomicHistogram::new( id, - description, - "milliseconds", [ 5, // 5 milliseconds 10, // 10 milliseconds @@ -179,14 +154,9 @@ impl AtomicHistogram { ) } - pub const fn new_medium_durations( - id: &'static str, - description: &'static str, - ) -> AtomicHistogram<12> { + pub const fn new_medium_durations(id: MetricType) -> AtomicHistogram<12> { AtomicHistogram::new( id, - description, - "milliseconds", [ 250, 500, @@ -204,14 +174,9 @@ impl AtomicHistogram { ) } - pub const fn new_long_durations( - id: &'static str, - description: &'static str, - ) -> AtomicHistogram<12> { + pub const fn new_long_durations(id: MetricType) -> AtomicHistogram<12> { AtomicHistogram::new( id, - description, - "milliseconds", [ 1_000, // 1 second 30_000, // 30 seconds diff --git a/crates/trc/src/event/metrics.rs b/crates/trc/src/event/metrics.rs new file mode 100644 index 00000000..5f39e5e9 --- /dev/null +++ b/crates/trc/src/event/metrics.rs @@ -0,0 +1,191 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use super::MetricType; + +impl MetricType { + pub fn name(&self) -> &'static str { + match self { + Self::MessageIngestionTime => "message.ingestion-time", + Self::MessageFtsIndexTime => "message.fts-index-time", + Self::DeliveryTotalTime => "delivery.total-time", + Self::DeliveryTime => "delivery.attempt-time", + Self::MessageSize => "message.size", + Self::MessageAuthSize => "message.authenticated-size", + Self::ReportOutgoingSize => "outgoing-report.size", + Self::StoreReadTime => "store.data-read-time", + Self::StoreWriteTime => "store.data-write-time", + Self::BlobReadTime => "store.blob-read-time", + Self::BlobWriteTime => "store.blob-write-time", + Self::DnsLookupTime => "dns.lookup-time", + Self::HttpRequestTime => "http.request-time", + Self::ImapRequestTime => "imap.request-time", + Self::Pop3RequestTime => "pop3.request-time", + Self::SmtpRequestTime => "smtp.request-time", + Self::SieveRequestTime => "sieve.request-time", + Self::HttpActiveConnections => "http.active-connections", + Self::ImapActiveConnections => "imap.active-connections", + Self::Pop3ActiveConnections => "pop3.active-connections", + Self::SmtpActiveConnections => "smtp.active-connections", + Self::SieveActiveConnections => "sieve.active-connections", + Self::DeliveryActiveConnections => "delivery.active-connections", + Self::ServerMemory => "server.memory", + Self::QueueCount => "queue.count", + } + } + + pub fn description(&self) -> &'static str { + match self { + Self::MessageIngestionTime => "Message ingestion time", + Self::MessageFtsIndexTime => "Message full-text indexing time", + Self::DeliveryTotalTime => "Total message delivery time from submission to delivery", + Self::DeliveryTime => "Message delivery time", + Self::MessageSize => "Received message size", + Self::MessageAuthSize => "Received message size from authenticated users", + Self::ReportOutgoingSize => "Outgoing report size", + Self::StoreReadTime => "Data store read time", + Self::StoreWriteTime => "Data store write time", + Self::BlobReadTime => "Blob store read time", + Self::BlobWriteTime => "Blob store write time", + Self::DnsLookupTime => "DNS lookup time", + Self::HttpRequestTime => "HTTP request duration", + Self::ImapRequestTime => "IMAP request duration", + Self::Pop3RequestTime => "POP3 request duration", + Self::SmtpRequestTime => "SMTP request duration", + Self::SieveRequestTime => "ManageSieve request duration", + Self::HttpActiveConnections => "Active HTTP connections", + Self::ImapActiveConnections => "Active IMAP connections", + Self::Pop3ActiveConnections => "Active POP3 connections", + Self::SmtpActiveConnections => "Active SMTP connections", + Self::SieveActiveConnections => "Active ManageSieve connections", + Self::DeliveryActiveConnections => "Active delivery connections", + Self::ServerMemory => "Server memory usage", + Self::QueueCount => "Total number of messages in the queue", + } + } + + pub fn unit(&self) -> &'static str { + match self { + Self::MessageIngestionTime + | Self::MessageFtsIndexTime + | Self::DeliveryTotalTime + | Self::DeliveryTime + | Self::StoreReadTime + | Self::StoreWriteTime + | Self::BlobReadTime + | Self::BlobWriteTime + | Self::DnsLookupTime + | Self::HttpRequestTime + | Self::ImapRequestTime + | Self::Pop3RequestTime + | Self::SmtpRequestTime + | Self::SieveRequestTime => "milliseconds", + Self::MessageSize + | Self::MessageAuthSize + | Self::ReportOutgoingSize + | Self::ServerMemory => "bytes", + Self::HttpActiveConnections + | Self::ImapActiveConnections + | Self::Pop3ActiveConnections + | Self::SmtpActiveConnections + | Self::SieveActiveConnections + | Self::DeliveryActiveConnections => "connections", + Self::QueueCount => "messages", + } + } + + pub fn code(&self) -> u64 { + match self { + Self::MessageIngestionTime => 0, + Self::MessageFtsIndexTime => 1, + Self::DeliveryTotalTime => 2, + Self::DeliveryTime => 3, + Self::MessageSize => 4, + Self::MessageAuthSize => 5, + Self::ReportOutgoingSize => 6, + Self::StoreReadTime => 7, + Self::StoreWriteTime => 8, + Self::BlobReadTime => 9, + Self::BlobWriteTime => 10, + Self::DnsLookupTime => 11, + Self::HttpRequestTime => 12, + Self::ImapRequestTime => 13, + Self::Pop3RequestTime => 14, + Self::SmtpRequestTime => 15, + Self::SieveRequestTime => 16, + Self::HttpActiveConnections => 17, + Self::ImapActiveConnections => 18, + Self::Pop3ActiveConnections => 19, + Self::SmtpActiveConnections => 20, + Self::SieveActiveConnections => 21, + Self::DeliveryActiveConnections => 22, + Self::ServerMemory => 23, + Self::QueueCount => 24, + } + } + + pub fn from_code(code: u64) -> Option { + match code { + 0 => Some(Self::MessageIngestionTime), + 1 => Some(Self::MessageFtsIndexTime), + 2 => Some(Self::DeliveryTotalTime), + 3 => Some(Self::DeliveryTime), + 4 => Some(Self::MessageSize), + 5 => Some(Self::MessageAuthSize), + 6 => Some(Self::ReportOutgoingSize), + 7 => Some(Self::StoreReadTime), + 8 => Some(Self::StoreWriteTime), + 9 => Some(Self::BlobReadTime), + 10 => Some(Self::BlobWriteTime), + 11 => Some(Self::DnsLookupTime), + 12 => Some(Self::HttpRequestTime), + 13 => Some(Self::ImapRequestTime), + 14 => Some(Self::Pop3RequestTime), + 15 => Some(Self::SmtpRequestTime), + 16 => Some(Self::SieveRequestTime), + 17 => Some(Self::HttpActiveConnections), + 18 => Some(Self::ImapActiveConnections), + 19 => Some(Self::Pop3ActiveConnections), + 20 => Some(Self::SmtpActiveConnections), + 21 => Some(Self::SieveActiveConnections), + 22 => Some(Self::DeliveryActiveConnections), + 23 => Some(Self::ServerMemory), + 24 => Some(Self::QueueCount), + _ => None, + } + } + + pub fn try_parse(name: &str) -> Option { + match name { + "message.ingestion-time" => Some(Self::MessageIngestionTime), + "message.fts-index-time" => Some(Self::MessageFtsIndexTime), + "delivery.total-time" => Some(Self::DeliveryTotalTime), + "delivery.attempt-time" => Some(Self::DeliveryTime), + "message.size" => Some(Self::MessageSize), + "message.authenticated-size" => Some(Self::MessageAuthSize), + "outgoing-report.size" => Some(Self::ReportOutgoingSize), + "store.data-read-time" => Some(Self::StoreReadTime), + "store.data-write-time" => Some(Self::StoreWriteTime), + "store.blob-read-time" => Some(Self::BlobReadTime), + "store.blob-write-time" => Some(Self::BlobWriteTime), + "dns.lookup-time" => Some(Self::DnsLookupTime), + "http.request-time" => Some(Self::HttpRequestTime), + "imap.request-time" => Some(Self::ImapRequestTime), + "pop3.request-time" => Some(Self::Pop3RequestTime), + "smtp.request-time" => Some(Self::SmtpRequestTime), + "sieve.request-time" => Some(Self::SieveRequestTime), + "http.active-connections" => Some(Self::HttpActiveConnections), + "imap.active-connections" => Some(Self::ImapActiveConnections), + "pop3.active-connections" => Some(Self::Pop3ActiveConnections), + "smtp.active-connections" => Some(Self::SmtpActiveConnections), + "sieve.active-connections" => Some(Self::SieveActiveConnections), + "delivery.active-connections" => Some(Self::DeliveryActiveConnections), + "server.memory" => Some(Self::ServerMemory), + "queue.count" => Some(Self::QueueCount), + _ => None, + } + } +} diff --git a/crates/trc/src/event/mod.rs b/crates/trc/src/event/mod.rs index 30e832cf..c320ed01 100644 --- a/crates/trc/src/event/mod.rs +++ b/crates/trc/src/event/mod.rs @@ -7,6 +7,7 @@ pub mod conv; pub mod description; pub mod level; +pub mod metrics; use std::{borrow::Cow, fmt::Display}; diff --git a/crates/trc/src/ipc/metrics.rs b/crates/trc/src/ipc/metrics.rs index b81c454f..20cba148 100644 --- a/crates/trc/src/ipc/metrics.rs +++ b/crates/trc/src/ipc/metrics.rs @@ -6,9 +6,7 @@ use std::sync::atomic::Ordering; -use atomics::{ - array::AtomicU32Array, counter::AtomicCounter, gauge::AtomicGauge, histogram::AtomicHistogram, -}; +use atomics::{array::AtomicU32Array, gauge::AtomicGauge, histogram::AtomicHistogram}; use ipc::{ collector::{Collector, GlobalInterests, EVENT_TYPES}, subscriber::Interests, @@ -22,38 +20,33 @@ static EVENT_COUNTERS: AtomicU32Array = AtomicU32Array::new() static CONNECTION_METRICS: [ConnectionMetrics; TOTAL_CONN_TYPES] = init_conn_metrics(); static MESSAGE_INGESTION_TIME: AtomicHistogram<12> = - AtomicHistogram::<10>::new_short_durations("message.ingestion-time", "Message ingestion time"); -static MESSAGE_INDEX_TIME: AtomicHistogram<12> = AtomicHistogram::<10>::new_short_durations( - "message.fts-index-time", - "Message full-text indexing time", -); -static MESSAGE_DELIVERY_TIME: AtomicHistogram<12> = AtomicHistogram::<18>::new_long_durations( - "message.outgoing-delivery-time", - "Total message delivery time from submission to delivery", -); + AtomicHistogram::<10>::new_short_durations(MetricType::MessageIngestionTime); +static MESSAGE_INDEX_TIME: AtomicHistogram<12> = + AtomicHistogram::<10>::new_short_durations(MetricType::MessageFtsIndexTime); +static MESSAGE_DELIVERY_TIME: AtomicHistogram<12> = + AtomicHistogram::<18>::new_long_durations(MetricType::DeliveryTotalTime); static MESSAGE_INCOMING_SIZE: AtomicHistogram<12> = - AtomicHistogram::<12>::new_message_sizes("message.incoming-size", "Received message size"); -static MESSAGE_SUBMISSION_SIZE: AtomicHistogram<12> = AtomicHistogram::<12>::new_message_sizes( - "message.incoming-submission-size", - "Received message size from authenticated users", -); -static MESSAGE_OUT_REPORT_SIZE: AtomicHistogram<12> = AtomicHistogram::<12>::new_message_sizes( - "message.outgoing-report-size", - "Outgoing report size", -); + AtomicHistogram::<12>::new_message_sizes(MetricType::MessageSize); +static MESSAGE_SUBMISSION_SIZE: AtomicHistogram<12> = + AtomicHistogram::<12>::new_message_sizes(MetricType::MessageAuthSize); +static MESSAGE_OUT_REPORT_SIZE: AtomicHistogram<12> = + AtomicHistogram::<12>::new_message_sizes(MetricType::ReportOutgoingSize); static STORE_DATA_READ_TIME: AtomicHistogram<12> = - AtomicHistogram::<10>::new_short_durations("store.data-read-time", "Data store read time"); + AtomicHistogram::<10>::new_short_durations(MetricType::StoreReadTime); static STORE_DATA_WRITE_TIME: AtomicHistogram<12> = - AtomicHistogram::<10>::new_short_durations("store.data-write-time", "Data store write time"); + AtomicHistogram::<10>::new_short_durations(MetricType::StoreWriteTime); static STORE_BLOB_READ_TIME: AtomicHistogram<12> = - AtomicHistogram::<10>::new_short_durations("store.blob-read-time", "Blob store read time"); + AtomicHistogram::<10>::new_short_durations(MetricType::BlobReadTime); static STORE_BLOB_WRITE_TIME: AtomicHistogram<12> = - AtomicHistogram::<10>::new_short_durations("store.blob-write-time", "Blob store write time"); + AtomicHistogram::<10>::new_short_durations(MetricType::BlobWriteTime); static DNS_LOOKUP_TIME: AtomicHistogram<12> = - AtomicHistogram::<10>::new_short_durations("dns.lookup-time", "DNS lookup time"); + AtomicHistogram::<10>::new_short_durations(MetricType::DnsLookupTime); + +static SERVER_MEMORY: AtomicGauge = AtomicGauge::new(MetricType::ServerMemory); +static QUEUE_COUNT: AtomicGauge = AtomicGauge::new(MetricType::QueueCount); const CONN_SMTP_IN: usize = 0; const CONN_SMTP_OUT: usize = 1; @@ -64,16 +57,12 @@ const CONN_SIEVE: usize = 5; const TOTAL_CONN_TYPES: usize = 6; pub struct ConnectionMetrics { - pub total_connections: AtomicCounter, pub active_connections: AtomicGauge, - pub bytes_sent: AtomicCounter, - pub bytes_received: AtomicCounter, pub elapsed: AtomicHistogram<12>, } pub struct EventCounter { - id: &'static str, - description: &'static str, + id: EventType, value: u32, } @@ -98,7 +87,6 @@ impl Collector { match event { EventType::Smtp(SmtpEvent::ConnectionStart) => { let conn = &CONNECTION_METRICS[CONN_SMTP_IN]; - conn.total_connections.increment(); conn.active_connections.increment(); } EventType::Smtp(SmtpEvent::ConnectionEnd) => { @@ -108,7 +96,6 @@ impl Collector { } EventType::Imap(ImapEvent::ConnectionStart) => { let conn = &CONNECTION_METRICS[CONN_IMAP]; - conn.total_connections.increment(); conn.active_connections.increment(); } EventType::Imap(ImapEvent::ConnectionEnd) => { @@ -118,7 +105,6 @@ impl Collector { } EventType::Pop3(Pop3Event::ConnectionStart) => { let conn = &CONNECTION_METRICS[CONN_POP3]; - conn.total_connections.increment(); conn.active_connections.increment(); } EventType::Pop3(Pop3Event::ConnectionEnd) => { @@ -128,7 +114,6 @@ impl Collector { } EventType::Http(HttpEvent::ConnectionStart) => { let conn = &CONNECTION_METRICS[CONN_HTTP]; - conn.total_connections.increment(); conn.active_connections.increment(); } EventType::Http(HttpEvent::ConnectionEnd) => { @@ -138,7 +123,6 @@ impl Collector { } EventType::ManageSieve(ManageSieveEvent::ConnectionStart) => { let conn = &CONNECTION_METRICS[CONN_SIEVE]; - conn.total_connections.increment(); conn.active_connections.increment(); } EventType::ManageSieve(ManageSieveEvent::ConnectionEnd) => { @@ -148,7 +132,6 @@ impl Collector { } EventType::Delivery(DeliveryEvent::AttemptStart) => { let conn = &CONNECTION_METRICS[CONN_SMTP_OUT]; - conn.total_connections.increment(); conn.active_connections.increment(); } EventType::Delivery(DeliveryEvent::AttemptEnd) => { @@ -157,60 +140,9 @@ impl Collector { conn.elapsed.observe(elapsed); } EventType::Delivery(DeliveryEvent::Completed) => { + QUEUE_COUNT.decrement(); MESSAGE_DELIVERY_TIME.observe(elapsed); } - EventType::Smtp(SmtpEvent::RawInput) => { - CONNECTION_METRICS[CONN_SMTP_IN] - .bytes_received - .increment_by(size); - } - EventType::Smtp(SmtpEvent::RawOutput) => { - CONNECTION_METRICS[CONN_SMTP_IN] - .bytes_sent - .increment_by(size); - } - EventType::Imap(ImapEvent::RawInput) => { - CONNECTION_METRICS[CONN_IMAP] - .bytes_received - .increment_by(size); - } - EventType::Imap(ImapEvent::RawOutput) => { - CONNECTION_METRICS[CONN_IMAP].bytes_sent.increment_by(size); - } - EventType::Http(HttpEvent::RequestBody) => { - CONNECTION_METRICS[CONN_HTTP] - .bytes_received - .increment_by(size); - } - EventType::Http(HttpEvent::ResponseBody) => { - CONNECTION_METRICS[CONN_HTTP].bytes_sent.increment_by(size); - } - EventType::Pop3(Pop3Event::RawInput) => { - CONNECTION_METRICS[CONN_POP3] - .bytes_received - .increment_by(size); - } - EventType::Pop3(Pop3Event::RawOutput) => { - CONNECTION_METRICS[CONN_POP3].bytes_sent.increment_by(size); - } - EventType::ManageSieve(ManageSieveEvent::RawInput) => { - CONNECTION_METRICS[CONN_SIEVE] - .bytes_received - .increment_by(size); - } - EventType::ManageSieve(ManageSieveEvent::RawOutput) => { - CONNECTION_METRICS[CONN_SIEVE].bytes_sent.increment_by(size); - } - EventType::Delivery(DeliveryEvent::RawInput) => { - CONNECTION_METRICS[CONN_SMTP_OUT] - .bytes_received - .increment_by(size); - } - EventType::Delivery(DeliveryEvent::RawOutput) => { - CONNECTION_METRICS[CONN_SMTP_OUT] - .bytes_sent - .increment_by(size); - } EventType::Delivery( DeliveryEvent::MxLookup | DeliveryEvent::IpLookup | DeliveryEvent::NullMx, ) @@ -231,12 +163,18 @@ impl Collector { } EventType::Queue(QueueEvent::QueueMessage) => { MESSAGE_INCOMING_SIZE.observe(size); + QUEUE_COUNT.increment(); } EventType::Queue(QueueEvent::QueueMessageAuthenticated) => { MESSAGE_SUBMISSION_SIZE.observe(size); + QUEUE_COUNT.increment(); } EventType::Queue(QueueEvent::QueueReport) => { MESSAGE_OUT_REPORT_SIZE.observe(size); + QUEUE_COUNT.increment(); + } + EventType::Queue(QueueEvent::QueueAutogenerated | QueueEvent::QueueDsn) => { + QUEUE_COUNT.increment(); } EventType::FtsIndex(FtsIndexEvent::Index) => { MESSAGE_INDEX_TIME.observe(elapsed); @@ -267,7 +205,7 @@ impl Collector { METRIC_INTERESTS.update(interests); } - pub fn collect_event_counters(_is_enterprise: bool) -> impl Iterator { + pub fn collect_counters(_is_enterprise: bool) -> impl Iterator { EVENT_COUNTERS .inner() .iter() @@ -275,11 +213,8 @@ impl Collector { .filter_map(|(event_id, value)| { let value = value.load(Ordering::Relaxed); if value > 0 { - let event = EVENT_TYPES[event_id]; - Some(EventCounter { - id: event.name(), - description: event.description(), + id: EVENT_TYPES[event_id], value, }) } else { @@ -288,15 +223,14 @@ impl Collector { }) } - pub fn collect_counters(_is_enterprise: bool) -> impl Iterator { - CONNECTION_METRICS - .iter() - .flat_map(|m| [&m.total_connections, &m.bytes_sent, &m.bytes_received]) - .filter(|c| c.is_active()) - } + pub fn collect_gauges(is_enterprise: bool) -> impl Iterator { + static E_GAUGES: &[&AtomicGauge] = &[&SERVER_MEMORY, &QUEUE_COUNT]; + static C_GAUGES: &[&AtomicGauge] = &[&SERVER_MEMORY]; - pub fn collect_gauges(_is_enterprise: bool) -> impl Iterator { - CONNECTION_METRICS.iter().map(|m| &m.active_connections) + if is_enterprise { E_GAUGES } else { C_GAUGES } + .iter() + .copied() + .chain(CONNECTION_METRICS.iter().map(|m| &m.active_connections)) } pub fn collect_histograms( @@ -331,17 +265,26 @@ impl Collector { .chain(CONNECTION_METRICS.iter().map(|m| &m.elapsed)) .filter(|h| h.is_active()) } + + #[inline(always)] + pub fn read_event_metric(metric_id: usize) -> u32 { + EVENT_COUNTERS.get(metric_id) + } + + pub fn update_gauge(metric_type: MetricType, value: u64) { + match metric_type { + MetricType::ServerMemory => SERVER_MEMORY.set(value), + MetricType::QueueCount => QUEUE_COUNT.set(value), + _ => {} + } + } } impl EventCounter { - pub fn id(&self) -> &'static str { + pub fn id(&self) -> EventType { self.id } - pub fn description(&self) -> &'static str { - self.description - } - pub fn value(&self) -> u64 { self.value as u64 } @@ -351,11 +294,8 @@ impl ConnectionMetrics { #[allow(clippy::new_without_default)] pub const fn new() -> Self { Self { - total_connections: AtomicCounter::new("", "", ""), - active_connections: AtomicGauge::new("", "", ""), - bytes_sent: AtomicCounter::new("", "", ""), - bytes_received: AtomicCounter::new("", "", ""), - elapsed: AtomicHistogram::<18>::new_medium_durations("", ""), + active_connections: AtomicGauge::new(MetricType::BlobReadTime), + elapsed: AtomicHistogram::<18>::new_medium_durations(MetricType::BlobReadTime), } } } @@ -366,131 +306,37 @@ const fn init_conn_metrics() -> [ConnectionMetrics; TOTAL_CONN_TYPES] { let mut array = [INIT; TOTAL_CONN_TYPES]; let mut i = 0; while i < TOTAL_CONN_TYPES { - let text = match i { + let metric = match i { CONN_HTTP => &[ - ("http.total-connections", "Total HTTP connections", "number"), - ( - "http.active-connections", - "Active HTTP connections", - "number", - ), - ("http.bytes-sent", "Bytes sent over HTTP", "bytes"), - ("http.bytes-received", "Bytes received over HTTP", "bytes"), - ("http.request-time", "HTTP request duration", "milliseconds"), + MetricType::HttpRequestTime, + MetricType::HttpActiveConnections, ], CONN_IMAP => &[ - ("imap.total-connections", "Total IMAP connections", "number"), - ( - "imap.active-connections", - "Active IMAP connections", - "number", - ), - ("imap.bytes-sent", "Bytes sent over IMAP", "bytes"), - ("imap.bytes-received", "Bytes received over IMAP", "bytes"), - ("imap.request-time", "IMAP request duration", "milliseconds"), + MetricType::ImapRequestTime, + MetricType::ImapActiveConnections, ], CONN_POP3 => &[ - ("pop3.total-connections", "Total POP3 connections", "number"), - ( - "pop3.active-connections", - "Active POP3 connections", - "number", - ), - ("pop3.bytes-sent", "Bytes sent over POP3", "bytes"), - ("pop3.bytes-received", "Bytes received over POP3", "bytes"), - ("pop3.request-time", "POP3 request duration", "milliseconds"), + MetricType::Pop3RequestTime, + MetricType::Pop3ActiveConnections, ], CONN_SMTP_IN => &[ - ( - "smtp-in.total-connections", - "Total SMTP incoming connections", - "number", - ), - ( - "smtp-in.active-connections", - "Active SMTP incoming connections", - "number", - ), - ( - "smtp-in.bytes-sent", - "Bytes sent over SMTP incoming", - "bytes", - ), - ( - "smtp-in.bytes-received", - "Bytes received over SMTP incoming", - "bytes", - ), - ( - "smtp-in.request-time", - "SMTP incoming request duration", - "milliseconds", - ), + MetricType::SmtpRequestTime, + MetricType::SmtpActiveConnections, ], CONN_SMTP_OUT => &[ - ( - "smtp-out.total-connections", - "Total SMTP outgoing connections", - "number", - ), - ( - "smtp-out.active-connections", - "Active SMTP outgoing connections", - "number", - ), - ( - "smtp-out.bytes-sent", - "Bytes sent over SMTP outgoing", - "bytes", - ), - ( - "smtp-out.bytes-received", - "Bytes received over SMTP outgoing", - "bytes", - ), - ( - "smtp-out.request-time", - "SMTP outgoing request duration", - "milliseconds", - ), + MetricType::DeliveryTime, + MetricType::DeliveryActiveConnections, ], CONN_SIEVE => &[ - ( - "sieve.total-connections", - "Total ManageSieve connections", - "number", - ), - ( - "sieve.active-connections", - "Active ManageSieve connections", - "number", - ), - ("sieve.bytes-sent", "Bytes sent over ManageSieve", "bytes"), - ( - "sieve.bytes-received", - "Bytes received over ManageSieve", - "bytes", - ), - ( - "sieve.request-time", - "ManageSieve request duration", - "milliseconds", - ), - ], - _ => &[ - ("", "", ""), - ("", "", ""), - ("", "", ""), - ("", "", ""), - ("", "", ""), + MetricType::SieveRequestTime, + MetricType::SieveActiveConnections, ], + _ => &[MetricType::BlobReadTime, MetricType::BlobReadTime], }; + array[i] = ConnectionMetrics { - total_connections: AtomicCounter::new(text[0].0, text[0].1, text[0].2), - active_connections: AtomicGauge::new(text[1].0, text[1].1, text[1].2), - bytes_sent: AtomicCounter::new(text[2].0, text[2].1, text[2].2), - bytes_received: AtomicCounter::new(text[3].0, text[3].1, text[3].2), - elapsed: AtomicHistogram::<18>::new_medium_durations(text[4].0, text[4].1), + elapsed: AtomicHistogram::<18>::new_medium_durations(metric[0]), + active_connections: AtomicGauge::new(metric[1]), }; i += 1; } @@ -501,23 +347,15 @@ impl EventType { pub fn is_metric(&self) -> bool { match self { EventType::Server(ServerEvent::ThreadError) => true, - EventType::Purge( - PurgeEvent::Started - | PurgeEvent::Error - | PurgeEvent::AutoExpunge - | PurgeEvent::TombstoneCleanup, - ) => true, + EventType::Purge(PurgeEvent::Error) => true, EventType::Eval( EvalEvent::Error | EvalEvent::StoreNotFound | EvalEvent::DirectoryNotFound, ) => true, EventType::Acme( AcmeEvent::TlsAlpnError - | AcmeEvent::OrderStart | AcmeEvent::OrderCompleted | AcmeEvent::AuthError - | AcmeEvent::AuthCompleted | AcmeEvent::AuthTooManyAttempts - | AcmeEvent::DnsRecordCreated | AcmeEvent::DnsRecordCreationFailed | AcmeEvent::DnsRecordDeletionFailed | AcmeEvent::DnsRecordPropagationTimeout @@ -569,17 +407,61 @@ impl EventType { | JmapEvent::RequestTooLarge | JmapEvent::UnknownMethod, ) => true, - EventType::Imap(_) => true, - EventType::ManageSieve(_) => true, - EventType::Pop3(_) => true, - EventType::Smtp(_) => true, + EventType::Imap(ImapEvent::ConnectionStart | ImapEvent::ConnectionEnd) => true, + EventType::ManageSieve( + ManageSieveEvent::ConnectionStart | ManageSieveEvent::ConnectionEnd, + ) => true, + EventType::Pop3(Pop3Event::ConnectionStart | Pop3Event::ConnectionEnd) => true, + EventType::Smtp( + SmtpEvent::ConnectionStart + | SmtpEvent::ConnectionEnd + | SmtpEvent::Error + | SmtpEvent::ConcurrencyLimitExceeded + | SmtpEvent::TransferLimitExceeded + | SmtpEvent::RateLimitExceeded + | SmtpEvent::TimeLimitExceeded + | SmtpEvent::MessageParseFailed + | SmtpEvent::MessageTooLarge + | SmtpEvent::LoopDetected + | SmtpEvent::DkimPass + | SmtpEvent::DkimFail + | SmtpEvent::ArcPass + | SmtpEvent::ArcFail + | SmtpEvent::SpfEhloPass + | SmtpEvent::SpfEhloFail + | SmtpEvent::SpfFromPass + | SmtpEvent::SpfFromFail + | SmtpEvent::DmarcPass + | SmtpEvent::DmarcFail + | SmtpEvent::IprevPass + | SmtpEvent::IprevFail + | SmtpEvent::TooManyMessages + | SmtpEvent::InvalidEhlo + | SmtpEvent::DidNotSayEhlo + | SmtpEvent::MailFromUnauthenticated + | SmtpEvent::MailFromUnauthorized + | SmtpEvent::MailFromMissing + | SmtpEvent::MultipleMailFrom + | SmtpEvent::MailboxDoesNotExist + | SmtpEvent::RelayNotAllowed + | SmtpEvent::RcptToDuplicate + | SmtpEvent::RcptToMissing + | SmtpEvent::TooManyRecipients + | SmtpEvent::TooManyInvalidRcpt + | SmtpEvent::AuthMechanismNotSupported + | SmtpEvent::AuthExchangeTooLong + | SmtpEvent::CommandNotImplemented + | SmtpEvent::InvalidCommand + | SmtpEvent::SyntaxError + | SmtpEvent::RequestTooLarge, + ) => true, EventType::Http( HttpEvent::Error | HttpEvent::RequestBody | HttpEvent::ResponseBody | HttpEvent::XForwardedMissing, ) => true, - EventType::Network(_) => true, + EventType::Network(NetworkEvent::Timeout | NetworkEvent::DropBlocked) => true, EventType::Limit(_) => true, EventType::Manage(_) => false, EventType::Auth( @@ -593,15 +475,41 @@ impl EventType { EventType::Resource( ResourceEvent::NotFound | ResourceEvent::BadParameters | ResourceEvent::Error, ) => true, - EventType::Arc(_) => true, + EventType::Arc( + ArcEvent::ChainTooLong + | ArcEvent::InvalidInstance + | ArcEvent::InvalidCv + | ArcEvent::HasHeaderTag + | ArcEvent::BrokenChain, + ) => true, EventType::Dkim(_) => true, EventType::Dmarc(_) => true, EventType::Iprev(_) => true, - EventType::Dane(_) => true, + EventType::Dane( + DaneEvent::AuthenticationSuccess + | DaneEvent::AuthenticationFailure + | DaneEvent::NoCertificatesFound + | DaneEvent::CertificateParseError + | DaneEvent::TlsaRecordFetchError + | DaneEvent::TlsaRecordNotFound + | DaneEvent::TlsaRecordNotDnssecSigned + | DaneEvent::TlsaRecordInvalid, + ) => true, EventType::Spf(_) => true, EventType::MailAuth(_) => true, - EventType::Tls(_) => true, - EventType::Sieve(_) => true, + EventType::Tls(TlsEvent::HandshakeError) => true, + EventType::Sieve( + SieveEvent::ActionAccept + | SieveEvent::ActionAcceptReplace + | SieveEvent::ActionDiscard + | SieveEvent::ActionReject + | SieveEvent::SendMessage + | SieveEvent::MessageTooLarge + | SieveEvent::RuntimeError + | SieveEvent::UnexpectedError + | SieveEvent::NotSupported + | SieveEvent::QuotaExceeded, + ) => true, EventType::Spam( SpamEvent::PyzorError | SpamEvent::ListUpdated @@ -627,9 +535,41 @@ impl EventType { | FtsIndexEvent::BlobNotFound | FtsIndexEvent::MetadataNotFound, ) => true, - EventType::Milter(_) => true, + EventType::Milter( + MilterEvent::ActionAccept + | MilterEvent::ActionDiscard + | MilterEvent::ActionReject + | MilterEvent::ActionTempFail + | MilterEvent::ActionReplyCode + | MilterEvent::ActionConnectionFailure + | MilterEvent::ActionShutdown, + ) => true, EventType::MtaHook(_) => true, - EventType::Delivery(_) => true, + EventType::Delivery( + DeliveryEvent::AttemptStart + | DeliveryEvent::AttemptEnd + | DeliveryEvent::MxLookupFailed + | DeliveryEvent::IpLookupFailed + | DeliveryEvent::NullMx + | DeliveryEvent::GreetingFailed + | DeliveryEvent::EhloRejected + | DeliveryEvent::AuthFailed + | DeliveryEvent::MailFromRejected + | DeliveryEvent::Delivered + | DeliveryEvent::RcptToRejected + | DeliveryEvent::RcptToFailed + | DeliveryEvent::MessageRejected + | DeliveryEvent::StartTlsUnavailable + | DeliveryEvent::StartTlsError + | DeliveryEvent::StartTlsDisabled + | DeliveryEvent::ImplicitTlsError + | DeliveryEvent::ConcurrencyLimitExceeded + | DeliveryEvent::RateLimitExceeded + | DeliveryEvent::DoubleBounce + | DeliveryEvent::DsnSuccess + | DeliveryEvent::DsnTempFail + | DeliveryEvent::DsnPermFail, + ) => true, EventType::Queue( QueueEvent::QueueMessage | QueueEvent::QueueMessageAuthenticated @@ -642,8 +582,10 @@ impl EventType { | QueueEvent::ConcurrencyLimitExceeded | QueueEvent::QuotaExceeded, ) => true, - EventType::TlsRpt(_) => true, - EventType::MtaSts(_) => true, + EventType::TlsRpt(_) => false, + EventType::MtaSts( + MtaStsEvent::Authorized | MtaStsEvent::NotAuthorized | MtaStsEvent::InvalidPolicy, + ) => true, EventType::IncomingReport(_) => true, EventType::OutgoingReport( OutgoingReportEvent::SpfReport diff --git a/crates/trc/src/lib.rs b/crates/trc/src/lib.rs index e6c877e8..d2262c64 100644 --- a/crates/trc/src/lib.rs +++ b/crates/trc/src/lib.rs @@ -927,6 +927,35 @@ pub enum ResourceEvent { WebadminUnpacked, } +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum MetricType { + ServerMemory, + MessageIngestionTime, + MessageFtsIndexTime, + MessageSize, + MessageAuthSize, + DeliveryTotalTime, + DeliveryTime, + DeliveryActiveConnections, + QueueCount, + ReportOutgoingSize, + StoreReadTime, + StoreWriteTime, + BlobReadTime, + BlobWriteTime, + DnsLookupTime, + HttpActiveConnections, + HttpRequestTime, + ImapActiveConnections, + ImapRequestTime, + Pop3ActiveConnections, + Pop3RequestTime, + SmtpActiveConnections, + SmtpRequestTime, + SieveActiveConnections, + SieveRequestTime, +} + pub const TOTAL_EVENT_COUNT: usize = total_event_count!(); pub trait AddContext { diff --git a/crates/trc/src/serializers/binary.rs b/crates/trc/src/serializers/binary.rs index 0ba4f681..1d22a8ac 100644 --- a/crates/trc/src/serializers/binary.rs +++ b/crates/trc/src/serializers/binary.rs @@ -857,7 +857,7 @@ impl EventType { } } - fn from_code(code: u64) -> Option { + pub fn from_code(code: u64) -> Option { match code { 0 => Some(EventType::Acme(AcmeEvent::AuthCompleted)), 1 => Some(EventType::Acme(AcmeEvent::AuthError)), diff --git a/tests/src/jmap/enterprise.rs b/tests/src/jmap/enterprise.rs index cfd45bbe..17b4bcd5 100644 --- a/tests/src/jmap/enterprise.rs +++ b/tests/src/jmap/enterprise.rs @@ -12,7 +12,9 @@ use std::time::Duration; use common::{ config::telemetry::{StoreTracer, TelemetrySubscriberType}, - enterprise::{license::LicenseKey, undelete::DeletedBlob, Enterprise}, + enterprise::{ + license::LicenseKey, undelete::DeletedBlob, Enterprise, MetricsStore, TraceStore, Undelete, + }, telemetry::tracers::store::{TracingQuery, TracingStore}, }; use imap_proto::ResponseType; @@ -22,6 +24,7 @@ use trc::{ ipc::{bitset::Bitset, subscriber::SubscriberBuilder}, DeliveryEvent, EventType, SmtpEvent, }; +use utils::config::cron::SimpleCron; use crate::{ imap::{ImapConnection, Type}, @@ -40,9 +43,21 @@ pub async fn test(params: &mut JMAPTest) { hostname: String::new(), accounts: 100, }, - undelete_period: Duration::from_secs(2).into(), - trace_hold_period: Duration::from_secs(1).into(), - trace_store: core.storage.data.clone().into(), + undelete: Undelete { + retention: Duration::from_secs(2), + } + .into(), + trace_store: TraceStore { + retention: Some(Duration::from_secs(1)), + store: core.storage.data.clone(), + } + .into(), + metrics_store: MetricsStore { + retention: Some(Duration::from_secs(1)), + store: core.storage.data.clone(), + interval: SimpleCron::Day { hour: 0, minute: 0 }, + } + .into(), } .into(); params.server.shared_core.store(core.into());