Alerts implementation
Some checks failed
trivy / Check (push) Failing after -8m22s

This commit is contained in:
mdecimus 2024-08-28 19:33:29 +02:00
parent 62f55ad62b
commit 7e1b6bd06d
18 changed files with 701 additions and 24 deletions

1
Cargo.lock generated
View file

@ -1061,6 +1061,7 @@ dependencies = [
"jmap_proto",
"libc",
"mail-auth",
"mail-builder",
"mail-parser",
"mail-send",
"md5",

View file

@ -13,6 +13,7 @@ directory = { path = "../directory" }
jmap_proto = { path = "../jmap-proto" }
sieve-rs = { version = "0.5" }
mail-parser = { version = "0.9", features = ["full_encoding", "ludicrous_mode"] }
mail-builder = { version = "0.3", features = ["ludicrous_mode"] }
mail-auth = { version = "0.4" }
mail-send = { version = "0.4", default-features = false, features = ["cram-md5", "ring", "tls12"] }
smtp-proto = { version = "0.1", features = ["serde_support"] }

View file

@ -0,0 +1,158 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: LicenseRef-SEL
*
* This file is subject to the Stalwart Enterprise License Agreement (SEL) and
* is NOT open source software.
*
*/
use mail_builder::{
headers::{
address::{Address, EmailAddress},
HeaderType,
},
MessageBuilder,
};
use trc::{Collector, MetricType, TelemetryEvent, TOTAL_EVENT_COUNT};
use super::{AlertContent, AlertContentToken, AlertMethod};
use crate::{
expr::{functions::ResolveVariable, Variable},
Core,
};
use std::fmt::Write;
#[derive(Debug, PartialEq, Eq)]
pub struct AlertMessage {
pub from: String,
pub to: Vec<String>,
pub body: Vec<u8>,
}
struct CollectorResolver;
impl Core {
pub async fn process_alerts(&self) -> Option<Vec<AlertMessage>> {
let alerts = &self.enterprise.as_ref()?.metrics_alerts;
if alerts.is_empty() {
return None;
}
let mut messages = Vec::new();
for alert in alerts {
if !self
.eval_expr(&alert.condition, &CollectorResolver, &alert.id, 0)
.await
.unwrap_or(false)
{
continue;
}
for method in &alert.method {
match method {
AlertMethod::Email {
from_name,
from_addr,
to,
subject,
body,
} => {
messages.push(AlertMessage {
from: from_addr.clone(),
to: to.clone(),
body: MessageBuilder::new()
.from(Address::Address(EmailAddress {
name: from_name.as_ref().map(|s| s.into()),
email: from_addr.as_str().into(),
}))
.header(
"To",
HeaderType::Address(Address::List(
to.iter()
.map(|to| {
Address::Address(EmailAddress {
name: None,
email: to.as_str().into(),
})
})
.collect(),
)),
)
.header("Auto-Submitted", HeaderType::Text("auto-generated".into()))
.subject(subject.build())
.text_body(body.build())
.write_to_vec()
.unwrap_or_default(),
});
}
AlertMethod::Event { message } => {
trc::event!(
Telemetry(TelemetryEvent::Alert),
Id = alert.id.to_string(),
Details = message.as_ref().map(|m| m.build())
);
#[cfg(feature = "test_mode")]
Collector::update_event_counter(
trc::EventType::Telemetry(TelemetryEvent::Alert),
1,
);
}
}
}
}
(!messages.is_empty()).then_some(messages)
}
}
impl ResolveVariable for CollectorResolver {
fn resolve_variable(&self, variable: u32) -> Variable<'_> {
if (variable as usize) < TOTAL_EVENT_COUNT {
Variable::Integer(Collector::read_event_metric(variable as usize) as i64)
} else if let Some(metric_type) =
MetricType::from_code(variable as u64 - TOTAL_EVENT_COUNT as u64)
{
Variable::Float(Collector::read_metric(metric_type))
} else {
Variable::Integer(0)
}
}
}
impl AlertContent {
pub fn build(&self) -> String {
let mut buf = String::with_capacity(self.len());
for token in &self.0 {
token.write(&mut buf);
}
buf
}
#[allow(clippy::len_without_is_empty)]
pub fn len(&self) -> usize {
self.0.iter().map(|t| t.len()).sum()
}
}
impl AlertContentToken {
fn write(&self, buf: &mut String) {
match self {
AlertContentToken::Text(text) => buf.push_str(text),
AlertContentToken::Metric(metric_type) => {
let _ = write!(buf, "{}", Collector::read_metric(*metric_type));
}
AlertContentToken::Event(event_type) => {
let _ = write!(buf, "{}", Collector::read_event_metric(event_type.id()));
}
}
}
fn len(&self) -> usize {
match self {
AlertContentToken::Text(s) => s.len(),
AlertContentToken::Metric(_) | AlertContentToken::Event(_) => 10,
}
}
}

