Metrics collector

This commit is contained in:
mdecimus 2024-08-06 19:23:02 +02:00
parent b0d1561554
commit a3284b8bc3
40 changed files with 1241 additions and 230 deletions

View file

@ -25,7 +25,7 @@ pub mod scripts;
pub mod server;
pub mod smtp;
pub mod storage;
pub mod tracers;
pub mod telemetry;
pub(crate) const CONNECTION_VARS: &[u32; 7] = &[
V_LISTENER,

View file

@ -13,26 +13,30 @@ use hyper::{
HeaderMap,
};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::export::{logs::LogExporter, trace::SpanExporter};
use trc::{subscriber::Interests, EventType, Level, TracingEvent};
use opentelemetry_sdk::{
export::{logs::LogExporter, trace::SpanExporter},
metrics::exporter::PushMetricsExporter,
};
use trc::{subscriber::Interests, EventType, Level, TelemetryEvent};
use utils::config::{utils::ParseValue, Config};
#[derive(Debug)]
pub struct Tracer {
pub struct TelemetrySubscriber {
pub id: String,
pub interests: Interests,
pub typ: TracerType,
pub typ: TelemetrySubscriberType,
pub lossy: bool,
}
#[derive(Debug)]
pub enum TracerType {
Console(ConsoleTracer),
Log(LogTracer),
Otel(OtelTracer),
pub enum TelemetrySubscriberType {
ConsoleTracer(ConsoleTracer),
LogTracer(LogTracer),
OtelTracer(OtelTracer),
OtelMetrics(OtelMetrics),
Webhook(WebhookTracer),
#[cfg(unix)]
Journal(crate::tracing::journald::Subscriber),
JournalTracer(crate::telemetry::tracers::journald::Subscriber),
}
#[derive(Debug)]
@ -44,6 +48,11 @@ pub struct OtelTracer {
pub throttle: Duration,
}
pub struct OtelMetrics {
pub exporter: Box<dyn PushMetricsExporter>,
pub throttle: Duration,
}
#[derive(Debug)]
pub struct ConsoleTracer {
pub ansi: bool,
@ -80,13 +89,13 @@ pub enum RotationStrategy {
}
#[derive(Debug)]
pub struct Tracers {
pub struct Telemetry {
pub global_interests: Interests,
pub custom_levels: AHashMap<EventType, Level>,
pub tracers: Vec<Tracer>,
pub tracers: Vec<TelemetrySubscriber>,
}
impl Tracers {
impl Telemetry {
pub fn parse(config: &mut Config) -> Self {
// Parse custom logging levels
let mut custom_levels = AHashMap::new();
@ -109,7 +118,7 @@ impl Tracers {
let event_names = EventType::variants()
.into_iter()
.filter_map(|e| {
if e != EventType::Tracing(TracingEvent::WebhookError) {
if e != EventType::Telemetry(TelemetryEvent::WebhookError) {
Some((e, e.name()))
} else {
None
@ -118,7 +127,7 @@ impl Tracers {
.collect::<Vec<_>>();
// Parse tracers
let mut tracers: Vec<Tracer> = Vec::new();
let mut tracers: Vec<TelemetrySubscriber> = Vec::new();
let mut global_interests = Interests::default();
for tracer_id in config
.sub_keys("tracer", ".type")
@ -147,7 +156,7 @@ impl Tracers {
.value_require(("tracer", id, "path"))
.map(|s| s.to_string())
{
TracerType::Log(LogTracer {
TelemetrySubscriberType::LogTracer(LogTracer {
path,
prefix: config
.value(("tracer", id, "prefix"))
@ -179,9 +188,9 @@ impl Tracers {
"console" | "stdout" | "stderr" => {
if !tracers
.iter()
.any(|t| matches!(t.typ, TracerType::Console(_)))
.any(|t| matches!(t.typ, TelemetrySubscriberType::ConsoleTracer(_)))
{
TracerType::Console(ConsoleTracer {
TelemetrySubscriberType::ConsoleTracer(ConsoleTracer {
ansi: config
.property_or_default(("tracer", id, "ansi"), "true")
.unwrap_or(true),
@ -239,7 +248,7 @@ impl Tracers {
log_exporter.build_log_exporter(),
) {
(Ok(span_exporter), Ok(log_exporter)) => {
TracerType::Otel(OtelTracer {
TelemetrySubscriberType::OtelTracer(OtelTracer {
span_exporter: Box::new(span_exporter),
log_exporter: Box::new(log_exporter),
throttle,
@ -308,7 +317,7 @@ impl Tracers {
log_exporter.build_log_exporter(),
) {
(Ok(span_exporter), Ok(log_exporter)) => {
TracerType::Otel(OtelTracer {
TelemetrySubscriberType::OtelTracer(OtelTracer {
span_exporter: Box::new(span_exporter),
log_exporter: Box::new(log_exporter),
throttle,
@ -351,10 +360,12 @@ impl Tracers {
{
if !tracers
.iter()
.any(|t| matches!(t.typ, TracerType::Journal(_)))
.any(|t| matches!(t.typ, TelemetrySubscriberType::JournalTracer(_)))
{
match crate::tracing::journald::Subscriber::new() {
Ok(subscriber) => TracerType::Journal(subscriber),
match crate::telemetry::tracers::journald::Subscriber::new() {
Ok(subscriber) => {
TelemetrySubscriberType::JournalTracer(subscriber)
}
Err(e) => {
config.new_build_error(
("tracer", id, "type"),
@ -391,7 +402,7 @@ impl Tracers {
};
// Create tracer
let mut tracer = Tracer {
let mut tracer = TelemetrySubscriber {
id: format!("t_{id}"),
interests: Default::default(),
lossy: config
@ -413,20 +424,21 @@ impl Tracers {
// Parse disabled events
let mut disabled_events = AHashSet::new();
match &tracer.typ {
TracerType::Console(_) => (),
TracerType::Log(_) => {
disabled_events.insert(EventType::Tracing(TracingEvent::LogError));
TelemetrySubscriberType::ConsoleTracer(_) => (),
TelemetrySubscriberType::LogTracer(_) => {
disabled_events.insert(EventType::Telemetry(TelemetryEvent::LogError));
}
TracerType::Otel(_) => {
disabled_events.insert(EventType::Tracing(TracingEvent::OtelError));
TelemetrySubscriberType::OtelTracer(_) => {
disabled_events.insert(EventType::Telemetry(TelemetryEvent::OtelError));
}
TracerType::Webhook(_) => {
disabled_events.insert(EventType::Tracing(TracingEvent::WebhookError));
TelemetrySubscriberType::Webhook(_) => {
disabled_events.insert(EventType::Telemetry(TelemetryEvent::WebhookError));
}
#[cfg(unix)]
TracerType::Journal(_) => {
disabled_events.insert(EventType::Tracing(TracingEvent::JournalError));
TelemetrySubscriberType::JournalTracer(_) => {
disabled_events.insert(EventType::Telemetry(TelemetryEvent::JournalError));
}
TelemetrySubscriberType::OtelMetrics(_) => todo!(),
}
for (_, event_type) in
config.properties::<EventOrMany>(("tracer", id, "disabled-events"))
@ -502,10 +514,10 @@ impl Tracers {
}
}
tracers.push(Tracer {
tracers.push(TelemetrySubscriber {
id: "default".to_string(),
interests: global_interests.clone(),
typ: TracerType::Console(ConsoleTracer {
typ: TelemetrySubscriberType::ConsoleTracer(ConsoleTracer {
ansi: true,
multiline: false,
buffered: true,
@ -514,7 +526,7 @@ impl Tracers {
});
}
Tracers {
Telemetry {
tracers,
global_interests,
custom_levels,
@ -526,7 +538,7 @@ fn parse_webhook(
config: &mut Config,
id: &str,
global_interests: &mut Interests,
) -> Option<Tracer> {
) -> Option<TelemetrySubscriber> {
let mut headers = HeaderMap::new();
for (header, value) in config
@ -568,13 +580,13 @@ fn parse_webhook(
}
// Build tracer
let mut tracer = Tracer {
let mut tracer = TelemetrySubscriber {
id: format!("w_{id}"),
interests: Default::default(),
lossy: config
.property_or_default(("webhook", id, "lossy"), "false")
.unwrap_or(false),
typ: TracerType::Webhook(WebhookTracer {
typ: TelemetrySubscriberType::Webhook(WebhookTracer {
url: config.value_require(("webhook", id, "url"))?.to_string(),
timeout: config
.property_or_default(("webhook", id, "timeout"), "30s")
@ -600,7 +612,7 @@ fn parse_webhook(
let event_names = EventType::variants()
.into_iter()
.filter_map(|e| {
if e != EventType::Tracing(TracingEvent::WebhookError) {
if e != EventType::Telemetry(TelemetryEvent::WebhookError) {
Some((e, e.name()))
} else {
None
@ -610,7 +622,7 @@ fn parse_webhook(
for (_, event_type) in config.properties::<EventOrMany>(("webhook", id, "events")) {
match event_type {
EventOrMany::Event(event_type) => {
if event_type != EventType::Tracing(TracingEvent::WebhookError) {
if event_type != EventType::Telemetry(TelemetryEvent::WebhookError) {
tracer.interests.set(event_type);
global_interests.set(event_type);
}
@ -670,3 +682,11 @@ impl ParseValue for EventOrMany {
}
}
}
impl std::fmt::Debug for OtelMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OtelMetrics")
.field("throttle", &self.throttle)
.finish()
}
}

View file

@ -39,7 +39,7 @@ pub mod expr;
pub mod listener;
pub mod manager;
pub mod scripts;
pub mod tracing;
pub mod telemetry;
pub static USER_AGENT: &str = concat!("Stalwart/", env!("CARGO_PKG_VERSION"),);
pub static DAEMON_NAME: &str = concat!("Stalwart Mail Server v", env!("CARGO_PKG_VERSION"),);

View file

@ -96,6 +96,7 @@ pub trait SessionManager: Sync + Send + 'static + Clone {
tokio::spawn(async move {
let start_time = Instant::now();
let protocol = session.instance.protocol;
let session_id;
if is_tls {
@ -189,6 +190,7 @@ pub trait SessionManager: Sync + Send + 'static + Clone {
Network(trc::NetworkEvent::ConnectionEnd),
SpanId = session_id,
Elapsed = start_time.elapsed(),
Protocol = protocol,
);
});
}

View file

@ -18,7 +18,7 @@ use utils::{
};
use crate::{
config::{server::Servers, tracers::Tracers},
config::{server::Servers, telemetry::Telemetry},
Core, SharedCore,
};
@ -162,7 +162,7 @@ impl BootManager {
}
// Enable tracing
Tracers::parse(&mut config).enable();
Telemetry::parse(&mut config).enable();
match import_export {
ImportExport::None => {

View file

@ -12,7 +12,7 @@ use utils::config::{ipmask::IpAddrOrMask, utils::ParseValue, Config};
use crate::{
config::{
server::{tls::parse_certificates, Servers},
tracers::Tracers,
telemetry::Telemetry,
},
listener::blocked::BLOCKED_IP_KEY,
Core,
@ -23,7 +23,7 @@ use super::config::{ConfigManager, Patterns};
pub struct ReloadResult {
pub config: Config,
pub new_core: Option<Core>,
pub tracers: Option<Tracers>,
pub tracers: Option<Telemetry>,
}
impl Core {
@ -84,7 +84,7 @@ impl Core {
let mut config = self.storage.config.build_config("").await?;
// Parse tracers
let tracers = Tracers::parse(&mut config);
let tracers = Telemetry::parse(&mut config);
// Load stores
let mut stores = Stores {

View file

@ -78,7 +78,7 @@ async fn train(ctx: PluginContext<'_>, is_train: bool) -> trc::Result<Variable>
trc::event!(
Spam(trc::SpamEvent::Train),
SpanId = ctx.session_id,
Spam = is_spam,
Details = is_spam,
Total = model.weights.len(),
);
@ -256,8 +256,8 @@ pub async fn exec_is_balanced(ctx: PluginContext<'_>) -> trc::Result<Variable> {
trc::event!(
Spam(trc::SpamEvent::TrainBalance),
SpanId = ctx.session_id,
Spam = learn_spam,
Details = vec![
trc::Value::from(learn_spam),
trc::Value::from(min_balance),
trc::Value::from(spam_learns),
trc::Value::from(ham_learns),

View file

@ -0,0 +1,5 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/

View file

@ -4,26 +4,23 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
#[cfg(unix)]
pub mod journald;
pub mod log;
pub mod otel;
pub mod stdout;
pub mod webhook;
pub mod metrics;
pub mod tracers;
pub mod webhooks;
use std::time::Duration;
use log::spawn_log_tracer;
use otel::spawn_otel_tracer;
use stdout::spawn_console_tracer;
use tracers::log::spawn_log_tracer;
use tracers::otel::spawn_otel_tracer;
use tracers::stdout::spawn_console_tracer;
use trc::{collector::Collector, subscriber::SubscriberBuilder};
use webhook::spawn_webhook_tracer;
use webhooks::spawn_webhook_tracer;
use crate::config::tracers::{TracerType, Tracers};
use crate::config::telemetry::{Telemetry, TelemetrySubscriberType};
pub const LONG_SLUMBER: Duration = Duration::from_secs(60 * 60 * 24 * 365);
impl Tracers {
impl Telemetry {
pub fn enable(self) {
// Spawn tracers
for tracer in self.tracers {
@ -85,7 +82,7 @@ impl Tracers {
SubscriberBuilder::new("stderr".to_string())
.with_interests(interests.clone())
.with_lossy(false),
crate::config::tracers::ConsoleTracer {
crate::config::telemetry::ConsoleTracer {
ansi: true,
multiline: false,
buffered: false,
@ -97,15 +94,20 @@ impl Tracers {
}
}
impl TracerType {
impl TelemetrySubscriberType {
pub fn spawn(self, builder: SubscriberBuilder) {
match self {
TracerType::Console(settings) => spawn_console_tracer(builder, settings),
TracerType::Log(settings) => spawn_log_tracer(builder, settings),
TracerType::Webhook(settings) => spawn_webhook_tracer(builder, settings),
TracerType::Otel(settings) => spawn_otel_tracer(builder, settings),
TelemetrySubscriberType::ConsoleTracer(settings) => {
spawn_console_tracer(builder, settings)
}
TelemetrySubscriberType::LogTracer(settings) => spawn_log_tracer(builder, settings),
TelemetrySubscriberType::Webhook(settings) => spawn_webhook_tracer(builder, settings),
TelemetrySubscriberType::OtelTracer(settings) => spawn_otel_tracer(builder, settings),
#[cfg(unix)]
TracerType::Journal(subscriber) => journald::spawn_journald_tracer(builder, subscriber),
TelemetrySubscriberType::JournalTracer(subscriber) => {
tracers::journald::spawn_journald_tracer(builder, subscriber)
}
TelemetrySubscriberType::OtelMetrics(_) => todo!(),
}
}
}

View file

@ -7,7 +7,7 @@
use ahash::AHashSet;
use std::io::Write;
use trc::subscriber::SubscriberBuilder;
use trc::{Event, EventDetails, Level, TracingEvent};
use trc::{Event, EventDetails, Level, TelemetryEvent};
pub(crate) fn spawn_journald_tracer(builder: SubscriberBuilder, subscriber: Subscriber) {
let (_, mut rx) = builder.register();
@ -52,7 +52,7 @@ impl Subscriber {
if let Err(err) = self.send_payload(&buf) {
trc::event!(
Tracing(TracingEvent::JournalError),
Telemetry(TelemetryEvent::JournalError),
Details = "Failed to send event to journald",
Reason = err.to_string()
);

View file

@ -6,14 +6,14 @@
use std::{path::PathBuf, time::SystemTime};
use crate::config::tracers::{LogTracer, RotationStrategy};
use crate::config::telemetry::{LogTracer, RotationStrategy};
use mail_parser::DateTime;
use tokio::{
fs::{File, OpenOptions},
io::BufWriter,
};
use trc::{fmt::FmtWriter, subscriber::SubscriberBuilder, TracingEvent};
use trc::{fmt::FmtWriter, subscriber::SubscriberBuilder, TelemetryEvent};
pub(crate) fn spawn_log_tracer(builder: SubscriberBuilder, settings: LogTracer) {
let (_, mut rx) = builder.register();
@ -30,7 +30,7 @@ pub(crate) fn spawn_log_tracer(builder: SubscriberBuilder, settings: LogTracer)
if roatation_timestamp != 0 && event.inner.timestamp > roatation_timestamp {
if let Err(err) = buf.flush().await {
trc::event!(
Tracing(TracingEvent::LogError),
Telemetry(TelemetryEvent::LogError),
Reason = err.to_string(),
Details = "Failed to flush log buffer"
);
@ -46,7 +46,7 @@ pub(crate) fn spawn_log_tracer(builder: SubscriberBuilder, settings: LogTracer)
if let Err(err) = buf.write(&event).await {
trc::event!(
Tracing(TracingEvent::LogError),
Telemetry(TelemetryEvent::LogError),
Reason = err.to_string(),
Details = "Failed to write event to log"
);
@ -56,7 +56,7 @@ pub(crate) fn spawn_log_tracer(builder: SubscriberBuilder, settings: LogTracer)
if let Err(err) = buf.flush().await {
trc::event!(
Tracing(TracingEvent::LogError),
Telemetry(TelemetryEvent::LogError),
Reason = err.to_string(),
Details = "Failed to flush log buffer"
);
@ -105,7 +105,7 @@ impl LogTracer {
Ok(writer) => Some(BufWriter::new(writer)),
Err(err) => {
trc::event!(
Tracing(TracingEvent::LogError),
Telemetry(TelemetryEvent::LogError),
Details = "Failed to create log file",
Path = path.to_string_lossy().into_owned(),
Reason = err.to_string(),

View file

@ -0,0 +1,11 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
#[cfg(unix)]
pub mod journald;
pub mod log;
pub mod otel;
pub mod stdout;

View file

@ -22,11 +22,9 @@ use opentelemetry_sdk::{
Resource,
};
use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION};
use trc::{subscriber::SubscriberBuilder, Event, EventDetails, Level, TracingEvent};
use trc::{subscriber::SubscriberBuilder, Event, EventDetails, Level, TelemetryEvent};
use crate::config::tracers::OtelTracer;
use super::LONG_SLUMBER;
use crate::{config::telemetry::OtelTracer, telemetry::LONG_SLUMBER};
pub(crate) fn spawn_otel_tracer(builder: SubscriberBuilder, mut otel: OtelTracer) {
let (_, mut rx) = builder.register();
@ -91,7 +89,7 @@ pub(crate) fn spawn_otel_tracer(builder: SubscriberBuilder, mut otel: OtelTracer
.await
{
trc::event!(
Tracing(TracingEvent::OtelError),
Telemetry(TelemetryEvent::OtelError),
Details = "Failed to export spans",
Reason = err.to_string()
);
@ -105,7 +103,7 @@ pub(crate) fn spawn_otel_tracer(builder: SubscriberBuilder, mut otel: OtelTracer
.await
{
trc::event!(
Tracing(TracingEvent::OtelError),
Telemetry(TelemetryEvent::OtelError),
Details = "Failed to export logs",
Reason = err.to_string()
);

View file

@ -10,7 +10,7 @@ use std::{
task::{Context, Poll},
};
use crate::config::tracers::ConsoleTracer;
use crate::config::telemetry::ConsoleTracer;
use std::io::Write;
use tokio::io::AsyncWrite;
use trc::{fmt::FmtWriter, subscriber::SubscriberBuilder};

View file

@ -12,7 +12,7 @@ use std::{
time::Instant,
};
use crate::config::tracers::WebhookTracer;
use crate::config::telemetry::WebhookTracer;
use base64::{engine::general_purpose::STANDARD, Engine};
use ring::hmac;
use serde::Serialize;
@ -20,7 +20,7 @@ use store::write::now;
use tokio::sync::mpsc;
use trc::{
subscriber::{EventBatch, SubscriberBuilder},
ServerEvent, TracingEvent,
ServerEvent, TelemetryEvent,
};
use super::LONG_SLUMBER;
@ -53,7 +53,7 @@ pub(crate) fn spawn_webhook_tracer(builder: SubscriberBuilder, settings: Webhook
if discard_count > 0 {
trc::event!(
Tracing(TracingEvent::WebhookError),
Telemetry(TelemetryEvent::WebhookError),
Details = "Discarded stale events",
Total = discard_count
);
@ -111,7 +111,7 @@ fn spawn_webhook_handler(
let wrapper = EventWrapper { events };
if let Err(err) = post_webhook_events(&settings, &wrapper).await {
trc::event!(Tracing(TracingEvent::WebhookError), Details = err);
trc::event!(Telemetry(TelemetryEvent::WebhookError), Details = err);
if webhook_tx.send(wrapper.events).await.is_err() {
trc::event!(

View file

@ -360,8 +360,7 @@ impl<T: SessionStream> SessionData<T> {
trc::ImapEvent::Copy
}),
SpanId = self.session_id,
SourceAccountId = src_mailbox.id.account_id,
SourceMailboxId = src_mailbox.id.mailbox_id,
Source = src_mailbox.id.account_id,
Details = src_uids
.iter()
.map(|r| trc::Value::from(*r))

View file

@ -205,8 +205,8 @@ impl JMAP {
}
}
Err(mut err)
if err.matches(trc::EventType::Store(
trc::StoreEvent::IngestError,
if err.matches(trc::EventType::MessageIngest(
trc::MessageIngestEvent::Error,
)) =>
{
results.push(UndeleteResponse::Error {

View file

@ -138,7 +138,7 @@ impl JMAP {
.with_description("You have exceeded your disk quota."),
);
}
trc::EventType::Store(trc::StoreEvent::IngestError) => {
trc::EventType::MessageIngest(trc::MessageIngestEvent::Error) => {
response.not_created.append(
id,
SetError::new(SetErrorType::InvalidEmail).with_description(

View file

@ -31,7 +31,7 @@ use store::{
},
BitmapKey, BlobClass, Serialize,
};
use trc::AddContext;
use trc::{AddContext, MessageIngestEvent};
use utils::map::vec_map::VecMap;
use crate::{
@ -90,7 +90,7 @@ impl JMAP {
// Parse message
let mut raw_message = Cow::from(params.raw_message);
let mut message = params.message.ok_or_else(|| {
trc::EventType::Store(trc::StoreEvent::IngestError)
trc::EventType::MessageIngest(trc::MessageIngestEvent::Error)
.ctx(trc::Key::Code, 550)
.ctx(trc::Key::Reason, "Failed to parse e-mail message.")
})?;
@ -174,7 +174,7 @@ impl JMAP {
.is_empty()
{
trc::event!(
Store(trc::StoreEvent::IngestDuplicate),
MessageIngest(MessageIngestEvent::Duplicate),
SpanId = params.session_id,
AccountId = params.account_id,
MessageId = message_id.to_string(),
@ -216,7 +216,7 @@ impl JMAP {
message = MessageParser::default()
.parse(raw_message.as_ref())
.ok_or_else(|| {
trc::EventType::Store(trc::StoreEvent::IngestError)
trc::EventType::MessageIngest(trc::MessageIngestEvent::Error)
.ctx(trc::Key::Code, 550)
.ctx(
trc::Key::Reason,
@ -338,7 +338,16 @@ impl JMAP {
let _ = self.inner.housekeeper_tx.send(Event::IndexStart).await;
trc::event!(
Store(trc::StoreEvent::Ingest),
MessageIngest(match params.source {
IngestSource::Smtp =>
if !is_spam {
MessageIngestEvent::Ham
} else {
MessageIngestEvent::Spam
},
IngestSource::Jmap => MessageIngestEvent::JmapAppend,
IngestSource::Imap => MessageIngestEvent::ImapAppend,
}),
SpanId = params.session_id,
AccountId = params.account_id,
DocumentId = document_id,
@ -346,7 +355,6 @@ impl JMAP {
BlobId = blob_id.hash.to_hex(),
ChangeId = change_id,
Size = raw_message_len as u64,
Spam = is_spam,
Elapsed = start_time.elapsed(),
);

View file

@ -29,7 +29,7 @@ impl JMAP {
Ok(Some(raw_message)) => raw_message,
Ok(None) => {
trc::event!(
Store(trc::StoreEvent::IngestError),
MessageIngest(trc::MessageIngestEvent::Error),
Reason = "Blob not found.",
SpanId = message.session_id,
CausedBy = trc::location!()
@ -168,7 +168,7 @@ impl JMAP {
reason: "Mailbox over quota.".into(),
}
}
trc::EventType::Store(trc::StoreEvent::IngestError) => {
trc::EventType::MessageIngest(trc::MessageIngestEvent::Error) => {
*status = DeliveryResult::PermanentFailure {
code: err
.value(trc::Key::Code)

View file

@ -48,9 +48,11 @@ impl JMAP {
let message = if let Some(message) = MessageParser::new().parse(raw_message) {
message
} else {
return Err(trc::EventType::Store(trc::StoreEvent::IngestError)
.ctx(trc::Key::Code, 550)
.ctx(trc::Key::Reason, "Failed to parse e-mail message."));
return Err(
trc::EventType::MessageIngest(trc::MessageIngestEvent::Error)
.ctx(trc::Key::Code, 550)
.ctx(trc::Key::Reason, "Failed to parse e-mail message."),
);
};
// Obtain mailboxIds
@ -478,9 +480,11 @@ impl JMAP {
}
if let Some(reject_reason) = reject_reason {
Err(trc::EventType::Store(trc::StoreEvent::IngestError)
.ctx(trc::Key::Code, 571)
.ctx(trc::Key::Reason, reject_reason))
Err(
trc::EventType::MessageIngest(trc::MessageIngestEvent::Error)
.ctx(trc::Key::Code, 571)
.ctx(trc::Key::Reason, reject_reason),
)
} else if has_delivered || last_temp_error.is_none() {
Ok(ingested_message)
} else {

View file

@ -33,7 +33,7 @@ use utils::config::Rate;
use crate::{
core::{Session, SessionAddress, State},
inbound::milter::Modification,
queue::{self, Message, QueueEnvelope, Schedule},
queue::{self, Message, MessageSource, QueueEnvelope, Schedule},
scripts::ScriptResult,
};
@ -703,12 +703,18 @@ impl<T: SessionStream> Session<T> {
let queue_id = message.queue_id;
// Queue message
let source = if self.data.authenticated_as.is_empty() {
MessageSource::Unauthenticated
} else {
MessageSource::Authenticated
};
if message
.queue(
Some(&headers),
raw_message,
self.data.session_id,
&self.core,
source,
)
.await
{

View file

@ -19,8 +19,8 @@ use store::write::now;
use crate::core::SMTP;
use super::{
Domain, Error, ErrorDetails, HostResponse, Message, QueueEnvelope, Recipient, Status,
RCPT_DSN_SENT, RCPT_STATUS_CHANGED,
Domain, Error, ErrorDetails, HostResponse, Message, MessageSource, QueueEnvelope, Recipient,
Status, RCPT_DSN_SENT, RCPT_STATUS_CHANGED,
};
impl SMTP {
@ -48,7 +48,13 @@ impl SMTP {
// Queue DSN
dsn_message
.queue(signature.as_deref(), &dsn, message.span_id, self)
.queue(
signature.as_deref(),
&dsn,
message.span_id,
self,
MessageSource::Dsn,
)
.await;
}
} else {

View file

@ -49,6 +49,15 @@ pub struct Schedule<T> {
pub inner: T,
}
#[derive(Debug, Clone, Copy)]
pub enum MessageSource {
Authenticated,
Unauthenticated,
Dsn,
Report,
Sieve,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct Message {
pub queue_id: QueueId,

View file

@ -16,7 +16,8 @@ use utils::BlobHash;
use crate::core::SMTP;
use super::{
Domain, Event, Message, QueueEnvelope, QueueId, QuotaKey, Recipient, Schedule, Status,
Domain, Event, Message, MessageSource, QueueEnvelope, QueueId, QuotaKey, Recipient, Schedule,
Status,
};
pub const LOCK_EXPIRY: u64 = 300;
@ -174,6 +175,7 @@ impl Message {
raw_message: &[u8],
session_id: u64,
core: &SMTP,
source: MessageSource,
) -> bool {
// Write blob
let message = if let Some(raw_headers) = raw_headers {
@ -225,7 +227,13 @@ impl Message {
}
trc::event!(
Queue(trc::QueueEvent::Scheduled),
Queue(match source {
MessageSource::Authenticated => trc::QueueEvent::QueueMessageSubmission,
MessageSource::Unauthenticated => trc::QueueEvent::QueueMessage,
MessageSource::Dsn => trc::QueueEvent::QueueDsn,
MessageSource::Report => trc::QueueEvent::QueueReport,
MessageSource::Sieve => trc::QueueEvent::QueueAutogenerated,
}),
SpanId = session_id,
QueueId = self.queue_id,
From = if !self.return_path.is_empty() {

View file

@ -461,17 +461,6 @@ impl LogReport for TlsReport {
impl LogReport for Feedback<'_> {
fn log(&self) {
/*
user_agent = self.user_agent().unwrap_or_default(),
auth_failure = ?self.auth_failure(),
dkim_domain = self.dkim_domain().unwrap_or_default(),
dkim_identity = self.dkim_identity().unwrap_or_default(),
dkim_selector = self.dkim_selector().unwrap_or_default(),
identity_alignment = ?self.identity_alignment(),
*/
trc::event!(
IncomingReport(match self.feedback_type() {
mail_auth::report::FeedbackType::Abuse => IncomingReportEvent::AbuseReport,
@ -482,7 +471,7 @@ impl LogReport for Feedback<'_> {
mail_auth::report::FeedbackType::Other => IncomingReportEvent::OtherReport,
mail_auth::report::FeedbackType::Virus => IncomingReportEvent::VirusReport,
}),
Date = trc::Value::Timestamp(
RangeFrom = trc::Value::Timestamp(
self.arrival_date()
.map(|d| d as u64)
.unwrap_or_else(|| { now() })

View file

@ -30,7 +30,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
use crate::{
core::{Session, SMTP},
inbound::DkimSign,
queue::{DomainPart, Message},
queue::{DomainPart, Message, MessageSource},
};
pub mod analysis;
@ -153,7 +153,13 @@ impl SMTP {
// Queue message
message
.queue(signature.as_deref(), &report, parent_session_id, self)
.queue(
signature.as_deref(),
&report,
parent_session_id,
self,
MessageSource::Report,
)
.await;
}

View file

@ -18,7 +18,11 @@ use smtp_proto::{
};
use trc::SieveEvent;
use crate::{core::SMTP, inbound::DkimSign, queue::DomainPart};
use crate::{
core::SMTP,
inbound::DkimSign,
queue::{DomainPart, MessageSource},
};
use super::{ScriptModification, ScriptParameters, ScriptResult};
@ -284,7 +288,13 @@ impl SMTP {
if self.has_quota(&mut message).await {
message
.queue(headers.as_deref(), raw_message, session_id, self)
.queue(
headers.as_deref(),
raw_message,
session_id,
self,
MessageSource::Sieve,
)
.await;
} else {
trc::event!(

View file

@ -4,7 +4,7 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use std::{borrow::Cow, ops::Range};
use std::{borrow::Cow, ops::Range, time::Instant};
use trc::{AddContext, StoreEvent};
use utils::config::utils::ParseValue;
@ -17,7 +17,7 @@ impl BlobStore {
CompressionAlgo::None => range.clone(),
CompressionAlgo::Lz4 => 0..usize::MAX,
};
let start_time = Instant::now();
let result = match &self.backend {
BlobBackend::Store(store) => match store {
#[cfg(feature = "sqlite")]
@ -37,6 +37,15 @@ impl BlobStore {
BlobBackend::S3(store) => store.get_blob(key, read_range).await,
};
trc::event!(
Store(StoreEvent::BlobRead),
Key = key,
Elapsed = start_time.elapsed(),
Size = result
.as_ref()
.map_or(0, |data| data.as_ref().map_or(0, |data| data.len())),
);
let decompressed = match self.compression {
CompressionAlgo::Lz4 => match result.caused_by(trc::location!())? {
Some(data)
@ -84,7 +93,8 @@ impl BlobStore {
}
};
match &self.backend {
let start_time = Instant::now();
let result = match &self.backend {
BlobBackend::Store(store) => match store {
#[cfg(feature = "sqlite")]
Store::SQLite(store) => store.put_blob(key, data.as_ref()).await,
@ -102,11 +112,21 @@ impl BlobStore {
#[cfg(feature = "s3")]
BlobBackend::S3(store) => store.put_blob(key, data.as_ref()).await,
}
.caused_by(trc::location!())
.caused_by(trc::location!());
trc::event!(
Store(StoreEvent::BlobWrite),
Key = key,
Elapsed = start_time.elapsed(),
Size = data.len(),
);
result
}
pub async fn delete_blob(&self, key: &[u8]) -> trc::Result<bool> {
match &self.backend {
let start_time = Instant::now();
let result = match &self.backend {
BlobBackend::Store(store) => match store {
#[cfg(feature = "sqlite")]
Store::SQLite(store) => store.delete_blob(key).await,
@ -124,7 +144,15 @@ impl BlobStore {
#[cfg(feature = "s3")]
BlobBackend::S3(store) => store.delete_blob(key).await,
}
.caused_by(trc::location!())
.caused_by(trc::location!());
trc::event!(
Store(StoreEvent::BlobWrite),
Key = key,
Elapsed = start_time.elapsed(),
);
result
}
pub fn with_compression(self, compression: CompressionAlgo) -> Self {

View file

@ -4,10 +4,13 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use std::ops::{BitAndAssign, Range};
use std::{
ops::{BitAndAssign, Range},
time::Instant,
};
use roaring::RoaringBitmap;
use trc::AddContext;
use trc::{AddContext, StoreEvent};
use crate::{
write::{
@ -95,7 +98,8 @@ impl Store {
params: IterateParams<T>,
cb: impl for<'x> FnMut(&'x [u8], &'x [u8]) -> trc::Result<bool> + Sync + Send,
) -> trc::Result<()> {
match self {
let start_time = Instant::now();
let result = match self {
#[cfg(feature = "sqlite")]
Self::SQLite(store) => store.iterate(params, cb).await,
#[cfg(feature = "foundation")]
@ -108,7 +112,14 @@ impl Store {
Self::RocksDb(store) => store.iterate(params, cb).await,
Self::None => Err(trc::StoreEvent::NotConfigured.into()),
}
.caused_by(trc::location!())
.caused_by(trc::location!());
trc::event!(
Store(StoreEvent::DataIterate),
Elapsed = start_time.elapsed(),
);
result
}
pub async fn get_counter(
@ -220,7 +231,10 @@ impl Store {
return Ok(AssignedIds::default());
}
match self {
let start_time = Instant::now();
let ops = batch.ops.len();
let result = match self {
#[cfg(feature = "sqlite")]
Self::SQLite(store) => store.write(batch).await,
#[cfg(feature = "foundation")]
@ -232,7 +246,15 @@ impl Store {
#[cfg(feature = "rocks")]
Self::RocksDb(store) => store.write(batch).await,
Self::None => Err(trc::StoreEvent::NotConfigured.into()),
}
};
trc::event!(
Store(StoreEvent::DataWrite),
Elapsed = start_time.elapsed(),
Total = ops,
);
result
}
pub async fn purge_store(&self) -> trc::Result<()> {

View file

@ -67,11 +67,10 @@ pub fn event_type(_attr: TokenStream, item: TokenStream) -> TokenStream {
};
let variants_fn = quote! {
pub fn variants() -> &'static [Self] {
static VARIANTS: &'static [#name] = &[
pub const fn variants() -> &'static [Self] {
&[
#(#name::#variant_names,)*
];
VARIANTS
]
}
};
@ -148,10 +147,18 @@ pub fn event_family(_attr: TokenStream, item: TokenStream) -> TokenStream {
}
}
pub fn variants() -> Vec<#name> {
let mut variants = Vec::new();
pub const fn variants() -> [#name; crate::TOTAL_EVENT_COUNT] {
let mut variants = [crate::EventType::Eval(crate::EvalEvent::Error); crate::TOTAL_EVENT_COUNT];
#(
variants.extend(<#event_types>::variants().iter().copied().map(#name::#variant_idents));
{
let sub_variants = <#event_types>::variants();
let mut i = 0;
while i < sub_variants.len() {
variants[sub_variants[i].id()] = #name::#variant_idents(sub_variants[i]);
i += 1;
}
}
)*
variants
}
@ -291,9 +298,13 @@ pub fn event(input: TokenStream) -> TokenStream {
const ET: trc::EventType = trc::EventType::#event(#param);
const ET_ID: usize = ET.id();
if trc::collector::Collector::has_interest(ET_ID) {
trc::Event::with_keys(ET, vec![#(#key_value_tokens),*]).send();
let keys = vec![#(#key_value_tokens),*];
if trc::collector::Collector::is_metric(ET_ID) {
trc::collector::Collector::record_metric(ET, ET_ID, &keys);
}
trc::Event::with_keys(ET, keys).send();
} else if trc::collector::Collector::is_metric(ET_ID) {
trc::Event::with_keys(ET, vec![#(#key_value_metric_tokens),*]).send();
trc::collector::Collector::record_metric(ET, ET_ID, &[#(#key_value_metric_tokens),*]);
}
}}
} else {
@ -301,9 +312,13 @@ pub fn event(input: TokenStream) -> TokenStream {
let et = trc::EventType::#event(#param);
let et_id = et.id();
if trc::collector::Collector::has_interest(et_id) {
trc::Event::with_keys(et, vec![#(#key_value_tokens),*]).send();
let keys = vec![#(#key_value_tokens),*];
if trc::collector::Collector::is_metric(et_id) {
trc::collector::Collector::record_metric(et, et_id, &keys);
}
trc::Event::with_keys(et, keys).send();
} else if trc::collector::Collector::is_metric(et_id) {
trc::Event::with_keys(et, vec![#(#key_value_metric_tokens),*]).send();
trc::collector::Collector::record_metric(et, et_id, &[#(#key_value_metric_tokens),*]);
}
}}
};

350
crates/trc/src/atomic.rs Normal file
View file

@ -0,0 +1,350 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
pub struct AtomicU32Array<const N: usize>([AtomicU32; N]);
pub struct AtomicU64Array<const N: usize>([AtomicU64; N]);
pub struct AtomicHistogram<const N: usize> {
id: &'static str,
description: &'static str,
unit: &'static str,
buckets: AtomicU64Array<N>,
upper_bounds: [u64; N],
sum: AtomicU64,
count: AtomicU64,
}
pub struct AtomicGauge {
id: &'static str,
description: &'static str,
unit: &'static str,
value: AtomicU64,
}
pub struct AtomicCounter {
id: &'static str,
description: &'static str,
unit: &'static str,
value: AtomicU64,
}
impl<const N: usize> AtomicU32Array<N> {
#[allow(clippy::new_without_default)]
#[allow(clippy::declare_interior_mutable_const)]
pub const fn new() -> Self {
Self({
const INIT: AtomicU32 = AtomicU32::new(0);
let mut array = [INIT; N];
let mut i = 0;
while i < N {
array[i] = AtomicU32::new(0);
i += 1;
}
array
})
}
#[inline(always)]
pub fn get(&self, index: usize) -> u32 {
self.0[index].load(Ordering::Relaxed)
}
#[inline(always)]
pub fn set(&self, index: usize, value: u32) {
self.0[index].store(value, Ordering::Relaxed);
}
#[inline(always)]
pub fn add(&self, index: usize, value: u32) {
self.0[index].fetch_add(value, Ordering::Relaxed);
}
pub fn inner(&self) -> &[AtomicU32; N] {
&self.0
}
}
impl<const N: usize> AtomicU64Array<N> {
#[allow(clippy::new_without_default)]
#[allow(clippy::declare_interior_mutable_const)]
pub const fn new() -> Self {
Self({
const INIT: AtomicU64 = AtomicU64::new(0);
let mut array = [INIT; N];
let mut i = 0;
while i < N {
array[i] = AtomicU64::new(0);
i += 1;
}
array
})
}
#[inline(always)]
pub fn get(&self, index: usize) -> u64 {
self.0[index].load(Ordering::Relaxed)
}
#[inline(always)]
pub fn set(&self, index: usize, value: u64) {
self.0[index].store(value, Ordering::Relaxed);
}
#[inline(always)]
pub fn add(&self, index: usize, value: u64) {
self.0[index].fetch_add(value, Ordering::Relaxed);
}
pub fn inner(&self) -> &[AtomicU64; N] {
&self.0
}
}
impl<const N: usize> AtomicHistogram<N> {
pub const fn new(
id: &'static str,
description: &'static str,
unit: &'static str,
upper_bounds: [u64; N],
) -> Self {
Self {
buckets: AtomicU64Array::new(),
upper_bounds,
sum: AtomicU64::new(0),
count: AtomicU64::new(0),
id,
description,
unit,
}
}
pub fn observe(&self, value: u64) {
self.sum.fetch_add(value, Ordering::Relaxed);
self.count.fetch_add(1, Ordering::Relaxed);
for (idx, upper_bound) in self.upper_bounds.iter().enumerate() {
if value < *upper_bound {
self.buckets.add(idx, value);
return;
}
}
unreachable!()
}
pub fn id(&self) -> &'static str {
self.id
}
pub fn description(&self) -> &'static str {
self.description
}
pub fn unit(&self) -> &'static str {
self.unit
}
pub const fn new_message_sizes(
id: &'static str,
description: &'static str,
) -> AtomicHistogram<12> {
AtomicHistogram::new(
id,
description,
"bytes",
[
500, // 500 bytes
1_000, // 1 KB
10_000, // 10 KB
100_000, // 100 KB
1_000_000, // 1 MB
5_000_000, // 5 MB
10_000_000, // 10 MB
25_000_000, // 25 MB
50_000_000, // 50 MB
100_000_000, // 100 MB
500_000_000, // 500 MB
u64::MAX, // Catch-all for any larger sizes
],
)
}
pub const fn new_short_durations(
id: &'static str,
description: &'static str,
) -> AtomicHistogram<12> {
AtomicHistogram::new(
id,
description,
"milliseconds",
[
5, // 5 milliseconds
10, // 10 milliseconds
50, // 50 milliseconds
100, // 100 milliseconds
500, // 0.5 seconds
1_000, // 1 second
2_000, // 2 seconds
5_000, // 5 seconds
10_000, // 10 seconds
30_000, // 30 seconds
60_000, // 1 minute
u64::MAX, // Catch-all for any longer durations
],
)
}
pub const fn new_medium_durations(
id: &'static str,
description: &'static str,
) -> AtomicHistogram<12> {
AtomicHistogram::new(
id,
description,
"milliseconds",
[
250,
500,
1_000,
5_000,
10_000, // For quick connections (seconds)
60_000,
(60 * 5) * 1_000,
(60 * 10) * 1_000,
(60 * 30) * 1_000, // For medium-length connections (minutes)
(60 * 60) * 1_000,
(60 * 60 * 5) * 1_000,
u64::MAX, // For extreme cases (8 hours and 1 day)
],
)
}
pub const fn new_long_durations(
id: &'static str,
description: &'static str,
) -> AtomicHistogram<12> {
AtomicHistogram::new(
id,
description,
"milliseconds",
[
1_000, // 1 second
30_000, // 30 seconds
300_000, // 5 minutes
600_000, // 10 minutes
1_800_000, // 30 minutes
3_600_000, // 1 hour
14_400_000, // 5 hours
28_800_000, // 8 hours
43_200_000, // 12 hours
86_400_000, // 1 day
604_800_000, // 1 week
u64::MAX, // Catch-all for any longer durations
],
)
}
}
impl AtomicCounter {
pub const fn new(id: &'static str, description: &'static str, unit: &'static str) -> Self {
Self {
id,
description,
unit,
value: AtomicU64::new(0),
}
}
#[inline(always)]
pub fn increment(&self) {
self.value.fetch_add(1, Ordering::Relaxed);
}
#[inline(always)]
pub fn increment_by(&self, value: u64) {
self.value.fetch_add(value, Ordering::Relaxed);
}
#[inline(always)]
pub fn decrement(&self) {
self.value.fetch_sub(1, Ordering::Relaxed);
}
#[inline(always)]
pub fn decrement_by(&self, value: u64) {
self.value.fetch_sub(value, Ordering::Relaxed);
}
#[inline(always)]
pub fn get(&self) -> u64 {
self.value.load(Ordering::Relaxed)
}
pub fn id(&self) -> &'static str {
self.id
}
pub fn description(&self) -> &'static str {
self.description
}
pub fn unit(&self) -> &'static str {
self.unit
}
}
impl AtomicGauge {
pub const fn new(id: &'static str, description: &'static str, unit: &'static str) -> Self {
Self {
id,
description,
unit,
value: AtomicU64::new(0),
}
}
#[inline(always)]
pub fn increment(&self) {
self.value.fetch_add(1, Ordering::Relaxed);
}
#[inline(always)]
pub fn set(&self, value: u64) {
self.value.store(value, Ordering::Relaxed);
}
#[inline(always)]
pub fn decrement(&self) {
self.value.fetch_sub(1, Ordering::Relaxed);
}
#[inline(always)]
pub fn get(&self) -> u64 {
self.value.load(Ordering::Relaxed)
}
#[inline(always)]
pub fn add(&self, value: u64) {
self.value.fetch_add(value, Ordering::Relaxed);
}
#[inline(always)]
pub fn subtract(&self, value: u64) {
self.value.fetch_sub(value, Ordering::Relaxed);
}
pub fn id(&self) -> &'static str {
self.id
}
pub fn description(&self) -> &'static str {
self.description
}
pub fn unit(&self) -> &'static str {
self.unit
}
}

View file

@ -17,18 +17,20 @@ use crate::{
bitset::{AtomicBitset, USIZE_BITS},
channel::{EVENT_COUNT, EVENT_RXS},
subscriber::{Interests, Subscriber},
DeliveryEvent, Event, EventDetails, EventType, Level, NetworkEvent, TracingEvent,
DeliveryEvent, Event, EventDetails, EventType, Level, NetworkEvent, TelemetryEvent,
TOTAL_EVENT_COUNT,
};
type GlobalInterests = AtomicBitset<{ (TOTAL_EVENT_COUNT + USIZE_BITS - 1) / USIZE_BITS }>;
pub(crate) type GlobalInterests =
AtomicBitset<{ (TOTAL_EVENT_COUNT + USIZE_BITS - 1) / USIZE_BITS }>;
pub(crate) static INTERESTS: GlobalInterests = GlobalInterests::new();
pub(crate) static METRIC_INTERESTS: GlobalInterests = GlobalInterests::new();
pub(crate) static TRACE_INTERESTS: GlobalInterests = GlobalInterests::new();
pub(crate) type CollectorThread = JoinHandle<()>;
pub(crate) static ACTIVE_SUBSCRIBERS: Mutex<Vec<String>> = Mutex::new(Vec::new());
pub(crate) static COLLECTOR_UPDATES: Mutex<Vec<Update>> = Mutex::new(Vec::new());
pub(crate) const EVENT_TYPES: [EventType; TOTAL_EVENT_COUNT] = EventType::variants();
#[allow(clippy::enum_variant_names)]
pub(crate) enum Update {
Register {
@ -43,15 +45,14 @@ pub(crate) enum Update {
lossy: bool,
},
UpdateLevels {
custom_levels: AHashMap<EventType, Level>,
levels: AHashMap<EventType, Level>,
},
Shutdown,
}
#[derive(Default)]
pub struct Collector {
subscribers: Vec<Subscriber>,
custom_levels: AHashMap<EventType, Level>,
levels: [Level; TOTAL_EVENT_COUNT],
active_spans: AHashMap<u64, Arc<Event<EventDetails>>>,
}
@ -59,7 +60,7 @@ const EV_CONN_START: usize = EventType::Network(NetworkEvent::ConnectionStart).i
const EV_CONN_END: usize = EventType::Network(NetworkEvent::ConnectionEnd).id();
const EV_ATTEMPT_START: usize = EventType::Delivery(DeliveryEvent::AttemptStart).id();
const EV_ATTEMPT_END: usize = EventType::Delivery(DeliveryEvent::AttemptEnd).id();
const EV_COLLECTOR_UPDATE: usize = EventType::Tracing(TracingEvent::Update).id();
const EV_COLLECTOR_UPDATE: usize = EventType::Telemetry(TelemetryEvent::Update).id();
const STALE_SPAN_CHECK_WATERMARK: usize = 8000;
const SPAN_MAX_HOLD: u64 = 86400;
@ -81,13 +82,10 @@ impl Collector {
match rx.try_recv() {
Ok(Some(event)) => {
// Build event
let event_id = event.inner.id();
let mut event = Event {
inner: EventDetails {
level: self
.custom_levels
.get(&event.inner)
.copied()
.unwrap_or_else(|| event.inner.level()),
level: self.levels[event_id],
typ: event.inner,
timestamp,
span: None,
@ -96,7 +94,6 @@ impl Collector {
};
// Track spans
let event_id = event.inner.typ.id();
let event = match event_id {
EV_CONN_START | EV_ATTEMPT_START => {
let event = Arc::new(event);
@ -213,8 +210,15 @@ impl Collector {
}
}
}
Update::UpdateLevels { custom_levels } => {
self.custom_levels = custom_levels;
Update::UpdateLevels { levels } => {
for event in EVENT_TYPES.iter() {
let event_id = event.id();
if let Some(level) = levels.get(event) {
self.levels[event_id] = *level;
} else {
self.levels[event_id] = event.level();
}
}
}
Update::Shutdown => return false,
}
@ -235,43 +239,26 @@ impl Collector {
}
}
INTERESTS.update(interests);
TRACE_INTERESTS.update(interests);
}
pub fn union_interests(interests: Interests) {
INTERESTS.union(interests);
}
pub fn enable_event(event: impl Into<usize>) {
INTERESTS.set(event);
}
pub fn disable_event(event: impl Into<usize>) {
INTERESTS.clear(event);
}
pub fn disable_all_events() {
INTERESTS.clear_all();
TRACE_INTERESTS.union(interests);
}
#[inline(always)]
pub fn has_interest(event: impl Into<usize>) -> bool {
INTERESTS.get(event)
}
#[inline(always)]
pub fn is_metric(event: impl Into<usize>) -> bool {
METRIC_INTERESTS.get(event)
TRACE_INTERESTS.get(event)
}
pub fn get_subscribers() -> Vec<String> {
ACTIVE_SUBSCRIBERS.lock().clone()
}
pub fn update_custom_levels(custom_levels: AHashMap<EventType, Level>) {
pub fn update_custom_levels(levels: AHashMap<EventType, Level>) {
COLLECTOR_UPDATES
.lock()
.push(Update::UpdateLevels { custom_levels });
.push(Update::UpdateLevels { levels });
}
pub fn update_subscriber(id: String, interests: Interests, lossy: bool) {
@ -292,11 +279,11 @@ impl Collector {
}
pub fn is_enabled() -> bool {
!INTERESTS.is_empty()
!TRACE_INTERESTS.is_empty()
}
pub fn reload() {
Event::new(EventType::Tracing(TracingEvent::Update)).send()
Event::new(EventType::Telemetry(TelemetryEvent::Update)).send()
}
}
@ -315,3 +302,20 @@ pub(crate) fn spawn_collector() -> &'static Arc<CollectorThread> {
)
})
}
impl Default for Collector {
fn default() -> Self {
let mut c = Collector {
subscribers: Vec::new(),
levels: [Level::Disable; TOTAL_EVENT_COUNT],
active_spans: AHashMap::new(),
};
for event in EVENT_TYPES.iter() {
let event_id = event.id();
c.levels[event_id] = event.level();
}
c
}
}

View file

@ -274,7 +274,6 @@ impl StoreEvent {
Self::NotSupported => "Operation not supported",
Self::UnexpectedError => "Unexpected error",
Self::CryptoError => "Crypto error",
Self::IngestError => "Message Ingest error",
_ => "Store error",
}
}
@ -845,18 +844,24 @@ impl EventType {
EventType::MtaSts(event) => event.description(),
EventType::IncomingReport(event) => event.description(),
EventType::OutgoingReport(event) => event.description(),
EventType::Tracing(event) => event.description(),
EventType::Telemetry(event) => event.description(),
EventType::MessageIngest(event) => event.description(),
}
}
pub fn level(&self) -> Level {
match self {
EventType::Store(event) => match event {
StoreEvent::SqlQuery | StoreEvent::LdapQuery | StoreEvent::LdapBind => Level::Trace,
StoreEvent::DataWrite
| StoreEvent::DataIterate
| StoreEvent::BlobRead
| StoreEvent::BlobWrite
| StoreEvent::BlobDelete
| StoreEvent::SqlQuery
| StoreEvent::LdapQuery
| StoreEvent::LdapBind => Level::Trace,
StoreEvent::NotFound => Level::Debug,
StoreEvent::Ingest | StoreEvent::IngestDuplicate => Level::Info,
StoreEvent::IngestError
| StoreEvent::AssertValueFailed
StoreEvent::AssertValueFailed
| StoreEvent::FoundationdbError
| StoreEvent::MysqlError
| StoreEvent::PostgresqlError
@ -1299,9 +1304,13 @@ impl EventType {
DeliveryEvent::RawInput | DeliveryEvent::RawOutput => Level::Trace,
},
EventType::Queue(event) => match event {
QueueEvent::RateLimitExceeded
QueueEvent::QueueMessage
| QueueEvent::QueueMessageSubmission
| QueueEvent::QueueReport
| QueueEvent::QueueDsn
| QueueEvent::QueueAutogenerated
| QueueEvent::RateLimitExceeded
| QueueEvent::ConcurrencyLimitExceeded
| QueueEvent::Scheduled
| QueueEvent::Rescheduled
| QueueEvent::QuotaExceeded => Level::Info,
QueueEvent::LockBusy | QueueEvent::Locked | QueueEvent::BlobNotFound => {
@ -1355,10 +1364,18 @@ impl EventType {
| OutgoingReportEvent::SubmissionError
| OutgoingReportEvent::NoRecipientsFound => Level::Info,
},
EventType::Tracing(event) => match event {
TracingEvent::Update => Level::Disable,
EventType::Telemetry(event) => match event {
TelemetryEvent::Update => Level::Disable,
_ => Level::Warn,
},
EventType::MessageIngest(event) => match event {
MessageIngestEvent::Ham
| MessageIngestEvent::Spam
| MessageIngestEvent::ImapAppend
| MessageIngestEvent::JmapAppend
| MessageIngestEvent::Duplicate => Level::Info,
MessageIngestEvent::Error => Level::Error,
},
}
}
}
@ -1650,7 +1667,6 @@ impl DeliveryEvent {
impl QueueEvent {
pub fn description(&self) -> &'static str {
match self {
QueueEvent::Scheduled => "Message scheduled for delivery",
QueueEvent::Rescheduled => "Message rescheduled for delivery",
QueueEvent::LockBusy => "Queue lock is busy",
QueueEvent::Locked => "Queue is locked",
@ -1658,6 +1674,11 @@ impl QueueEvent {
QueueEvent::RateLimitExceeded => "Rate limit exceeded",
QueueEvent::ConcurrencyLimitExceeded => "Concurrency limit exceeded",
QueueEvent::QuotaExceeded => "Quota exceeded",
QueueEvent::QueueMessage => "Queued message for delivery",
QueueEvent::QueueMessageSubmission => "Queued message submissions for delivery",
QueueEvent::QueueReport => "Queued report for delivery",
QueueEvent::QueueDsn => "Queued DSN for delivery",
QueueEvent::QueueAutogenerated => "Queued autogenerated message for delivery",
}
}
}
@ -1882,14 +1903,15 @@ impl ServerEvent {
}
}
impl TracingEvent {
impl TelemetryEvent {
pub fn description(&self) -> &'static str {
match self {
TracingEvent::Update => "Tracing update",
TracingEvent::LogError => "Log collector error",
TracingEvent::WebhookError => "Webhook collector error",
TracingEvent::OtelError => "OpenTelemetry collector error",
TracingEvent::JournalError => "Journal collector error",
TelemetryEvent::Update => "Tracing update",
TelemetryEvent::LogError => "Log collector error",
TelemetryEvent::WebhookError => "Webhook collector error",
TelemetryEvent::OtelError => "OpenTelemetry collector error",
TelemetryEvent::JournalError => "Journal collector error",
TelemetryEvent::MetricsError => "Metrics collector error",
}
}
}
@ -2069,7 +2091,6 @@ impl MailAuthEvent {
impl StoreEvent {
pub fn description(&self) -> &'static str {
match self {
StoreEvent::IngestError => "Message ingestion error",
StoreEvent::AssertValueFailed => "Another process modified the record",
StoreEvent::FoundationdbError => "FoundationDB error",
StoreEvent::MysqlError => "MySQL error",
@ -2091,11 +2112,27 @@ impl StoreEvent {
StoreEvent::UnexpectedError => "Unexpected store error",
StoreEvent::CryptoError => "Store crypto error",
StoreEvent::BlobMissingMarker => "Blob missing marker",
StoreEvent::Ingest => "Message ingested",
StoreEvent::IngestDuplicate => "Skipping duplicate message",
StoreEvent::SqlQuery => "SQL query executed",
StoreEvent::LdapQuery => "LDAP query executed",
StoreEvent::LdapBind => "LDAP bind operation",
StoreEvent::DataWrite => "Write batch operation",
StoreEvent::BlobRead => "Blob read operation",
StoreEvent::BlobWrite => "Blob write operation",
StoreEvent::BlobDelete => "Blob delete operation",
StoreEvent::DataIterate => "Data store iteration operation",
}
}
}
impl MessageIngestEvent {
pub fn description(&self) -> &'static str {
match self {
MessageIngestEvent::Ham => "Message ingested",
MessageIngestEvent::Spam => "Possible spam message ingested",
MessageIngestEvent::ImapAppend => "Message appended via IMAP",
MessageIngestEvent::JmapAppend => "Message appended via JMAP",
MessageIngestEvent::Duplicate => "Skipping duplicate message",
MessageIngestEvent::Error => "Message ingestion error",
}
}
}

View file

@ -4,6 +4,7 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
pub mod atomic;
pub mod bitset;
pub mod channel;
pub mod collector;
@ -11,6 +12,7 @@ pub mod conv;
pub mod fmt;
pub mod imple;
pub mod macros;
pub mod metrics;
pub mod serializer;
pub mod subscriber;
@ -83,7 +85,6 @@ pub enum Key {
Code,
Collection,
Contents,
Date,
Details,
DkimFail,
DkimNone,
@ -122,9 +123,7 @@ pub enum Key {
ReportId,
Result,
Size,
SourceAccountId,
SourceMailboxId,
Spam,
Source,
SpanId,
SpfFail,
SpfNone,
@ -154,6 +153,7 @@ pub enum EventType {
Eval(EvalEvent),
Acme(AcmeEvent),
Store(StoreEvent),
MessageIngest(MessageIngestEvent),
Jmap(JmapEvent),
Imap(ImapEvent),
ManageSieve(ManageSieveEvent),
@ -188,7 +188,7 @@ pub enum EventType {
MtaSts(MtaStsEvent),
IncomingReport(IncomingReportEvent),
OutgoingReport(OutgoingReportEvent),
Tracing(TracingEvent),
Telemetry(TelemetryEvent),
}
#[event_type]
@ -459,7 +459,11 @@ pub enum DeliveryEvent {
#[event_type]
pub enum QueueEvent {
Scheduled,
QueueMessage,
QueueMessageSubmission,
QueueReport,
QueueDsn,
QueueAutogenerated,
Rescheduled,
LockBusy,
Locked,
@ -644,12 +648,13 @@ pub enum ServerEvent {
}
#[event_type]
pub enum TracingEvent {
pub enum TelemetryEvent {
Update,
LogError,
WebhookError,
OtelError,
JournalError,
MetricsError,
}
#[event_type]
@ -797,7 +802,6 @@ pub enum MailAuthEvent {
#[event_type]
pub enum StoreEvent {
// Errors
IngestError,
AssertValueFailed,
FoundationdbError,
MysqlError,
@ -823,13 +827,25 @@ pub enum StoreEvent {
BlobMissingMarker,
// Traces
DataWrite,
DataIterate,
BlobRead,
BlobWrite,
BlobDelete,
SqlQuery,
LdapQuery,
LdapBind,
}
#[event_type]
pub enum MessageIngestEvent {
// Events
Ingest,
IngestDuplicate,
Ham,
Spam,
ImapAppend,
JmapAppend,
Duplicate,
Error,
}
#[event_type]

View file

@ -24,9 +24,10 @@ macro_rules! error {
let err = $err;
let event_id = err.as_ref().id();
if $crate::collector::Collector::has_interest(event_id)
|| $crate::collector::Collector::is_metric(event_id)
{
if $crate::collector::Collector::is_metric(event_id) {
$crate::collector::Collector::record_metric(*err.as_ref(), event_id, &err.keys);
}
if $crate::collector::Collector::has_interest(event_id) {
err.send();
}
};

455
crates/trc/src/metrics.rs Normal file
View file

@ -0,0 +1,455 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use std::sync::atomic::Ordering;
use crate::{
atomic::{AtomicCounter, AtomicGauge, AtomicHistogram, AtomicU32Array},
collector::{Collector, GlobalInterests, EVENT_TYPES},
subscriber::Interests,
DeliveryEvent, EventType, FtsIndexEvent, HttpEvent, ImapEvent, Key, ManageSieveEvent,
MessageIngestEvent, NetworkEvent, Pop3Event, Protocol, QueueEvent, SmtpEvent, StoreEvent,
Value, TOTAL_EVENT_COUNT,
};
pub(crate) static METRIC_INTERESTS: GlobalInterests = GlobalInterests::new();
static EVENT_COUNTERS: AtomicU32Array<TOTAL_EVENT_COUNT> = AtomicU32Array::new();
static CONNECTION_METRICS: [ConnectionMetrics; TOTAL_CONN_TYPES] = init_conn_metrics();
static MESSAGE_INGESTION_TIME: AtomicHistogram<12> =
AtomicHistogram::<10>::new_short_durations("message.ingestion-time", "Message ingestion time");
static MESSAGE_INDEX_TIME: AtomicHistogram<12> = AtomicHistogram::<10>::new_short_durations(
"message.fts-index-time",
"Message full-text indexing time",
);
static MESSAGE_DELIVERY_TIME: AtomicHistogram<12> = AtomicHistogram::<18>::new_long_durations(
"message.outgoing-delivery-time",
"Total message delivery time from submission to delivery",
);
static MESSAGE_INCOMING_SIZE: AtomicHistogram<12> =
AtomicHistogram::<12>::new_message_sizes("message.incoming-size", "Received message size");
static MESSAGE_SUBMISSION_SIZE: AtomicHistogram<12> = AtomicHistogram::<12>::new_message_sizes(
"message.incoming-submission-size",
"Received message size from authenticated users",
);
static MESSAGE_OUT_REPORT_SIZE: AtomicHistogram<12> = AtomicHistogram::<12>::new_message_sizes(
"message.outgoing-report-size",
"Outgoing report size",
);
static STORE_DATA_READ_TIME: AtomicHistogram<12> =
AtomicHistogram::<10>::new_short_durations("store.data-read-time", "Data store read time");
static STORE_DATA_WRITE_TIME: AtomicHistogram<12> =
AtomicHistogram::<10>::new_short_durations("store.data-write-time", "Data store write time");
static STORE_BLOB_READ_TIME: AtomicHistogram<12> =
AtomicHistogram::<10>::new_short_durations("store.blob-read-time", "Blob store read time");
static STORE_BLOB_WRITE_TIME: AtomicHistogram<12> =
AtomicHistogram::<10>::new_short_durations("store.blob-write-time", "Blob store write time");
static DNS_LOOKUP_TIME: AtomicHistogram<12> =
AtomicHistogram::<10>::new_short_durations("dns.lookup-time", "DNS lookup time");
const CONN_SMTP_IN: usize = 0;
const CONN_SMTP_OUT: usize = 1;
const CONN_IMAP: usize = 2;
const CONN_POP3: usize = 3;
const CONN_HTTP: usize = 4;
const CONN_SIEVE: usize = 5;
const TOTAL_CONN_TYPES: usize = 6;
pub struct ConnectionMetrics {
pub total_connections: AtomicCounter,
pub active_connections: AtomicGauge,
pub bytes_sent: AtomicCounter,
pub bytes_received: AtomicCounter,
pub elapsed: AtomicHistogram<12>,
}
pub struct EventCounter {
id: &'static str,
description: &'static str,
value: u32,
}
impl Collector {
pub fn record_metric(event: EventType, event_id: usize, keys: &[(Key, Value)]) {
// Increment the event counter
EVENT_COUNTERS.add(event_id, 1);
// Extract variables
let mut elapsed = 0;
let mut size = 0;
let mut protocol = Protocol::Gossip;
for (key, value) in keys {
match (key, value) {
(Key::Elapsed, Value::Duration(d)) => elapsed = *d,
(Key::Size, Value::UInt(s)) => size = *s,
(Key::Protocol, Value::Protocol(p)) => protocol = *p,
_ => {}
}
}
match event {
EventType::Network(NetworkEvent::ConnectionStart) => {
let conn = &CONNECTION_METRICS[protocol.idx()];
conn.total_connections.increment();
conn.active_connections.increment();
}
EventType::Network(NetworkEvent::ConnectionEnd) => {
let conn = &CONNECTION_METRICS[protocol.idx()];
conn.active_connections.decrement();
conn.elapsed.observe(elapsed);
}
EventType::Delivery(DeliveryEvent::AttemptStart) => {
let conn = &CONNECTION_METRICS[CONN_SMTP_OUT];
conn.total_connections.increment();
conn.active_connections.increment();
}
EventType::Delivery(DeliveryEvent::AttemptEnd) => {
let conn = &CONNECTION_METRICS[CONN_SMTP_OUT];
conn.active_connections.decrement();
conn.elapsed.observe(elapsed);
}
EventType::Delivery(DeliveryEvent::Completed) => {
MESSAGE_DELIVERY_TIME.observe(elapsed);
}
EventType::Smtp(SmtpEvent::RawInput) => {
CONNECTION_METRICS[CONN_SMTP_IN]
.bytes_received
.increment_by(size);
}
EventType::Smtp(SmtpEvent::RawOutput) => {
CONNECTION_METRICS[CONN_SMTP_IN]
.bytes_sent
.increment_by(size);
}
EventType::Imap(ImapEvent::RawInput) => {
CONNECTION_METRICS[CONN_IMAP]
.bytes_received
.increment_by(size);
}
EventType::Imap(ImapEvent::RawOutput) => {
CONNECTION_METRICS[CONN_IMAP].bytes_sent.increment_by(size);
}
EventType::Http(HttpEvent::RequestBody) => {
CONNECTION_METRICS[CONN_HTTP]
.bytes_received
.increment_by(size);
}
EventType::Http(HttpEvent::ResponseBody) => {
CONNECTION_METRICS[CONN_HTTP].bytes_sent.increment_by(size);
}
EventType::Pop3(Pop3Event::RawInput) => {
CONNECTION_METRICS[CONN_POP3]
.bytes_received
.increment_by(size);
}
EventType::Pop3(Pop3Event::RawOutput) => {
CONNECTION_METRICS[CONN_POP3].bytes_sent.increment_by(size);
}
EventType::ManageSieve(ManageSieveEvent::RawInput) => {
CONNECTION_METRICS[CONN_SIEVE]
.bytes_received
.increment_by(size);
}
EventType::ManageSieve(ManageSieveEvent::RawOutput) => {
CONNECTION_METRICS[CONN_SIEVE].bytes_sent.increment_by(size);
}
EventType::Delivery(DeliveryEvent::RawInput) => {
CONNECTION_METRICS[CONN_SMTP_OUT]
.bytes_received
.increment_by(size);
}
EventType::Delivery(DeliveryEvent::RawOutput) => {
CONNECTION_METRICS[CONN_SMTP_OUT]
.bytes_sent
.increment_by(size);
}
EventType::Delivery(
DeliveryEvent::MxLookup | DeliveryEvent::IpLookup | DeliveryEvent::NullMx,
)
| EventType::TlsRpt(_)
| EventType::MtaSts(_)
| EventType::Dane(_) => {
if elapsed > 0 {
DNS_LOOKUP_TIME.observe(elapsed);
}
}
EventType::MessageIngest(
MessageIngestEvent::Ham
| MessageIngestEvent::Spam
| MessageIngestEvent::ImapAppend
| MessageIngestEvent::JmapAppend,
) => {
MESSAGE_INGESTION_TIME.observe(elapsed);
}
EventType::Queue(QueueEvent::QueueMessage) => {
MESSAGE_INCOMING_SIZE.observe(size);
}
EventType::Queue(QueueEvent::QueueMessageSubmission) => {
MESSAGE_SUBMISSION_SIZE.observe(size);
}
EventType::Queue(QueueEvent::QueueReport) => {
MESSAGE_OUT_REPORT_SIZE.observe(size);
}
EventType::FtsIndex(FtsIndexEvent::Index) => {
MESSAGE_INDEX_TIME.observe(elapsed);
}
EventType::Store(StoreEvent::BlobWrite) => {
STORE_BLOB_WRITE_TIME.observe(elapsed);
}
EventType::Store(StoreEvent::BlobRead) => {
STORE_BLOB_READ_TIME.observe(elapsed);
}
EventType::Store(StoreEvent::DataWrite) => {
STORE_DATA_WRITE_TIME.observe(elapsed);
}
EventType::Store(StoreEvent::DataIterate) => {
STORE_DATA_READ_TIME.observe(elapsed);
}
_ => {}
}
}
#[inline(always)]
pub fn is_metric(event: impl Into<usize>) -> bool {
METRIC_INTERESTS.get(event)
}
pub fn set_metrics(interests: Interests) {
METRIC_INTERESTS.update(interests);
}
pub fn collect_event_counters() -> impl Iterator<Item = EventCounter> {
EVENT_COUNTERS
.inner()
.iter()
.enumerate()
.filter_map(|(event_id, value)| {
let value = value.load(Ordering::Relaxed);
if value > 0 {
let event = EVENT_TYPES[event_id];
Some(EventCounter {
id: event.name(),
description: event.description(),
value,
})
} else {
None
}
})
}
pub fn collect_counters() -> impl Iterator<Item = &'static AtomicCounter> {
CONNECTION_METRICS
.iter()
.flat_map(|m| [&m.total_connections, &m.bytes_sent, &m.bytes_received])
}
pub fn collect_gauges() -> impl Iterator<Item = &'static AtomicGauge> {
CONNECTION_METRICS.iter().map(|m| &m.active_connections)
}
pub fn collect_histograms() -> impl Iterator<Item = &'static AtomicHistogram<12>> {
[
&MESSAGE_INGESTION_TIME,
&MESSAGE_INDEX_TIME,
&MESSAGE_DELIVERY_TIME,
&MESSAGE_INCOMING_SIZE,
&MESSAGE_SUBMISSION_SIZE,
&MESSAGE_OUT_REPORT_SIZE,
&STORE_DATA_READ_TIME,
&STORE_DATA_WRITE_TIME,
&STORE_BLOB_READ_TIME,
&STORE_BLOB_WRITE_TIME,
&DNS_LOOKUP_TIME,
]
.into_iter()
.chain(CONNECTION_METRICS.iter().map(|m| &m.elapsed))
}
}
impl EventCounter {
pub fn id(&self) -> &'static str {
self.id
}
pub fn description(&self) -> &'static str {
self.description
}
pub fn value(&self) -> u32 {
self.value
}
}
impl ConnectionMetrics {
#[allow(clippy::new_without_default)]
pub const fn new() -> Self {
Self {
total_connections: AtomicCounter::new("", "", ""),
active_connections: AtomicGauge::new("", "", ""),
bytes_sent: AtomicCounter::new("", "", ""),
bytes_received: AtomicCounter::new("", "", ""),
elapsed: AtomicHistogram::<18>::new_medium_durations("", ""),
}
}
}
#[allow(clippy::declare_interior_mutable_const)]
const fn init_conn_metrics() -> [ConnectionMetrics; TOTAL_CONN_TYPES] {
const INIT: ConnectionMetrics = ConnectionMetrics::new();
let mut array = [INIT; TOTAL_CONN_TYPES];
let mut i = 0;
while i < TOTAL_CONN_TYPES {
let text = match i {
CONN_HTTP => &[
("http.total-connections", "Total HTTP connections", "number"),
(
"http.active-connections",
"Active HTTP connections",
"number",
),
("http.bytes-sent", "Bytes sent over HTTP", "bytes"),
("http.bytes-received", "Bytes received over HTTP", "bytes"),
("http.request-time", "HTTP request duration", "milliseconds"),
],
CONN_IMAP => &[
("imap.total-connections", "Total IMAP connections", "number"),
(
"imap.active-connections",
"Active IMAP connections",
"number",
),
("imap.bytes-sent", "Bytes sent over IMAP", "bytes"),
("imap.bytes-received", "Bytes received over IMAP", "bytes"),
("imap.request-time", "IMAP request duration", "milliseconds"),
],
CONN_POP3 => &[
("pop3.total-connections", "Total POP3 connections", "number"),
(
"pop3.active-connections",
"Active POP3 connections",
"number",
),
("pop3.bytes-sent", "Bytes sent over POP3", "bytes"),
("pop3.bytes-received", "Bytes received over POP3", "bytes"),
("pop3.request-time", "POP3 request duration", "milliseconds"),
],
CONN_SMTP_IN => &[
(
"smtp-in.total-connections",
"Total SMTP incoming connections",
"number",
),
(
"smtp-in.active-connections",
"Active SMTP incoming connections",
"number",
),
(
"smtp-in.bytes-sent",
"Bytes sent over SMTP incoming",
"bytes",
),
(
"smtp-in.bytes-received",
"Bytes received over SMTP incoming",
"bytes",
),
(
"smtp-in.request-time",
"SMTP incoming request duration",
"milliseconds",
),
],
CONN_SMTP_OUT => &[
(
"smtp-out.total-connections",
"Total SMTP outgoing connections",
"number",
),
(
"smtp-out.active-connections",
"Active SMTP outgoing connections",
"number",
),
(
"smtp-out.bytes-sent",
"Bytes sent over SMTP outgoing",
"bytes",
),
(
"smtp-out.bytes-received",
"Bytes received over SMTP outgoing",
"bytes",
),
(
"smtp-out.request-time",
"SMTP outgoing request duration",
"milliseconds",
),
],
CONN_SIEVE => &[
(
"sieve.total-connections",
"Total ManageSieve connections",
"number",
),
(
"sieve.active-connections",
"Active ManageSieve connections",
"number",
),
("sieve.bytes-sent", "Bytes sent over ManageSieve", "bytes"),
(
"sieve.bytes-received",
"Bytes received over ManageSieve",
"bytes",
),
(
"sieve.request-time",
"ManageSieve request duration",
"milliseconds",
),
],
_ => &[
("", "", ""),
("", "", ""),
("", "", ""),
("", "", ""),
("", "", ""),
],
};
array[i] = ConnectionMetrics {
total_connections: AtomicCounter::new(text[0].0, text[0].1, text[0].2),
active_connections: AtomicGauge::new(text[1].0, text[1].1, text[1].2),
bytes_sent: AtomicCounter::new(text[2].0, text[2].1, text[2].2),
bytes_received: AtomicCounter::new(text[3].0, text[3].1, text[3].2),
elapsed: AtomicHistogram::<18>::new_medium_durations(text[4].0, text[4].1),
};
i += 1;
}
array
}
impl Protocol {
fn idx(&self) -> usize {
match self {
Protocol::Jmap => CONN_IMAP,
Protocol::Imap => CONN_IMAP,
Protocol::Lmtp | Protocol::Smtp => CONN_SMTP_IN,
Protocol::ManageSieve => CONN_SIEVE,
Protocol::Pop3 => CONN_POP3,
Protocol::Http => CONN_HTTP,
_ => unreachable!(),
}
}
}

View file

@ -29,7 +29,7 @@ use ::managesieve::core::ManageSieveSessionManager;
use common::{
config::{
server::{ServerProtocol, Servers},
tracers::Tracers,
telemetry::Telemetry,
},
Core, Ipc, IPC_CHANNEL_BUFFER,
};
@ -312,7 +312,7 @@ async fn init_imap_tests(store_id: &str, delete_if_exists: bool) -> IMAPTest {
let stores = Stores::parse_all(&mut config).await;
// Parse core
let tracers = Tracers::parse(&mut config);
let tracers = Telemetry::parse(&mut config);
let core = Core::parse(&mut config, stores, Default::default()).await;
let store = core.storage.data.clone();
let shared_core = core.into_shared();

View file

@ -13,7 +13,7 @@ use base64::{
use common::{
config::{
server::{ServerProtocol, Servers},
tracers::Tracers,
telemetry::Telemetry,
},
manager::config::{ConfigManager, Patterns},
Core, Ipc, IPC_CHANNEL_BUFFER,
@ -454,7 +454,7 @@ async fn init_jmap_tests(store_id: &str, delete_if_exists: bool) -> JMAPTest {
.cloned()
.unwrap_or_default(),
};
let tracers = Tracers::parse(&mut config);
let tracers = Telemetry::parse(&mut config);
let core = Core::parse(&mut config, stores, config_manager).await;
let store = core.storage.data.clone();
let shared_core = core.into_shared();

View file

@ -64,11 +64,11 @@ impl AssertConfig for utils::config::Config {
#[cfg(test)]
pub fn enable_logging() {
use common::config::tracers::Tracers;
use common::config::telemetry::Telemetry;
if let Ok(level) = std::env::var("LOG") {
if !Collector::is_enabled() {
Tracers::test_tracer(level.parse().expect("Invalid log level"));
Telemetry::test_tracer(level.parse().expect("Invalid log level"));
}
}
}