Live tracing + Tracing history

This commit is contained in:
mdecimus 2024-08-20 17:01:36 +02:00
parent 547c67120a
commit 0b91feffad
33 changed files with 1768 additions and 466 deletions

624
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -5,7 +5,7 @@ authors = ["Stalwart Labs Ltd. <hello@stalw.art>"]
license = "AGPL-3.0-only OR LicenseRef-SEL" license = "AGPL-3.0-only OR LicenseRef-SEL"
repository = "https://github.com/stalwartlabs/cli" repository = "https://github.com/stalwartlabs/cli"
homepage = "https://github.com/stalwartlabs/cli" homepage = "https://github.com/stalwartlabs/cli"
version = "0.9.1" version = "0.9.2"
edition = "2021" edition = "2021"
readme = "README.md" readme = "README.md"
resolver = "2" resolver = "2"

View file

@ -1,6 +1,6 @@
[package] [package]
name = "common" name = "common"
version = "0.9.1" version = "0.9.2"
edition = "2021" edition = "2021"
resolver = "2" resolver = "2"

View file

@ -57,10 +57,10 @@ impl Enterprise {
Some(Enterprise { Some(Enterprise {
license, license,
undelete_period: config undelete_period: config
.property_or_default::<Option<Duration>>("storage.undelete.hold-for", "false") .property_or_default::<Option<Duration>>("storage.undelete.retention", "false")
.unwrap_or_default(), .unwrap_or_default(),
trace_hold_period: config trace_hold_period: config
.property_or_default::<Option<Duration>>("tracing.history.hold-for", "90d") .property_or_default::<Option<Duration>>("tracing.history.retention", "90d")
.unwrap_or(Some(Duration::from_secs(90 * 24 * 60 * 60))), .unwrap_or(Some(Duration::from_secs(90 * 24 * 60 * 60))),
trace_store: config trace_store: config
.value("tracing.history.store") .value("tracing.history.store")

View file

@ -181,7 +181,8 @@ impl TracingStore for Store {
from_span_id: u64, from_span_id: u64,
to_span_id: u64, to_span_id: u64,
) -> trc::Result<Vec<u64>> { ) -> trc::Result<Vec<u64>> {
let mut spans = AHashSet::new(); let mut spans = SpanCollector::Empty;
let num_params = params.len();
for (param_num, param) in params.iter().enumerate() { for (param_num, param) in params.iter().enumerate() {
let (value, exact_len) = match param { let (value, exact_len) = match param {
@ -202,7 +203,7 @@ impl TracingStore for Store {
} }
}; };
let mut param_spans = AHashSet::new(); let mut param_spans = SpanCollector::new(num_params);
self.iterate( self.iterate(
IterateParams::new( IterateParams::new(
ValueKey::from(ValueClass::Trace(TraceClass::Index { ValueKey::from(ValueClass::Trace(TraceClass::Index {
@ -236,17 +237,12 @@ impl TracingStore for Store {
if param_num == 0 { if param_num == 0 {
spans = param_spans; spans = param_spans;
} else { } else if spans.intersect(param_spans) {
spans = spans.intersection(&param_spans).copied().collect(); return Ok(Vec::new());
if spans.is_empty() {
break;
}
} }
} }
let mut spans: Vec<u64> = spans.into_iter().collect(); Ok(spans.into_vec())
spans.sort_unstable();
Ok(spans)
} }
async fn purge_spans(&self, period: Duration) -> trc::Result<()> { async fn purge_spans(&self, period: Duration) -> trc::Result<()> {
@ -316,6 +312,57 @@ impl TracingStore for Store {
} }
} }
enum SpanCollector {
Vec(Vec<u64>),
HashSet(AHashSet<u64>),
Empty,
}
impl SpanCollector {
fn new(num_params: usize) -> Self {
if num_params == 1 {
Self::Vec(Vec::new())
} else {
Self::HashSet(AHashSet::new())
}
}
fn insert(&mut self, span_id: u64) {
match self {
Self::Vec(vec) => vec.push(span_id),
Self::HashSet(set) => {
set.insert(span_id);
}
_ => unreachable!(),
}
}
fn into_vec(self) -> Vec<u64> {
match self {
Self::Vec(mut vec) => {
vec.sort_unstable();
vec
}
Self::HashSet(set) => {
let mut vec: Vec<u64> = set.into_iter().collect();
vec.sort_unstable();
vec
}
Self::Empty => Vec::new(),
}
}
fn intersect(&mut self, other_span: Self) -> bool {
match (self, other_span) {
(Self::HashSet(set), Self::HashSet(other_set)) => {
set.retain(|span_id| other_set.contains(span_id));
set.is_empty()
}
_ => unreachable!(),
}
}
}
impl StoreTracer { impl StoreTracer {
pub fn default_events() -> impl IntoIterator<Item = EventType> { pub fn default_events() -> impl IntoIterator<Item = EventType> {
EventType::variants().into_iter().filter(|event| { EventType::variants().into_iter().filter(|event| {
@ -339,7 +386,7 @@ impl StoreTracer {
| EventType::MailAuth(_) | EventType::MailAuth(_)
| EventType::Queue( | EventType::Queue(
QueueEvent::QueueMessage QueueEvent::QueueMessage
| QueueEvent::QueueMessageSubmission | QueueEvent::QueueMessageAuthenticated
| QueueEvent::QueueReport | QueueEvent::QueueReport
| QueueEvent::QueueDsn | QueueEvent::QueueDsn
| QueueEvent::QueueAutogenerated | QueueEvent::QueueAutogenerated

View file

@ -20,7 +20,8 @@ use store::write::now;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use trc::{ use trc::{
ipc::subscriber::{EventBatch, SubscriberBuilder}, ipc::subscriber::{EventBatch, SubscriberBuilder},
ServerEvent, TelemetryEvent, serializers::json::JsonEventSerializer,
Event, EventDetails, ServerEvent, TelemetryEvent,
}; };
use super::LONG_SLUMBER; use super::LONG_SLUMBER;
@ -97,7 +98,7 @@ pub(crate) fn spawn_webhook_tracer(builder: SubscriberBuilder, settings: Webhook
#[derive(Serialize)] #[derive(Serialize)]
struct EventWrapper { struct EventWrapper {
events: EventBatch, events: JsonEventSerializer<Vec<Arc<Event<EventDetails>>>>,
} }
fn spawn_webhook_handler( fn spawn_webhook_handler(
@ -108,12 +109,14 @@ fn spawn_webhook_handler(
) { ) {
tokio::spawn(async move { tokio::spawn(async move {
in_flight.store(true, Ordering::Relaxed); in_flight.store(true, Ordering::Relaxed);
let wrapper = EventWrapper { events }; let wrapper = EventWrapper {
events: JsonEventSerializer::new(events).with_id(),
};
if let Err(err) = post_webhook_events(&settings, &wrapper).await { if let Err(err) = post_webhook_events(&settings, &wrapper).await {
trc::event!(Telemetry(TelemetryEvent::WebhookError), Details = err); trc::event!(Telemetry(TelemetryEvent::WebhookError), Details = err);
if webhook_tx.send(wrapper.events).await.is_err() { if webhook_tx.send(wrapper.events.into_inner()).await.is_err() {
trc::event!( trc::event!(
Server(ServerEvent::ThreadError), Server(ServerEvent::ThreadError),
Details = "Failed to send failed webhook events back to main thread", Details = "Failed to send failed webhook events back to main thread",

View file

@ -1,6 +1,6 @@
[package] [package]
name = "directory" name = "directory"
version = "0.9.1" version = "0.9.2"
edition = "2021" edition = "2021"
resolver = "2" resolver = "2"

View file

@ -1073,6 +1073,10 @@ pub fn unsupported(details: impl Into<trc::Value>) -> trc::Error {
trc::ManageEvent::NotSupported.ctx(trc::Key::Details, details) trc::ManageEvent::NotSupported.ctx(trc::Key::Details, details)
} }
pub fn enterprise() -> trc::Error {
trc::ManageEvent::NotSupported.ctx(trc::Key::Details, "Enterprise feature")
}
pub fn error(details: impl Into<trc::Value>, reason: Option<impl Into<trc::Value>>) -> trc::Error { pub fn error(details: impl Into<trc::Value>, reason: Option<impl Into<trc::Value>>) -> trc::Error {
trc::ManageEvent::Error trc::ManageEvent::Error
.ctx(trc::Key::Details, details) .ctx(trc::Key::Details, details)

View file

@ -1,6 +1,6 @@
[package] [package]
name = "imap" name = "imap"
version = "0.9.1" version = "0.9.2"
edition = "2021" edition = "2021"
resolver = "2" resolver = "2"

View file

@ -1,6 +1,6 @@
[package] [package]
name = "jmap" name = "jmap"
version = "0.9.1" version = "0.9.2"
edition = "2021" edition = "2021"
resolver = "2" resolver = "2"

View file

@ -279,11 +279,45 @@ impl JMAP {
} }
// Authenticate user // Authenticate user
let (_, access_token) = self.authenticate_headers(&req, &session).await?; match self.authenticate_headers(&req, &session).await {
let body = fetch_body(&mut req, 1024 * 1024, session.session_id).await; Ok((_, access_token)) => {
return self let body = fetch_body(&mut req, 1024 * 1024, session.session_id).await;
.handle_api_manage_request(&req, body, access_token, &session) return self
.await; .handle_api_manage_request(&req, body, access_token, &session)
.await;
}
Err(err) => {
#[cfg(feature = "enterprise")]
{
// SPDX-SnippetBegin
// SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
// SPDX-License-Identifier: LicenseRef-SEL
// Eventsource does not support authentication, validate the token instead
if err.matches(trc::EventType::Auth(trc::AuthEvent::Failed))
&& self.core.is_enterprise_edition()
{
if let Some(token) =
req.uri().path().strip_prefix("/api/tracing/live/")
{
let (account_id, _, _) =
self.validate_access_token("live_tracing", token).await?;
return self
.handle_tracing_api_request(
&req,
vec!["", "live"],
account_id,
)
.await;
}
}
// SPDX-SnippetEnd
}
return Err(err);
}
}
} }
"mail" => { "mail" => {
if req.method() == Method::GET if req.method() == Method::GET

View file

@ -22,7 +22,8 @@ use serde_json::json;
use store::ahash::{AHashMap, AHashSet}; use store::ahash::{AHashMap, AHashSet};
use trc::{ use trc::{
ipc::{bitset::Bitset, subscriber::SubscriberBuilder}, ipc::{bitset::Bitset, subscriber::SubscriberBuilder},
Key, Value, serializers::json::JsonEventSerializer,
DeliveryEvent, EventType, Key, QueueEvent, Value,
}; };
use utils::{snowflake::SnowflakeIdGenerator, url_params::UrlParams}; use utils::{snowflake::SnowflakeIdGenerator, url_params::UrlParams};
@ -39,12 +40,13 @@ impl JMAP {
&self, &self,
req: &HttpRequest, req: &HttpRequest,
path: Vec<&str>, path: Vec<&str>,
account_id: u32,
) -> trc::Result<HttpResponse> { ) -> trc::Result<HttpResponse> {
let params = UrlParams::new(req.uri().query()); let params = UrlParams::new(req.uri().query());
match ( match (
path.get(2).copied().unwrap(), path.get(1).copied().unwrap_or_default(),
path.get(3).copied(), path.get(2).copied(),
req.method(), req.method(),
) { ) {
("spans", None, &Method::GET) => { ("spans", None, &Method::GET) => {
@ -97,16 +99,14 @@ impl JMAP {
.map(|t| t.into_inner()) .map(|t| t.into_inner())
.and_then(SnowflakeIdGenerator::from_timestamp) .and_then(SnowflakeIdGenerator::from_timestamp)
.unwrap_or(0); .unwrap_or(0);
let span_ids = self let values = params.get("values").is_some();
let store = self
.core .core
.enterprise .enterprise
.as_ref() .as_ref()
.and_then(|e| e.trace_store.as_ref()) .and_then(|e| e.trace_store.as_ref())
.ok_or_else(|| { .ok_or_else(|| manage::unsupported("No tracing store has been configured"))?;
manage::error("Unavailable", "No tracing store has been configured".into()) let span_ids = store.query_spans(&tracing_query, after, before).await?;
})?
.query_spans(&tracing_query, after, before)
.await?;
let (total, span_ids) = if limit > 0 { let (total, span_ids) = if limit > 0 {
let offset = page.saturating_sub(1) * limit; let offset = page.saturating_sub(1) * limit;
@ -118,13 +118,41 @@ impl JMAP {
(span_ids.len(), span_ids) (span_ids.len(), span_ids)
}; };
Ok(JsonResponse::new(json!({ if values && !span_ids.is_empty() {
"data": { let mut values = Vec::with_capacity(span_ids.len());
"items": span_ids,
"total": total, for span_id in span_ids {
}, for event in store.get_span(span_id).await? {
})) if matches!(
.into_http_response()) event.inner.typ,
EventType::Delivery(DeliveryEvent::AttemptStart)
| EventType::Queue(
QueueEvent::QueueMessage
| QueueEvent::QueueMessageAuthenticated
)
) {
values.push(event);
break;
}
}
}
Ok(JsonResponse::new(json!({
"data": {
"items": JsonEventSerializer::new(values).with_spans(),
"total": total,
},
}))
.into_http_response())
} else {
Ok(JsonResponse::new(json!({
"data": {
"items": span_ids,
"total": total,
},
}))
.into_http_response())
}
} }
("span", id, &Method::GET) => { ("span", id, &Method::GET) => {
let store = self let store = self
@ -132,9 +160,7 @@ impl JMAP {
.enterprise .enterprise
.as_ref() .as_ref()
.and_then(|e| e.trace_store.as_ref()) .and_then(|e| e.trace_store.as_ref())
.ok_or_else(|| { .ok_or_else(|| manage::unsupported("No tracing store has been configured"))?;
manage::error("Unavailable", "No tracing store has been configured".into())
})?;
let mut events = Vec::new(); let mut events = Vec::new();
for span_id in id for span_id in id
@ -143,21 +169,45 @@ impl JMAP {
.split(',') .split(',')
{ {
if let Ok(span_id) = span_id.parse::<u64>() { if let Ok(span_id) = span_id.parse::<u64>() {
events.push(store.get_span(span_id).await?); events.push(
JsonEventSerializer::new(store.get_span(span_id).await?)
.with_description()
.with_explanation(),
);
} else {
events.push(JsonEventSerializer::new(Vec::new()));
} }
} }
Ok(JsonResponse::new(json!({ if events.len() == 1 && id.is_some() {
"data": events, Ok(JsonResponse::new(json!({
})) "data": events.into_iter().next().unwrap(),
.into_http_response()) }))
.into_http_response())
} else {
Ok(JsonResponse::new(json!({
"data": events,
}))
.into_http_response())
}
} }
("live", None, &Method::GET) => { ("live", Some("token"), &Method::GET) => {
let mut filters = AHashMap::new(); // Issue a live tracing token valid for 60 seconds
Ok(JsonResponse::new(json!({
"data": self.issue_custom_token(account_id, "live_tracing", "web", 60).await?,
}))
.into_http_response())
}
("live", _, &Method::GET) => {
let mut key_filters = AHashMap::new();
let mut filter = None;
for (key, value) in params.into_inner() { for (key, value) in params.into_inner() {
if let Some(key) = Key::try_parse(key.as_ref()) { if key == "filter" {
filters.insert(key, value.into_owned()); filter = value.into_owned().into();
} else if let Some(key) = Key::try_parse(key.to_ascii_lowercase().as_str()) {
key_filters.insert(key, value.into_owned());
} }
} }
@ -189,7 +239,7 @@ impl JMAP {
match tokio::time::timeout(timeout, rx.recv()).await { match tokio::time::timeout(timeout, rx.recv()).await {
Ok(Some(event_batch)) => { Ok(Some(event_batch)) => {
for event in event_batch { for event in event_batch {
if filters.is_empty() if (filter.is_none() && key_filters.is_empty())
|| event || event
.span_id() .span_id()
.map_or(false, |span_id| active_span_ids.contains(&span_id)) .map_or(false, |span_id| active_span_ids.contains(&span_id))
@ -202,34 +252,32 @@ impl JMAP {
.iter() .iter()
.chain(event.inner.span.as_ref().map_or(([]).iter(), |s| s.keys.iter())) .chain(event.inner.span.as_ref().map_or(([]).iter(), |s| s.keys.iter()))
{ {
if let Some(needle) = filters.get(key) { if let Some(needle) = key_filters.get(key).or(filter.as_ref()) {
let matches = match value { let matches = match value {
Value::Static(haystack) => haystack.contains(needle), Value::Static(haystack) => haystack.contains(needle),
Value::String(haystack) => haystack.contains(needle), Value::String(haystack) => haystack.contains(needle),
Value::UInt(haystack) => haystack.to_string().contains(needle),
Value::Int(haystack) => haystack.to_string().contains(needle),
Value::Float(haystack) => haystack.to_string().contains(needle),
Value::Timestamp(haystack) => { Value::Timestamp(haystack) => {
DateTime::from_timestamp(*haystack as i64) DateTime::from_timestamp(*haystack as i64)
.to_rfc3339() .to_rfc3339()
.contains(needle) .contains(needle)
} }
Value::Duration(haystack) => {
haystack.to_string().contains(needle)
}
Value::Bytes(haystack) => std::str::from_utf8(haystack)
.unwrap_or_default()
.contains(needle),
Value::Bool(true) => needle == "true", Value::Bool(true) => needle == "true",
Value::Bool(false) => needle == "false", Value::Bool(false) => needle == "false",
Value::Ipv4(haystack) => haystack.to_string().contains(needle), Value::Ipv4(haystack) => haystack.to_string().contains(needle),
Value::Ipv6(haystack) => haystack.to_string().contains(needle), Value::Ipv6(haystack) => haystack.to_string().contains(needle),
Value::Event(_) | Value::Array(_) | Value::None => false, Value::Event(_) |
Value::Array(_) |
Value::UInt(_) |
Value::Int(_) |
Value::Float(_) |
Value::Duration(_) |
Value::Bytes(_) |
Value::None => false,
}; };
if matches { if matches {
matched_keys.insert(*key); matched_keys.insert(*key);
if matched_keys.len() == filters.len() { if filter.is_some() || matched_keys.len() == key_filters.len() {
if let Some(span_id) = event.span_id() { if let Some(span_id) = event.span_id() {
active_span_ids.insert(span_id); active_span_ids.insert(span_id);
} }
@ -254,10 +302,12 @@ impl JMAP {
last_message = Instant::now(); last_message = Instant::now();
yield Ok(Frame::data(Bytes::from(format!( yield Ok(Frame::data(Bytes::from(format!(
"event: state\ndata: {}\n\n", "event: state\ndata: {}\n\n",
serde_json::to_string(&events).unwrap() serde_json::to_string(
&JsonEventSerializer::new(std::mem::take(&mut events))
.with_description()
.with_explanation()).unwrap_or_default()
)))); ))));
events.clear();
ping_interval ping_interval
} else { } else {
throttle - elapsed throttle - elapsed

View file

@ -21,7 +21,9 @@ use crate::{
struct LogEntry { struct LogEntry {
timestamp: String, timestamp: String,
level: String, level: String,
message: String, event: String,
event_id: String,
details: String,
} }
impl JMAP { impl JMAP {
@ -118,12 +120,15 @@ impl LogEntry {
fn from_line(line: &str) -> Option<Self> { fn from_line(line: &str) -> Option<Self> {
let (timestamp, rest) = line.split_once(' ')?; let (timestamp, rest) = line.split_once(' ')?;
let timestamp = DateTime::parse_from_rfc3339(timestamp).ok()?; let timestamp = DateTime::parse_from_rfc3339(timestamp).ok()?;
let (level, message) = rest.trim().split_once(' ')?; let (level, rest) = rest.trim().split_once(' ')?;
let message = message.split_once(": ").map_or(message, |(_, v)| v); let (event, rest) = rest.trim().split_once(" (")?;
let (event_id, details) = rest.split_once(")")?;
Some(Self { Some(Self {
timestamp: timestamp.to_rfc3339_opts(chrono::SecondsFormat::Secs, true), timestamp: timestamp.to_rfc3339_opts(chrono::SecondsFormat::Secs, true),
level: level.to_string(), level: level.to_string(),
message: message.to_string(), event: event.to_string(),
event_id: event_id.to_string(),
details: details.trim().to_string(),
}) })
} }
} }

View file

@ -83,7 +83,7 @@ impl JMAP {
// SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art> // SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
// SPDX-License-Identifier: LicenseRef-SEL // SPDX-License-Identifier: LicenseRef-SEL
#[cfg(feature = "enterprise")] #[cfg(feature = "enterprise")]
"tracing" if is_superuser && self.core.is_enterprise_edition() => { "tracing" if is_superuser => {
// WARNING: TAMPERING WITH THIS FUNCTION IS STRICTLY PROHIBITED // WARNING: TAMPERING WITH THIS FUNCTION IS STRICTLY PROHIBITED
// Any attempt to modify, bypass, or disable this license validation mechanism // Any attempt to modify, bypass, or disable this license validation mechanism
// constitutes a severe violation of the Stalwart Enterprise License Agreement. // constitutes a severe violation of the Stalwart Enterprise License Agreement.
@ -93,7 +93,12 @@ impl JMAP {
// violators to the fullest extent of the law, including but not limited to claims // violators to the fullest extent of the law, including but not limited to claims
// for copyright infringement, breach of contract, and fraud. // for copyright infringement, breach of contract, and fraud.
self.handle_tracing_api_request(req, path).await if self.core.is_enterprise_edition() {
self.handle_tracing_api_request(req, path, access_token.primary_id())
.await
} else {
Err(manage::enterprise())
}
} }
// SPDX-SnippetEnd // SPDX-SnippetEnd
_ => Err(trc::ResourceEvent::NotFound.into_err()), _ => Err(trc::ResourceEvent::NotFound.into_err()),

View file

@ -6,7 +6,7 @@
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine};
use common::manager::webadmin::Resource; use common::manager::webadmin::Resource;
use directory::backend::internal::manage::ManageDirectory; use directory::backend::internal::manage::{self, ManageDirectory};
use hyper::Method; use hyper::Method;
use serde_json::json; use serde_json::json;
use utils::url_params::UrlParams; use utils::url_params::UrlParams;
@ -123,7 +123,7 @@ impl JMAP {
// SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art> // SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
// SPDX-License-Identifier: LicenseRef-SEL // SPDX-License-Identifier: LicenseRef-SEL
#[cfg(feature = "enterprise")] #[cfg(feature = "enterprise")]
(Some("undelete"), _, _, _) if self.core.is_enterprise_edition() => { (Some("undelete"), _, _, _) => {
// WARNING: TAMPERING WITH THIS FUNCTION IS STRICTLY PROHIBITED // WARNING: TAMPERING WITH THIS FUNCTION IS STRICTLY PROHIBITED
// Any attempt to modify, bypass, or disable this license validation mechanism // Any attempt to modify, bypass, or disable this license validation mechanism
// constitutes a severe violation of the Stalwart Enterprise License Agreement. // constitutes a severe violation of the Stalwart Enterprise License Agreement.
@ -133,8 +133,12 @@ impl JMAP {
// violators to the fullest extent of the law, including but not limited to claims // violators to the fullest extent of the law, including but not limited to claims
// for copyright infringement, breach of contract, and fraud. // for copyright infringement, breach of contract, and fraud.
self.handle_undelete_api_request(req, path, body, session) if self.core.is_enterprise_edition() {
.await self.handle_undelete_api_request(req, path, body, session)
.await
} else {
Err(manage::enterprise())
}
} }
// SPDX-SnippetEnd // SPDX-SnippetEnd
_ => Err(trc::ResourceEvent::NotFound.into_err()), _ => Err(trc::ResourceEvent::NotFound.into_err()),

View file

@ -232,6 +232,26 @@ impl JMAP {
}) })
} }
pub async fn issue_custom_token(
&self,
account_id: u32,
grant_type: &str,
client_id: &str,
expiry_in: u64,
) -> trc::Result<String> {
self.encode_access_token(
grant_type,
account_id,
&self
.password_hash(account_id)
.await
.map_err(|err| trc::StoreEvent::UnexpectedError.into_err().details(err))?,
client_id,
expiry_in,
)
.map_err(|err| trc::StoreEvent::UnexpectedError.into_err().details(err))
}
fn encode_access_token( fn encode_access_token(
&self, &self,
grant_type: &str, grant_type: &str,

View file

@ -7,7 +7,7 @@ homepage = "https://stalw.art"
keywords = ["imap", "jmap", "smtp", "email", "mail", "server"] keywords = ["imap", "jmap", "smtp", "email", "mail", "server"]
categories = ["email"] categories = ["email"]
license = "AGPL-3.0-only OR LicenseRef-SEL" license = "AGPL-3.0-only OR LicenseRef-SEL"
version = "0.9.1" version = "0.9.2"
edition = "2021" edition = "2021"
resolver = "2" resolver = "2"

View file

@ -1,6 +1,6 @@
[package] [package]
name = "managesieve" name = "managesieve"
version = "0.9.1" version = "0.9.2"
edition = "2021" edition = "2021"
resolver = "2" resolver = "2"

View file

@ -1,6 +1,6 @@
[package] [package]
name = "nlp" name = "nlp"
version = "0.9.1" version = "0.9.2"
edition = "2021" edition = "2021"
resolver = "2" resolver = "2"

View file

@ -1,6 +1,6 @@
[package] [package]
name = "pop3" name = "pop3"
version = "0.9.1" version = "0.9.2"
edition = "2021" edition = "2021"
resolver = "2" resolver = "2"

View file

@ -7,7 +7,7 @@ homepage = "https://stalw.art/smtp"
keywords = ["smtp", "email", "mail", "server"] keywords = ["smtp", "email", "mail", "server"]
categories = ["email"] categories = ["email"]
license = "AGPL-3.0-only OR LicenseRef-SEL" license = "AGPL-3.0-only OR LicenseRef-SEL"
version = "0.9.1" version = "0.9.2"
edition = "2021" edition = "2021"
resolver = "2" resolver = "2"

View file

@ -228,7 +228,7 @@ impl Message {
trc::event!( trc::event!(
Queue(match source { Queue(match source {
MessageSource::Authenticated => trc::QueueEvent::QueueMessageSubmission, MessageSource::Authenticated => trc::QueueEvent::QueueMessageAuthenticated,
MessageSource::Unauthenticated => trc::QueueEvent::QueueMessage, MessageSource::Unauthenticated => trc::QueueEvent::QueueMessage,
MessageSource::Dsn => trc::QueueEvent::QueueDsn, MessageSource::Dsn => trc::QueueEvent::QueueDsn,
MessageSource::Report => trc::QueueEvent::QueueReport, MessageSource::Report => trc::QueueEvent::QueueReport,

View file

@ -1,6 +1,6 @@
[package] [package]
name = "store" name = "store"
version = "0.9.1" version = "0.9.2"
edition = "2021" edition = "2021"
resolver = "2" resolver = "2"

View file

@ -1,6 +1,6 @@
[package] [package]
name = "trc" name = "trc"
version = "0.9.1" version = "0.9.2"
edition = "2021" edition = "2021"
resolver = "2" resolver = "2"

File diff suppressed because it is too large Load diff

View file

@ -468,7 +468,7 @@ impl EventType {
}, },
EventType::Queue(event) => match event { EventType::Queue(event) => match event {
QueueEvent::QueueMessage QueueEvent::QueueMessage
| QueueEvent::QueueMessageSubmission | QueueEvent::QueueMessageAuthenticated
| QueueEvent::QueueReport | QueueEvent::QueueReport
| QueueEvent::QueueDsn | QueueEvent::QueueDsn
| QueueEvent::QueueAutogenerated | QueueEvent::QueueAutogenerated

View file

@ -692,3 +692,9 @@ impl From<EventType> for usize {
value.id() value.id()
} }
} }
impl AsRef<Event<EventDetails>> for Event<EventDetails> {
fn as_ref(&self) -> &Event<EventDetails> {
self
}
}

View file

@ -232,7 +232,7 @@ impl Collector {
EventType::Queue(QueueEvent::QueueMessage) => { EventType::Queue(QueueEvent::QueueMessage) => {
MESSAGE_INCOMING_SIZE.observe(size); MESSAGE_INCOMING_SIZE.observe(size);
} }
EventType::Queue(QueueEvent::QueueMessageSubmission) => { EventType::Queue(QueueEvent::QueueMessageAuthenticated) => {
MESSAGE_SUBMISSION_SIZE.observe(size); MESSAGE_SUBMISSION_SIZE.observe(size);
} }
EventType::Queue(QueueEvent::QueueReport) => { EventType::Queue(QueueEvent::QueueReport) => {
@ -632,7 +632,7 @@ impl EventType {
EventType::Delivery(_) => true, EventType::Delivery(_) => true,
EventType::Queue( EventType::Queue(
QueueEvent::QueueMessage QueueEvent::QueueMessage
| QueueEvent::QueueMessageSubmission | QueueEvent::QueueMessageAuthenticated
| QueueEvent::QueueReport | QueueEvent::QueueReport
| QueueEvent::QueueDsn | QueueEvent::QueueDsn
| QueueEvent::QueueAutogenerated | QueueEvent::QueueAutogenerated

View file

@ -466,7 +466,7 @@ pub enum DeliveryEvent {
#[event_type] #[event_type]
pub enum QueueEvent { pub enum QueueEvent {
QueueMessage, QueueMessage,
QueueMessageSubmission, QueueMessageAuthenticated,
QueueReport, QueueReport,
QueueDsn, QueueDsn,
QueueAutogenerated, QueueAutogenerated,

View file

@ -687,7 +687,7 @@ impl EventType {
EventType::Queue(QueueEvent::QueueAutogenerated) => 378, EventType::Queue(QueueEvent::QueueAutogenerated) => 378,
EventType::Queue(QueueEvent::QueueDsn) => 379, EventType::Queue(QueueEvent::QueueDsn) => 379,
EventType::Queue(QueueEvent::QueueMessage) => 380, EventType::Queue(QueueEvent::QueueMessage) => 380,
EventType::Queue(QueueEvent::QueueMessageSubmission) => 381, EventType::Queue(QueueEvent::QueueMessageAuthenticated) => 381,
EventType::Queue(QueueEvent::QueueReport) => 382, EventType::Queue(QueueEvent::QueueReport) => 382,
EventType::Queue(QueueEvent::QuotaExceeded) => 383, EventType::Queue(QueueEvent::QuotaExceeded) => 383,
EventType::Queue(QueueEvent::RateLimitExceeded) => 384, EventType::Queue(QueueEvent::RateLimitExceeded) => 384,
@ -1276,7 +1276,7 @@ impl EventType {
378 => Some(EventType::Queue(QueueEvent::QueueAutogenerated)), 378 => Some(EventType::Queue(QueueEvent::QueueAutogenerated)),
379 => Some(EventType::Queue(QueueEvent::QueueDsn)), 379 => Some(EventType::Queue(QueueEvent::QueueDsn)),
380 => Some(EventType::Queue(QueueEvent::QueueMessage)), 380 => Some(EventType::Queue(QueueEvent::QueueMessage)),
381 => Some(EventType::Queue(QueueEvent::QueueMessageSubmission)), 381 => Some(EventType::Queue(QueueEvent::QueueMessageAuthenticated)),
382 => Some(EventType::Queue(QueueEvent::QueueReport)), 382 => Some(EventType::Queue(QueueEvent::QueueReport)),
383 => Some(EventType::Queue(QueueEvent::QuotaExceeded)), 383 => Some(EventType::Queue(QueueEvent::QuotaExceeded)),
384 => Some(EventType::Queue(QueueEvent::RateLimitExceeded)), 384 => Some(EventType::Queue(QueueEvent::RateLimitExceeded)),

View file

@ -4,104 +4,243 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/ */
use ahash::AHashSet; use crate::{Event, EventDetails, EventType, Key, Value};
use mail_parser::DateTime; use ahash::AHashSet;
use serde::{ser::SerializeMap, Serialize, Serializer}; use base64::{engine::general_purpose::STANDARD, Engine};
use mail_parser::DateTime;
use crate::{Event, EventDetails, EventType, Key, Value}; use serde::{
ser::{SerializeMap, SerializeSeq},
struct Keys<'x> { Serialize, Serializer,
keys: &'x [(Key, Value)], };
span_keys: &'x [(Key, Value)],
} struct Keys<'x> {
keys: &'x [(Key, Value)],
impl Serialize for Event<EventDetails> { span_keys: &'x [(Key, Value)],
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> }
where
S: Serializer, pub struct JsonEventSerializer<T> {
{ inner: T,
let mut map = serializer.serialize_map(Some(4))?; with_id: bool,
map.serialize_entry( with_spans: bool,
"id", with_description: bool,
&format!("{}{}", self.inner.timestamp, self.inner.typ.id()), with_explanation: bool,
)?; }
map.serialize_entry(
"createdAt", impl<T> JsonEventSerializer<T> {
&DateTime::from_timestamp(self.inner.timestamp as i64).to_rfc3339(), pub fn new(inner: T) -> Self {
)?; Self {
map.serialize_entry("type", self.inner.typ.name())?; inner,
map.serialize_entry( with_id: false,
"data", with_spans: false,
&Keys { with_description: false,
keys: self.keys.as_slice(), with_explanation: false,
span_keys: self.inner.span.as_ref().map(|s| &s.keys[..]).unwrap_or(&[]), }
}, }
)?;
map.end() pub fn with_id(mut self) -> Self {
} self.with_id = true;
} self
}
impl<'x> Serialize for Keys<'x> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> pub fn with_spans(mut self) -> Self {
where self.with_spans = true;
S: Serializer, self
{ }
let keys_len = self.keys.len() + self.span_keys.len();
let mut seen_keys = AHashSet::with_capacity(keys_len); pub fn with_description(mut self) -> Self {
let mut keys = serializer.serialize_map(Some(keys_len))?; self.with_description = true;
for (key, value) in self.span_keys.iter().chain(self.keys.iter()) { self
if !matches!(value, Value::None) }
&& !matches!(key, Key::SpanId)
&& seen_keys.insert(*key) pub fn with_explanation(mut self) -> Self {
{ self.with_explanation = true;
keys.serialize_entry(key.name(), value)?; self
} }
}
keys.end() pub fn into_inner(self) -> T {
} self.inner
} }
}
impl Serialize for Event<EventType> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> impl<T: AsRef<Event<EventDetails>>> Serialize for JsonEventSerializer<Vec<T>> {
where fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
S: Serializer, where
{ S: Serializer,
let mut map = serializer.serialize_map(Some(4))?; {
map.serialize_entry("type", self.inner.name())?; let mut seq = serializer.serialize_seq(Some(self.inner.len()))?;
map.serialize_entry( for event in &self.inner {
"data", seq.serialize_element(&JsonEventSerializer {
&Keys { inner: event,
keys: self.keys.as_slice(), with_id: self.with_id,
span_keys: &[], with_spans: self.with_spans,
}, with_description: self.with_description,
)?; with_explanation: self.with_explanation,
map.end() })?;
} }
} seq.end()
}
impl Serialize for Value { }
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where impl<T: AsRef<Event<EventDetails>>> Serialize for JsonEventSerializer<T> {
S: Serializer, fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
{ where
match self { S: Serializer,
Value::Static(value) => value.serialize(serializer), {
Value::String(value) => value.serialize(serializer), let event = self.inner.as_ref();
Value::UInt(value) => value.serialize(serializer), let mut map = serializer.serialize_map(None)?;
Value::Int(value) => value.serialize(serializer), if self.with_id {
Value::Float(value) => value.serialize(serializer), map.serialize_entry(
Value::Timestamp(value) => DateTime::from_timestamp(*value as i64) "id",
.to_rfc3339() &format!("{}{}", event.inner.timestamp, event.inner.typ.id()),
.serialize(serializer), )?;
Value::Duration(value) => value.serialize(serializer), }
Value::Bytes(value) => value.serialize(serializer), if self.with_description {
Value::Bool(value) => value.serialize(serializer), map.serialize_entry("text", event.inner.typ.description())?;
Value::Ipv4(value) => value.serialize(serializer), }
Value::Ipv6(value) => value.serialize(serializer), if self.with_explanation {
Value::Event(value) => value.serialize(serializer), map.serialize_entry("details", event.inner.typ.explain())?;
Value::Array(value) => value.serialize(serializer), }
Value::None => unreachable!(), map.serialize_entry(
} "createdAt",
} &DateTime::from_timestamp(event.inner.timestamp as i64).to_rfc3339(),
} )?;
map.serialize_entry("type", event.inner.typ.name())?;
map.serialize_entry(
"data",
&JsonEventSerializer {
inner: Keys {
keys: event.keys.as_slice(),
span_keys: event
.inner
.span
.as_ref()
.map(|s| &s.keys[..])
.unwrap_or(&[]),
},
with_spans: self.with_spans,
with_description: self.with_description,
with_explanation: self.with_explanation,
with_id: self.with_id,
},
)?;
map.end()
}
}
impl<'x> Serialize for JsonEventSerializer<Keys<'x>> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let keys_len = self.inner.keys.len() + self.inner.span_keys.len();
let mut seen_keys = AHashSet::with_capacity(keys_len);
let mut keys = serializer.serialize_map(Some(keys_len))?;
for (key, value) in self.inner.span_keys.iter().chain(self.inner.keys.iter()) {
if !matches!(value, Value::None)
&& (self.with_spans || !matches!(key, Key::SpanId))
&& seen_keys.insert(*key)
{
keys.serialize_entry(
key.name(),
&JsonEventSerializer {
inner: value,
with_spans: self.with_spans,
with_description: self.with_description,
with_explanation: self.with_explanation,
with_id: self.with_id,
},
)?;
}
}
keys.end()
}
}
impl Serialize for JsonEventSerializer<&Event<EventType>> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", self.inner.inner.name())?;
if self.with_description {
map.serialize_entry("text", self.inner.inner.description())?;
}
if self.with_explanation {
map.serialize_entry("details", self.inner.inner.explain())?;
}
map.serialize_entry(
"data",
&JsonEventSerializer {
inner: Keys {
keys: self.inner.keys.as_slice(),
span_keys: &[],
},
with_spans: self.with_spans,
with_description: self.with_description,
with_explanation: self.with_explanation,
with_id: self.with_id,
},
)?;
map.end()
}
}
impl Serialize for JsonEventSerializer<&Value> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match &self.inner {
Value::Static(value) => value.serialize(serializer),
Value::String(value) => value.serialize(serializer),
Value::UInt(value) => value.serialize(serializer),
Value::Int(value) => value.serialize(serializer),
Value::Float(value) => value.serialize(serializer),
Value::Timestamp(value) => DateTime::from_timestamp(*value as i64)
.to_rfc3339()
.serialize(serializer),
Value::Duration(value) => value.serialize(serializer),
Value::Bytes(value) => STANDARD.encode(value).serialize(serializer),
Value::Bool(value) => value.serialize(serializer),
Value::Ipv4(value) => value.serialize(serializer),
Value::Ipv6(value) => value.serialize(serializer),
Value::Event(value) => JsonEventSerializer {
inner: value,
with_spans: self.with_spans,
with_description: self.with_description,
with_explanation: self.with_explanation,
with_id: self.with_id,
}
.serialize(serializer),
Value::Array(value) => JsonEventSerializer {
inner: value,
with_spans: self.with_spans,
with_description: self.with_description,
with_explanation: self.with_explanation,
with_id: self.with_id,
}
.serialize(serializer),
Value::None => unreachable!(),
}
}
}
impl Serialize for JsonEventSerializer<&Vec<Value>> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut seq = serializer.serialize_seq(Some(self.inner.len()))?;
for value in self.inner {
seq.serialize_element(&JsonEventSerializer {
inner: value,
with_spans: self.with_spans,
with_description: self.with_description,
with_explanation: self.with_explanation,
with_id: self.with_id,
})?;
}
seq.end()
}
}

View file

@ -381,6 +381,7 @@ impl Display for Event<EventType> {
mod tests { mod tests {
use crate::{EventType, Level}; use crate::{EventType, Level};
#[allow(dead_code)]
fn to_camel_case(name: &str) -> String { fn to_camel_case(name: &str) -> String {
let mut out = String::with_capacity(name.len()); let mut out = String::with_capacity(name.len());
let mut upper = true; let mut upper = true;
@ -399,6 +400,7 @@ mod tests {
out out
} }
#[allow(dead_code)]
fn event_to_class(name: &str) -> String { fn event_to_class(name: &str) -> String {
let (group, name) = name.split_once('.').unwrap(); let (group, name) = name.split_once('.').unwrap();
let group = to_camel_case(group); let group = to_camel_case(group);
@ -410,6 +412,12 @@ mod tests {
) )
} }
#[allow(dead_code)]
fn event_to_webadmin_class(name: &str) -> String {
let (group, name) = name.split_once('.').unwrap();
format!("{}{}", to_camel_case(group), to_camel_case(name))
}
#[test] #[test]
fn print_all_events() { fn print_all_events() {
assert!(!Level::Disable.is_contained(Level::Warn)); assert!(!Level::Disable.is_contained(Level::Warn));
@ -435,8 +443,12 @@ mod tests {
for (pos, (name, _, _)) in names.iter().enumerate() { for (pos, (name, _, _)) in names.iter().enumerate() {
//println!("{:?},", name); //println!("{:?},", name);
//println!("{} => Some({}),", pos, event_to_class(name)); println!("{} => Some({}),", pos, event_to_class(name));
println!("{} => {},", event_to_class(name), pos); //println!("{} => {},", event_to_class(name), pos);
/*println!(
"#[serde(rename = \"{name}\")]\n{},",
event_to_webadmin_class(name)
);*/
} }
} }
} }

View file

@ -1,6 +1,6 @@
[package] [package]
name = "utils" name = "utils"
version = "0.9.1" version = "0.9.2"
edition = "2021" edition = "2021"
resolver = "2" resolver = "2"