View file

@ -12,9 +12,19 @@ use std::time::Duration;
use jmap_proto::types::collection::Collection;
use store::{BitmapKey, Store, Stores};
use utils::config::{cron::SimpleCron, utils::ParseValue, Config};
use trc::{EventType, MetricType, TOTAL_EVENT_COUNT};
use utils::config::{
cron::SimpleCron,
utils::{AsKey, ParseValue},
Config,
};
use super::{license::LicenseValidator, Enterprise, MetricStore, TraceStore, Undelete};
use crate::expr::{tokenizer::TokenMap, Expression};
use super::{
license::LicenseValidator, AlertContent, AlertContentToken, AlertMethod, Enterprise,
MetricAlert, MetricStore, TraceStore, Undelete,
};
impl Enterprise {
pub async fn parse(config: &mut Config, stores: &Stores, data: &Store) -> Option<Self> {
@ -110,6 +120,197 @@ impl Enterprise {
.map(|retention| Undelete { retention }),
trace_store,
metrics_store,
metrics_alerts: parse_metric_alerts(config),
})
}
}
pub fn parse_metric_alerts(config: &mut Config) -> Vec<MetricAlert> {
let mut alerts = Vec::new();
for metric_id in config
.sub_keys("metrics.alerts", ".enable")
.map(|s| s.to_string())
.collect::<Vec<_>>()
{
if let Some(alert) = parse_metric_alert(config, metric_id) {
alerts.push(alert);
}
}
alerts
}
fn parse_metric_alert(config: &mut Config, id: String) -> Option<MetricAlert> {
if !config.property_or_default::<bool>(("metrics.alerts", id.as_str(), "enable"), "false")? {
return None;
}
let mut alert = MetricAlert {
condition: Expression::try_parse(
config,
("metrics.alerts", id.as_str(), "condition"),
&TokenMap::default().with_variables_map(
EventType::variants()
.into_iter()
.map(|e| (sanitize_metric_name(e.name()), e.id() as u32))
.chain(MetricType::variants().iter().map(|m| {
(
sanitize_metric_name(m.name()),
m.code() as u32 + TOTAL_EVENT_COUNT as u32,
)
})),
),
)?,
method: Vec::new(),
id,
};
let id_str = alert.id.as_str();
if config
.property_or_default::<bool>(("metrics.alerts", id_str, "notify.event.enable"), "false")
.unwrap_or_default()
{
alert.method.push(AlertMethod::Event {
message: parse_alert_content(
("metrics.alerts", id_str, "notify.event.message"),
config,
),
});
}
if config
.property_or_default::<bool>(("metrics.alerts", id_str, "notify.email.enable"), "false")
.unwrap_or_default()
{
let from_addr = config
.value_require(("metrics.alerts", id_str, "notify.email.from-addr"))?
.trim()
.to_string();
let from_name = config
.value(("metrics.alerts", id_str, "notify.email.from-name"))
.map(|s| s.to_string());
let to = config
.values(("metrics.alerts", id_str, "notify.email.to"))
.filter_map(|(_, s)| {
if s.contains('@') {
s.trim().to_string().into()
} else {
None
}
})
.collect::<Vec<_>>();
let subject =
parse_alert_content(("metrics.alerts", id_str, "notify.email.subject"), config)?;
let body = parse_alert_content(("metrics.alerts", id_str, "notify.email.body"), config)?;
if !from_addr.contains('@') {
config.new_build_error(
("metrics.alerts", id_str, "notify.email.from-addr"),
"Invalid from email address",
);
}
if to.is_empty() {
config.new_build_error(
("metrics.alerts", id_str, "notify.email.to"),
"Missing recipient address(es)",
);
}
if subject.0.is_empty() {
config.new_build_error(
("metrics.alerts", id_str, "notify.email.subject"),
"Missing email subject",
);
}
if body.0.is_empty() {
config.new_build_error(
("metrics.alerts", id_str, "notify.email.body"),
"Missing email body",
);
}
alert.method.push(AlertMethod::Email {
from_name,
from_addr,
to,
subject,
body,
});
}
if alert.method.is_empty() {
config.new_build_error(
("metrics.alerts", id_str),
"No notification method enabled for alert",
);
}
alert.into()
}
fn parse_alert_content(key: impl AsKey, config: &mut Config) -> Option<AlertContent> {
let mut tokens = Vec::new();
let mut value = config.value(key)?.chars().peekable();
let mut buf = String::new();
while let Some(ch) = value.next() {
if ch == '%' && value.peek() == Some(&'{') {
value.next();
let mut var_name = String::new();
let mut found_curly = false;
for ch in value.by_ref() {
if ch == '}' {
found_curly = true;
break;
}
var_name.push(ch);
}
if found_curly && value.peek() == Some(&'%') {
value.next();
if let Some(event_type) = EventType::try_parse(&var_name)
.map(AlertContentToken::Event)
.or_else(|| MetricType::try_parse(&var_name).map(AlertContentToken::Metric))
{
if !buf.is_empty() {
tokens.push(AlertContentToken::Text(std::mem::take(&mut buf)));
}
tokens.push(event_type);
} else {
buf.push('%');
buf.push('{');
buf.push_str(&var_name);
buf.push('}');
buf.push('%');
}
} else {
buf.push('%');
buf.push('{');
buf.push_str(&var_name);
}
} else {
buf.push(ch);
}
}
if !buf.is_empty() {
tokens.push(AlertContentToken::Text(buf));
}
AlertContent(tokens).into()
}
fn sanitize_metric_name(name: &str) -> String {
let mut result = String::with_capacity(name.len());
for ch in name.chars() {
if ch.is_ascii_alphanumeric() {
result.push(ch);
} else {
result.push('_');
}
}
result
}

