Metric history + Live metrics

This commit is contained in:
mdecimus 2024-08-23 16:21:35 +02:00
parent 0aaf493f94
commit dcc31e8b3e
34 changed files with 1256 additions and 512 deletions

11
Cargo.lock generated
View file

@ -3217,6 +3217,7 @@ dependencies = [
"mail-builder", "mail-builder",
"mail-parser", "mail-parser",
"mail-send", "mail-send",
"memory-stats",
"mime", "mime",
"nlp", "nlp",
"p256", "p256",
@ -3741,6 +3742,16 @@ dependencies = [
"autocfg", "autocfg",
] ]
[[package]]
name = "memory-stats"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c73f5c649995a115e1a0220b35e4df0a1294500477f97a91d0660fb5abeb574a"
dependencies = [
"libc",
"windows-sys 0.52.0",
]
[[package]] [[package]]
name = "memsec" name = "memsec"
version = "0.7.0" version = "0.7.0"

View file

@ -32,6 +32,7 @@ impl Default for Network {
Self { Self {
blocked_ips: Default::default(), blocked_ips: Default::default(),
allowed_ips: Default::default(), allowed_ips: Default::default(),
node_id: 0,
http_response_url: IfBlock::new::<()>( http_response_url: IfBlock::new::<()>(
"server.http.url", "server.http.url",
[], [],
@ -45,6 +46,7 @@ impl Default for Network {
impl Network { impl Network {
pub fn parse(config: &mut Config) -> Self { pub fn parse(config: &mut Config) -> Self {
let mut network = Network { let mut network = Network {
node_id: config.property("cluster.node-id").unwrap_or_default(),
blocked_ips: BlockedIps::parse(config), blocked_ips: BlockedIps::parse(config),
allowed_ips: AllowedIps::parse(config), allowed_ips: AllowedIps::parse(config),
..Default::default() ..Default::default()

View file

@ -137,28 +137,18 @@ impl Telemetry {
}; };
// Parse metrics // Parse metrics
if config apply_events(
.property_or_default("metrics.prometheus.enable", "false") config
.unwrap_or(false) .properties::<EventOrMany>("metrics.disabled-events")
|| ["http", "grpc"].contains( .into_iter()
&config .map(|(_, e)| e),
.value("metrics.open-telemetry.transport") false,
.unwrap_or("disabled"), |event_type| {
) if event_type.is_metric() {
{ telemetry.metrics.set(event_type);
apply_events( }
config },
.properties::<EventOrMany>("metrics.disabled-events") );
.into_iter()
.map(|(_, e)| e),
false,
|event_type| {
if event_type.is_metric() {
telemetry.metrics.set(event_type);
}
},
);
}
telemetry telemetry
} }

View file

@ -12,9 +12,9 @@ use std::time::Duration;
use jmap_proto::types::collection::Collection; use jmap_proto::types::collection::Collection;
use store::{BitmapKey, Store, Stores}; use store::{BitmapKey, Store, Stores};
use utils::config::Config; use utils::config::{cron::SimpleCron, utils::ParseValue, Config};
use super::{license::LicenseValidator, Enterprise}; use super::{license::LicenseValidator, Enterprise, MetricsStore, TraceStore, Undelete};
impl Enterprise { impl Enterprise {
pub async fn parse(config: &mut Config, stores: &Stores, data: &Store) -> Option<Self> { pub async fn parse(config: &mut Config, stores: &Stores, data: &Store) -> Option<Self> {
@ -54,18 +54,62 @@ impl Enterprise {
_ => (), _ => (),
} }
Some(Enterprise { let trace_store = if config
license, .property_or_default("tracing.history.enable", "false")
undelete_period: config .unwrap_or(false)
.property_or_default::<Option<Duration>>("storage.undelete.retention", "false") {
.unwrap_or_default(), if let Some(store) = config
trace_hold_period: config
.property_or_default::<Option<Duration>>("tracing.history.retention", "30d")
.unwrap_or(Some(Duration::from_secs(30 * 24 * 60 * 60))),
trace_store: config
.value("tracing.history.store") .value("tracing.history.store")
.and_then(|name| stores.stores.get(name)) .and_then(|name| stores.stores.get(name))
.cloned(), .cloned()
{
TraceStore {
retention: config
.property_or_default::<Option<Duration>>("tracing.history.retention", "30d")
.unwrap_or(Some(Duration::from_secs(30 * 24 * 60 * 60))),
store,
}
.into()
} else {
None
}
} else {
None
};
let metrics_store = if config
.property_or_default("metrics.history.enable", "false")
.unwrap_or(false)
{
if let Some(store) = config
.value("metrics.history.store")
.and_then(|name| stores.stores.get(name))
.cloned()
{
MetricsStore {
retention: config
.property_or_default::<Option<Duration>>("metrics.history.retention", "30d")
.unwrap_or(Some(Duration::from_secs(30 * 24 * 60 * 60))),
store,
interval: config
.property_or_default::<SimpleCron>("metrics.history.interval", "0 * *")
.unwrap_or_else(|| SimpleCron::parse_value("0 * *").unwrap()),
}
.into()
} else {
None
}
} else {
None
};
Some(Enterprise {
license,
undelete: config
.property_or_default::<Option<Duration>>("storage.undelete.retention", "false")
.unwrap_or_default()
.map(|retention| Undelete { retention }),
trace_store,
metrics_store,
}) })
} }
} }

View file

@ -17,15 +17,34 @@ use std::time::Duration;
use license::LicenseKey; use license::LicenseKey;
use mail_parser::DateTime; use mail_parser::DateTime;
use store::Store; use store::Store;
use utils::config::cron::SimpleCron;
use crate::Core; use crate::Core;
#[derive(Clone)] #[derive(Clone)]
pub struct Enterprise { pub struct Enterprise {
pub license: LicenseKey, pub license: LicenseKey,
pub undelete_period: Option<Duration>, pub undelete: Option<Undelete>,
pub trace_hold_period: Option<Duration>, pub trace_store: Option<TraceStore>,
pub trace_store: Option<Store>, pub metrics_store: Option<MetricsStore>,
}
#[derive(Clone)]
pub struct Undelete {
pub retention: Duration,
}
#[derive(Clone)]
pub struct TraceStore {
pub retention: Option<Duration>,
pub store: Store,
}
#[derive(Clone)]
pub struct MetricsStore {
pub retention: Option<Duration>,
pub store: Store,
pub interval: SimpleCron,
} }
impl Core { impl Core {

View file

@ -40,13 +40,13 @@ impl Core {
blob_hash: &BlobHash, blob_hash: &BlobHash,
blob_size: usize, blob_size: usize,
) { ) {
if let Some(hold_period) = self.enterprise.as_ref().and_then(|e| e.undelete_period) { if let Some(undelete) = self.enterprise.as_ref().and_then(|e| e.undelete.as_ref()) {
let now = now(); let now = now();
batch.set( batch.set(
BlobOp::Reserve { BlobOp::Reserve {
hash: blob_hash.clone(), hash: blob_hash.clone(),
until: now + hold_period.as_secs(), until: now + undelete.retention.as_secs(),
}, },
KeySerializer::new(U64_LEN + U64_LEN) KeySerializer::new(U64_LEN + U64_LEN)
.write(blob_size as u32) .write(blob_size as u32)

View file

@ -28,7 +28,10 @@ use listener::{
use mail_send::Credentials; use mail_send::Credentials;
use sieve::Sieve; use sieve::Sieve;
use store::LookupStore; use store::{
write::{QueueClass, ValueClass},
IterateParams, LookupStore, ValueKey,
};
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use utils::BlobHash; use utils::BlobHash;
@ -65,6 +68,7 @@ pub struct Core {
#[derive(Clone)] #[derive(Clone)]
pub struct Network { pub struct Network {
pub node_id: u64,
pub blocked_ips: BlockedIps, pub blocked_ips: BlockedIps,
pub allowed_ips: AllowedIps, pub allowed_ips: AllowedIps,
pub http_response_url: IfBlock, pub http_response_url: IfBlock,
@ -303,6 +307,26 @@ impl Core {
.ctx(trc::Key::AccountName, credentials.login().to_string())) .ctx(trc::Key::AccountName, credentials.login().to_string()))
} }
} }
pub async fn message_queue_size(&self) -> trc::Result<u64> {
let mut total = 0;
self.storage
.data
.iterate(
IterateParams::new(
ValueKey::from(ValueClass::Queue(QueueClass::Message(0))),
ValueKey::from(ValueClass::Queue(QueueClass::Message(u64::MAX))),
)
.no_values(),
|_, _| {
total += 1;
Ok(true)
},
)
.await
.map(|_| total)
}
} }
trait CredentialsUsername { trait CredentialsUsername {

View file

@ -6,3 +6,6 @@
pub mod otel; pub mod otel;
pub mod prometheus; pub mod prometheus;
#[cfg(feature = "enterprise")]
pub mod store;

View file

@ -29,28 +29,8 @@ impl OtelMetrics {
// Add counters // Add counters
for counter in Collector::collect_counters(is_enterprise) { for counter in Collector::collect_counters(is_enterprise) {
metrics.push(Metric { metrics.push(Metric {
name: counter.id().into(), name: counter.id().name().into(),
description: counter.description().into(), description: counter.id().description().into(),
unit: counter.unit().into(),
data: Box::new(Sum {
data_points: vec![DataPoint {
attributes: vec![],
start_time: start_time.into(),
time: now.into(),
value: counter.get(),
exemplars: vec![],
}],
temporality: Temporality::Cumulative,
is_monotonic: true,
}),
});
}
// Add event counters
for counter in Collector::collect_event_counters(is_enterprise) {
metrics.push(Metric {
name: counter.id().into(),
description: counter.description().into(),
unit: "events".into(), unit: "events".into(),
data: Box::new(Sum { data: Box::new(Sum {
data_points: vec![DataPoint { data_points: vec![DataPoint {
@ -69,9 +49,9 @@ impl OtelMetrics {
// Add gauges // Add gauges
for gauge in Collector::collect_gauges(is_enterprise) { for gauge in Collector::collect_gauges(is_enterprise) {
metrics.push(Metric { metrics.push(Metric {
name: gauge.id().into(), name: gauge.id().name().into(),
description: gauge.description().into(), description: gauge.id().description().into(),
unit: gauge.unit().into(), unit: gauge.id().unit().into(),
data: Box::new(Gauge { data: Box::new(Gauge {
data_points: vec![DataPoint { data_points: vec![DataPoint {
attributes: vec![], attributes: vec![],
@ -87,9 +67,9 @@ impl OtelMetrics {
// Add histograms // Add histograms
for histogram in Collector::collect_histograms(is_enterprise) { for histogram in Collector::collect_histograms(is_enterprise) {
metrics.push(Metric { metrics.push(Metric {
name: histogram.id().into(), name: histogram.id().name().into(),
description: histogram.description().into(), description: histogram.id().description().into(),
unit: histogram.unit().into(), unit: histogram.id().unit().into(),
data: Box::new(Histogram { data: Box::new(Histogram {
data_points: vec![HistogramDataPoint { data_points: vec![HistogramDataPoint {
attributes: vec![], attributes: vec![],

View file

@ -25,18 +25,8 @@ impl Core {
// Add counters // Add counters
for counter in Collector::collect_counters(is_enterprise) { for counter in Collector::collect_counters(is_enterprise) {
let mut metric = MetricFamily::default(); let mut metric = MetricFamily::default();
metric.set_name(metric_name(counter.id())); metric.set_name(metric_name(counter.id().name()));
metric.set_help(counter.description().into()); metric.set_help(counter.id().description().into());
metric.set_field_type(MetricType::COUNTER);
metric.set_metric(vec![new_counter(counter.get())]);
metrics.push(metric);
}
// Add event counters
for counter in Collector::collect_event_counters(is_enterprise) {
let mut metric = MetricFamily::default();
metric.set_name(metric_name(counter.id()));
metric.set_help(counter.description().into());
metric.set_field_type(MetricType::COUNTER); metric.set_field_type(MetricType::COUNTER);
metric.set_metric(vec![new_counter(counter.value())]); metric.set_metric(vec![new_counter(counter.value())]);
metrics.push(metric); metrics.push(metric);
@ -45,8 +35,8 @@ impl Core {
// Add gauges // Add gauges
for gauge in Collector::collect_gauges(is_enterprise) { for gauge in Collector::collect_gauges(is_enterprise) {
let mut metric = MetricFamily::default(); let mut metric = MetricFamily::default();
metric.set_name(metric_name(gauge.id())); metric.set_name(metric_name(gauge.id().name()));
metric.set_help(gauge.description().into()); metric.set_help(gauge.id().description().into());
metric.set_field_type(MetricType::GAUGE); metric.set_field_type(MetricType::GAUGE);
metric.set_metric(vec![new_gauge(gauge.get())]); metric.set_metric(vec![new_gauge(gauge.get())]);
metrics.push(metric); metrics.push(metric);
@ -55,8 +45,8 @@ impl Core {
// Add histograms // Add histograms
for histogram in Collector::collect_histograms(is_enterprise) { for histogram in Collector::collect_histograms(is_enterprise) {
let mut metric = MetricFamily::default(); let mut metric = MetricFamily::default();
metric.set_name(metric_name(histogram.id())); metric.set_name(metric_name(histogram.id().name()));
metric.set_help(histogram.description().into()); metric.set_help(histogram.id().description().into());
metric.set_field_type(MetricType::HISTOGRAM); metric.set_field_type(MetricType::HISTOGRAM);
metric.set_metric(vec![new_histogram(histogram)]); metric.set_metric(vec![new_histogram(histogram)]);
metrics.push(metric); metrics.push(metric);

View file

@ -0,0 +1,257 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: LicenseRef-SEL
*
* This file is subject to the Stalwart Enterprise License Agreement (SEL) and
* is NOT open source software.
*
*/
use std::{future::Future, sync::Arc, time::Duration};
use ahash::AHashMap;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use store::{
write::{
key::{DeserializeBigEndian, KeySerializer},
now, BatchBuilder, TelemetryClass, ValueClass,
},
IterateParams, Store, ValueKey, U32_LEN, U64_LEN,
};
use trc::*;
use utils::codec::leb128::Leb128Reader;
use crate::Core;
pub trait MetricsStore: Sync + Send {
fn write_metrics(
&self,
core: Arc<Core>,
history: SharedMetricHistory,
) -> impl Future<Output = trc::Result<()>> + Send;
fn query_metrics(
&self,
from_timestamp: u64,
to_timestamp: u64,
) -> impl Future<Output = trc::Result<Vec<Metric<EventType, MetricType, u64>>>> + Send;
fn purge_metrics(&self, period: Duration) -> impl Future<Output = trc::Result<()>> + Send;
}
#[derive(Default)]
pub struct MetricsHistory {
events: AHashMap<EventType, u32>,
histograms: AHashMap<MetricType, HistogramHistory>,
}
#[derive(Default)]
struct HistogramHistory {
sum: u64,
count: u64,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type")]
#[serde(rename_all = "snake_case")]
pub enum Metric<CI, MI, T> {
Counter {
id: CI,
timestamp: T,
value: u32,
},
Histogram {
id: MI,
timestamp: T,
count: u64,
sum: u64,
},
}
pub type SharedMetricHistory = Arc<Mutex<MetricsHistory>>;
const TYPE_COUNTER: u64 = 0x00;
const TYPE_HISTOGRAM: u64 = 0x01;
impl MetricsStore for Store {
async fn write_metrics(
&self,
core: Arc<Core>,
history_: SharedMetricHistory,
) -> trc::Result<()> {
let mut batch = BatchBuilder::new();
{
let timestamp = now();
let node_id = core.network.node_id;
let mut history = history_.lock();
for event in [
EventType::Smtp(SmtpEvent::ConnectionStart),
EventType::Imap(ImapEvent::ConnectionStart),
EventType::Pop3(Pop3Event::ConnectionStart),
EventType::ManageSieve(ManageSieveEvent::ConnectionStart),
EventType::Http(HttpEvent::ConnectionStart),
EventType::Delivery(DeliveryEvent::AttemptStart),
EventType::Delivery(DeliveryEvent::Completed),
EventType::MessageIngest(MessageIngestEvent::Ham),
EventType::MessageIngest(MessageIngestEvent::Spam),
EventType::Network(NetworkEvent::DropBlocked),
EventType::IncomingReport(IncomingReportEvent::DmarcReport),
EventType::IncomingReport(IncomingReportEvent::DmarcReportWithWarnings),
EventType::IncomingReport(IncomingReportEvent::TlsReport),
EventType::IncomingReport(IncomingReportEvent::TlsReportWithWarnings),
] {
let reading = Collector::read_event_metric(event.id());
if reading > 0 {
let history = history.events.entry(event).or_insert(0);
let diff = reading - *history;
if diff > 0 {
batch.set(
ValueClass::Telemetry(TelemetryClass::Metric {
timestamp,
metric_id: (event.code() << 2) | TYPE_COUNTER,
node_id,
}),
KeySerializer::new(U32_LEN).write_leb128(diff).finalize(),
);
}
*history = reading;
}
}
for histogram in Collector::collect_histograms(true) {
let histogram_id = histogram.id();
if matches!(
histogram_id,
MetricType::MessageIngestionTime
| MetricType::MessageFtsIndexTime
| MetricType::DeliveryTotalTime
| MetricType::DeliveryTime
| MetricType::DnsLookupTime
) {
let history = history.histograms.entry(histogram_id).or_default();
let sum = histogram.sum();
let count = histogram.count();
let diff_sum = sum - history.sum;
let diff_count = count - history.count;
if diff_sum > 0 || diff_count > 0 {
batch.set(
ValueClass::Telemetry(TelemetryClass::Metric {
timestamp,
metric_id: (histogram_id.code() << 2) | TYPE_HISTOGRAM,
node_id,
}),
KeySerializer::new(U32_LEN)
.write_leb128(diff_count)
.write_leb128(diff_sum)
.finalize(),
);
}
history.sum = sum;
history.count = count;
}
}
}
if !batch.is_empty() {
self.write(batch.build())
.await
.caused_by(trc::location!())?;
}
Ok(())
}
async fn query_metrics(
&self,
from_timestamp: u64,
to_timestamp: u64,
) -> trc::Result<Vec<Metric<EventType, MetricType, u64>>> {
let mut metrics = Vec::new();
self.iterate(
IterateParams::new(
ValueKey::from(ValueClass::Telemetry(TelemetryClass::Metric {
timestamp: from_timestamp,
metric_id: 0,
node_id: 0,
})),
ValueKey::from(ValueClass::Telemetry(TelemetryClass::Metric {
timestamp: to_timestamp,
metric_id: 0,
node_id: 0,
})),
),
|key, value| {
let timestamp = key.deserialize_be_u64(0).caused_by(trc::location!())?;
let (metric_type, _) = key
.get(U64_LEN..)
.and_then(|bytes| bytes.read_leb128::<u64>())
.ok_or_else(|| trc::Error::corrupted_key(key, None, trc::location!()))?;
match metric_type & 0x03 {
TYPE_COUNTER => {
let id = EventType::from_code(metric_type >> 2).ok_or_else(|| {
trc::Error::corrupted_key(key, None, trc::location!())
})?;
let (value, _) = value.read_leb128::<u32>().ok_or_else(|| {
trc::Error::corrupted_key(key, value.into(), trc::location!())
})?;
metrics.push(Metric::Counter {
id,
timestamp,
value,
});
}
TYPE_HISTOGRAM => {
let id = MetricType::from_code(metric_type >> 2).ok_or_else(|| {
trc::Error::corrupted_key(key, None, trc::location!())
})?;
let (count, bytes_read) = value.read_leb128::<u64>().ok_or_else(|| {
trc::Error::corrupted_key(key, value.into(), trc::location!())
})?;
let (sum, _) = value
.get(bytes_read..)
.and_then(|bytes| bytes.read_leb128::<u64>())
.ok_or_else(|| {
trc::Error::corrupted_key(key, value.into(), trc::location!())
})?;
metrics.push(Metric::Histogram {
id,
timestamp,
count,
sum,
});
}
_ => return Err(trc::Error::corrupted_key(key, None, trc::location!())),
}
Ok(true)
},
)
.await
.caused_by(trc::location!())?;
Ok(metrics)
}
async fn purge_metrics(&self, period: Duration) -> trc::Result<()> {
self.delete_range(
ValueKey::from(ValueClass::Telemetry(TelemetryClass::Metric {
timestamp: 0,
metric_id: 0,
node_id: 0,
})),
ValueKey::from(ValueClass::Telemetry(TelemetryClass::Metric {
timestamp: now() - period.as_secs(),
metric_id: 0,
node_id: 0,
})),
)
.await
.caused_by(trc::location!())
}
}
impl MetricsHistory {
pub fn init() -> SharedMetricHistory {
Arc::new(Mutex::new(Self::default()))
}
}

View file

@ -12,7 +12,7 @@ use std::{future::Future, time::Duration};
use ahash::{AHashMap, AHashSet}; use ahash::{AHashMap, AHashSet};
use store::{ use store::{
write::{key::DeserializeBigEndian, BatchBuilder, MaybeDynamicId, TraceClass, ValueClass}, write::{key::DeserializeBigEndian, BatchBuilder, MaybeDynamicId, TelemetryClass, ValueClass},
Deserialize, IterateParams, Store, ValueKey, U64_LEN, Deserialize, IterateParams, Store, ValueKey, U64_LEN,
}; };
use trc::{ use trc::{
@ -81,7 +81,7 @@ pub(crate) fn spawn_store_tracer(builder: SubscriberBuilder, settings: StoreTrac
if !queue_ids.is_empty() { if !queue_ids.is_empty() {
// Serialize events // Serialize events
batch.set( batch.set(
ValueClass::Trace(TraceClass::Span { span_id }), ValueClass::Telemetry(TelemetryClass::Span { span_id }),
serialize_events( serialize_events(
[span.as_ref()] [span.as_ref()]
.into_iter() .into_iter()
@ -93,7 +93,7 @@ pub(crate) fn spawn_store_tracer(builder: SubscriberBuilder, settings: StoreTrac
// Build index // Build index
batch.set( batch.set(
ValueClass::Trace(TraceClass::Index { ValueClass::Telemetry(TelemetryClass::Index {
span_id, span_id,
value: (span.inner.typ.code() as u16).to_be_bytes().to_vec(), value: (span.inner.typ.code() as u16).to_be_bytes().to_vec(),
}), }),
@ -101,7 +101,7 @@ pub(crate) fn spawn_store_tracer(builder: SubscriberBuilder, settings: StoreTrac
); );
for queue_id in queue_ids { for queue_id in queue_ids {
batch.set( batch.set(
ValueClass::Trace(TraceClass::Index { ValueClass::Telemetry(TelemetryClass::Index {
span_id, span_id,
value: queue_id.to_be_bytes().to_vec(), value: queue_id.to_be_bytes().to_vec(),
}), }),
@ -110,7 +110,7 @@ pub(crate) fn spawn_store_tracer(builder: SubscriberBuilder, settings: StoreTrac
} }
for value in values { for value in values {
batch.set( batch.set(
ValueClass::Trace(TraceClass::Index { ValueClass::Telemetry(TelemetryClass::Index {
span_id, span_id,
value: value.into_bytes(), value: value.into_bytes(),
}), }),
@ -158,18 +158,18 @@ pub trait TracingStore: Sync + Send {
impl TracingStore for Store { impl TracingStore for Store {
async fn get_span(&self, span_id: u64) -> trc::Result<Vec<Event<EventDetails>>> { async fn get_span(&self, span_id: u64) -> trc::Result<Vec<Event<EventDetails>>> {
self.get_value::<Span>(ValueKey::from(ValueClass::Trace(TraceClass::Span { self.get_value::<Span>(ValueKey::from(ValueClass::Telemetry(
span_id, TelemetryClass::Span { span_id },
}))) )))
.await .await
.caused_by(trc::location!()) .caused_by(trc::location!())
.map(|span| span.map(|span| span.0).unwrap_or_default()) .map(|span| span.map(|span| span.0).unwrap_or_default())
} }
async fn get_raw_span(&self, span_id: u64) -> trc::Result<Option<Vec<u8>>> { async fn get_raw_span(&self, span_id: u64) -> trc::Result<Option<Vec<u8>>> {
self.get_value::<RawSpan>(ValueKey::from(ValueClass::Trace(TraceClass::Span { self.get_value::<RawSpan>(ValueKey::from(ValueClass::Telemetry(
span_id, TelemetryClass::Span { span_id },
}))) )))
.await .await
.caused_by(trc::location!()) .caused_by(trc::location!())
.map(|span| span.map(|span| span.0)) .map(|span| span.map(|span| span.0))
@ -206,11 +206,11 @@ impl TracingStore for Store {
let mut param_spans = SpanCollector::new(num_params); let mut param_spans = SpanCollector::new(num_params);
self.iterate( self.iterate(
IterateParams::new( IterateParams::new(
ValueKey::from(ValueClass::Trace(TraceClass::Index { ValueKey::from(ValueClass::Telemetry(TelemetryClass::Index {
span_id: 0, span_id: 0,
value: value.clone(), value: value.clone(),
})), })),
ValueKey::from(ValueClass::Trace(TraceClass::Index { ValueKey::from(ValueClass::Telemetry(TelemetryClass::Index {
span_id: u64::MAX, span_id: u64::MAX,
value, value,
})), })),
@ -253,8 +253,8 @@ impl TracingStore for Store {
})?; })?;
self.delete_range( self.delete_range(
ValueKey::from(ValueClass::Trace(TraceClass::Span { span_id: 0 })), ValueKey::from(ValueClass::Telemetry(TelemetryClass::Span { span_id: 0 })),
ValueKey::from(ValueClass::Trace(TraceClass::Span { ValueKey::from(ValueClass::Telemetry(TelemetryClass::Span {
span_id: until_span_id, span_id: until_span_id,
})), })),
) )
@ -264,11 +264,11 @@ impl TracingStore for Store {
let mut delete_keys: Vec<ValueClass<MaybeDynamicId>> = Vec::new(); let mut delete_keys: Vec<ValueClass<MaybeDynamicId>> = Vec::new();
self.iterate( self.iterate(
IterateParams::new( IterateParams::new(
ValueKey::from(ValueClass::Trace(TraceClass::Index { ValueKey::from(ValueClass::Telemetry(TelemetryClass::Index {
span_id: 0, span_id: 0,
value: vec![], value: vec![],
})), })),
ValueKey::from(ValueClass::Trace(TraceClass::Index { ValueKey::from(ValueClass::Telemetry(TelemetryClass::Index {
span_id: u64::MAX, span_id: u64::MAX,
value: vec![u8::MAX; 16], value: vec![u8::MAX; 16],
})), })),
@ -279,7 +279,7 @@ impl TracingStore for Store {
.deserialize_be_u64(key.len() - U64_LEN) .deserialize_be_u64(key.len() - U64_LEN)
.caused_by(trc::location!())?; .caused_by(trc::location!())?;
if span_id < until_span_id { if span_id < until_span_id {
delete_keys.push(ValueClass::Trace(TraceClass::Index { delete_keys.push(ValueClass::Telemetry(TelemetryClass::Index {
span_id, span_id,
value: key[0..key.len() - U64_LEN].to_vec(), value: key[0..key.len() - U64_LEN].to_vec(),
})); }));

View file

@ -57,7 +57,7 @@ lz4_flex = { version = "0.11", default-features = false }
rev_lines = "0.3.0" rev_lines = "0.3.0"
x509-parser = "0.16.0" x509-parser = "0.16.0"
quick-xml = "0.36" quick-xml = "0.36"
memory-stats = "1.2.0"
[features] [features]
test_mode = [] test_mode = []

View file

@ -297,16 +297,26 @@ impl JMAP {
if err.matches(trc::EventType::Auth(trc::AuthEvent::Failed)) if err.matches(trc::EventType::Auth(trc::AuthEvent::Failed))
&& self.core.is_enterprise_edition() && self.core.is_enterprise_edition()
{ {
if let Some(token) = if let Some((live_path, token)) = req
req.uri().path().strip_prefix("/api/tracing/live/") .uri()
.path()
.strip_prefix("/api/telemetry/")
.and_then(|p| {
p.strip_suffix("traces/live/")
.map(|t| ("traces", t))
.or_else(|| {
p.strip_suffix("metrics/live/")
.map(|t| ("metrics", t))
})
})
{ {
let (account_id, _, _) = let (account_id, _, _) =
self.validate_access_token("live_tracing", token).await?; self.validate_access_token("live_telemetry", token).await?;
return self return self
.handle_tracing_api_request( .handle_telemetry_api_request(
&req, &req,
vec!["", "live"], vec!["", live_path, "live"],
account_id, account_id,
) )
.await; .await;

View file

@ -8,5 +8,5 @@
* *
*/ */
pub mod tracing; pub mod telemetry;
pub mod undelete; pub mod undelete;

View file

@ -8,9 +8,15 @@
* *
*/ */
use std::time::{Duration, Instant}; use std::{
fmt::Write,
time::{Duration, Instant},
};
use common::telemetry::tracers::store::{TracingQuery, TracingStore}; use common::telemetry::{
metrics::store::{Metric, MetricsStore},
tracers::store::{TracingQuery, TracingStore},
};
use directory::backend::internal::manage; use directory::backend::internal::manage;
use http_body_util::{combinators::BoxBody, StreamBody}; use http_body_util::{combinators::BoxBody, StreamBody};
use hyper::{ use hyper::{
@ -23,7 +29,7 @@ use store::ahash::{AHashMap, AHashSet};
use trc::{ use trc::{
ipc::{bitset::Bitset, subscriber::SubscriberBuilder}, ipc::{bitset::Bitset, subscriber::SubscriberBuilder},
serializers::json::JsonEventSerializer, serializers::json::JsonEventSerializer,
DeliveryEvent, EventType, Key, QueueEvent, Value, Collector, DeliveryEvent, EventType, Key, MetricType, QueueEvent, Value,
}; };
use utils::{snowflake::SnowflakeIdGenerator, url_params::UrlParams}; use utils::{snowflake::SnowflakeIdGenerator, url_params::UrlParams};
@ -36,7 +42,7 @@ use crate::{
}; };
impl JMAP { impl JMAP {
pub async fn handle_tracing_api_request( pub async fn handle_telemetry_api_request(
&self, &self,
req: &HttpRequest, req: &HttpRequest,
path: Vec<&str>, path: Vec<&str>,
@ -49,7 +55,7 @@ impl JMAP {
path.get(2).copied(), path.get(2).copied(),
req.method(), req.method(),
) { ) {
("spans", None, &Method::GET) => { ("traces", None, &Method::GET) => {
let page: usize = params.parse("page").unwrap_or(0); let page: usize = params.parse("page").unwrap_or(0);
let limit: usize = params.parse("limit").unwrap_or(0); let limit: usize = params.parse("limit").unwrap_or(0);
let mut tracing_query = Vec::new(); let mut tracing_query = Vec::new();
@ -100,12 +106,13 @@ impl JMAP {
.and_then(SnowflakeIdGenerator::from_timestamp) .and_then(SnowflakeIdGenerator::from_timestamp)
.unwrap_or(0); .unwrap_or(0);
let values = params.get("values").is_some(); let values = params.get("values").is_some();
let store = self let store = &self
.core .core
.enterprise .enterprise
.as_ref() .as_ref()
.and_then(|e| e.trace_store.as_ref()) .and_then(|e| e.trace_store.as_ref())
.ok_or_else(|| manage::unsupported("No tracing store has been configured"))?; .ok_or_else(|| manage::unsupported("No tracing store has been configured"))?
.store;
let span_ids = store.query_spans(&tracing_query, after, before).await?; let span_ids = store.query_spans(&tracing_query, after, before).await?;
let (total, span_ids) = if limit > 0 { let (total, span_ids) = if limit > 0 {
@ -154,52 +161,7 @@ impl JMAP {
.into_http_response()) .into_http_response())
} }
} }
("span", id, &Method::GET) => { ("traces", Some("live"), &Method::GET) => {
let store = self
.core
.enterprise
.as_ref()
.and_then(|e| e.trace_store.as_ref())
.ok_or_else(|| manage::unsupported("No tracing store has been configured"))?;
let mut events = Vec::new();
for span_id in id
.or_else(|| params.get("id"))
.unwrap_or_default()
.split(',')
{
if let Ok(span_id) = span_id.parse::<u64>() {
events.push(
JsonEventSerializer::new(store.get_span(span_id).await?)
.with_description()
.with_explanation(),
);
} else {
events.push(JsonEventSerializer::new(Vec::new()));
}
}
if events.len() == 1 && id.is_some() {
Ok(JsonResponse::new(json!({
"data": events.into_iter().next().unwrap(),
}))
.into_http_response())
} else {
Ok(JsonResponse::new(json!({
"data": events,
}))
.into_http_response())
}
}
("live", Some("token"), &Method::GET) => {
// Issue a live tracing token valid for 60 seconds
Ok(JsonResponse::new(json!({
"data": self.issue_custom_token(account_id, "live_tracing", "web", 60).await?,
}))
.into_http_response())
}
("live", _, &Method::GET) => {
let mut key_filters = AHashMap::new(); let mut key_filters = AHashMap::new();
let mut filter = None; let mut filter = None;
@ -327,6 +289,190 @@ impl JMAP {
))), ))),
}) })
} }
("trace", id, &Method::GET) => {
let store = &self
.core
.enterprise
.as_ref()
.and_then(|e| e.trace_store.as_ref())
.ok_or_else(|| manage::unsupported("No tracing store has been configured"))?
.store;
let mut events = Vec::new();
for span_id in id
.or_else(|| params.get("id"))
.unwrap_or_default()
.split(',')
{
if let Ok(span_id) = span_id.parse::<u64>() {
events.push(
JsonEventSerializer::new(store.get_span(span_id).await?)
.with_description()
.with_explanation(),
);
} else {
events.push(JsonEventSerializer::new(Vec::new()));
}
}
if events.len() == 1 && id.is_some() {
Ok(JsonResponse::new(json!({
"data": events.into_iter().next().unwrap(),
}))
.into_http_response())
} else {
Ok(JsonResponse::new(json!({
"data": events,
}))
.into_http_response())
}
}
("live", Some("token"), &Method::GET) => {
// Issue a live telemetry token valid for 60 seconds
Ok(JsonResponse::new(json!({
"data": self.issue_custom_token(account_id, "live_telemetry", "web", 60).await?,
}))
.into_http_response())
}
("metrics", None, &Method::GET) => {
let before = params
.parse::<Timestamp>("before")
.map(|t| t.into_inner())
.unwrap_or(0);
let after = params
.parse::<Timestamp>("after")
.map(|t| t.into_inner())
.unwrap_or(0);
let results = self
.core
.enterprise
.as_ref()
.and_then(|e| e.metrics_store.as_ref())
.ok_or_else(|| manage::unsupported("No metrics store has been configured"))?
.store
.query_metrics(after, before)
.await?;
let mut metrics = Vec::with_capacity(results.len());
for metric in results {
metrics.push(match metric {
Metric::Counter {
id,
timestamp,
value,
} => Metric::Counter {
id: id.name(),
timestamp: DateTime::from_timestamp(timestamp as i64).to_rfc3339(),
value,
},
Metric::Histogram {
id,
timestamp,
count,
sum,
} => Metric::Histogram {
id: id.name(),
timestamp: DateTime::from_timestamp(timestamp as i64).to_rfc3339(),
count,
sum,
},
});
}
Ok(JsonResponse::new(json!({
"data": metrics,
}))
.into_http_response())
}
("metrics", Some("live"), &Method::GET) => {
let interval = Duration::from_secs(
params
.parse::<u64>("interval")
.filter(|interval| *interval >= 1)
.unwrap_or(30),
);
let mut event_types = AHashSet::new();
let mut metric_types = AHashSet::new();
for metric_name in params.get("metrics").unwrap_or_default().split(',') {
let metric_name = metric_name.trim();
if !metric_name.is_empty() {
if let Some(event_type) = EventType::try_parse(metric_name) {
event_types.insert(event_type);
} else if let Some(metric_type) = MetricType::try_parse(metric_name) {
metric_types.insert(metric_type);
}
}
}
Ok(HttpResponse {
status: StatusCode::OK,
content_type: "text/event-stream".into(),
content_disposition: "".into(),
cache_control: "no-store".into(),
body: HttpResponseBody::Stream(BoxBody::new(StreamBody::new(
async_stream::stream! {
loop {
let mut metrics = String::with_capacity(512);
metrics.push_str("event: metrics\ndata: [");
let mut is_first = true;
for counter in Collector::collect_counters(true) {
if event_types.is_empty() || event_types.contains(&counter.id()) {
if !is_first {
metrics.push(',');
} else {
is_first = false;
}
let _ = write!(
&mut metrics,
"{{\"id\":\"{}\",\"type\":\"counter\",\"value\":{}}}",
counter.id().name(),
counter.value()
);
}
}
for gauge in Collector::collect_gauges(true) {
if metric_types.is_empty() || metric_types.contains(&gauge.id()) {
if !is_first {
metrics.push(',');
} else {
is_first = false;
}
let _ = write!(
&mut metrics,
"{{\"id\":\"{}\",\"type\":\"gauge\",\"value\":{}}}",
gauge.id().name(),
gauge.get()
);
}
}
for histogram in Collector::collect_histograms(true) {
if metric_types.is_empty() || metric_types.contains(&histogram.id()) {
if !is_first {
metrics.push(',');
} else {
is_first = false;
}
let _ = write!(
&mut metrics,
"{{\"id\":\"{}\",\"type\":\"histogram\",\"count\":{},\"sum\":{}}}",
histogram.id().name(),
histogram.count(),
histogram.sum()
);
}
}
metrics.push_str("]\n\n");
yield Ok(Frame::data(Bytes::from(metrics)));
tokio::time::sleep(interval).await;
}
},
))),
})
}
_ => Err(trc::ResourceEvent::NotFound.into_err()), _ => Err(trc::ResourceEvent::NotFound.into_err()),
} }
} }

View file

@ -83,7 +83,7 @@ impl JMAP {
// SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art> // SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
// SPDX-License-Identifier: LicenseRef-SEL // SPDX-License-Identifier: LicenseRef-SEL
#[cfg(feature = "enterprise")] #[cfg(feature = "enterprise")]
"tracing" if is_superuser => { "telemetry" if is_superuser => {
// WARNING: TAMPERING WITH THIS FUNCTION IS STRICTLY PROHIBITED // WARNING: TAMPERING WITH THIS FUNCTION IS STRICTLY PROHIBITED
// Any attempt to modify, bypass, or disable this license validation mechanism // Any attempt to modify, bypass, or disable this license validation mechanism
// constitutes a severe violation of the Stalwart Enterprise License Agreement. // constitutes a severe violation of the Stalwart Enterprise License Agreement.
@ -94,7 +94,7 @@ impl JMAP {
// for copyright infringement, breach of contract, and fraud. // for copyright infringement, breach of contract, and fraud.
if self.core.is_enterprise_edition() { if self.core.is_enterprise_edition() {
self.handle_tracing_api_request(req, path, access_token.primary_id()) self.handle_telemetry_api_request(req, path, access_token.primary_id())
.await .await
} else { } else {
Err(manage::enterprise()) Err(manage::enterprise())

View file

@ -12,14 +12,17 @@ use std::{
use common::{config::telemetry::OtelMetrics, IPC_CHANNEL_BUFFER}; use common::{config::telemetry::OtelMetrics, IPC_CHANNEL_BUFFER};
#[cfg(feature = "enterprise")] #[cfg(feature = "enterprise")]
use common::telemetry::tracers::store::TracingStore; use common::telemetry::{
metrics::store::{MetricsStore, SharedMetricHistory},
tracers::store::TracingStore,
};
use store::{ use store::{
write::{now, purge::PurgeStore}, write::{now, purge::PurgeStore},
BlobStore, LookupStore, Store, BlobStore, LookupStore, Store,
}; };
use tokio::sync::mpsc; use tokio::sync::mpsc;
use trc::HousekeeperEvent; use trc::{Collector, HousekeeperEvent, MetricType};
use utils::map::ttl_dashmap::TtlMap; use utils::map::ttl_dashmap::TtlMap;
use crate::{Inner, JmapInstance, JMAP, LONG_SLUMBER}; use crate::{Inner, JmapInstance, JMAP, LONG_SLUMBER};
@ -55,6 +58,9 @@ enum ActionClass {
Acme(String), Acme(String),
OtelMetrics, OtelMetrics,
#[cfg(feature = "enterprise")] #[cfg(feature = "enterprise")]
InternalMetrics,
CalculateMetrics,
#[cfg(feature = "enterprise")]
ReloadSettings, ReloadSettings,
} }
@ -99,6 +105,9 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) {
queue.schedule(Instant::now() + otel.interval, ActionClass::OtelMetrics); queue.schedule(Instant::now() + otel.interval, ActionClass::OtelMetrics);
} }
// Calculate expensive metrics
queue.schedule(Instant::now(), ActionClass::CalculateMetrics);
// Add all ACME renewals to heap // Add all ACME renewals to heap
for provider in core_.tls.acme_providers.values() { for provider in core_.tls.acme_providers.values() {
match core_.init_acme(provider).await { match core_.init_acme(provider).await {
@ -125,10 +134,21 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) {
Instant::now() + enterprise.license.expires_in(), Instant::now() + enterprise.license.expires_in(),
ActionClass::ReloadSettings, ActionClass::ReloadSettings,
); );
if let Some(metrics_store) = enterprise.metrics_store.as_ref() {
queue.schedule(
Instant::now() + metrics_store.interval.time_to_next(),
ActionClass::InternalMetrics,
);
}
} }
// SPDX-SnippetEnd // SPDX-SnippetEnd
} }
// Metrics history
#[cfg(feature = "enterprise")]
let metrics_history = SharedMetricHistory::default();
loop { loop {
match tokio::time::timeout(queue.wake_up_time(), rx.recv()).await { match tokio::time::timeout(queue.wake_up_time(), rx.recv()).await {
Ok(Some(event)) => match event { Ok(Some(event)) => match event {
@ -185,12 +205,21 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) {
// SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art> // SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
// SPDX-License-Identifier: LicenseRef-SEL // SPDX-License-Identifier: LicenseRef-SEL
#[cfg(feature = "enterprise")] #[cfg(feature = "enterprise")]
let trace_hold_period = core let trace_retention = core
.core .core
.load() .load()
.enterprise .enterprise
.as_ref() .as_ref()
.and_then(|e| e.trace_hold_period); .and_then(|e| e.trace_store.as_ref())
.and_then(|t| t.retention);
#[cfg(feature = "enterprise")]
let metrics_retention = core
.core
.load()
.enterprise
.as_ref()
.and_then(|e| e.metrics_store.as_ref())
.and_then(|m| m.retention);
// SPDX-SnippetEnd // SPDX-SnippetEnd
tokio::spawn(async move { tokio::spawn(async move {
@ -206,11 +235,18 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) {
// SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art> // SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
// SPDX-License-Identifier: LicenseRef-SEL // SPDX-License-Identifier: LicenseRef-SEL
#[cfg(feature = "enterprise")] #[cfg(feature = "enterprise")]
if let Some(trace_hold_period) = trace_hold_period { if let Some(trace_retention) = trace_retention {
if let Err(err) = store.purge_spans(trace_hold_period).await { if let Err(err) = store.purge_spans(trace_retention).await {
trc::error!(err.details("Failed to purge tracing spans")); trc::error!(err.details("Failed to purge tracing spans"));
} }
} }
#[cfg(feature = "enterprise")]
if let Some(metrics_retention) = metrics_retention {
if let Err(err) = store.purge_metrics(metrics_retention).await {
trc::error!(err.details("Failed to purge metrics"));
}
}
// SPDX-SnippetEnd // SPDX-SnippetEnd
}); });
} }
@ -382,10 +418,83 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) {
}); });
} }
} }
ActionClass::CalculateMetrics => {
// Calculate expensive metrics every 5 minutes
queue.schedule(
Instant::now() + Duration::from_secs(5 * 60),
ActionClass::OtelMetrics,
);
let core = core_.clone();
tokio::spawn(async move {
#[cfg(feature = "enterprise")]
if core.is_enterprise_edition() {
// Obtain queue size
match core.message_queue_size().await {
Ok(total) => {
Collector::update_gauge(
MetricType::QueueCount,
total,
);
}
Err(err) => {
trc::error!(
err.details("Failed to obtain queue size")
);
}
}
}
match tokio::task::spawn_blocking(memory_stats::memory_stats)
.await
{
Ok(Some(stats)) => {
Collector::update_gauge(
MetricType::ServerMemory,
stats.physical_mem as u64,
);
}
Ok(None) => {}
Err(err) => {
trc::error!(trc::EventType::Server(
trc::ServerEvent::ThreadError,
)
.reason(err)
.caused_by(trc::location!())
.details("Join Error"));
}
}
});
}
// SPDX-SnippetBegin // SPDX-SnippetBegin
// SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art> // SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
// SPDX-License-Identifier: LicenseRef-SEL // SPDX-License-Identifier: LicenseRef-SEL
#[cfg(feature = "enterprise")]
ActionClass::InternalMetrics => {
if let Some(metrics_store) = &core_
.enterprise
.as_ref()
.and_then(|e| e.metrics_store.as_ref())
{
queue.schedule(
Instant::now() + metrics_store.interval.time_to_next(),
ActionClass::InternalMetrics,
);
let metrics_store = metrics_store.store.clone();
let metrics_history = metrics_history.clone();
let core = core_.clone();
tokio::spawn(async move {
if let Err(err) =
metrics_store.write_metrics(core, metrics_history).await
{
trc::error!(err.details("Failed to write metrics"));
}
});
}
}
#[cfg(feature = "enterprise")] #[cfg(feature = "enterprise")]
ActionClass::ReloadSettings => { ActionClass::ReloadSettings => {
match core_.reload().await { match core_.reload().await {

View file

@ -105,8 +105,9 @@ impl MysqlStore {
SUBSPACE_REPORT_IN, SUBSPACE_REPORT_IN,
SUBSPACE_FTS_INDEX, SUBSPACE_FTS_INDEX,
SUBSPACE_LOGS, SUBSPACE_LOGS,
SUBSPACE_TRACE, SUBSPACE_TELEMETRY_SPAN,
SUBSPACE_TRACE_INDEX, SUBSPACE_TELEMETRY_METRIC,
SUBSPACE_TELEMETRY_INDEX,
] { ] {
let table = char::from(table); let table = char::from(table);
conn.query_drop(&format!( conn.query_drop(&format!(

View file

@ -93,8 +93,9 @@ impl PostgresStore {
SUBSPACE_FTS_INDEX, SUBSPACE_FTS_INDEX,
SUBSPACE_LOGS, SUBSPACE_LOGS,
SUBSPACE_BLOBS, SUBSPACE_BLOBS,
SUBSPACE_TRACE, SUBSPACE_TELEMETRY_SPAN,
SUBSPACE_TRACE_INDEX, SUBSPACE_TELEMETRY_METRIC,
SUBSPACE_TELEMETRY_INDEX,
] { ] {
let table = char::from(table); let table = char::from(table);
conn.execute( conn.execute(

View file

@ -87,8 +87,9 @@ impl RocksDbStore {
SUBSPACE_FTS_INDEX, SUBSPACE_FTS_INDEX,
SUBSPACE_LOGS, SUBSPACE_LOGS,
SUBSPACE_BLOBS, SUBSPACE_BLOBS,
SUBSPACE_TRACE, SUBSPACE_TELEMETRY_SPAN,
SUBSPACE_TRACE_INDEX, SUBSPACE_TELEMETRY_METRIC,
SUBSPACE_TELEMETRY_INDEX,
] { ] {
let cf_opts = Options::default(); let cf_opts = Options::default();
cfs.push(ColumnFamilyDescriptor::new( cfs.push(ColumnFamilyDescriptor::new(

View file

@ -104,8 +104,9 @@ impl SqliteStore {
SUBSPACE_FTS_INDEX, SUBSPACE_FTS_INDEX,
SUBSPACE_LOGS, SUBSPACE_LOGS,
SUBSPACE_BLOBS, SUBSPACE_BLOBS,
SUBSPACE_TRACE, SUBSPACE_TELEMETRY_SPAN,
SUBSPACE_TRACE_INDEX, SUBSPACE_TELEMETRY_METRIC,
SUBSPACE_TELEMETRY_INDEX,
] { ] {
let table = char::from(table); let table = char::from(table);
conn.execute( conn.execute(

View file

@ -560,8 +560,9 @@ impl Store {
SUBSPACE_REPORT_OUT, SUBSPACE_REPORT_OUT,
SUBSPACE_REPORT_IN, SUBSPACE_REPORT_IN,
SUBSPACE_FTS_INDEX, SUBSPACE_FTS_INDEX,
SUBSPACE_TRACE, SUBSPACE_TELEMETRY_SPAN,
SUBSPACE_TRACE_INDEX, SUBSPACE_TELEMETRY_METRIC,
SUBSPACE_TELEMETRY_INDEX,
] { ] {
self.delete_range( self.delete_range(
AnyKey { AnyKey {
@ -745,8 +746,9 @@ impl Store {
(SUBSPACE_BITMAP_TAG, false), (SUBSPACE_BITMAP_TAG, false),
(SUBSPACE_BITMAP_TEXT, false), (SUBSPACE_BITMAP_TEXT, false),
(SUBSPACE_INDEXES, false), (SUBSPACE_INDEXES, false),
(SUBSPACE_TRACE, true), (SUBSPACE_TELEMETRY_SPAN, true),
(SUBSPACE_TRACE_INDEX, true), (SUBSPACE_TELEMETRY_METRIC, true),
(SUBSPACE_TELEMETRY_INDEX, true),
] { ] {
let from_key = crate::write::AnyKey { let from_key = crate::write::AnyKey {
subspace, subspace,

View file

@ -147,12 +147,12 @@ pub const SUBSPACE_QUOTA: u8 = b'u';
pub const SUBSPACE_REPORT_OUT: u8 = b'h'; pub const SUBSPACE_REPORT_OUT: u8 = b'h';
pub const SUBSPACE_REPORT_IN: u8 = b'r'; pub const SUBSPACE_REPORT_IN: u8 = b'r';
pub const SUBSPACE_FTS_INDEX: u8 = b'g'; pub const SUBSPACE_FTS_INDEX: u8 = b'g';
pub const SUBSPACE_TRACE: u8 = b'o'; pub const SUBSPACE_TELEMETRY_SPAN: u8 = b'o';
pub const SUBSPACE_TRACE_INDEX: u8 = b'w'; pub const SUBSPACE_TELEMETRY_INDEX: u8 = b'w';
pub const SUBSPACE_TELEMETRY_METRIC: u8 = b'x';
pub const SUBSPACE_RESERVED_1: u8 = b'x'; pub const SUBSPACE_RESERVED_1: u8 = b'y';
pub const SUBSPACE_RESERVED_2: u8 = b'y'; pub const SUBSPACE_RESERVED_2: u8 = b'z';
pub const SUBSPACE_RESERVED_3: u8 = b'z';
#[derive(Clone)] #[derive(Clone)]
pub struct IterateParams<T: Key> { pub struct IterateParams<T: Key> {

View file

@ -13,13 +13,13 @@ use crate::{
SUBSPACE_BLOB_RESERVE, SUBSPACE_COUNTER, SUBSPACE_DIRECTORY, SUBSPACE_FTS_INDEX, SUBSPACE_BLOB_RESERVE, SUBSPACE_COUNTER, SUBSPACE_DIRECTORY, SUBSPACE_FTS_INDEX,
SUBSPACE_FTS_QUEUE, SUBSPACE_INDEXES, SUBSPACE_LOGS, SUBSPACE_LOOKUP_VALUE, SUBSPACE_PROPERTY, SUBSPACE_FTS_QUEUE, SUBSPACE_INDEXES, SUBSPACE_LOGS, SUBSPACE_LOOKUP_VALUE, SUBSPACE_PROPERTY,
SUBSPACE_QUEUE_EVENT, SUBSPACE_QUEUE_MESSAGE, SUBSPACE_QUOTA, SUBSPACE_REPORT_IN, SUBSPACE_QUEUE_EVENT, SUBSPACE_QUEUE_MESSAGE, SUBSPACE_QUOTA, SUBSPACE_REPORT_IN,
SUBSPACE_REPORT_OUT, SUBSPACE_SETTINGS, SUBSPACE_TRACE, SUBSPACE_TRACE_INDEX, U32_LEN, U64_LEN, SUBSPACE_REPORT_OUT, SUBSPACE_SETTINGS, SUBSPACE_TELEMETRY_INDEX, SUBSPACE_TELEMETRY_METRIC,
WITH_SUBSPACE, SUBSPACE_TELEMETRY_SPAN, U32_LEN, U64_LEN, WITH_SUBSPACE,
}; };
use super::{ use super::{
AnyKey, AssignedIds, BitmapClass, BlobOp, DirectoryClass, LookupClass, QueueClass, ReportClass, AnyKey, AssignedIds, BitmapClass, BlobOp, DirectoryClass, LookupClass, QueueClass, ReportClass,
ReportEvent, ResolveId, TagValue, TraceClass, ValueClass, ReportEvent, ResolveId, TagValue, TelemetryClass, ValueClass,
}; };
pub struct KeySerializer { pub struct KeySerializer {
@ -366,11 +366,19 @@ impl<T: ResolveId> ValueClass<T> {
serializer.write(2u8).write(*expires).write(*id) serializer.write(2u8).write(*expires).write(*id)
} }
}, },
ValueClass::Trace(trace) => match trace { ValueClass::Telemetry(telemetry) => match telemetry {
TraceClass::Span { span_id } => serializer.write(*span_id), TelemetryClass::Span { span_id } => serializer.write(*span_id),
TraceClass::Index { span_id, value } => { TelemetryClass::Index { span_id, value } => {
serializer.write(value.as_slice()).write(*span_id) serializer.write(value.as_slice()).write(*span_id)
} }
TelemetryClass::Metric {
timestamp,
metric_id,
node_id,
} => serializer
.write(*timestamp)
.write_leb128(*metric_id)
.write_leb128(*node_id),
}, },
ValueClass::Any(any) => serializer.write(any.key.as_slice()), ValueClass::Any(any) => serializer.write(any.key.as_slice()),
} }
@ -550,9 +558,10 @@ impl<T> ValueClass<T> {
QueueClass::QuotaCount(v) | QueueClass::QuotaSize(v) => v.len(), QueueClass::QuotaCount(v) | QueueClass::QuotaSize(v) => v.len(),
}, },
ValueClass::Report(_) => U64_LEN * 2 + 1, ValueClass::Report(_) => U64_LEN * 2 + 1,
ValueClass::Trace(trace) => match trace { ValueClass::Telemetry(telemetry) => match telemetry {
TraceClass::Span { .. } => U64_LEN + 1, TelemetryClass::Span { .. } => U64_LEN + 1,
TraceClass::Index { value, .. } => U64_LEN + value.len() + 1, TelemetryClass::Index { value, .. } => U64_LEN + value.len() + 1,
TelemetryClass::Metric { .. } => U64_LEN * 2 + 1,
}, },
ValueClass::Any(v) => v.key.len(), ValueClass::Any(v) => v.key.len(),
} }
@ -595,9 +604,10 @@ impl<T> ValueClass<T> {
QueueClass::QuotaCount(_) | QueueClass::QuotaSize(_) => SUBSPACE_QUOTA, QueueClass::QuotaCount(_) | QueueClass::QuotaSize(_) => SUBSPACE_QUOTA,
}, },
ValueClass::Report(_) => SUBSPACE_REPORT_IN, ValueClass::Report(_) => SUBSPACE_REPORT_IN,
ValueClass::Trace(trace) => match trace { ValueClass::Telemetry(telemetry) => match telemetry {
TraceClass::Span { .. } => SUBSPACE_TRACE, TelemetryClass::Span { .. } => SUBSPACE_TELEMETRY_SPAN,
TraceClass::Index { .. } => SUBSPACE_TRACE_INDEX, TelemetryClass::Index { .. } => SUBSPACE_TELEMETRY_INDEX,
TelemetryClass::Metric { .. } => SUBSPACE_TELEMETRY_METRIC,
}, },
ValueClass::Any(any) => any.subspace, ValueClass::Any(any) => any.subspace,
} }

View file

@ -153,7 +153,7 @@ pub enum ValueClass<T> {
Config(Vec<u8>), Config(Vec<u8>),
Queue(QueueClass), Queue(QueueClass),
Report(ReportClass), Report(ReportClass),
Trace(TraceClass), Telemetry(TelemetryClass),
Any(AnyClass), Any(AnyClass),
} }
@ -206,9 +206,19 @@ pub enum ReportClass {
} }
#[derive(Debug, PartialEq, Clone, Eq, Hash)] #[derive(Debug, PartialEq, Clone, Eq, Hash)]
pub enum TraceClass { pub enum TelemetryClass {
Span { span_id: u64 }, Span {
Index { span_id: u64, value: Vec<u8> }, span_id: u64,
},
Metric {
timestamp: u64,
metric_id: u64,
node_id: u64,
},
Index {
span_id: u64,
value: Vec<u8>,
},
} }
#[derive(Debug, PartialEq, Clone, Eq, Hash)] #[derive(Debug, PartialEq, Clone, Eq, Hash)]

View file

@ -6,19 +6,17 @@
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use crate::MetricType;
pub struct AtomicGauge { pub struct AtomicGauge {
id: &'static str, id: MetricType,
description: &'static str,
unit: &'static str,
value: AtomicU64, value: AtomicU64,
} }
impl AtomicGauge { impl AtomicGauge {
pub const fn new(id: &'static str, description: &'static str, unit: &'static str) -> Self { pub const fn new(id: MetricType) -> Self {
Self { Self {
id, id,
description,
unit,
value: AtomicU64::new(0), value: AtomicU64::new(0),
} }
} }
@ -53,15 +51,7 @@ impl AtomicGauge {
self.value.fetch_sub(value, Ordering::Relaxed); self.value.fetch_sub(value, Ordering::Relaxed);
} }
pub fn id(&self) -> &'static str { pub fn id(&self) -> MetricType {
self.id self.id
} }
pub fn description(&self) -> &'static str {
self.description
}
pub fn unit(&self) -> &'static str {
self.unit
}
} }

View file

@ -6,12 +6,12 @@
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use crate::MetricType;
use super::array::AtomicU64Array; use super::array::AtomicU64Array;
pub struct AtomicHistogram<const N: usize> { pub struct AtomicHistogram<const N: usize> {
id: &'static str, id: MetricType,
description: &'static str,
unit: &'static str,
buckets: AtomicU64Array<N>, buckets: AtomicU64Array<N>,
upper_bounds: [u64; N], upper_bounds: [u64; N],
sum: AtomicU64, sum: AtomicU64,
@ -21,12 +21,7 @@ pub struct AtomicHistogram<const N: usize> {
} }
impl<const N: usize> AtomicHistogram<N> { impl<const N: usize> AtomicHistogram<N> {
pub const fn new( pub const fn new(id: MetricType, upper_bounds: [u64; N]) -> Self {
id: &'static str,
description: &'static str,
unit: &'static str,
upper_bounds: [u64; N],
) -> Self {
Self { Self {
buckets: AtomicU64Array::new(), buckets: AtomicU64Array::new(),
upper_bounds, upper_bounds,
@ -35,8 +30,6 @@ impl<const N: usize> AtomicHistogram<N> {
min: AtomicU64::new(u64::MAX), min: AtomicU64::new(u64::MAX),
max: AtomicU64::new(0), max: AtomicU64::new(0),
id, id,
description,
unit,
} }
} }
@ -56,18 +49,10 @@ impl<const N: usize> AtomicHistogram<N> {
unreachable!() unreachable!()
} }
pub fn id(&self) -> &'static str { pub fn id(&self) -> MetricType {
self.id self.id
} }
pub fn description(&self) -> &'static str {
self.description
}
pub fn unit(&self) -> &'static str {
self.unit
}
pub fn sum(&self) -> u64 { pub fn sum(&self) -> u64 {
self.sum.load(Ordering::Relaxed) self.sum.load(Ordering::Relaxed)
} }
@ -129,14 +114,9 @@ impl<const N: usize> AtomicHistogram<N> {
self.count.load(Ordering::Relaxed) > 0 self.count.load(Ordering::Relaxed) > 0
} }
pub const fn new_message_sizes( pub const fn new_message_sizes(id: MetricType) -> AtomicHistogram<12> {
id: &'static str,
description: &'static str,
) -> AtomicHistogram<12> {
AtomicHistogram::new( AtomicHistogram::new(
id, id,
description,
"bytes",
[ [
500, // 500 bytes 500, // 500 bytes
1_000, // 1 KB 1_000, // 1 KB
@ -154,14 +134,9 @@ impl<const N: usize> AtomicHistogram<N> {
) )
} }
pub const fn new_short_durations( pub const fn new_short_durations(id: MetricType) -> AtomicHistogram<12> {
id: &'static str,
description: &'static str,
) -> AtomicHistogram<12> {
AtomicHistogram::new( AtomicHistogram::new(
id, id,
description,
"milliseconds",
[ [
5, // 5 milliseconds 5, // 5 milliseconds
10, // 10 milliseconds 10, // 10 milliseconds
@ -179,14 +154,9 @@ impl<const N: usize> AtomicHistogram<N> {
) )
} }
pub const fn new_medium_durations( pub const fn new_medium_durations(id: MetricType) -> AtomicHistogram<12> {
id: &'static str,
description: &'static str,
) -> AtomicHistogram<12> {
AtomicHistogram::new( AtomicHistogram::new(
id, id,
description,
"milliseconds",
[ [
250, 250,
500, 500,
@ -204,14 +174,9 @@ impl<const N: usize> AtomicHistogram<N> {
) )
} }
pub const fn new_long_durations( pub const fn new_long_durations(id: MetricType) -> AtomicHistogram<12> {
id: &'static str,
description: &'static str,
) -> AtomicHistogram<12> {
AtomicHistogram::new( AtomicHistogram::new(
id, id,
description,
"milliseconds",
[ [
1_000, // 1 second 1_000, // 1 second
30_000, // 30 seconds 30_000, // 30 seconds

View file

@ -0,0 +1,191 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use super::MetricType;
impl MetricType {
pub fn name(&self) -> &'static str {
match self {
Self::MessageIngestionTime => "message.ingestion-time",
Self::MessageFtsIndexTime => "message.fts-index-time",
Self::DeliveryTotalTime => "delivery.total-time",
Self::DeliveryTime => "delivery.attempt-time",
Self::MessageSize => "message.size",
Self::MessageAuthSize => "message.authenticated-size",
Self::ReportOutgoingSize => "outgoing-report.size",
Self::StoreReadTime => "store.data-read-time",
Self::StoreWriteTime => "store.data-write-time",
Self::BlobReadTime => "store.blob-read-time",
Self::BlobWriteTime => "store.blob-write-time",
Self::DnsLookupTime => "dns.lookup-time",
Self::HttpRequestTime => "http.request-time",
Self::ImapRequestTime => "imap.request-time",
Self::Pop3RequestTime => "pop3.request-time",
Self::SmtpRequestTime => "smtp.request-time",
Self::SieveRequestTime => "sieve.request-time",
Self::HttpActiveConnections => "http.active-connections",
Self::ImapActiveConnections => "imap.active-connections",
Self::Pop3ActiveConnections => "pop3.active-connections",
Self::SmtpActiveConnections => "smtp.active-connections",
Self::SieveActiveConnections => "sieve.active-connections",
Self::DeliveryActiveConnections => "delivery.active-connections",
Self::ServerMemory => "server.memory",
Self::QueueCount => "queue.count",
}
}
pub fn description(&self) -> &'static str {
match self {
Self::MessageIngestionTime => "Message ingestion time",
Self::MessageFtsIndexTime => "Message full-text indexing time",
Self::DeliveryTotalTime => "Total message delivery time from submission to delivery",
Self::DeliveryTime => "Message delivery time",
Self::MessageSize => "Received message size",
Self::MessageAuthSize => "Received message size from authenticated users",
Self::ReportOutgoingSize => "Outgoing report size",
Self::StoreReadTime => "Data store read time",
Self::StoreWriteTime => "Data store write time",
Self::BlobReadTime => "Blob store read time",
Self::BlobWriteTime => "Blob store write time",
Self::DnsLookupTime => "DNS lookup time",
Self::HttpRequestTime => "HTTP request duration",
Self::ImapRequestTime => "IMAP request duration",
Self::Pop3RequestTime => "POP3 request duration",
Self::SmtpRequestTime => "SMTP request duration",
Self::SieveRequestTime => "ManageSieve request duration",
Self::HttpActiveConnections => "Active HTTP connections",
Self::ImapActiveConnections => "Active IMAP connections",
Self::Pop3ActiveConnections => "Active POP3 connections",
Self::SmtpActiveConnections => "Active SMTP connections",
Self::SieveActiveConnections => "Active ManageSieve connections",
Self::DeliveryActiveConnections => "Active delivery connections",
Self::ServerMemory => "Server memory usage",
Self::QueueCount => "Total number of messages in the queue",
}
}
pub fn unit(&self) -> &'static str {
match self {
Self::MessageIngestionTime
| Self::MessageFtsIndexTime
| Self::DeliveryTotalTime
| Self::DeliveryTime
| Self::StoreReadTime
| Self::StoreWriteTime
| Self::BlobReadTime
| Self::BlobWriteTime
| Self::DnsLookupTime
| Self::HttpRequestTime
| Self::ImapRequestTime
| Self::Pop3RequestTime
| Self::SmtpRequestTime
| Self::SieveRequestTime => "milliseconds",
Self::MessageSize
| Self::MessageAuthSize
| Self::ReportOutgoingSize
| Self::ServerMemory => "bytes",
Self::HttpActiveConnections
| Self::ImapActiveConnections
| Self::Pop3ActiveConnections
| Self::SmtpActiveConnections
| Self::SieveActiveConnections
| Self::DeliveryActiveConnections => "connections",
Self::QueueCount => "messages",
}
}
pub fn code(&self) -> u64 {
match self {
Self::MessageIngestionTime => 0,
Self::MessageFtsIndexTime => 1,
Self::DeliveryTotalTime => 2,
Self::DeliveryTime => 3,
Self::MessageSize => 4,
Self::MessageAuthSize => 5,
Self::ReportOutgoingSize => 6,
Self::StoreReadTime => 7,
Self::StoreWriteTime => 8,
Self::BlobReadTime => 9,
Self::BlobWriteTime => 10,
Self::DnsLookupTime => 11,
Self::HttpRequestTime => 12,
Self::ImapRequestTime => 13,
Self::Pop3RequestTime => 14,
Self::SmtpRequestTime => 15,
Self::SieveRequestTime => 16,
Self::HttpActiveConnections => 17,
Self::ImapActiveConnections => 18,
Self::Pop3ActiveConnections => 19,
Self::SmtpActiveConnections => 20,
Self::SieveActiveConnections => 21,
Self::DeliveryActiveConnections => 22,
Self::ServerMemory => 23,
Self::QueueCount => 24,
}
}
pub fn from_code(code: u64) -> Option<Self> {
match code {
0 => Some(Self::MessageIngestionTime),
1 => Some(Self::MessageFtsIndexTime),
2 => Some(Self::DeliveryTotalTime),
3 => Some(Self::DeliveryTime),
4 => Some(Self::MessageSize),
5 => Some(Self::MessageAuthSize),
6 => Some(Self::ReportOutgoingSize),
7 => Some(Self::StoreReadTime),
8 => Some(Self::StoreWriteTime),
9 => Some(Self::BlobReadTime),
10 => Some(Self::BlobWriteTime),
11 => Some(Self::DnsLookupTime),
12 => Some(Self::HttpRequestTime),
13 => Some(Self::ImapRequestTime),
14 => Some(Self::Pop3RequestTime),
15 => Some(Self::SmtpRequestTime),
16 => Some(Self::SieveRequestTime),
17 => Some(Self::HttpActiveConnections),
18 => Some(Self::ImapActiveConnections),
19 => Some(Self::Pop3ActiveConnections),
20 => Some(Self::SmtpActiveConnections),
21 => Some(Self::SieveActiveConnections),
22 => Some(Self::DeliveryActiveConnections),
23 => Some(Self::ServerMemory),
24 => Some(Self::QueueCount),
_ => None,
}
}
pub fn try_parse(name: &str) -> Option<Self> {
match name {
"message.ingestion-time" => Some(Self::MessageIngestionTime),
"message.fts-index-time" => Some(Self::MessageFtsIndexTime),
"delivery.total-time" => Some(Self::DeliveryTotalTime),
"delivery.attempt-time" => Some(Self::DeliveryTime),
"message.size" => Some(Self::MessageSize),
"message.authenticated-size" => Some(Self::MessageAuthSize),
"outgoing-report.size" => Some(Self::ReportOutgoingSize),
"store.data-read-time" => Some(Self::StoreReadTime),
"store.data-write-time" => Some(Self::StoreWriteTime),
"store.blob-read-time" => Some(Self::BlobReadTime),
"store.blob-write-time" => Some(Self::BlobWriteTime),
"dns.lookup-time" => Some(Self::DnsLookupTime),
"http.request-time" => Some(Self::HttpRequestTime),
"imap.request-time" => Some(Self::ImapRequestTime),
"pop3.request-time" => Some(Self::Pop3RequestTime),
"smtp.request-time" => Some(Self::SmtpRequestTime),
"sieve.request-time" => Some(Self::SieveRequestTime),
"http.active-connections" => Some(Self::HttpActiveConnections),
"imap.active-connections" => Some(Self::ImapActiveConnections),
"pop3.active-connections" => Some(Self::Pop3ActiveConnections),
"smtp.active-connections" => Some(Self::SmtpActiveConnections),
"sieve.active-connections" => Some(Self::SieveActiveConnections),
"delivery.active-connections" => Some(Self::DeliveryActiveConnections),
"server.memory" => Some(Self::ServerMemory),
"queue.count" => Some(Self::QueueCount),
_ => None,
}
}
}

View file

@ -7,6 +7,7 @@
pub mod conv; pub mod conv;
pub mod description; pub mod description;
pub mod level; pub mod level;
pub mod metrics;
use std::{borrow::Cow, fmt::Display}; use std::{borrow::Cow, fmt::Display};

View file

@ -6,9 +6,7 @@
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use atomics::{ use atomics::{array::AtomicU32Array, gauge::AtomicGauge, histogram::AtomicHistogram};
array::AtomicU32Array, counter::AtomicCounter, gauge::AtomicGauge, histogram::AtomicHistogram,
};
use ipc::{ use ipc::{
collector::{Collector, GlobalInterests, EVENT_TYPES}, collector::{Collector, GlobalInterests, EVENT_TYPES},
subscriber::Interests, subscriber::Interests,
@ -22,38 +20,33 @@ static EVENT_COUNTERS: AtomicU32Array<TOTAL_EVENT_COUNT> = AtomicU32Array::new()
static CONNECTION_METRICS: [ConnectionMetrics; TOTAL_CONN_TYPES] = init_conn_metrics(); static CONNECTION_METRICS: [ConnectionMetrics; TOTAL_CONN_TYPES] = init_conn_metrics();
static MESSAGE_INGESTION_TIME: AtomicHistogram<12> = static MESSAGE_INGESTION_TIME: AtomicHistogram<12> =
AtomicHistogram::<10>::new_short_durations("message.ingestion-time", "Message ingestion time"); AtomicHistogram::<10>::new_short_durations(MetricType::MessageIngestionTime);
static MESSAGE_INDEX_TIME: AtomicHistogram<12> = AtomicHistogram::<10>::new_short_durations( static MESSAGE_INDEX_TIME: AtomicHistogram<12> =
"message.fts-index-time", AtomicHistogram::<10>::new_short_durations(MetricType::MessageFtsIndexTime);
"Message full-text indexing time", static MESSAGE_DELIVERY_TIME: AtomicHistogram<12> =
); AtomicHistogram::<18>::new_long_durations(MetricType::DeliveryTotalTime);
static MESSAGE_DELIVERY_TIME: AtomicHistogram<12> = AtomicHistogram::<18>::new_long_durations(
"message.outgoing-delivery-time",
"Total message delivery time from submission to delivery",
);
static MESSAGE_INCOMING_SIZE: AtomicHistogram<12> = static MESSAGE_INCOMING_SIZE: AtomicHistogram<12> =
AtomicHistogram::<12>::new_message_sizes("message.incoming-size", "Received message size"); AtomicHistogram::<12>::new_message_sizes(MetricType::MessageSize);
static MESSAGE_SUBMISSION_SIZE: AtomicHistogram<12> = AtomicHistogram::<12>::new_message_sizes( static MESSAGE_SUBMISSION_SIZE: AtomicHistogram<12> =
"message.incoming-submission-size", AtomicHistogram::<12>::new_message_sizes(MetricType::MessageAuthSize);
"Received message size from authenticated users", static MESSAGE_OUT_REPORT_SIZE: AtomicHistogram<12> =
); AtomicHistogram::<12>::new_message_sizes(MetricType::ReportOutgoingSize);
static MESSAGE_OUT_REPORT_SIZE: AtomicHistogram<12> = AtomicHistogram::<12>::new_message_sizes(
"message.outgoing-report-size",
"Outgoing report size",
);
static STORE_DATA_READ_TIME: AtomicHistogram<12> = static STORE_DATA_READ_TIME: AtomicHistogram<12> =
AtomicHistogram::<10>::new_short_durations("store.data-read-time", "Data store read time"); AtomicHistogram::<10>::new_short_durations(MetricType::StoreReadTime);
static STORE_DATA_WRITE_TIME: AtomicHistogram<12> = static STORE_DATA_WRITE_TIME: AtomicHistogram<12> =
AtomicHistogram::<10>::new_short_durations("store.data-write-time", "Data store write time"); AtomicHistogram::<10>::new_short_durations(MetricType::StoreWriteTime);
static STORE_BLOB_READ_TIME: AtomicHistogram<12> = static STORE_BLOB_READ_TIME: AtomicHistogram<12> =
AtomicHistogram::<10>::new_short_durations("store.blob-read-time", "Blob store read time"); AtomicHistogram::<10>::new_short_durations(MetricType::BlobReadTime);
static STORE_BLOB_WRITE_TIME: AtomicHistogram<12> = static STORE_BLOB_WRITE_TIME: AtomicHistogram<12> =
AtomicHistogram::<10>::new_short_durations("store.blob-write-time", "Blob store write time"); AtomicHistogram::<10>::new_short_durations(MetricType::BlobWriteTime);
static DNS_LOOKUP_TIME: AtomicHistogram<12> = static DNS_LOOKUP_TIME: AtomicHistogram<12> =
AtomicHistogram::<10>::new_short_durations("dns.lookup-time", "DNS lookup time"); AtomicHistogram::<10>::new_short_durations(MetricType::DnsLookupTime);
static SERVER_MEMORY: AtomicGauge = AtomicGauge::new(MetricType::ServerMemory);
static QUEUE_COUNT: AtomicGauge = AtomicGauge::new(MetricType::QueueCount);
const CONN_SMTP_IN: usize = 0; const CONN_SMTP_IN: usize = 0;
const CONN_SMTP_OUT: usize = 1; const CONN_SMTP_OUT: usize = 1;
@ -64,16 +57,12 @@ const CONN_SIEVE: usize = 5;
const TOTAL_CONN_TYPES: usize = 6; const TOTAL_CONN_TYPES: usize = 6;
pub struct ConnectionMetrics { pub struct ConnectionMetrics {
pub total_connections: AtomicCounter,
pub active_connections: AtomicGauge, pub active_connections: AtomicGauge,
pub bytes_sent: AtomicCounter,
pub bytes_received: AtomicCounter,
pub elapsed: AtomicHistogram<12>, pub elapsed: AtomicHistogram<12>,
} }
pub struct EventCounter { pub struct EventCounter {
id: &'static str, id: EventType,
description: &'static str,
value: u32, value: u32,
} }
@ -98,7 +87,6 @@ impl Collector {
match event { match event {
EventType::Smtp(SmtpEvent::ConnectionStart) => { EventType::Smtp(SmtpEvent::ConnectionStart) => {
let conn = &CONNECTION_METRICS[CONN_SMTP_IN]; let conn = &CONNECTION_METRICS[CONN_SMTP_IN];
conn.total_connections.increment();
conn.active_connections.increment(); conn.active_connections.increment();
} }
EventType::Smtp(SmtpEvent::ConnectionEnd) => { EventType::Smtp(SmtpEvent::ConnectionEnd) => {
@ -108,7 +96,6 @@ impl Collector {
} }
EventType::Imap(ImapEvent::ConnectionStart) => { EventType::Imap(ImapEvent::ConnectionStart) => {
let conn = &CONNECTION_METRICS[CONN_IMAP]; let conn = &CONNECTION_METRICS[CONN_IMAP];
conn.total_connections.increment();
conn.active_connections.increment(); conn.active_connections.increment();
} }
EventType::Imap(ImapEvent::ConnectionEnd) => { EventType::Imap(ImapEvent::ConnectionEnd) => {
@ -118,7 +105,6 @@ impl Collector {
} }
EventType::Pop3(Pop3Event::ConnectionStart) => { EventType::Pop3(Pop3Event::ConnectionStart) => {
let conn = &CONNECTION_METRICS[CONN_POP3]; let conn = &CONNECTION_METRICS[CONN_POP3];
conn.total_connections.increment();
conn.active_connections.increment(); conn.active_connections.increment();
} }
EventType::Pop3(Pop3Event::ConnectionEnd) => { EventType::Pop3(Pop3Event::ConnectionEnd) => {
@ -128,7 +114,6 @@ impl Collector {
} }
EventType::Http(HttpEvent::ConnectionStart) => { EventType::Http(HttpEvent::ConnectionStart) => {
let conn = &CONNECTION_METRICS[CONN_HTTP]; let conn = &CONNECTION_METRICS[CONN_HTTP];
conn.total_connections.increment();
conn.active_connections.increment(); conn.active_connections.increment();
} }
EventType::Http(HttpEvent::ConnectionEnd) => { EventType::Http(HttpEvent::ConnectionEnd) => {
@ -138,7 +123,6 @@ impl Collector {
} }
EventType::ManageSieve(ManageSieveEvent::ConnectionStart) => { EventType::ManageSieve(ManageSieveEvent::ConnectionStart) => {
let conn = &CONNECTION_METRICS[CONN_SIEVE]; let conn = &CONNECTION_METRICS[CONN_SIEVE];
conn.total_connections.increment();
conn.active_connections.increment(); conn.active_connections.increment();
} }
EventType::ManageSieve(ManageSieveEvent::ConnectionEnd) => { EventType::ManageSieve(ManageSieveEvent::ConnectionEnd) => {
@ -148,7 +132,6 @@ impl Collector {
} }
EventType::Delivery(DeliveryEvent::AttemptStart) => { EventType::Delivery(DeliveryEvent::AttemptStart) => {
let conn = &CONNECTION_METRICS[CONN_SMTP_OUT]; let conn = &CONNECTION_METRICS[CONN_SMTP_OUT];
conn.total_connections.increment();
conn.active_connections.increment(); conn.active_connections.increment();
} }
EventType::Delivery(DeliveryEvent::AttemptEnd) => { EventType::Delivery(DeliveryEvent::AttemptEnd) => {
@ -157,60 +140,9 @@ impl Collector {
conn.elapsed.observe(elapsed); conn.elapsed.observe(elapsed);
} }
EventType::Delivery(DeliveryEvent::Completed) => { EventType::Delivery(DeliveryEvent::Completed) => {
QUEUE_COUNT.decrement();
MESSAGE_DELIVERY_TIME.observe(elapsed); MESSAGE_DELIVERY_TIME.observe(elapsed);
} }
EventType::Smtp(SmtpEvent::RawInput) => {
CONNECTION_METRICS[CONN_SMTP_IN]
.bytes_received
.increment_by(size);
}
EventType::Smtp(SmtpEvent::RawOutput) => {
CONNECTION_METRICS[CONN_SMTP_IN]
.bytes_sent
.increment_by(size);
}
EventType::Imap(ImapEvent::RawInput) => {
CONNECTION_METRICS[CONN_IMAP]
.bytes_received
.increment_by(size);
}
EventType::Imap(ImapEvent::RawOutput) => {
CONNECTION_METRICS[CONN_IMAP].bytes_sent.increment_by(size);
}
EventType::Http(HttpEvent::RequestBody) => {
CONNECTION_METRICS[CONN_HTTP]
.bytes_received
.increment_by(size);
}
EventType::Http(HttpEvent::ResponseBody) => {
CONNECTION_METRICS[CONN_HTTP].bytes_sent.increment_by(size);
}
EventType::Pop3(Pop3Event::RawInput) => {
CONNECTION_METRICS[CONN_POP3]
.bytes_received
.increment_by(size);
}
EventType::Pop3(Pop3Event::RawOutput) => {
CONNECTION_METRICS[CONN_POP3].bytes_sent.increment_by(size);
}
EventType::ManageSieve(ManageSieveEvent::RawInput) => {
CONNECTION_METRICS[CONN_SIEVE]
.bytes_received
.increment_by(size);
}
EventType::ManageSieve(ManageSieveEvent::RawOutput) => {
CONNECTION_METRICS[CONN_SIEVE].bytes_sent.increment_by(size);
}
EventType::Delivery(DeliveryEvent::RawInput) => {
CONNECTION_METRICS[CONN_SMTP_OUT]
.bytes_received
.increment_by(size);
}
EventType::Delivery(DeliveryEvent::RawOutput) => {
CONNECTION_METRICS[CONN_SMTP_OUT]
.bytes_sent
.increment_by(size);
}
EventType::Delivery( EventType::Delivery(
DeliveryEvent::MxLookup | DeliveryEvent::IpLookup | DeliveryEvent::NullMx, DeliveryEvent::MxLookup | DeliveryEvent::IpLookup | DeliveryEvent::NullMx,
) )
@ -231,12 +163,18 @@ impl Collector {
} }
EventType::Queue(QueueEvent::QueueMessage) => { EventType::Queue(QueueEvent::QueueMessage) => {
MESSAGE_INCOMING_SIZE.observe(size); MESSAGE_INCOMING_SIZE.observe(size);
QUEUE_COUNT.increment();
} }
EventType::Queue(QueueEvent::QueueMessageAuthenticated) => { EventType::Queue(QueueEvent::QueueMessageAuthenticated) => {
MESSAGE_SUBMISSION_SIZE.observe(size); MESSAGE_SUBMISSION_SIZE.observe(size);
QUEUE_COUNT.increment();
} }
EventType::Queue(QueueEvent::QueueReport) => { EventType::Queue(QueueEvent::QueueReport) => {
MESSAGE_OUT_REPORT_SIZE.observe(size); MESSAGE_OUT_REPORT_SIZE.observe(size);
QUEUE_COUNT.increment();
}
EventType::Queue(QueueEvent::QueueAutogenerated | QueueEvent::QueueDsn) => {
QUEUE_COUNT.increment();
} }
EventType::FtsIndex(FtsIndexEvent::Index) => { EventType::FtsIndex(FtsIndexEvent::Index) => {
MESSAGE_INDEX_TIME.observe(elapsed); MESSAGE_INDEX_TIME.observe(elapsed);
@ -267,7 +205,7 @@ impl Collector {
METRIC_INTERESTS.update(interests); METRIC_INTERESTS.update(interests);
} }
pub fn collect_event_counters(_is_enterprise: bool) -> impl Iterator<Item = EventCounter> { pub fn collect_counters(_is_enterprise: bool) -> impl Iterator<Item = EventCounter> {
EVENT_COUNTERS EVENT_COUNTERS
.inner() .inner()
.iter() .iter()
@ -275,11 +213,8 @@ impl Collector {
.filter_map(|(event_id, value)| { .filter_map(|(event_id, value)| {
let value = value.load(Ordering::Relaxed); let value = value.load(Ordering::Relaxed);
if value > 0 { if value > 0 {
let event = EVENT_TYPES[event_id];
Some(EventCounter { Some(EventCounter {
id: event.name(), id: EVENT_TYPES[event_id],
description: event.description(),
value, value,
}) })
} else { } else {
@ -288,15 +223,14 @@ impl Collector {
}) })
} }
pub fn collect_counters(_is_enterprise: bool) -> impl Iterator<Item = &'static AtomicCounter> { pub fn collect_gauges(is_enterprise: bool) -> impl Iterator<Item = &'static AtomicGauge> {
CONNECTION_METRICS static E_GAUGES: &[&AtomicGauge] = &[&SERVER_MEMORY, &QUEUE_COUNT];
.iter() static C_GAUGES: &[&AtomicGauge] = &[&SERVER_MEMORY];
.flat_map(|m| [&m.total_connections, &m.bytes_sent, &m.bytes_received])
.filter(|c| c.is_active())
}
pub fn collect_gauges(_is_enterprise: bool) -> impl Iterator<Item = &'static AtomicGauge> { if is_enterprise { E_GAUGES } else { C_GAUGES }
CONNECTION_METRICS.iter().map(|m| &m.active_connections) .iter()
.copied()
.chain(CONNECTION_METRICS.iter().map(|m| &m.active_connections))
} }
pub fn collect_histograms( pub fn collect_histograms(
@ -331,17 +265,26 @@ impl Collector {
.chain(CONNECTION_METRICS.iter().map(|m| &m.elapsed)) .chain(CONNECTION_METRICS.iter().map(|m| &m.elapsed))
.filter(|h| h.is_active()) .filter(|h| h.is_active())
} }
#[inline(always)]
pub fn read_event_metric(metric_id: usize) -> u32 {
EVENT_COUNTERS.get(metric_id)
}
pub fn update_gauge(metric_type: MetricType, value: u64) {
match metric_type {
MetricType::ServerMemory => SERVER_MEMORY.set(value),
MetricType::QueueCount => QUEUE_COUNT.set(value),
_ => {}
}
}
} }
impl EventCounter { impl EventCounter {
pub fn id(&self) -> &'static str { pub fn id(&self) -> EventType {
self.id self.id
} }
pub fn description(&self) -> &'static str {
self.description
}
pub fn value(&self) -> u64 { pub fn value(&self) -> u64 {
self.value as u64 self.value as u64
} }
@ -351,11 +294,8 @@ impl ConnectionMetrics {
#[allow(clippy::new_without_default)] #[allow(clippy::new_without_default)]
pub const fn new() -> Self { pub const fn new() -> Self {
Self { Self {
total_connections: AtomicCounter::new("", "", ""), active_connections: AtomicGauge::new(MetricType::BlobReadTime),
active_connections: AtomicGauge::new("", "", ""), elapsed: AtomicHistogram::<18>::new_medium_durations(MetricType::BlobReadTime),
bytes_sent: AtomicCounter::new("", "", ""),
bytes_received: AtomicCounter::new("", "", ""),
elapsed: AtomicHistogram::<18>::new_medium_durations("", ""),
} }
} }
} }
@ -366,131 +306,37 @@ const fn init_conn_metrics() -> [ConnectionMetrics; TOTAL_CONN_TYPES] {
let mut array = [INIT; TOTAL_CONN_TYPES]; let mut array = [INIT; TOTAL_CONN_TYPES];
let mut i = 0; let mut i = 0;
while i < TOTAL_CONN_TYPES { while i < TOTAL_CONN_TYPES {
let text = match i { let metric = match i {
CONN_HTTP => &[ CONN_HTTP => &[
("http.total-connections", "Total HTTP connections", "number"), MetricType::HttpRequestTime,
( MetricType::HttpActiveConnections,
"http.active-connections",
"Active HTTP connections",
"number",
),
("http.bytes-sent", "Bytes sent over HTTP", "bytes"),
("http.bytes-received", "Bytes received over HTTP", "bytes"),
("http.request-time", "HTTP request duration", "milliseconds"),
], ],
CONN_IMAP => &[ CONN_IMAP => &[
("imap.total-connections", "Total IMAP connections", "number"), MetricType::ImapRequestTime,
( MetricType::ImapActiveConnections,
"imap.active-connections",
"Active IMAP connections",
"number",
),
("imap.bytes-sent", "Bytes sent over IMAP", "bytes"),
("imap.bytes-received", "Bytes received over IMAP", "bytes"),
("imap.request-time", "IMAP request duration", "milliseconds"),
], ],
CONN_POP3 => &[ CONN_POP3 => &[
("pop3.total-connections", "Total POP3 connections", "number"), MetricType::Pop3RequestTime,
( MetricType::Pop3ActiveConnections,
"pop3.active-connections",
"Active POP3 connections",
"number",
),
("pop3.bytes-sent", "Bytes sent over POP3", "bytes"),
("pop3.bytes-received", "Bytes received over POP3", "bytes"),
("pop3.request-time", "POP3 request duration", "milliseconds"),
], ],
CONN_SMTP_IN => &[ CONN_SMTP_IN => &[
( MetricType::SmtpRequestTime,
"smtp-in.total-connections", MetricType::SmtpActiveConnections,
"Total SMTP incoming connections",
"number",
),
(
"smtp-in.active-connections",
"Active SMTP incoming connections",
"number",
),
(
"smtp-in.bytes-sent",
"Bytes sent over SMTP incoming",
"bytes",
),
(
"smtp-in.bytes-received",
"Bytes received over SMTP incoming",
"bytes",
),
(
"smtp-in.request-time",
"SMTP incoming request duration",
"milliseconds",
),
], ],
CONN_SMTP_OUT => &[ CONN_SMTP_OUT => &[
( MetricType::DeliveryTime,
"smtp-out.total-connections", MetricType::DeliveryActiveConnections,
"Total SMTP outgoing connections",
"number",
),
(
"smtp-out.active-connections",
"Active SMTP outgoing connections",
"number",
),
(
"smtp-out.bytes-sent",
"Bytes sent over SMTP outgoing",
"bytes",
),
(
"smtp-out.bytes-received",
"Bytes received over SMTP outgoing",
"bytes",
),
(
"smtp-out.request-time",
"SMTP outgoing request duration",
"milliseconds",
),
], ],
CONN_SIEVE => &[ CONN_SIEVE => &[
( MetricType::SieveRequestTime,
"sieve.total-connections", MetricType::SieveActiveConnections,
"Total ManageSieve connections",
"number",
),
(
"sieve.active-connections",
"Active ManageSieve connections",
"number",
),
("sieve.bytes-sent", "Bytes sent over ManageSieve", "bytes"),
(
"sieve.bytes-received",
"Bytes received over ManageSieve",
"bytes",
),
(
"sieve.request-time",
"ManageSieve request duration",
"milliseconds",
),
],
_ => &[
("", "", ""),
("", "", ""),
("", "", ""),
("", "", ""),
("", "", ""),
], ],
_ => &[MetricType::BlobReadTime, MetricType::BlobReadTime],
}; };
array[i] = ConnectionMetrics { array[i] = ConnectionMetrics {
total_connections: AtomicCounter::new(text[0].0, text[0].1, text[0].2), elapsed: AtomicHistogram::<18>::new_medium_durations(metric[0]),
active_connections: AtomicGauge::new(text[1].0, text[1].1, text[1].2), active_connections: AtomicGauge::new(metric[1]),
bytes_sent: AtomicCounter::new(text[2].0, text[2].1, text[2].2),
bytes_received: AtomicCounter::new(text[3].0, text[3].1, text[3].2),
elapsed: AtomicHistogram::<18>::new_medium_durations(text[4].0, text[4].1),
}; };
i += 1; i += 1;
} }
@ -501,23 +347,15 @@ impl EventType {
pub fn is_metric(&self) -> bool { pub fn is_metric(&self) -> bool {
match self { match self {
EventType::Server(ServerEvent::ThreadError) => true, EventType::Server(ServerEvent::ThreadError) => true,
EventType::Purge( EventType::Purge(PurgeEvent::Error) => true,
PurgeEvent::Started
| PurgeEvent::Error
| PurgeEvent::AutoExpunge
| PurgeEvent::TombstoneCleanup,
) => true,
EventType::Eval( EventType::Eval(
EvalEvent::Error | EvalEvent::StoreNotFound | EvalEvent::DirectoryNotFound, EvalEvent::Error | EvalEvent::StoreNotFound | EvalEvent::DirectoryNotFound,
) => true, ) => true,
EventType::Acme( EventType::Acme(
AcmeEvent::TlsAlpnError AcmeEvent::TlsAlpnError
| AcmeEvent::OrderStart
| AcmeEvent::OrderCompleted | AcmeEvent::OrderCompleted
| AcmeEvent::AuthError | AcmeEvent::AuthError
| AcmeEvent::AuthCompleted
| AcmeEvent::AuthTooManyAttempts | AcmeEvent::AuthTooManyAttempts
| AcmeEvent::DnsRecordCreated
| AcmeEvent::DnsRecordCreationFailed | AcmeEvent::DnsRecordCreationFailed
| AcmeEvent::DnsRecordDeletionFailed | AcmeEvent::DnsRecordDeletionFailed
| AcmeEvent::DnsRecordPropagationTimeout | AcmeEvent::DnsRecordPropagationTimeout
@ -569,17 +407,61 @@ impl EventType {
| JmapEvent::RequestTooLarge | JmapEvent::RequestTooLarge
| JmapEvent::UnknownMethod, | JmapEvent::UnknownMethod,
) => true, ) => true,
EventType::Imap(_) => true, EventType::Imap(ImapEvent::ConnectionStart | ImapEvent::ConnectionEnd) => true,
EventType::ManageSieve(_) => true, EventType::ManageSieve(
EventType::Pop3(_) => true, ManageSieveEvent::ConnectionStart | ManageSieveEvent::ConnectionEnd,
EventType::Smtp(_) => true, ) => true,
EventType::Pop3(Pop3Event::ConnectionStart | Pop3Event::ConnectionEnd) => true,
EventType::Smtp(
SmtpEvent::ConnectionStart
| SmtpEvent::ConnectionEnd
| SmtpEvent::Error
| SmtpEvent::ConcurrencyLimitExceeded
| SmtpEvent::TransferLimitExceeded
| SmtpEvent::RateLimitExceeded
| SmtpEvent::TimeLimitExceeded
| SmtpEvent::MessageParseFailed
| SmtpEvent::MessageTooLarge
| SmtpEvent::LoopDetected
| SmtpEvent::DkimPass
| SmtpEvent::DkimFail
| SmtpEvent::ArcPass
| SmtpEvent::ArcFail
| SmtpEvent::SpfEhloPass
| SmtpEvent::SpfEhloFail
| SmtpEvent::SpfFromPass
| SmtpEvent::SpfFromFail
| SmtpEvent::DmarcPass
| SmtpEvent::DmarcFail
| SmtpEvent::IprevPass
| SmtpEvent::IprevFail
| SmtpEvent::TooManyMessages
| SmtpEvent::InvalidEhlo
| SmtpEvent::DidNotSayEhlo
| SmtpEvent::MailFromUnauthenticated
| SmtpEvent::MailFromUnauthorized
| SmtpEvent::MailFromMissing
| SmtpEvent::MultipleMailFrom
| SmtpEvent::MailboxDoesNotExist
| SmtpEvent::RelayNotAllowed
| SmtpEvent::RcptToDuplicate
| SmtpEvent::RcptToMissing
| SmtpEvent::TooManyRecipients
| SmtpEvent::TooManyInvalidRcpt
| SmtpEvent::AuthMechanismNotSupported
| SmtpEvent::AuthExchangeTooLong
| SmtpEvent::CommandNotImplemented
| SmtpEvent::InvalidCommand
| SmtpEvent::SyntaxError
| SmtpEvent::RequestTooLarge,
) => true,
EventType::Http( EventType::Http(
HttpEvent::Error HttpEvent::Error
| HttpEvent::RequestBody | HttpEvent::RequestBody
| HttpEvent::ResponseBody | HttpEvent::ResponseBody
| HttpEvent::XForwardedMissing, | HttpEvent::XForwardedMissing,
) => true, ) => true,
EventType::Network(_) => true, EventType::Network(NetworkEvent::Timeout | NetworkEvent::DropBlocked) => true,
EventType::Limit(_) => true, EventType::Limit(_) => true,
EventType::Manage(_) => false, EventType::Manage(_) => false,
EventType::Auth( EventType::Auth(
@ -593,15 +475,41 @@ impl EventType {
EventType::Resource( EventType::Resource(
ResourceEvent::NotFound | ResourceEvent::BadParameters | ResourceEvent::Error, ResourceEvent::NotFound | ResourceEvent::BadParameters | ResourceEvent::Error,
) => true, ) => true,
EventType::Arc(_) => true, EventType::Arc(
ArcEvent::ChainTooLong
| ArcEvent::InvalidInstance
| ArcEvent::InvalidCv
| ArcEvent::HasHeaderTag
| ArcEvent::BrokenChain,
) => true,
EventType::Dkim(_) => true, EventType::Dkim(_) => true,
EventType::Dmarc(_) => true, EventType::Dmarc(_) => true,
EventType::Iprev(_) => true, EventType::Iprev(_) => true,
EventType::Dane(_) => true, EventType::Dane(
DaneEvent::AuthenticationSuccess
| DaneEvent::AuthenticationFailure
| DaneEvent::NoCertificatesFound
| DaneEvent::CertificateParseError
| DaneEvent::TlsaRecordFetchError
| DaneEvent::TlsaRecordNotFound
| DaneEvent::TlsaRecordNotDnssecSigned
| DaneEvent::TlsaRecordInvalid,
) => true,
EventType::Spf(_) => true, EventType::Spf(_) => true,
EventType::MailAuth(_) => true, EventType::MailAuth(_) => true,
EventType::Tls(_) => true, EventType::Tls(TlsEvent::HandshakeError) => true,
EventType::Sieve(_) => true, EventType::Sieve(
SieveEvent::ActionAccept
| SieveEvent::ActionAcceptReplace
| SieveEvent::ActionDiscard
| SieveEvent::ActionReject
| SieveEvent::SendMessage
| SieveEvent::MessageTooLarge
| SieveEvent::RuntimeError
| SieveEvent::UnexpectedError
| SieveEvent::NotSupported
| SieveEvent::QuotaExceeded,
) => true,
EventType::Spam( EventType::Spam(
SpamEvent::PyzorError SpamEvent::PyzorError
| SpamEvent::ListUpdated | SpamEvent::ListUpdated
@ -627,9 +535,41 @@ impl EventType {
| FtsIndexEvent::BlobNotFound | FtsIndexEvent::BlobNotFound
| FtsIndexEvent::MetadataNotFound, | FtsIndexEvent::MetadataNotFound,
) => true, ) => true,
EventType::Milter(_) => true, EventType::Milter(
MilterEvent::ActionAccept
| MilterEvent::ActionDiscard
| MilterEvent::ActionReject
| MilterEvent::ActionTempFail
| MilterEvent::ActionReplyCode
| MilterEvent::ActionConnectionFailure
| MilterEvent::ActionShutdown,
) => true,
EventType::MtaHook(_) => true, EventType::MtaHook(_) => true,
EventType::Delivery(_) => true, EventType::Delivery(
DeliveryEvent::AttemptStart
| DeliveryEvent::AttemptEnd
| DeliveryEvent::MxLookupFailed
| DeliveryEvent::IpLookupFailed
| DeliveryEvent::NullMx
| DeliveryEvent::GreetingFailed
| DeliveryEvent::EhloRejected
| DeliveryEvent::AuthFailed
| DeliveryEvent::MailFromRejected
| DeliveryEvent::Delivered
| DeliveryEvent::RcptToRejected
| DeliveryEvent::RcptToFailed
| DeliveryEvent::MessageRejected
| DeliveryEvent::StartTlsUnavailable
| DeliveryEvent::StartTlsError
| DeliveryEvent::StartTlsDisabled
| DeliveryEvent::ImplicitTlsError
| DeliveryEvent::ConcurrencyLimitExceeded
| DeliveryEvent::RateLimitExceeded
| DeliveryEvent::DoubleBounce
| DeliveryEvent::DsnSuccess
| DeliveryEvent::DsnTempFail
| DeliveryEvent::DsnPermFail,
) => true,
EventType::Queue( EventType::Queue(
QueueEvent::QueueMessage QueueEvent::QueueMessage
| QueueEvent::QueueMessageAuthenticated | QueueEvent::QueueMessageAuthenticated
@ -642,8 +582,10 @@ impl EventType {
| QueueEvent::ConcurrencyLimitExceeded | QueueEvent::ConcurrencyLimitExceeded
| QueueEvent::QuotaExceeded, | QueueEvent::QuotaExceeded,
) => true, ) => true,
EventType::TlsRpt(_) => true, EventType::TlsRpt(_) => false,
EventType::MtaSts(_) => true, EventType::MtaSts(
MtaStsEvent::Authorized | MtaStsEvent::NotAuthorized | MtaStsEvent::InvalidPolicy,
) => true,
EventType::IncomingReport(_) => true, EventType::IncomingReport(_) => true,
EventType::OutgoingReport( EventType::OutgoingReport(
OutgoingReportEvent::SpfReport OutgoingReportEvent::SpfReport

View file

@ -927,6 +927,35 @@ pub enum ResourceEvent {
WebadminUnpacked, WebadminUnpacked,
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum MetricType {
ServerMemory,
MessageIngestionTime,
MessageFtsIndexTime,
MessageSize,
MessageAuthSize,
DeliveryTotalTime,
DeliveryTime,
DeliveryActiveConnections,
QueueCount,
ReportOutgoingSize,
StoreReadTime,
StoreWriteTime,
BlobReadTime,
BlobWriteTime,
DnsLookupTime,
HttpActiveConnections,
HttpRequestTime,
ImapActiveConnections,
ImapRequestTime,
Pop3ActiveConnections,
Pop3RequestTime,
SmtpActiveConnections,
SmtpRequestTime,
SieveActiveConnections,
SieveRequestTime,
}
pub const TOTAL_EVENT_COUNT: usize = total_event_count!(); pub const TOTAL_EVENT_COUNT: usize = total_event_count!();
pub trait AddContext<T> { pub trait AddContext<T> {

View file

@ -857,7 +857,7 @@ impl EventType {
} }
} }
fn from_code(code: u64) -> Option<Self> { pub fn from_code(code: u64) -> Option<Self> {
match code { match code {
0 => Some(EventType::Acme(AcmeEvent::AuthCompleted)), 0 => Some(EventType::Acme(AcmeEvent::AuthCompleted)),
1 => Some(EventType::Acme(AcmeEvent::AuthError)), 1 => Some(EventType::Acme(AcmeEvent::AuthError)),

View file

@ -12,7 +12,9 @@ use std::time::Duration;
use common::{ use common::{
config::telemetry::{StoreTracer, TelemetrySubscriberType}, config::telemetry::{StoreTracer, TelemetrySubscriberType},
enterprise::{license::LicenseKey, undelete::DeletedBlob, Enterprise}, enterprise::{
license::LicenseKey, undelete::DeletedBlob, Enterprise, MetricsStore, TraceStore, Undelete,
},
telemetry::tracers::store::{TracingQuery, TracingStore}, telemetry::tracers::store::{TracingQuery, TracingStore},
}; };
use imap_proto::ResponseType; use imap_proto::ResponseType;
@ -22,6 +24,7 @@ use trc::{
ipc::{bitset::Bitset, subscriber::SubscriberBuilder}, ipc::{bitset::Bitset, subscriber::SubscriberBuilder},
DeliveryEvent, EventType, SmtpEvent, DeliveryEvent, EventType, SmtpEvent,
}; };
use utils::config::cron::SimpleCron;
use crate::{ use crate::{
imap::{ImapConnection, Type}, imap::{ImapConnection, Type},
@ -40,9 +43,21 @@ pub async fn test(params: &mut JMAPTest) {
hostname: String::new(), hostname: String::new(),
accounts: 100, accounts: 100,
}, },
undelete_period: Duration::from_secs(2).into(), undelete: Undelete {
trace_hold_period: Duration::from_secs(1).into(), retention: Duration::from_secs(2),
trace_store: core.storage.data.clone().into(), }
.into(),
trace_store: TraceStore {
retention: Some(Duration::from_secs(1)),
store: core.storage.data.clone(),
}
.into(),
metrics_store: MetricsStore {
retention: Some(Duration::from_secs(1)),
store: core.storage.data.clone(),
interval: SimpleCron::Day { hour: 0, minute: 0 },
}
.into(),
} }
.into(); .into();
params.server.shared_core.store(core.into()); params.server.shared_core.store(core.into());