Tracing: Include all events in OTEL traces + Include spanId in webhooks

This commit is contained in:
mdecimus 2024-12-04 09:32:50 +01:00
parent a6f24d23b4
commit 74a931322a
2 changed files with 26 additions and 12 deletions

View file

@ -9,6 +9,7 @@ use std::{
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};
use ahash::AHashMap;
use mail_parser::DateTime;
use opentelemetry::{
logs::{AnyValue, Severity},
@ -25,6 +26,8 @@ use trc::{ipc::subscriber::SubscriberBuilder, Event, EventDetails, Level, Teleme
use crate::{config::telemetry::OtelTracer, telemetry::LONG_SLUMBER};
const MAX_EVENTS: usize = 2048;
pub(crate) fn spawn_otel_tracer(builder: SubscriberBuilder, mut otel: OtelTracer) {
let (_, mut rx) = builder.register();
tokio::spawn(async move {
@ -45,6 +48,8 @@ pub(crate) fn spawn_otel_tracer(builder: SubscriberBuilder, mut otel: OtelTracer
let mut pending_logs = Vec::new();
let mut pending_spans = Vec::new();
let mut active_spans = AHashMap::new();
loop {
// Wait for the next event or timeout
let event_or_timeout = tokio::time::timeout(wakeup_time, rx.recv()).await;
@ -52,19 +57,28 @@ pub(crate) fn spawn_otel_tracer(builder: SubscriberBuilder, mut otel: OtelTracer
match event_or_timeout {
Ok(Some(events)) => {
for event in events {
if otel.span_exporter_enable && event.inner.typ.is_span_end() {
if let Some(start_span) = event.inner.span.as_ref() {
if otel.log_exporter_enable {
pending_logs.push(build_log_record(&event));
}
if otel.span_exporter_enable {
if let Some(span) = event.inner.span.as_ref() {
let span_id = span.span_id().unwrap();
if !event.inner.typ.is_span_end() {
let events =
active_spans.entry(span_id).or_insert_with(Vec::new);
if events.len() < MAX_EVENTS {
events.push(event);
}
} else if let Some(events) = active_spans.remove(&span_id) {
pending_spans.push(build_span_data(
start_span,
span,
&event,
[&event].into_iter(),
events.iter().chain(std::iter::once(&event)),
&instrumentation,
));
}
}
if otel.log_exporter_enable {
pending_logs.push(build_log_record(&event));
}
}
}

View file

@ -110,7 +110,7 @@ fn spawn_webhook_handler(
tokio::spawn(async move {
in_flight.store(true, Ordering::Relaxed);
let wrapper = EventWrapper {
events: JsonEventSerializer::new(events).with_id(),
events: JsonEventSerializer::new(events).with_id().with_spans(),
};
if let Err(err) = post_webhook_events(&settings, &wrapper).await {