View file

@ -8,6 +8,7 @@
*
*/
pub mod alerts;
pub mod config;
pub mod license;
pub mod undelete;
@ -17,9 +18,10 @@ use std::time::Duration;
use license::LicenseKey;
use mail_parser::DateTime;
use store::Store;
use trc::{EventType, MetricType};
use utils::config::cron::SimpleCron;
use crate::Core;
use crate::{expr::Expression, Core};
#[derive(Clone)]
pub struct Enterprise {
@ -27,6 +29,7 @@ pub struct Enterprise {
pub undelete: Option<Undelete>,
pub trace_store: Option<TraceStore>,
pub metrics_store: Option<MetricStore>,
pub metrics_alerts: Vec<MetricAlert>,
}
#[derive(Clone)]
@ -47,6 +50,37 @@ pub struct MetricStore {
pub interval: SimpleCron,
}
#[derive(Clone, Debug)]
pub struct MetricAlert {
pub id: String,
pub condition: Expression,
pub method: Vec<AlertMethod>,
}
#[derive(Clone, Debug)]
pub enum AlertMethod {
Email {
from_name: Option<String>,
from_addr: String,
to: Vec<String>,
subject: AlertContent,
body: AlertContent,
},
Event {
message: Option<AlertContent>,
},
}
#[derive(Clone, Debug)]
pub struct AlertContent(pub Vec<AlertContentToken>);
#[derive(Clone, Debug)]
pub enum AlertContentToken {
Text(String),
Metric(MetricType),
Event(EventType),
}
impl Core {
// WARNING: TAMPERING WITH THIS FUNCTION IS STRICTLY PROHIBITED
// Any attempt to modify, bypass, or disable this license validation mechanism

View file

@ -30,7 +30,7 @@ pub struct Tokenizer<'x> {
#[derive(Debug, Default, Clone)]
pub struct TokenMap {
pub tokens: AHashMap<&'static str, Token>,
pub tokens: AHashMap<Cow<'static, str>, Token>,
}
impl<'x> Tokenizer<'x> {
@ -359,19 +359,21 @@ impl TokenMap {
pub fn with_variables(mut self, variables: &[u32]) -> Self {
for (name, idx) in VARIABLES_MAP {
if variables.contains(idx) {
self.tokens.insert(name, Token::Variable(*idx));
self.tokens
.insert(Cow::Borrowed(name), Token::Variable(*idx));
}
}
self
}
pub fn with_variables_map<I>(mut self, vars: I) -> Self
pub fn with_variables_map<I, V>(mut self, vars: I) -> Self
where
I: IntoIterator<Item = (&'static str, u32)>,
I: IntoIterator<Item = (V, u32)>,
V: Into<Cow<'static, str>>,
{
for (name, idx) in vars {
self.tokens.insert(name, Token::Variable(idx));
self.tokens.insert(name.into(), Token::Variable(idx));
}
self
@ -383,7 +385,8 @@ impl TokenMap {
T: Into<Constant>,
{
for (name, constant) in consts {
self.tokens.insert(name, Token::Constant(constant.into()));
self.tokens
.insert(Cow::Borrowed(name), Token::Constant(constant.into()));
}
self
@ -395,7 +398,8 @@ impl TokenMap {
}
pub fn add_constant(&mut self, name: &'static str, constant: impl Into<Constant>) -> &mut Self {
self.tokens.insert(name, Token::Constant(constant.into()));
self.tokens
.insert(Cow::Borrowed(name), Token::Constant(constant.into()));
self
}
}

View file

@ -17,6 +17,7 @@ use common::telemetry::{
tracers::store::TracingStore,
};
use smtp::core::SMTP;
use store::{
write::{now, purge::PurgeStore},
BlobStore, LookupStore, Store,
@ -61,7 +62,9 @@ enum ActionClass {
InternalMetrics,
CalculateMetrics,
#[cfg(feature = "enterprise")]
ReloadSettings,
AlertMetrics,
#[cfg(feature = "enterprise")]
ValidateLicense,
}
#[derive(Default)]
@ -69,6 +72,9 @@ struct Queue {
heap: BinaryHeap<Action>,
}
#[cfg(feature = "enterprise")]
const METRIC_ALERTS_INTERVAL: Duration = Duration::from_secs(5 * 60);
pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) {
tokio::spawn(async move {
trc::event!(Housekeeper(HousekeeperEvent::Start));
@ -132,7 +138,7 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) {
if let Some(enterprise) = &core_.enterprise {
queue.schedule(
Instant::now() + enterprise.license.expires_in(),
ActionClass::ReloadSettings,
ActionClass::ValidateLicense,
);
if let Some(metrics_store) = enterprise.metrics_store.as_ref() {
@ -141,6 +147,13 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) {
ActionClass::InternalMetrics,
);
}
if !enterprise.metrics_alerts.is_empty() {
queue.schedule(
Instant::now() + METRIC_ALERTS_INTERVAL,
ActionClass::AlertMetrics,
);
}
}
// SPDX-SnippetEnd
}
@ -170,6 +183,35 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) {
_ => {}
}
// SPDX-SnippetBegin
// SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
// SPDX-License-Identifier: LicenseRef-SEL
#[cfg(feature = "enterprise")]
if let Some(enterprise) = &core_.enterprise {
if !queue.has_action(&ActionClass::ValidateLicense) {
queue.schedule(
Instant::now() + enterprise.license.expires_in(),
ActionClass::ValidateLicense,
);
}
if let Some(metrics_store) = enterprise.metrics_store.as_ref() {
if !queue.has_action(&ActionClass::InternalMetrics) {
queue.schedule(
Instant::now() + metrics_store.interval.time_to_next(),
ActionClass::InternalMetrics,
);
}
}
if !enterprise.metrics_alerts.is_empty()
&& !queue.has_action(&ActionClass::AlertMetrics)
{
queue.schedule(Instant::now(), ActionClass::AlertMetrics);
}
}
// SPDX-SnippetEnd
// Reload ACME certificates
tokio::spawn(async move {
for provider in core_.tls.acme_providers.values() {
@ -536,7 +578,30 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) {
}
#[cfg(feature = "enterprise")]
ActionClass::ReloadSettings => {
ActionClass::AlertMetrics => {
let smtp = SMTP {
core: core_.clone(),
inner: core.smtp_inner.clone(),
};
tokio::spawn(async move {
if let Some(messages) = smtp.core.process_alerts().await {
for message in messages {
smtp.send_autogenerated(
message.from,
message.to.into_iter(),
message.body,
None,
0,
)
.await;
}
}
});
}
#[cfg(feature = "enterprise")]
ActionClass::ValidateLicense => {
match core_.reload().await {
Ok(result) => {
if let Some(new_core) = result.new_core {
@ -544,7 +609,7 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) {
queue.schedule(
Instant::now()
+ enterprise.license.expires_in(),
ActionClass::ReloadSettings,
ActionClass::ValidateLicense,
);
}

View file

@ -55,7 +55,7 @@ pub enum MessageSource {
Unauthenticated,
Dsn,
Report,
Sieve,
Autogenerated,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]

View file

@ -232,7 +232,7 @@ impl Message {
MessageSource::Unauthenticated => trc::QueueEvent::QueueMessage,
MessageSource::Dsn => trc::QueueEvent::QueueDsn,
MessageSource::Report => trc::QueueEvent::QueueReport,
MessageSource::Sieve => trc::QueueEvent::QueueAutogenerated,
MessageSource::Autogenerated => trc::QueueEvent::QueueAutogenerated,
}),
SpanId = session_id,
QueueId = self.queue_id,

View file

@ -163,6 +163,48 @@ impl SMTP {
.await;
}
pub async fn send_autogenerated(
&self,
from_addr: impl Into<String>,
rcpts: impl Iterator<Item = impl Into<String>>,
raw_message: Vec<u8>,
sign_config: Option<&IfBlock>,
parent_session_id: u64,
) {
// Build message
let from_addr = from_addr.into();
let from_addr_lcase = from_addr.to_lowercase();
let from_addr_domain = from_addr_lcase.domain_part().to_string();
let mut message = self.new_message(
from_addr,
from_addr_lcase,
from_addr_domain,
parent_session_id,
);
for rcpt in rcpts {
message.add_recipient(rcpt, self).await;
}
// Sign message
let signature = if let Some(sign_config) = sign_config {
self.sign_message(&mut message, sign_config, &raw_message)
.await
} else {
None
};
// Queue message
message
.queue(
signature.as_deref(),
&raw_message,
parent_session_id,
self,
MessageSource::Autogenerated,
)
.await;
}
pub async fn schedule_report(&self, report: impl Into<Event>) {
if self.inner.report_tx.send(report.into()).await.is_err() {
trc::event!(

View file

@ -298,7 +298,7 @@ impl SMTP {
raw_message,
session_id,
self,
MessageSource::Sieve,
MessageSource::Autogenerated,
)
.await;
} else {

View file

@ -61,6 +61,16 @@ impl<const N: usize> AtomicHistogram<N> {
self.count.load(Ordering::Relaxed)
}
pub fn average(&self) -> f64 {
let sum = self.sum();
let count = self.count();
if count > 0 {
sum as f64 / count as f64
} else {
0.0
}
}
pub fn min(&self) -> Option<u64> {
let min = self.min.load(Ordering::Relaxed);
if min != u64::MAX {

View file

@ -1168,6 +1168,7 @@ impl ServerEvent {
impl TelemetryEvent {
pub fn description(&self) -> &'static str {
match self {
TelemetryEvent::Alert => "Alert triggered",
TelemetryEvent::LogError => "Log collector error",
TelemetryEvent::WebhookError => "Webhook collector error",
TelemetryEvent::JournalError => "Journal collector error",
@ -1179,6 +1180,7 @@ impl TelemetryEvent {
pub fn explain(&self) -> &'static str {
match self {
TelemetryEvent::Alert => "An alert was triggered",
TelemetryEvent::LogError => "An error occurred with the log collector",
TelemetryEvent::WebhookError => "An error occurred with the webhook collector",
TelemetryEvent::JournalError => "An error occurred with the journal collector",

View file

@ -200,4 +200,36 @@ impl MetricType {
_ => None,
}
}
pub fn variants() -> &'static [Self] {
&[
Self::MessageIngestionTime,
Self::MessageFtsIndexTime,
Self::DeliveryTotalTime,
Self::DeliveryTime,
Self::MessageSize,
Self::MessageAuthSize,
Self::ReportOutgoingSize,
Self::StoreReadTime,
Self::StoreWriteTime,
Self::BlobReadTime,
Self::BlobWriteTime,
Self::DnsLookupTime,
Self::HttpRequestTime,
Self::ImapRequestTime,
Self::Pop3RequestTime,
Self::SmtpRequestTime,
Self::SieveRequestTime,
Self::HttpActiveConnections,
Self::ImapActiveConnections,
Self::Pop3ActiveConnections,
Self::SmtpActiveConnections,
Self::SieveActiveConnections,
Self::DeliveryActiveConnections,
Self::ServerMemory,
Self::QueueCount,
Self::UserCount,
Self::DomainCount,
]
}
}

View file

@ -274,6 +274,50 @@ impl Collector {
EVENT_COUNTERS.get(metric_id)
}
pub fn read_metric(metric_type: MetricType) -> f64 {
match metric_type {
MetricType::ServerMemory => SERVER_MEMORY.get() as f64,
MetricType::MessageIngestionTime => MESSAGE_INGESTION_TIME.average(),
MetricType::MessageFtsIndexTime => MESSAGE_INDEX_TIME.average(),
MetricType::MessageSize => MESSAGE_INCOMING_SIZE.average(),
MetricType::MessageAuthSize => MESSAGE_SUBMISSION_SIZE.average(),
MetricType::DeliveryTotalTime => MESSAGE_DELIVERY_TIME.average(),
MetricType::DeliveryTime => CONNECTION_METRICS[CONN_SMTP_OUT].elapsed.average(),
MetricType::DeliveryActiveConnections => {
CONNECTION_METRICS[CONN_SMTP_OUT].active_connections.get() as f64
}
MetricType::QueueCount => QUEUE_COUNT.get() as f64,
MetricType::ReportOutgoingSize => MESSAGE_OUT_REPORT_SIZE.average(),
MetricType::StoreReadTime => STORE_DATA_READ_TIME.average(),
MetricType::StoreWriteTime => STORE_DATA_WRITE_TIME.average(),
MetricType::BlobReadTime => STORE_BLOB_READ_TIME.average(),
MetricType::BlobWriteTime => STORE_BLOB_WRITE_TIME.average(),
MetricType::DnsLookupTime => DNS_LOOKUP_TIME.average(),
MetricType::HttpActiveConnections => {
CONNECTION_METRICS[CONN_HTTP].active_connections.get() as f64
}
MetricType::HttpRequestTime => CONNECTION_METRICS[CONN_HTTP].elapsed.average(),
MetricType::ImapActiveConnections => {
CONNECTION_METRICS[CONN_IMAP].active_connections.get() as f64
}
MetricType::ImapRequestTime => CONNECTION_METRICS[CONN_IMAP].elapsed.average(),
MetricType::Pop3ActiveConnections => {
CONNECTION_METRICS[CONN_POP3].active_connections.get() as f64
}
MetricType::Pop3RequestTime => CONNECTION_METRICS[CONN_POP3].elapsed.average(),
MetricType::SmtpActiveConnections => {
CONNECTION_METRICS[CONN_SMTP_IN].active_connections.get() as f64
}
MetricType::SmtpRequestTime => CONNECTION_METRICS[CONN_SMTP_IN].elapsed.average(),
MetricType::SieveActiveConnections => {
CONNECTION_METRICS[CONN_SIEVE].active_connections.get() as f64
}
MetricType::SieveRequestTime => CONNECTION_METRICS[CONN_SIEVE].elapsed.average(),
MetricType::UserCount => USER_COUNT.get() as f64,
MetricType::DomainCount => DOMAIN_COUNT.get() as f64,
}
}
pub fn update_gauge(metric_type: MetricType, value: u64) {
match metric_type {
MetricType::ServerMemory => SERVER_MEMORY.set(value),

View file

@ -653,6 +653,7 @@ pub enum ServerEvent {
#[event_type]
pub enum TelemetryEvent {
Alert,
LogError,
WebhookError,
OtelExporterError,

View file

@ -854,6 +854,7 @@ impl EventType {
EventType::Tls(TlsEvent::MultipleCertificatesAvailable) => 545,
EventType::Tls(TlsEvent::NoCertificatesAvailable) => 546,
EventType::Tls(TlsEvent::NotConfigured) => 547,
EventType::Telemetry(TelemetryEvent::Alert) => 548,
}
}
@ -1447,6 +1448,7 @@ impl EventType {
545 => Some(EventType::Tls(TlsEvent::MultipleCertificatesAvailable)),
546 => Some(EventType::Tls(TlsEvent::NoCertificatesAvailable)),
547 => Some(EventType::Tls(TlsEvent::NotConfigured)),
548 => Some(EventType::Telemetry(TelemetryEvent::Alert)),
_ => None,
}
}

View file

@ -13,7 +13,8 @@ use std::{sync::Arc, time::Duration};
use common::{
config::telemetry::{StoreTracer, TelemetrySubscriberType},
enterprise::{
license::LicenseKey, undelete::DeletedBlob, Enterprise, MetricStore, TraceStore, Undelete,
config::parse_metric_alerts, license::LicenseKey, undelete::DeletedBlob, Enterprise,
MetricStore, TraceStore, Undelete,
},
telemetry::{
metrics::store::{Metric, MetricsStore, SharedMetricHistory},
@ -31,18 +32,54 @@ use trc::{
ipc::{bitset::Bitset, subscriber::SubscriberBuilder},
*,
};
use utils::config::cron::SimpleCron;
use utils::config::{cron::SimpleCron, Config};
use crate::{
imap::{ImapConnection, Type},
jmap::delivery::SmtpConnection,
AssertConfig,
};
use super::{delivery::AssertResult, JMAPTest, ManagementApi};
const METRICS_CONFIG: &str = r#"
[metrics.alerts.expected]
enable = true
condition = "domain_count > 1 && cluster_error > 3"
[metrics.alerts.expected.notify.event]
enable = true
message = "Yikes! Found %{cluster.error}% cluster errors!"
[metrics.alerts.expected.notify.email]
enable = true
from-name = "Alert Subsystem"
from-addr = "alert@example.com"
to = ["jdoe@example.com"]
subject = "Found %{cluster.error}% cluster errors"
body = "Sorry for the bad news, but we found %{domain.count}% domains and %{cluster.error}% cluster errors."
[metrics.alerts.unexpected]
enable = true
condition = "domain_count < 1 || cluster_error < 3"
[metrics.alerts.unexpected.notify.event]
enable = true
message = "this should not have happened"
"#;
const RAW_MESSAGE: &str = "From: john@example.com
To: john@example.com
Subject: undelete test
test
";
pub async fn test(params: &mut JMAPTest) {
// Enable Enterprise
let mut core = params.server.shared_core.load_full().as_ref().clone();
let mut config = Config::new(METRICS_CONFIG).unwrap();
core.enterprise = Enterprise {
license: LicenseKey {
valid_to: now() + 3600,
@ -65,8 +102,11 @@ pub async fn test(params: &mut JMAPTest) {
interval: SimpleCron::Day { hour: 0, minute: 0 },
}
.into(),
metrics_alerts: parse_metric_alerts(&mut config),
}
.into();
config.assert_no_errors();
assert_ne!(core.enterprise.as_ref().unwrap().metrics_alerts.len(), 0);
params.server.shared_core.store(core.into());
assert!(params.server.shared_core.load().is_enterprise_edition());
@ -76,6 +116,7 @@ pub async fn test(params: &mut JMAPTest) {
.create_test_user_with_email("jdoe@example.com", "secret", "John Doe")
.await;
alerts(&params.server.shared_core.load()).await;
undelete(params).await;
tracing(params).await;
metrics(params).await;
@ -86,12 +127,51 @@ pub async fn test(params: &mut JMAPTest) {
params.server.shared_core.store(core.into());
}
const RAW_MESSAGE: &str = "From: john@example.com
To: john@example.com
Subject: undelete test
async fn alerts(core: &Core) {
// Make sure the required metrics are set to 0
assert_eq!(
Collector::read_event_metric(EventType::Cluster(ClusterEvent::Error).id()),
0
);
assert_eq!(Collector::read_metric(MetricType::DomainCount), 0.0);
assert_eq!(
Collector::read_event_metric(EventType::Telemetry(TelemetryEvent::Alert).id()),
0
);
test
";
// Increment metrics to trigger alerts
Collector::update_event_counter(EventType::Cluster(ClusterEvent::Error), 5);
Collector::update_gauge(MetricType::DomainCount, 3);
// Make sure the values were set
assert_eq!(
Collector::read_event_metric(EventType::Cluster(ClusterEvent::Error).id()),
5
);
assert_eq!(Collector::read_metric(MetricType::DomainCount), 3.0);
// Process alerts
let message = core.process_alerts().await.unwrap().pop().unwrap();
assert_eq!(message.from, "alert@example.com");
assert_eq!(message.to, vec!["jdoe@example.com".to_string()]);
let body = String::from_utf8(message.body).unwrap();
assert!(
body.contains("Sorry for the bad news, but we found 3 domains and 5 cluster errors."),
"{body:?}"
);
assert!(body.contains("Subject: Found 5 cluster errors"), "{body:?}");
assert!(
body.contains("From: \"Alert Subsystem\" <alert@example.com>"),
"{body:?}"
);
assert!(body.contains("To: <jdoe@example.com>"), "{body:?}");
// Make sure the event was triggered
assert_eq!(
Collector::read_event_metric(EventType::Telemetry(TelemetryEvent::Alert).id()),
1
);
}
async fn tracing(params: &mut JMAPTest) {
// Enable tracing