mirror of
https://github.com/stalwartlabs/mail-server.git
synced 2024-09-20 07:16:18 +08:00
This commit is contained in:
parent
18a24f7220
commit
62f55ad62b
|
@ -62,7 +62,7 @@ Key features:
|
|||
- Inbound throttling and filtering with granular configuration rules, sieve scripting, MTA hooks and milter integration.
|
||||
- Distributed virtual queues with delayed delivery, priority delivery, quotas, routing rules and throttling support.
|
||||
- Envelope rewriting and message modification.
|
||||
- **Spam Phishing** filter:
|
||||
- **Spam and Phishing** filter:
|
||||
- Comprehensive set of filtering **rules** on par with popular solutions.
|
||||
- Statistical **spam classifier** with automatic training capabilities.
|
||||
- DNS Blocklists (**DNSBLs**) checking of IP addresses, domains, and hashes.
|
||||
|
|
|
@ -14,7 +14,7 @@ use jmap_proto::types::collection::Collection;
|
|||
use store::{BitmapKey, Store, Stores};
|
||||
use utils::config::{cron::SimpleCron, utils::ParseValue, Config};
|
||||
|
||||
use super::{license::LicenseValidator, Enterprise, MetricsStore, TraceStore, Undelete};
|
||||
use super::{license::LicenseValidator, Enterprise, MetricStore, TraceStore, Undelete};
|
||||
|
||||
impl Enterprise {
|
||||
pub async fn parse(config: &mut Config, stores: &Stores, data: &Store) -> Option<Self> {
|
||||
|
@ -85,10 +85,10 @@ impl Enterprise {
|
|||
.and_then(|name| stores.stores.get(name))
|
||||
.cloned()
|
||||
{
|
||||
MetricsStore {
|
||||
MetricStore {
|
||||
retention: config
|
||||
.property_or_default::<Option<Duration>>("metrics.history.retention", "30d")
|
||||
.unwrap_or(Some(Duration::from_secs(30 * 24 * 60 * 60))),
|
||||
.property_or_default::<Option<Duration>>("metrics.history.retention", "90d")
|
||||
.unwrap_or(Some(Duration::from_secs(90 * 24 * 60 * 60))),
|
||||
store,
|
||||
interval: config
|
||||
.property_or_default::<SimpleCron>("metrics.history.interval", "0 * *")
|
||||
|
|
|
@ -26,7 +26,7 @@ pub struct Enterprise {
|
|||
pub license: LicenseKey,
|
||||
pub undelete: Option<Undelete>,
|
||||
pub trace_store: Option<TraceStore>,
|
||||
pub metrics_store: Option<MetricsStore>,
|
||||
pub metrics_store: Option<MetricStore>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
|
@ -41,7 +41,7 @@ pub struct TraceStore {
|
|||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct MetricsStore {
|
||||
pub struct MetricStore {
|
||||
pub retention: Option<Duration>,
|
||||
pub store: Store,
|
||||
pub interval: SimpleCron,
|
||||
|
|
|
@ -21,6 +21,7 @@ use config::{
|
|||
};
|
||||
use directory::{core::secret::verify_secret_hash, Directory, Principal, QueryBy, Type};
|
||||
use expr::if_block::IfBlock;
|
||||
use jmap_proto::types::collection::Collection;
|
||||
use listener::{
|
||||
blocked::{AllowedIps, BlockedIps},
|
||||
tls::TlsManager,
|
||||
|
@ -29,10 +30,11 @@ use mail_send::Credentials;
|
|||
|
||||
use sieve::Sieve;
|
||||
use store::{
|
||||
write::{QueueClass, ValueClass},
|
||||
IterateParams, LookupStore, ValueKey,
|
||||
write::{DirectoryClass, QueueClass, ValueClass},
|
||||
BitmapKey, IterateParams, LookupStore, ValueKey,
|
||||
};
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use trc::AddContext;
|
||||
use utils::BlobHash;
|
||||
|
||||
pub mod addresses;
|
||||
|
@ -308,7 +310,7 @@ impl Core {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn message_queue_size(&self) -> trc::Result<u64> {
|
||||
pub async fn total_queued_messages(&self) -> trc::Result<u64> {
|
||||
let mut total = 0;
|
||||
self.storage
|
||||
.data
|
||||
|
@ -327,6 +329,39 @@ impl Core {
|
|||
.await
|
||||
.map(|_| total)
|
||||
}
|
||||
|
||||
pub async fn total_accounts(&self) -> trc::Result<u64> {
|
||||
self.storage
|
||||
.data
|
||||
.get_bitmap(BitmapKey::document_ids(u32::MAX, Collection::Principal))
|
||||
.await
|
||||
.caused_by(trc::location!())
|
||||
.map(|bitmap| bitmap.map_or(0, |b| b.len()))
|
||||
}
|
||||
|
||||
pub async fn total_domains(&self) -> trc::Result<u64> {
|
||||
let mut total = 0;
|
||||
self.storage
|
||||
.data
|
||||
.iterate(
|
||||
IterateParams::new(
|
||||
ValueKey::from(ValueClass::Directory(DirectoryClass::Domain(vec![]))),
|
||||
ValueKey::from(ValueClass::Directory(DirectoryClass::Domain(vec![
|
||||
u8::MAX;
|
||||
10
|
||||
]))),
|
||||
)
|
||||
.no_values()
|
||||
.ascending(),
|
||||
|_, _| {
|
||||
total += 1;
|
||||
Ok(true)
|
||||
},
|
||||
)
|
||||
.await
|
||||
.caused_by(trc::location!())
|
||||
.map(|_| total)
|
||||
}
|
||||
}
|
||||
|
||||
trait CredentialsUsername {
|
||||
|
|
|
@ -29,6 +29,7 @@ pub trait MetricsStore: Sync + Send {
|
|||
fn write_metrics(
|
||||
&self,
|
||||
core: Arc<Core>,
|
||||
timestamp: u64,
|
||||
history: SharedMetricHistory,
|
||||
) -> impl Future<Output = trc::Result<()>> + Send;
|
||||
fn query_metrics(
|
||||
|
@ -51,14 +52,19 @@ struct HistogramHistory {
|
|||
count: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(tag = "type")]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub enum Metric<CI, MI, T> {
|
||||
Counter {
|
||||
id: CI,
|
||||
timestamp: T,
|
||||
value: u32,
|
||||
value: u64,
|
||||
},
|
||||
Gauge {
|
||||
id: MI,
|
||||
timestamp: T,
|
||||
value: u64,
|
||||
},
|
||||
Histogram {
|
||||
id: MI,
|
||||
|
@ -72,16 +78,17 @@ pub type SharedMetricHistory = Arc<Mutex<MetricsHistory>>;
|
|||
|
||||
const TYPE_COUNTER: u64 = 0x00;
|
||||
const TYPE_HISTOGRAM: u64 = 0x01;
|
||||
const TYPE_GAUGE: u64 = 0x02;
|
||||
|
||||
impl MetricsStore for Store {
|
||||
async fn write_metrics(
|
||||
&self,
|
||||
core: Arc<Core>,
|
||||
timestamp: u64,
|
||||
history_: SharedMetricHistory,
|
||||
) -> trc::Result<()> {
|
||||
let mut batch = BatchBuilder::new();
|
||||
{
|
||||
let timestamp = now();
|
||||
let node_id = core.network.node_id;
|
||||
let mut history = history_.lock();
|
||||
for event in [
|
||||
|
@ -91,9 +98,14 @@ impl MetricsStore for Store {
|
|||
EventType::ManageSieve(ManageSieveEvent::ConnectionStart),
|
||||
EventType::Http(HttpEvent::ConnectionStart),
|
||||
EventType::Delivery(DeliveryEvent::AttemptStart),
|
||||
EventType::Delivery(DeliveryEvent::Completed),
|
||||
EventType::Queue(QueueEvent::QueueMessage),
|
||||
EventType::Queue(QueueEvent::QueueMessageAuthenticated),
|
||||
EventType::Queue(QueueEvent::QueueDsn),
|
||||
EventType::Queue(QueueEvent::QueueReport),
|
||||
EventType::MessageIngest(MessageIngestEvent::Ham),
|
||||
EventType::MessageIngest(MessageIngestEvent::Spam),
|
||||
EventType::Auth(AuthEvent::Failed),
|
||||
EventType::Auth(AuthEvent::Banned),
|
||||
EventType::Network(NetworkEvent::DropBlocked),
|
||||
EventType::IncomingReport(IncomingReportEvent::DmarcReport),
|
||||
EventType::IncomingReport(IncomingReportEvent::DmarcReportWithWarnings),
|
||||
|
@ -118,6 +130,23 @@ impl MetricsStore for Store {
|
|||
}
|
||||
}
|
||||
|
||||
for gauge in Collector::collect_gauges(true) {
|
||||
let gauge_id = gauge.id();
|
||||
if matches!(gauge_id, MetricType::QueueCount | MetricType::ServerMemory) {
|
||||
let value = gauge.get();
|
||||
if value > 0 {
|
||||
batch.set(
|
||||
ValueClass::Telemetry(TelemetryClass::Metric {
|
||||
timestamp,
|
||||
metric_id: (gauge_id.code() << 2) | TYPE_GAUGE,
|
||||
node_id,
|
||||
}),
|
||||
KeySerializer::new(U32_LEN).write_leb128(value).finalize(),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for histogram in Collector::collect_histograms(true) {
|
||||
let histogram_id = histogram.id();
|
||||
if matches!(
|
||||
|
@ -191,7 +220,7 @@ impl MetricsStore for Store {
|
|||
let id = EventType::from_code(metric_type >> 2).ok_or_else(|| {
|
||||
trc::Error::corrupted_key(key, None, trc::location!())
|
||||
})?;
|
||||
let (value, _) = value.read_leb128::<u32>().ok_or_else(|| {
|
||||
let (value, _) = value.read_leb128::<u64>().ok_or_else(|| {
|
||||
trc::Error::corrupted_key(key, value.into(), trc::location!())
|
||||
})?;
|
||||
metrics.push(Metric::Counter {
|
||||
|
@ -220,6 +249,19 @@ impl MetricsStore for Store {
|
|||
sum,
|
||||
});
|
||||
}
|
||||
TYPE_GAUGE => {
|
||||
let id = MetricType::from_code(metric_type >> 2).ok_or_else(|| {
|
||||
trc::Error::corrupted_key(key, None, trc::location!())
|
||||
})?;
|
||||
let (value, _) = value.read_leb128::<u64>().ok_or_else(|| {
|
||||
trc::Error::corrupted_key(key, value.into(), trc::location!())
|
||||
})?;
|
||||
metrics.push(Metric::Gauge {
|
||||
id,
|
||||
timestamp,
|
||||
value,
|
||||
});
|
||||
}
|
||||
_ => return Err(trc::Error::corrupted_key(key, None, trc::location!())),
|
||||
}
|
||||
|
||||
|
|
|
@ -302,10 +302,10 @@ impl JMAP {
|
|||
.path()
|
||||
.strip_prefix("/api/telemetry/")
|
||||
.and_then(|p| {
|
||||
p.strip_suffix("traces/live/")
|
||||
p.strip_prefix("traces/live/")
|
||||
.map(|t| ("traces", t))
|
||||
.or_else(|| {
|
||||
p.strip_suffix("metrics/live/")
|
||||
p.strip_prefix("metrics/live/")
|
||||
.map(|t| ("metrics", t))
|
||||
})
|
||||
})
|
||||
|
|
|
@ -263,7 +263,7 @@ impl JMAP {
|
|||
if elapsed >= throttle {
|
||||
last_message = Instant::now();
|
||||
yield Ok(Frame::data(Bytes::from(format!(
|
||||
"event: state\ndata: {}\n\n",
|
||||
"event: trace\ndata: {}\n\n",
|
||||
serde_json::to_string(
|
||||
&JsonEventSerializer::new(std::mem::take(&mut events))
|
||||
.with_description()
|
||||
|
@ -339,7 +339,7 @@ impl JMAP {
|
|||
let before = params
|
||||
.parse::<Timestamp>("before")
|
||||
.map(|t| t.into_inner())
|
||||
.unwrap_or(0);
|
||||
.unwrap_or(u64::MAX);
|
||||
let after = params
|
||||
.parse::<Timestamp>("after")
|
||||
.map(|t| t.into_inner())
|
||||
|
@ -377,6 +377,15 @@ impl JMAP {
|
|||
count,
|
||||
sum,
|
||||
},
|
||||
Metric::Gauge {
|
||||
id,
|
||||
timestamp,
|
||||
value,
|
||||
} => Metric::Gauge {
|
||||
id: id.name(),
|
||||
timestamp: DateTime::from_timestamp(timestamp as i64).to_rfc3339(),
|
||||
value,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -405,6 +414,23 @@ impl JMAP {
|
|||
}
|
||||
}
|
||||
|
||||
// Refresh expensive metrics
|
||||
for metric_type in [
|
||||
MetricType::QueueCount,
|
||||
MetricType::UserCount,
|
||||
MetricType::DomainCount,
|
||||
] {
|
||||
if metric_types.contains(&metric_type) {
|
||||
let value = match metric_type {
|
||||
MetricType::QueueCount => self.core.total_queued_messages().await?,
|
||||
MetricType::UserCount => self.core.total_accounts().await?,
|
||||
MetricType::DomainCount => self.core.total_domains().await?,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
Collector::update_gauge(metric_type, value);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(HttpResponse {
|
||||
status: StatusCode::OK,
|
||||
content_type: "text/event-stream".into(),
|
||||
|
|
|
@ -115,16 +115,29 @@ pub fn decode_path_element(item: &str) -> Cow<'_, str> {
|
|||
.unwrap_or_else(|| item.into())
|
||||
}
|
||||
|
||||
pub(super) struct FutureTimestamp(u64);
|
||||
pub(super) struct Timestamp(u64);
|
||||
|
||||
impl FromStr for Timestamp {
|
||||
type Err = ();
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
if let Some(dt) = DateTime::parse_rfc3339(s) {
|
||||
Ok(Timestamp(dt.to_timestamp() as u64))
|
||||
} else {
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for FutureTimestamp {
|
||||
type Err = ();
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
if let Some(dt) = DateTime::parse_rfc3339(s) {
|
||||
let instant = dt.to_timestamp() as u64;
|
||||
if instant >= now() {
|
||||
return Ok(Timestamp(instant));
|
||||
return Ok(FutureTimestamp(instant));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -132,6 +145,12 @@ impl FromStr for Timestamp {
|
|||
}
|
||||
}
|
||||
|
||||
impl FutureTimestamp {
|
||||
pub fn into_inner(self) -> u64 {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl Timestamp {
|
||||
pub fn into_inner(self) -> u64 {
|
||||
self.0
|
||||
|
|
|
@ -26,7 +26,7 @@ use crate::{
|
|||
JMAP,
|
||||
};
|
||||
|
||||
use super::{decode_path_element, Timestamp};
|
||||
use super::{decode_path_element, FutureTimestamp};
|
||||
|
||||
#[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
|
||||
pub struct Message {
|
||||
|
@ -117,8 +117,12 @@ impl JMAP {
|
|||
let text = params.get("text");
|
||||
let from = params.get("from");
|
||||
let to = params.get("to");
|
||||
let before = params.parse::<Timestamp>("before").map(|t| t.into_inner());
|
||||
let after = params.parse::<Timestamp>("after").map(|t| t.into_inner());
|
||||
let before = params
|
||||
.parse::<FutureTimestamp>("before")
|
||||
.map(|t| t.into_inner());
|
||||
let after = params
|
||||
.parse::<FutureTimestamp>("after")
|
||||
.map(|t| t.into_inner());
|
||||
let page = params.parse::<usize>("page").unwrap_or_default();
|
||||
let limit = params.parse::<usize>("limit").unwrap_or_default();
|
||||
let values = params.has_key("values");
|
||||
|
@ -228,7 +232,7 @@ impl JMAP {
|
|||
}
|
||||
("messages", Some(queue_id), &Method::PATCH) => {
|
||||
let time = params
|
||||
.parse::<Timestamp>("at")
|
||||
.parse::<FutureTimestamp>("at")
|
||||
.map(|t| t.into_inner())
|
||||
.unwrap_or_else(now);
|
||||
let item = params.get("filter");
|
||||
|
|
|
@ -148,6 +148,7 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) {
|
|||
// Metrics history
|
||||
#[cfg(feature = "enterprise")]
|
||||
let metrics_history = SharedMetricHistory::default();
|
||||
let mut next_metric_update = Instant::now();
|
||||
|
||||
loop {
|
||||
match tokio::time::timeout(queue.wake_up_time(), rx.recv()).await {
|
||||
|
@ -425,12 +426,20 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) {
|
|||
ActionClass::OtelMetrics,
|
||||
);
|
||||
|
||||
let update_other_metrics = if Instant::now() >= next_metric_update {
|
||||
next_metric_update =
|
||||
Instant::now() + Duration::from_secs(86400);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
let core = core_.clone();
|
||||
tokio::spawn(async move {
|
||||
#[cfg(feature = "enterprise")]
|
||||
if core.is_enterprise_edition() {
|
||||
// Obtain queue size
|
||||
match core.message_queue_size().await {
|
||||
match core.total_queued_messages().await {
|
||||
Ok(total) => {
|
||||
Collector::update_gauge(
|
||||
MetricType::QueueCount,
|
||||
|
@ -445,6 +454,36 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) {
|
|||
}
|
||||
}
|
||||
|
||||
if update_other_metrics {
|
||||
match core.total_accounts().await {
|
||||
Ok(total) => {
|
||||
Collector::update_gauge(
|
||||
MetricType::UserCount,
|
||||
total,
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
trc::error!(
|
||||
err.details("Failed to obtain account count")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
match core.total_domains().await {
|
||||
Ok(total) => {
|
||||
Collector::update_gauge(
|
||||
MetricType::DomainCount,
|
||||
total,
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
trc::error!(
|
||||
err.details("Failed to obtain domain count")
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
match tokio::task::spawn_blocking(memory_stats::memory_stats)
|
||||
.await
|
||||
{
|
||||
|
@ -486,8 +525,9 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) {
|
|||
let metrics_history = metrics_history.clone();
|
||||
let core = core_.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) =
|
||||
metrics_store.write_metrics(core, metrics_history).await
|
||||
if let Err(err) = metrics_store
|
||||
.write_metrics(core, now(), metrics_history)
|
||||
.await
|
||||
{
|
||||
trc::error!(err.details("Failed to write metrics"));
|
||||
}
|
||||
|
|
|
@ -9,8 +9,8 @@ use super::MetricType;
|
|||
impl MetricType {
|
||||
pub fn name(&self) -> &'static str {
|
||||
match self {
|
||||
Self::MessageIngestionTime => "message.ingestion-time",
|
||||
Self::MessageFtsIndexTime => "message.fts-index-time",
|
||||
Self::MessageIngestionTime => "message-ingest.time",
|
||||
Self::MessageFtsIndexTime => "message-ingest.index-time",
|
||||
Self::DeliveryTotalTime => "delivery.total-time",
|
||||
Self::DeliveryTime => "delivery.attempt-time",
|
||||
Self::MessageSize => "message.size",
|
||||
|
@ -34,6 +34,8 @@ impl MetricType {
|
|||
Self::DeliveryActiveConnections => "delivery.active-connections",
|
||||
Self::ServerMemory => "server.memory",
|
||||
Self::QueueCount => "queue.count",
|
||||
Self::UserCount => "user.count",
|
||||
Self::DomainCount => "domain.count",
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -64,6 +66,8 @@ impl MetricType {
|
|||
Self::DeliveryActiveConnections => "Active delivery connections",
|
||||
Self::ServerMemory => "Server memory usage",
|
||||
Self::QueueCount => "Total number of messages in the queue",
|
||||
Self::UserCount => "Total number of users",
|
||||
Self::DomainCount => "Total number of domains",
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -94,6 +98,8 @@ impl MetricType {
|
|||
| Self::SieveActiveConnections
|
||||
| Self::DeliveryActiveConnections => "connections",
|
||||
Self::QueueCount => "messages",
|
||||
Self::UserCount => "users",
|
||||
Self::DomainCount => "domains",
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -124,6 +130,8 @@ impl MetricType {
|
|||
Self::DeliveryActiveConnections => 22,
|
||||
Self::ServerMemory => 23,
|
||||
Self::QueueCount => 24,
|
||||
Self::UserCount => 25,
|
||||
Self::DomainCount => 26,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -154,14 +162,16 @@ impl MetricType {
|
|||
22 => Some(Self::DeliveryActiveConnections),
|
||||
23 => Some(Self::ServerMemory),
|
||||
24 => Some(Self::QueueCount),
|
||||
25 => Some(Self::UserCount),
|
||||
26 => Some(Self::DomainCount),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn try_parse(name: &str) -> Option<Self> {
|
||||
match name {
|
||||
"message.ingestion-time" => Some(Self::MessageIngestionTime),
|
||||
"message.fts-index-time" => Some(Self::MessageFtsIndexTime),
|
||||
"message-ingest.time" => Some(Self::MessageIngestionTime),
|
||||
"message-ingest.index-time" => Some(Self::MessageFtsIndexTime),
|
||||
"delivery.total-time" => Some(Self::DeliveryTotalTime),
|
||||
"delivery.attempt-time" => Some(Self::DeliveryTime),
|
||||
"message.size" => Some(Self::MessageSize),
|
||||
|
@ -185,6 +195,8 @@ impl MetricType {
|
|||
"delivery.active-connections" => Some(Self::DeliveryActiveConnections),
|
||||
"server.memory" => Some(Self::ServerMemory),
|
||||
"queue.count" => Some(Self::QueueCount),
|
||||
"user.count" => Some(Self::UserCount),
|
||||
"domain.count" => Some(Self::DomainCount),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -47,6 +47,8 @@ static DNS_LOOKUP_TIME: AtomicHistogram<12> =
|
|||
|
||||
static SERVER_MEMORY: AtomicGauge = AtomicGauge::new(MetricType::ServerMemory);
|
||||
static QUEUE_COUNT: AtomicGauge = AtomicGauge::new(MetricType::QueueCount);
|
||||
static USER_COUNT: AtomicGauge = AtomicGauge::new(MetricType::UserCount);
|
||||
static DOMAIN_COUNT: AtomicGauge = AtomicGauge::new(MetricType::DomainCount);
|
||||
|
||||
const CONN_SMTP_IN: usize = 0;
|
||||
const CONN_SMTP_OUT: usize = 1;
|
||||
|
@ -224,8 +226,9 @@ impl Collector {
|
|||
}
|
||||
|
||||
pub fn collect_gauges(is_enterprise: bool) -> impl Iterator<Item = &'static AtomicGauge> {
|
||||
static E_GAUGES: &[&AtomicGauge] = &[&SERVER_MEMORY, &QUEUE_COUNT];
|
||||
static C_GAUGES: &[&AtomicGauge] = &[&SERVER_MEMORY];
|
||||
static E_GAUGES: &[&AtomicGauge] =
|
||||
&[&SERVER_MEMORY, &QUEUE_COUNT, &USER_COUNT, &DOMAIN_COUNT];
|
||||
static C_GAUGES: &[&AtomicGauge] = &[&SERVER_MEMORY, &USER_COUNT, &DOMAIN_COUNT];
|
||||
|
||||
if is_enterprise { E_GAUGES } else { C_GAUGES }
|
||||
.iter()
|
||||
|
@ -275,6 +278,23 @@ impl Collector {
|
|||
match metric_type {
|
||||
MetricType::ServerMemory => SERVER_MEMORY.set(value),
|
||||
MetricType::QueueCount => QUEUE_COUNT.set(value),
|
||||
MetricType::UserCount => USER_COUNT.set(value),
|
||||
MetricType::DomainCount => DOMAIN_COUNT.set(value),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_event_counter(event_type: EventType, value: u32) {
|
||||
EVENT_COUNTERS.add(event_type.into(), value);
|
||||
}
|
||||
|
||||
pub fn update_histogram(metric_type: MetricType, value: u64) {
|
||||
match metric_type {
|
||||
MetricType::MessageIngestionTime => MESSAGE_INGESTION_TIME.observe(value),
|
||||
MetricType::MessageFtsIndexTime => MESSAGE_INDEX_TIME.observe(value),
|
||||
MetricType::DeliveryTotalTime => MESSAGE_DELIVERY_TIME.observe(value),
|
||||
MetricType::DeliveryTime => CONNECTION_METRICS[CONN_SMTP_OUT].elapsed.observe(value),
|
||||
MetricType::DnsLookupTime => DNS_LOOKUP_TIME.observe(value),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -954,6 +954,8 @@ pub enum MetricType {
|
|||
SmtpRequestTime,
|
||||
SieveActiveConnections,
|
||||
SieveRequestTime,
|
||||
UserCount,
|
||||
DomainCount,
|
||||
}
|
||||
|
||||
pub const TOTAL_EVENT_COUNT: usize = total_event_count!();
|
||||
|
|
|
@ -8,21 +8,28 @@
|
|||
*
|
||||
*/
|
||||
|
||||
use std::time::Duration;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use common::{
|
||||
config::telemetry::{StoreTracer, TelemetrySubscriberType},
|
||||
enterprise::{
|
||||
license::LicenseKey, undelete::DeletedBlob, Enterprise, MetricsStore, TraceStore, Undelete,
|
||||
license::LicenseKey, undelete::DeletedBlob, Enterprise, MetricStore, TraceStore, Undelete,
|
||||
},
|
||||
telemetry::tracers::store::{TracingQuery, TracingStore},
|
||||
telemetry::{
|
||||
metrics::store::{Metric, MetricsStore, SharedMetricHistory},
|
||||
tracers::store::{TracingQuery, TracingStore},
|
||||
},
|
||||
Core,
|
||||
};
|
||||
use imap_proto::ResponseType;
|
||||
use jmap::api::management::enterprise::undelete::{UndeleteRequest, UndeleteResponse};
|
||||
use store::write::now;
|
||||
use store::{
|
||||
rand::{self, Rng},
|
||||
write::now,
|
||||
};
|
||||
use trc::{
|
||||
ipc::{bitset::Bitset, subscriber::SubscriberBuilder},
|
||||
DeliveryEvent, EventType, SmtpEvent,
|
||||
*,
|
||||
};
|
||||
use utils::config::cron::SimpleCron;
|
||||
|
||||
|
@ -52,7 +59,7 @@ pub async fn test(params: &mut JMAPTest) {
|
|||
store: core.storage.data.clone(),
|
||||
}
|
||||
.into(),
|
||||
metrics_store: MetricsStore {
|
||||
metrics_store: MetricStore {
|
||||
retention: Some(Duration::from_secs(1)),
|
||||
store: core.storage.data.clone(),
|
||||
interval: SimpleCron::Day { hour: 0, minute: 0 },
|
||||
|
@ -71,6 +78,7 @@ pub async fn test(params: &mut JMAPTest) {
|
|||
|
||||
undelete(params).await;
|
||||
tracing(params).await;
|
||||
metrics(params).await;
|
||||
|
||||
// Disable Enterprise
|
||||
let mut core = params.server.shared_core.load_full().as_ref().clone();
|
||||
|
@ -97,6 +105,7 @@ async fn tracing(params: &mut JMAPTest) {
|
|||
);
|
||||
|
||||
// Make sure there are no span entries in the db
|
||||
store.purge_spans(Duration::from_secs(0)).await.unwrap();
|
||||
assert_eq!(
|
||||
store
|
||||
.query_spans(
|
||||
|
@ -177,6 +186,26 @@ async fn tracing(params: &mut JMAPTest) {
|
|||
}
|
||||
}
|
||||
|
||||
async fn metrics(params: &mut JMAPTest) {
|
||||
// Make sure there are no span entries in the db
|
||||
let store = params.server.core.storage.data.clone();
|
||||
assert_eq!(
|
||||
store.query_metrics(0, u64::MAX).await.unwrap(),
|
||||
Vec::<Metric<EventType, MetricType, u64>>::new()
|
||||
);
|
||||
|
||||
insert_test_metrics(params.server.core.clone()).await;
|
||||
|
||||
let total = store.query_metrics(0, u64::MAX).await.unwrap();
|
||||
assert!(!total.is_empty(), "{total:?}");
|
||||
|
||||
store.purge_metrics(Duration::from_secs(0)).await.unwrap();
|
||||
assert_eq!(
|
||||
store.query_metrics(0, u64::MAX).await.unwrap(),
|
||||
Vec::<Metric<EventType, MetricType, u64>>::new()
|
||||
);
|
||||
}
|
||||
|
||||
async fn undelete(_params: &mut JMAPTest) {
|
||||
// Authenticate
|
||||
let mut imap = ImapConnection::connect(b"_x ").await;
|
||||
|
@ -276,6 +305,69 @@ async fn undelete(_params: &mut JMAPTest) {
|
|||
.assert_contains("Subject: undelete test");
|
||||
}
|
||||
|
||||
pub async fn insert_test_metrics(core: Arc<Core>) {
|
||||
let store = core.storage.data.clone();
|
||||
store.purge_metrics(Duration::from_secs(0)).await.unwrap();
|
||||
let mut start_time = now() - (90 * 24 * 60 * 60);
|
||||
let timestamp = now();
|
||||
let history = SharedMetricHistory::default();
|
||||
|
||||
while start_time < timestamp {
|
||||
for event_type in [
|
||||
EventType::Smtp(SmtpEvent::ConnectionStart),
|
||||
EventType::Imap(ImapEvent::ConnectionStart),
|
||||
EventType::Pop3(Pop3Event::ConnectionStart),
|
||||
EventType::ManageSieve(ManageSieveEvent::ConnectionStart),
|
||||
EventType::Http(HttpEvent::ConnectionStart),
|
||||
EventType::Delivery(DeliveryEvent::AttemptStart),
|
||||
EventType::Queue(QueueEvent::QueueMessage),
|
||||
EventType::Queue(QueueEvent::QueueMessageAuthenticated),
|
||||
EventType::Queue(QueueEvent::QueueDsn),
|
||||
EventType::Queue(QueueEvent::QueueReport),
|
||||
EventType::MessageIngest(MessageIngestEvent::Ham),
|
||||
EventType::MessageIngest(MessageIngestEvent::Spam),
|
||||
EventType::Auth(AuthEvent::Banned),
|
||||
EventType::Auth(AuthEvent::Failed),
|
||||
EventType::Network(NetworkEvent::DropBlocked),
|
||||
EventType::IncomingReport(IncomingReportEvent::DmarcReport),
|
||||
EventType::IncomingReport(IncomingReportEvent::DmarcReportWithWarnings),
|
||||
EventType::IncomingReport(IncomingReportEvent::TlsReport),
|
||||
EventType::IncomingReport(IncomingReportEvent::TlsReportWithWarnings),
|
||||
] {
|
||||
// Generate a random value between 0 and 100
|
||||
Collector::update_event_counter(event_type, rand::thread_rng().gen_range(0..=100))
|
||||
}
|
||||
|
||||
Collector::update_gauge(
|
||||
MetricType::QueueCount,
|
||||
rand::thread_rng().gen_range(0..=1000),
|
||||
);
|
||||
Collector::update_gauge(
|
||||
MetricType::ServerMemory,
|
||||
rand::thread_rng().gen_range(100 * 1024 * 1024..=300 * 1024 * 1024),
|
||||
);
|
||||
|
||||
for metric_type in [
|
||||
MetricType::MessageIngestionTime,
|
||||
MetricType::MessageFtsIndexTime,
|
||||
MetricType::DeliveryTime,
|
||||
MetricType::DnsLookupTime,
|
||||
] {
|
||||
Collector::update_histogram(metric_type, rand::thread_rng().gen_range(2..=1000))
|
||||
}
|
||||
Collector::update_histogram(
|
||||
MetricType::DeliveryTotalTime,
|
||||
rand::thread_rng().gen_range(1000..=5000),
|
||||
);
|
||||
|
||||
store
|
||||
.write_metrics(core.clone(), start_time, history.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
start_time += 60 * 60;
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(serde::Deserialize, Debug)]
|
||||
#[allow(dead_code)]
|
||||
pub(super) struct List<T> {
|
||||
|
|
|
@ -18,6 +18,7 @@ use common::{
|
|||
manager::config::{ConfigManager, Patterns},
|
||||
Core, Ipc, IPC_CHANNEL_BUFFER,
|
||||
};
|
||||
use enterprise::insert_test_metrics;
|
||||
use hyper::{header::AUTHORIZATION, Method};
|
||||
use imap::core::{ImapSessionManager, IMAP};
|
||||
use jmap::{api::JmapSessionManager, JMAP};
|
||||
|
@ -346,6 +347,19 @@ pub async fn jmap_stress_tests() {
|
|||
params.temp_dir.delete();
|
||||
}
|
||||
|
||||
#[ignore]
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
pub async fn jmap_metric_tests() {
|
||||
let params = init_jmap_tests(
|
||||
&std::env::var("STORE")
|
||||
.expect("Missing store type. Try running `STORE=<store_type> cargo test`"),
|
||||
false,
|
||||
)
|
||||
.await;
|
||||
|
||||
insert_test_metrics(params.server.core.clone()).await;
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub struct JMAPTest {
|
||||
server: Arc<JMAP>,
|
||||
|
|
Loading…
Reference in a new issue