Added caching logging + Update config properties

This commit is contained in:
mdecimus 2025-05-19 19:11:15 +02:00
parent e34cd950bf
commit c6ef70edd2
13 changed files with 171 additions and 41 deletions

View file

@ -67,10 +67,10 @@ impl GroupwareConfig {
.unwrap_or_default(),
max_ical_instances: config
.property("calendar.max-recurrence-expansions")
.unwrap_or(1000),
.unwrap_or(3000),
max_ical_attendees_per_instance: config
.property("calendar.max-attendees-per-instance")
.unwrap_or(1000),
.unwrap_or(20),
max_file_size: config
.property("file-storage.max-size")
.unwrap_or(25 * 1024 * 1024),

View file

@ -77,6 +77,7 @@ impl Data {
impl Caches {
pub fn parse(config: &mut Config) -> Self {
const MB_50: u64 = 50 * 1024 * 1024;
const MB_10: u64 = 10 * 1024 * 1024;
const MB_5: u64 = 5 * 1024 * 1024;
const MB_1: u64 = 1024 * 1024;
@ -103,7 +104,7 @@ impl Caches {
messages: Cache::from_config(
config,
"message",
MB_10,
MB_50,
(std::mem::size_of::<u32>()
+ std::mem::size_of::<CacheSwap<MessageStoreCache>>()
+ (1024 * std::mem::size_of::<MessageUidCache>())

View file

@ -104,32 +104,26 @@ impl JmapConfig {
pub fn parse(config: &mut Config) -> Self {
// Parse HTTP headers
let mut http_headers = config
.values("server.http.headers")
.values("http.headers")
.map(|(_, v)| {
if let Some((k, v)) = v.split_once(':') {
Ok((
hyper::header::HeaderName::from_str(k.trim()).map_err(|err| {
format!(
"Invalid header found in property \"server.http.headers\": {}",
err
)
format!("Invalid header found in property \"http.headers\": {}", err)
})?,
hyper::header::HeaderValue::from_str(v.trim()).map_err(|err| {
format!(
"Invalid header found in property \"server.http.headers\": {}",
err
)
format!("Invalid header found in property \"http.headers\": {}", err)
})?,
))
} else {
Err(format!(
"Invalid header found in property \"server.http.headers\": {}",
"Invalid header found in property \"http.headers\": {}",
v
))
}
})
.collect::<Result<Vec<_>, String>>()
.map_err(|e| config.new_parse_error("server.http.headers", e))
.map_err(|e| config.new_parse_error("http.headers", e))
.unwrap_or_default();
// Parse default folders
@ -200,7 +194,7 @@ impl JmapConfig {
// Add permissive CORS headers
if config
.property::<bool>("server.http.permissive-cors")
.property::<bool>("http.permissive-cors")
.unwrap_or(false)
{
http_headers.push((
@ -222,7 +216,7 @@ impl JmapConfig {
}
// Add HTTP Strict Transport Security
if config.property::<bool>("server.http.hsts").unwrap_or(false) {
if config.property::<bool>("http.hsts").unwrap_or(false) {
http_headers.push((
hyper::header::STRICT_TRANSPORT_SECURITY,
hyper::header::HeaderValue::from_static(
@ -245,7 +239,7 @@ impl JmapConfig {
.property_or_default::<Option<usize>>("jmap.protocol.changes.max-results", "5000")
.unwrap_or_default(),
changes_max_history: config
.property_or_default::<Option<usize>>("changes.max-history", "100000")
.property_or_default::<Option<usize>>("changes.max-history", "10000")
.unwrap_or_default(),
snippet_max_results: config
.property("jmap.protocol.search-snippet.max-results")
@ -301,10 +295,10 @@ impl JmapConfig {
.unwrap_or(256),
capabilities: BaseCapabilities::default(),
rate_authenticated: config
.property_or_default::<Option<Rate>>("jmap.rate-limit.account", "1000/1m")
.property_or_default::<Option<Rate>>("http.rate-limit.account", "1000/1m")
.unwrap_or_default(),
rate_anonymous: config
.property_or_default::<Option<Rate>>("jmap.rate-limit.anonymous", "100/1m")
.property_or_default::<Option<Rate>>("http.rate-limit.anonymous", "100/1m")
.unwrap_or_default(),
event_source_throttle: config
.property_or_default("jmap.event-source.throttle", "1s")
@ -327,9 +321,7 @@ impl JmapConfig {
encrypt_append: config
.property_or_default("storage.encryption.append", "false")
.unwrap_or(false),
http_use_forwarded: config
.property("server.http.use-x-forwarded")
.unwrap_or(false),
http_use_forwarded: config.property("http.use-x-forwarded").unwrap_or(false),
http_headers,
push_attempt_interval: config
.property_or_default("jmap.push.attempts.interval", "1m")

View file

@ -138,15 +138,15 @@ impl Core {
})
.unwrap_or_default();
let pubsub = config
.value("storage.pubsub")
.value("cluster.pubsub")
.map(|id| id.to_string())
.and_then(|id| {
if let Some(store) = stores.pubsub_stores.get(&id) {
store.clone().into()
} else {
config.new_parse_error(
"storage.pubsub",
format!("PubSub store {id:?} not found"),
"cluster.pubsub",
format!("PubSub server {id:?} not found"),
);
None
}

View file

@ -96,11 +96,11 @@ impl Default for Network {
contact_form: None,
node_id: 1,
http_response_url: IfBlock::new::<()>(
"server.http.url",
"http.url",
[],
"protocol + '://' + config_get('server.hostname') + ':' + local_port",
),
http_allowed_endpoint: IfBlock::new::<()>("server.http.allowed-endpoint", [], "200"),
http_allowed_endpoint: IfBlock::new::<()>("http.allowed-endpoint", [], "200"),
asn_geo_lookup: AsnGeoLookupConfig::Disabled,
server_name: Default::default(),
report_domain: Default::default(),
@ -240,11 +240,8 @@ impl Network {
}
for (value, key) in [
(&mut network.http_response_url, "server.http.url"),
(
&mut network.http_allowed_endpoint,
"server.http.allowed-endpoint",
),
(&mut network.http_response_url, "http.url"),
(&mut network.http_allowed_endpoint, "http.allowed-endpoint"),
] {
if let Some(if_block) = IfBlock::try_parse(config, key, token_map) {
*value = if_block;

View file

@ -4,7 +4,7 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use std::{collections::hash_map::Entry, sync::Arc};
use std::{collections::hash_map::Entry, sync::Arc, time::Instant};
use common::{CacheSwap, MessageStoreCache, Server};
use email::{full_email_cache_build, update_email_cache};
@ -15,7 +15,7 @@ use store::{
query::log::{Change, Query},
};
use tokio::sync::Semaphore;
use trc::AddContext;
use trc::{AddContext, StoreEvent};
pub mod email;
pub mod mailbox;
@ -38,19 +38,32 @@ impl MessageCacheFetch for Server {
{
Ok(cache) => cache,
Err(guard) => {
let start_time = Instant::now();
let cache = full_cache_build(self, account_id, Arc::new(Semaphore::new(1))).await?;
if guard.insert(CacheSwap::new(cache.clone())).is_err() {
self.inner
.cache
.messages
.insert(account_id, CacheSwap::new(cache.clone()));
}
trc::event!(
Store(StoreEvent::CacheMiss),
AccountId = account_id,
Collection = SyncCollection::Email.as_str(),
Total = vec![cache.emails.items.len(), cache.mailboxes.items.len()],
ChangeId = cache.last_change_id,
Elapsed = start_time.elapsed(),
);
return Ok(cache);
}
};
// Obtain current state
let cache = cache_.load_full();
let start_time = Instant::now();
let changes = self
.core
.storage
@ -67,11 +80,29 @@ impl MessageCacheFetch for Server {
if changes.is_truncated {
let cache = full_cache_build(self, account_id, cache.update_lock.clone()).await?;
cache_.update(cache.clone());
trc::event!(
Store(StoreEvent::CacheStale),
AccountId = account_id,
Collection = SyncCollection::Email.as_str(),
ChangeId = cache.last_change_id,
Total = vec![cache.emails.items.len(), cache.mailboxes.items.len()],
Elapsed = start_time.elapsed(),
);
return Ok(cache);
}
// Verify changes
if changes.changes.is_empty() {
trc::event!(
Store(StoreEvent::CacheHit),
AccountId = account_id,
Collection = SyncCollection::Email.as_str(),
ChangeId = cache.last_change_id,
Elapsed = start_time.elapsed(),
);
return Ok(cache);
}
@ -79,6 +110,14 @@ impl MessageCacheFetch for Server {
let _permit = cache.update_lock.acquire().await;
let cache = cache_.0.load();
let mut cache = if cache.last_change_id >= changes.to_change_id {
trc::event!(
Store(StoreEvent::CacheHit),
AccountId = account_id,
Collection = SyncCollection::Email.as_str(),
ChangeId = cache.last_change_id,
Elapsed = start_time.elapsed(),
);
return Ok(cache.clone());
} else {
cache.as_ref().clone()
@ -141,6 +180,16 @@ impl MessageCacheFetch for Server {
let cache = Arc::new(cache);
cache_.update(cache.clone());
trc::event!(
Store(StoreEvent::CacheUpdate),
AccountId = account_id,
Collection = SyncCollection::Email.as_str(),
ChangeId = cache.last_change_id,
Details = vec![changed_items.len(), changed_containers.len()],
Total = vec![cache.emails.items.len(), cache.mailboxes.items.len()],
Elapsed = start_time.elapsed(),
);
Ok(cache)
}
}

View file

@ -16,14 +16,14 @@ use calcard::{
use common::{CacheSwap, DavResource, DavResources, Server, auth::AccessToken};
use file::{build_file_resources, build_nested_hierarchy, resource_from_file};
use jmap_proto::types::collection::{Collection, SyncCollection};
use std::sync::Arc;
use std::{sync::Arc, time::Instant};
use store::{
ahash::AHashMap,
query::log::{Change, Query},
write::{AlignedBytes, Archive, BatchBuilder},
};
use tokio::sync::Semaphore;
use trc::AddContext;
use trc::{AddContext, StoreEvent};
pub mod calcard;
pub mod file;
@ -71,6 +71,7 @@ impl GroupwareCache for Server {
let cache_ = match cache_store.get_value_or_guard_async(&account_id).await {
Ok(cache) => cache,
Err(guard) => {
let start_time = Instant::now();
let cache = full_cache_build(
self,
account_id,
@ -79,15 +80,27 @@ impl GroupwareCache for Server {
access_token,
)
.await?;
if guard.insert(CacheSwap::new(cache.clone())).is_err() {
cache_store.insert(account_id, CacheSwap::new(cache.clone()));
}
trc::event!(
Store(StoreEvent::CacheMiss),
AccountId = account_id,
Collection = collection.as_str(),
Total = cache.resources.len(),
ChangeId = cache.highest_change_id,
Elapsed = start_time.elapsed(),
);
return Ok(cache);
}
};
// Obtain current state
let cache = cache_.load_full();
let start_time = Instant::now();
let changes = self
.core
.storage
@ -111,11 +124,29 @@ impl GroupwareCache for Server {
)
.await?;
cache_.update(cache.clone());
trc::event!(
Store(StoreEvent::CacheStale),
AccountId = account_id,
Collection = collection.as_str(),
ChangeId = cache.highest_change_id,
Total = cache.resources.len(),
Elapsed = start_time.elapsed(),
);
return Ok(cache);
}
// Verify changes
if changes.changes.is_empty() {
trc::event!(
Store(StoreEvent::CacheHit),
AccountId = account_id,
Collection = collection.as_str(),
ChangeId = cache.highest_change_id,
Elapsed = start_time.elapsed(),
);
return Ok(cache);
}
@ -123,11 +154,20 @@ impl GroupwareCache for Server {
let _permit = cache.update_lock.acquire().await;
let cache = cache_.load_full();
if cache.highest_change_id >= changes.to_change_id {
trc::event!(
Store(StoreEvent::CacheHit),
AccountId = account_id,
Collection = collection.as_str(),
ChangeId = cache.highest_change_id,
Elapsed = start_time.elapsed(),
);
return Ok(cache);
}
let mut updated_resources = AHashMap::with_capacity(8);
let has_no_children = collection == SyncCollection::FileNode;
let num_changes = changes.changes.len();
for change in changes.changes {
match change {
@ -246,6 +286,16 @@ impl GroupwareCache for Server {
let cache = Arc::new(cache);
cache_.update(cache.clone());
trc::event!(
Store(StoreEvent::CacheUpdate),
AccountId = account_id,
Collection = collection.as_str(),
ChangeId = cache.highest_change_id,
Details = num_changes,
Total = cache.resources.len(),
Elapsed = start_time.elapsed(),
);
Ok(cache)
}

View file

@ -303,3 +303,19 @@ impl BitmapItem for Collection {
!matches!(self, Collection::None)
}
}
impl SyncCollection {
pub fn as_str(&self) -> &'static str {
match self {
SyncCollection::Email => "email",
SyncCollection::Thread => "thread",
SyncCollection::Calendar => "calendar",
SyncCollection::AddressBook => "addressBook",
SyncCollection::FileNode => "fileNode",
SyncCollection::Identity => "identity",
SyncCollection::EmailSubmission => "emailSubmission",
SyncCollection::SieveScript => "sieveScript",
SyncCollection::None => "",
}
}
}

View file

@ -1556,6 +1556,10 @@ impl StoreEvent {
StoreEvent::HttpStoreFetch => "HTTP store updated",
StoreEvent::HttpStoreError => "Error updating HTTP store",
StoreEvent::NatsError => "NATS error",
StoreEvent::CacheMiss => "Cache miss",
StoreEvent::CacheHit => "Cache hit",
StoreEvent::CacheStale => "Cache is stale",
StoreEvent::CacheUpdate => "Cache update",
}
}
@ -1594,6 +1598,10 @@ impl StoreEvent {
StoreEvent::HttpStoreFetch => "The HTTP store was updated",
StoreEvent::HttpStoreError => "An error occurred while updating the HTTP store",
StoreEvent::NatsError => "A NATS error occurred",
StoreEvent::CacheMiss => "No cache entry found for the account",
StoreEvent::CacheHit => "Cache entry found for the account, no update needed",
StoreEvent::CacheStale => "Cache is too old, rebuilding",
StoreEvent::CacheUpdate => "Cache updated with latest database changes",
}
}
}

View file

@ -20,7 +20,12 @@ impl EventType {
| StoreEvent::SqlQuery
| StoreEvent::LdapQuery
| StoreEvent::LdapBind => Level::Trace,
StoreEvent::NotFound | StoreEvent::HttpStoreFetch => Level::Debug,
StoreEvent::CacheMiss
| StoreEvent::CacheHit
| StoreEvent::CacheStale
| StoreEvent::CacheUpdate
| StoreEvent::NotFound
| StoreEvent::HttpStoreFetch => Level::Debug,
StoreEvent::AssertValueFailed
| StoreEvent::FoundationdbError
| StoreEvent::MysqlError

View file

@ -841,6 +841,12 @@ pub enum StoreEvent {
CryptoError,
HttpStoreError,
// Caching
CacheMiss,
CacheHit,
CacheStale,
CacheUpdate,
// Warnings
BlobMissingMarker,

View file

@ -884,6 +884,10 @@ impl EventType {
EventType::WebDav(WebDavEvent::Mkcalendar) => 575,
EventType::Calendar(CalendarEvent::RuleExpansionError) => 576,
EventType::Store(StoreEvent::NatsError) => 577,
EventType::Store(StoreEvent::CacheMiss) => 50,
EventType::Store(StoreEvent::CacheHit) => 51,
EventType::Store(StoreEvent::CacheStale) => 52,
EventType::Store(StoreEvent::CacheUpdate) => 578,
}
}
@ -1504,13 +1508,15 @@ impl EventType {
575 => Some(EventType::WebDav(WebDavEvent::Mkcalendar)),
576 => Some(EventType::Calendar(CalendarEvent::RuleExpansionError)),
577 => Some(EventType::Store(StoreEvent::NatsError)),
50 => Some(EventType::Store(StoreEvent::CacheMiss)),
51 => Some(EventType::Store(StoreEvent::CacheHit)),
52 => Some(EventType::Store(StoreEvent::CacheStale)),
578 => Some(EventType::Store(StoreEvent::CacheUpdate)),
_ => None,
}
}
}
// 50, 51, 52
impl Key {
fn code(&self) -> u64 {
match self {

View file

@ -89,7 +89,7 @@ pub async fn imap_tests() {
imap.assert_read(Type::Tagged, ResponseType::Ok).await;
}
/*mailbox::test(&mut imap, &mut imap_check).await;
mailbox::test(&mut imap, &mut imap_check).await;
append::test(&mut imap, &mut imap_check, &handle).await;
search::test(&mut imap, &mut imap_check).await;
fetch::test(&mut imap, &mut imap_check).await;
@ -98,7 +98,7 @@ pub async fn imap_tests() {
thread::test(&mut imap, &mut imap_check).await;
idle::test(&mut imap, &mut imap_check, false).await;
condstore::test(&mut imap, &mut imap_check).await;
acl::test(&mut imap, &mut imap_check).await;*/
acl::test(&mut imap, &mut imap_check).await;
// Logout
for imap in [&mut imap, &mut imap_check] {