From 4f3406d44969003c082451b90a719529ef1a1f37 Mon Sep 17 00:00:00 2001 From: mdecimus Date: Sun, 13 Jul 2025 19:21:56 +0200 Subject: [PATCH] Cache invalidation cluster broadcast (closes #1741) --- crates/common/src/auth/access_token.rs | 117 +++++-------- crates/common/src/auth/roles.rs | 50 +----- crates/common/src/ipc.rs | 2 + crates/common/src/lib.rs | 1 - crates/common/src/manager/boot.rs | 2 +- crates/common/src/sharing/acl.rs | 4 +- .../directory/src/backend/internal/manage.rs | 17 +- crates/groupware/src/cache/mod.rs | 29 +--- crates/http/src/management/principal.rs | 12 +- crates/imap/src/op/acl.rs | 2 +- crates/services/src/broadcast/mod.rs | 129 ++++++++------ crates/services/src/broadcast/subscriber.rs | 159 +++++++++++------- tests/src/cluster/broadcast.rs | 68 +++++++- tests/src/cluster/mod.rs | 16 +- tests/src/jmap/auth_acl.rs | 4 +- tests/src/webdav/basic.rs | 27 +++ 16 files changed, 348 insertions(+), 291 deletions(-) diff --git a/crates/common/src/auth/access_token.rs b/crates/common/src/auth/access_token.rs index 59cf6b67..106828c8 100644 --- a/crates/common/src/auth/access_token.rs +++ b/crates/common/src/auth/access_token.rs @@ -4,6 +4,12 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ +use super::{AccessToken, ResourceToken, TenantInfo, roles::RolePermissions}; +use crate::{ + Server, + ipc::BroadcastEvent, + listener::limiter::{ConcurrencyLimiter, LimiterResult}, +}; use ahash::AHashSet; use directory::{ Permission, Principal, PrincipalData, QueryBy, Type, @@ -20,20 +26,13 @@ use std::{ hash::{DefaultHasher, Hash, Hasher}, sync::Arc, }; -use store::{dispatch::lookup::KeyValue, query::acl::AclQuery}; +use store::{query::acl::AclQuery, rand}; use trc::AddContext; use utils::map::{ bitmap::{Bitmap, BitmapItem}, vec_map::VecMap, }; -use crate::{ - KV_TOKEN_REVISION, Server, - listener::limiter::{ConcurrencyLimiter, LimiterResult}, -}; - -use super::{AccessToken, ResourceToken, TenantInfo, roles::RolePermissions}; - pub enum PrincipalOrId { Principal(Principal), Id(u32), @@ -209,7 +208,6 @@ impl Server { // Obtain current revision let principal_id = principal.id(); - let revision = self.fetch_token_revision(principal_id).await; match self .inner @@ -218,32 +216,9 @@ impl Server { .get_value_or_guard_async(&principal_id) .await { - Ok(token) => { - if revision == Some(token.revision) { - Ok(token) - } else { - let revision = revision.unwrap_or(u64::MAX); - let token: Arc = match principal { - PrincipalOrId::Principal(principal) => { - self.build_access_token_from_principal(principal, revision) - .await? - } - PrincipalOrId::Id(account_id) => { - self.build_access_token(account_id, revision).await? - } - } - .into(); - - self.inner - .cache - .access_tokens - .insert(token.primary_id(), token.clone()); - - Ok(token) - } - } + Ok(token) => Ok(token), Err(guard) => { - let revision = revision.unwrap_or(u64::MAX); + let revision = rand::random::(); let token: Arc = match principal { PrincipalOrId::Principal(principal) => { self.build_access_token_from_principal(principal, revision) @@ -260,11 +235,21 @@ impl Server { } } - pub async fn increment_token_revision(&self, changed_principals: ChangedPrincipals) { + pub async fn invalidate_principal_caches(&self, changed_principals: ChangedPrincipals) { let mut nested_principals = Vec::new(); + let mut changed_ids = AHashSet::new(); + let mut changed_names = Vec::new(); for (id, changed_principal) in changed_principals.iter() { - self.increment_revision(*id).await; + changed_ids.insert(*id); + + if changed_principal.name_change { + self.inner.cache.files.remove(id); + self.inner.cache.contacts.remove(id); + self.inner.cache.events.remove(id); + self.inner.cache.scheduling.remove(id); + changed_names.push(*id); + } if changed_principal.member_change { if changed_principal.typ == Type::Tenant { @@ -282,9 +267,7 @@ impl Server { { Ok(principals) => { for principal in principals.items { - if !changed_principals.contains(principal.id()) { - self.increment_revision(principal.id()).await; - } + changed_ids.insert(principal.id()); } } Err(err) => { @@ -302,22 +285,16 @@ impl Server { } if !nested_principals.is_empty() { - let mut fetched_ids = AHashSet::new(); let mut ids = nested_principals.into_iter(); let mut ids_stack = vec![]; loop { if let Some(id) = ids.next() { // Skip if already fetched - if !fetched_ids.insert(id) { + if !changed_ids.insert(id) { continue; } - // Increment revision - if !changed_principals.contains(id) { - self.increment_revision(id).await; - } - // Obtain principal match self.store().get_members(id).await { Ok(members) => { @@ -339,41 +316,23 @@ impl Server { } } } - } - async fn increment_revision(&self, id: u32) { - if let Err(err) = self - .in_memory_store() - .counter_incr( - KeyValue::with_prefix(KV_TOKEN_REVISION, id.to_be_bytes(), 1).expires(30 * 86400), - false, - ) - .await - { - trc::error!( - err.details("Failed to increment principal revision") - .account_id(id) - ); - } - } - - pub async fn fetch_token_revision(&self, id: u32) -> Option { - match self - .in_memory_store() - .counter_get(KeyValue::<()>::build_key( - KV_TOKEN_REVISION, - id.to_be_bytes(), - )) - .await - { - Ok(revision) => (revision as u64).into(), - Err(err) => { - trc::error!( - err.details("Failed to obtain principal revision") - .account_id(id) - ); - None + // Invalidate access tokens in cluster + if !changed_ids.is_empty() { + let mut ids = Vec::with_capacity(changed_ids.len()); + for id in changed_ids { + self.inner.cache.permissions.remove(&id); + self.inner.cache.access_tokens.remove(&id); + ids.push(id); } + self.cluster_broadcast(BroadcastEvent::InvalidateAccessTokens(ids)) + .await; + } + + // Invalidate DAV caches + if !changed_names.is_empty() { + self.cluster_broadcast(BroadcastEvent::InvalidateDavCache(changed_names)) + .await; } } } diff --git a/crates/common/src/auth/roles.rs b/crates/common/src/auth/roles.rs index 6f2d86ce..1da9da1f 100644 --- a/crates/common/src/auth/roles.rs +++ b/crates/common/src/auth/roles.rs @@ -20,7 +20,6 @@ use crate::Server; pub struct RolePermissions { pub enabled: Permissions, pub disabled: Permissions, - pub revision: u64, } static USER_PERMISSIONS: LazyLock> = LazyLock::new(user_permissions); @@ -35,8 +34,6 @@ impl Server { ROLE_ADMIN => Ok(ADMIN_PERMISSIONS.clone()), ROLE_TENANT_ADMIN => Ok(TENANT_ADMIN_PERMISSIONS.clone()), role_id => { - let revision = self.fetch_token_revision(role_id).await; - match self .inner .cache @@ -44,26 +41,9 @@ impl Server { .get_value_or_guard_async(&role_id) .await { - Ok(permissions) => { - if Some(permissions.revision) == revision { - Ok(permissions) - } else { - let permissions = self - .build_role_permissions(role_id, revision.unwrap_or(u64::MAX)) - .await?; - - self.inner - .cache - .permissions - .insert(role_id, permissions.clone()); - - Ok(permissions) - } - } + Ok(permissions) => Ok(permissions), Err(guard) => { - let permissions = self - .build_role_permissions(role_id, revision.unwrap_or(u64::MAX)) - .await?; + let permissions = self.build_role_permissions(role_id).await?; let _ = guard.insert(permissions.clone()); Ok(permissions) } @@ -72,18 +52,11 @@ impl Server { } } - async fn build_role_permissions( - &self, - role_id: u32, - revision: u64, - ) -> trc::Result> { + async fn build_role_permissions(&self, role_id: u32) -> trc::Result> { let mut role_ids = vec![role_id].into_iter(); let mut role_ids_stack = vec![]; let mut fetched_role_ids = AHashSet::new(); - let mut return_permissions = RolePermissions { - revision, - ..Default::default() - }; + let mut return_permissions = RolePermissions::default(); 'outer: loop { if let Some(role_id) = role_ids.next() { @@ -116,20 +89,10 @@ impl Server { } role_id => { // Try with the cache - let revision = self.fetch_token_revision(role_id).await; - if let Some(role_permissions) = self - .inner - .cache - .permissions - .get(&role_id) - .filter(|p| Some(p.revision) == revision) - { + if let Some(role_permissions) = self.inner.cache.permissions.get(&role_id) { return_permissions.union(role_permissions.as_ref()); } else { - let mut role_permissions = RolePermissions { - revision: revision.unwrap_or(u64::MAX), - ..Default::default() - }; + let mut role_permissions = RolePermissions::default(); // Obtain principal let mut principal = self @@ -233,7 +196,6 @@ fn admin_permissions() -> Arc { Arc::new(RolePermissions { enabled: Permissions::all(), disabled: Permissions::new(), - revision: 0, }) } diff --git a/crates/common/src/ipc.rs b/crates/common/src/ipc.rs index 09c3dc9d..53c33668 100644 --- a/crates/common/src/ipc.rs +++ b/crates/common/src/ipc.rs @@ -70,6 +70,8 @@ pub enum StateEvent { #[derive(Debug)] pub enum BroadcastEvent { StateChange(StateChange), + InvalidateAccessTokens(Vec), + InvalidateDavCache(Vec), ReloadSettings, ReloadBlockedIps, } diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 2f3af0ca..36b35e14 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -100,7 +100,6 @@ pub const KV_RATE_LIMIT_CONTACT: u8 = 7; pub const KV_RATE_LIMIT_HTTP_AUTHENTICATED: u8 = 8; pub const KV_RATE_LIMIT_HTTP_ANONYMOUS: u8 = 9; pub const KV_RATE_LIMIT_IMAP: u8 = 10; -pub const KV_TOKEN_REVISION: u8 = 11; pub const KV_REPUTATION_IP: u8 = 12; pub const KV_REPUTATION_FROM: u8 = 13; pub const KV_REPUTATION_DOMAIN: u8 = 14; diff --git a/crates/common/src/manager/boot.rs b/crates/common/src/manager/boot.rs index f765fb95..b25cadee 100644 --- a/crates/common/src/manager/boot.rs +++ b/crates/common/src/manager/boot.rs @@ -470,7 +470,7 @@ impl BootManager { core.network.asn_geo_lookup, AsnGeoLookupConfig::Resource { .. } ); - let (ipc, ipc_rxs) = build_ipc(!core.storage.pubsub.is_none()); + let (ipc, ipc_rxs) = build_ipc(core.storage.pubsub.is_none()); let inner = Arc::new(Inner { shared_core: ArcSwap::from_pointee(core), data, diff --git a/crates/common/src/sharing/acl.rs b/crates/common/src/sharing/acl.rs index 7f4e355d..24402f39 100644 --- a/crates/common/src/sharing/acl.rs +++ b/crates/common/src/sharing/acl.rs @@ -161,7 +161,7 @@ impl Server { } } - self.increment_token_revision(changed_principals).await; + self.invalidate_principal_caches(changed_principals).await; } pub async fn refresh_archived_acls( @@ -204,7 +204,7 @@ impl Server { } } - self.increment_token_revision(changed_principals).await; + self.invalidate_principal_caches(changed_principals).await; } pub async fn map_acl_set(&self, acl_set: Vec) -> Result, SetError> { diff --git a/crates/directory/src/backend/internal/manage.rs b/crates/directory/src/backend/internal/manage.rs index 469ed5b8..721ad388 100644 --- a/crates/directory/src/backend/internal/manage.rs +++ b/crates/directory/src/backend/internal/manage.rs @@ -50,6 +50,7 @@ pub struct ChangedPrincipals(AHashMap); #[derive(Debug, Default, PartialEq, Eq)] pub struct ChangedPrincipal { pub typ: Type, + pub name_change: bool, pub member_change: bool, } @@ -2538,7 +2539,8 @@ impl ChangedPrincipals { PrincipalField::EnabledPermissions | PrincipalField::DisabledPermissions, Type::Role | Type::Tenant ) - )); + )) + .update_name_change(matches!(field, PrincipalField::Name)); } } @@ -2615,13 +2617,18 @@ impl ChangedPrincipal { Self { typ, member_change: false, + name_change: false, } } - pub fn update_member_change(&mut self, member_change: bool) { - if !self.member_change && member_change { - self.member_change = true; - } + pub fn update_member_change(&mut self, member_change: bool) -> &mut Self { + self.member_change |= member_change; + self + } + + pub fn update_name_change(&mut self, name_change: bool) -> &mut Self { + self.name_change |= name_change; + self } } diff --git a/crates/groupware/src/cache/mod.rs b/crates/groupware/src/cache/mod.rs index 1ae217bc..48b702e4 100644 --- a/crates/groupware/src/cache/mod.rs +++ b/crates/groupware/src/cache/mod.rs @@ -5,7 +5,6 @@ */ use crate::{ - DavResourceName, RFC_3986, cache::calcard::{build_scheduling_resources, path_from_scheduling, resource_from_scheduling}, calendar::{Calendar, CalendarEvent, CalendarPreferences}, contact::{AddressBook, ContactCard}, @@ -17,7 +16,6 @@ use calcard::{ resource_from_calendar, resource_from_card, resource_from_event, }; use common::{CacheSwap, DavResource, DavResources, Server, auth::AccessToken}; -use directory::backend::internal::manage::ManageDirectory; use file::{build_file_resources, build_nested_hierarchy, resource_from_file}; use jmap_proto::types::collection::{Collection, SyncCollection}; use std::{sync::Arc, time::Instant}; @@ -179,27 +177,6 @@ impl GroupwareCache for Server { return Ok(cache); } - // Build base path - let base_path = if access_token.primary_id() == account_id { - format!( - "{}/{}/", - DavResourceName::from(collection).base_path(), - percent_encoding::utf8_percent_encode(&access_token.name, RFC_3986) - ) - } else { - let name = self - .store() - .get_principal_name(account_id) - .await - .caused_by(trc::location!())? - .unwrap_or_else(|| format!("_{account_id}")); - format!( - "{}/{}/", - DavResourceName::from(collection).base_path(), - percent_encoding::utf8_percent_encode(&name, RFC_3986) - ) - }; - let num_changes = changes.changes.len(); let cache = if !matches!(collection, SyncCollection::CalendarScheduling) { let mut updated_resources = AHashMap::with_capacity(8); @@ -244,7 +221,7 @@ impl GroupwareCache for Server { if rebuild_hierarchy { let mut cache = DavResources { - base_path, + base_path: cache.base_path.clone(), paths: Default::default(), resources, item_change_id: changes.item_change_id.unwrap_or(cache.item_change_id), @@ -264,7 +241,7 @@ impl GroupwareCache for Server { cache } else { DavResources { - base_path, + base_path: cache.base_path.clone(), paths: cache.paths.clone(), resources, item_change_id: changes.item_change_id.unwrap_or(cache.item_change_id), @@ -307,7 +284,7 @@ impl GroupwareCache for Server { } DavResources { - base_path, + base_path: cache.base_path.clone(), paths, resources, item_change_id: changes.item_change_id.unwrap_or(cache.item_change_id), diff --git a/crates/http/src/management/principal.rs b/crates/http/src/management/principal.rs index 6d669423..be2f3d5e 100644 --- a/crates/http/src/management/principal.rs +++ b/crates/http/src/management/principal.rs @@ -181,7 +181,7 @@ impl PrincipalManager for Server { } // Increment revision - self.increment_token_revision(result.changed_principals) + self.invalidate_principal_caches(result.changed_principals) .await; Ok(JsonResponse::new(json!({ @@ -390,7 +390,7 @@ impl PrincipalManager for Server { { Ok(changed_principals) => { // Increment revision - server.increment_token_revision(changed_principals).await; + server.invalidate_principal_caches(changed_principals).await; } Err(err) => { trc::error!(err.details("Failed to delete principal")); @@ -539,7 +539,7 @@ impl PrincipalManager for Server { } // Increment revision - self.increment_token_revision(changed_principals).await; + self.invalidate_principal_caches(changed_principals).await; Ok(JsonResponse::new(json!({ "data": (), @@ -669,7 +669,7 @@ impl PrincipalManager for Server { .await?; // Increment revision - self.increment_token_revision(changed_principals).await; + self.invalidate_principal_caches(changed_principals).await; Ok(JsonResponse::new(json!({ "data": (), @@ -770,7 +770,7 @@ impl PrincipalManager for Server { .await?; // Increment revision - self.increment_token_revision(ChangedPrincipals::from_change( + self.invalidate_principal_caches(ChangedPrincipals::from_change( access_token.primary_id(), Type::Individual, PrincipalField::Secrets, @@ -841,7 +841,7 @@ impl PrincipalManager for Server { .await?; // Increment revision - self.increment_token_revision(changed_principals).await; + self.invalidate_principal_caches(changed_principals).await; Ok(JsonResponse::new(json!({ "data": (), diff --git a/crates/imap/src/op/acl.rs b/crates/imap/src/op/acl.rs index 4beb61cb..baaa6947 100644 --- a/crates/imap/src/op/acl.rs +++ b/crates/imap/src/op/acl.rs @@ -341,7 +341,7 @@ impl Session { // Invalidate ACLs data.server - .increment_token_revision(ChangedPrincipals::from_change( + .invalidate_principal_caches(ChangedPrincipals::from_change( acl_account_id, Type::Individual, PrincipalField::EnabledPermissions, diff --git a/crates/services/src/broadcast/mod.rs b/crates/services/src/broadcast/mod.rs index 1f805b97..3e87699a 100644 --- a/crates/services/src/broadcast/mod.rs +++ b/crates/services/src/broadcast/mod.rs @@ -4,9 +4,14 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ +use std::borrow::Borrow; + use common::ipc::BroadcastEvent; use jmap_proto::types::state::StateChange; -use utils::map::bitmap::Bitmap; +use utils::{ + codec::leb128::{Leb128Iterator, Leb128Writer}, + map::bitmap::Bitmap, +}; pub mod publisher; pub mod subscriber; @@ -17,7 +22,6 @@ pub(crate) struct BroadcastBatch { } const MAX_BATCH_SIZE: usize = 100; -const MESSAGE_SIZE: usize = std::mem::size_of::() + (std::mem::size_of::() * 2); pub(crate) const BROADCAST_TOPIC: &str = "stwt.agora"; impl BroadcastBatch> { @@ -34,23 +38,37 @@ impl BroadcastBatch> { pub fn serialize(&self, node_id: u16) -> Vec { let mut serialized = - Vec::with_capacity((self.messages.len() * MESSAGE_SIZE) + std::mem::size_of::()); - serialized.extend_from_slice(&node_id.to_le_bytes()); + Vec::with_capacity((self.messages.len() * 10) + std::mem::size_of::()); + let _ = serialized.write_leb128(node_id); for message in &self.messages { - let msg_id: u32 = match message { + match message { BroadcastEvent::StateChange(state_change) => { - serialized.extend_from_slice(&state_change.change_id.to_le_bytes()); - serialized.extend_from_slice(&state_change.types.as_ref().to_le_bytes()); - serialized.extend_from_slice(&state_change.account_id.to_le_bytes()); - continue; + serialized.push(0u8); + let _ = serialized.write_leb128(state_change.change_id); + let _ = serialized.write_leb128(*state_change.types.as_ref()); + let _ = serialized.write_leb128(state_change.account_id); } - BroadcastEvent::ReloadSettings => 0, - BroadcastEvent::ReloadBlockedIps => 1, - }; - - serialized.extend_from_slice(&u64::MAX.to_le_bytes()); - serialized.extend_from_slice(&u64::MAX.to_le_bytes()); - serialized.extend_from_slice(&msg_id.to_le_bytes()); + BroadcastEvent::InvalidateAccessTokens(items) => { + serialized.push(1u8); + let _ = serialized.write_leb128(items.len()); + for item in items { + let _ = serialized.write_leb128(*item); + } + } + BroadcastEvent::InvalidateDavCache(items) => { + serialized.push(2u8); + let _ = serialized.write_leb128(items.len()); + for item in items { + let _ = serialized.write_leb128(*item); + } + } + BroadcastEvent::ReloadSettings => { + serialized.push(3u8); + } + BroadcastEvent::ReloadBlockedIps => { + serialized.push(4u8); + } + } } serialized } @@ -60,49 +78,48 @@ impl BroadcastBatch> { } } -impl> BroadcastBatch { - pub fn node_id(&self) -> Option { - self.messages - .as_ref() - .get(0..std::mem::size_of::()) - .and_then(|bytes| bytes.try_into().ok()) - .map(u16::from_le_bytes) +impl BroadcastBatch +where + T: Iterator + Leb128Iterator, + I: Borrow, +{ + pub fn node_id(&mut self) -> Option { + self.messages.next_leb128::() } - pub fn events(&self) -> impl Iterator> { - self.messages - .as_ref() - .get(std::mem::size_of::()..) - .unwrap_or_default() - .chunks_exact(MESSAGE_SIZE) - .map(|chunk| { - let change_id = - u64::from_le_bytes(chunk[0..std::mem::size_of::()].try_into().unwrap()); - let types = u64::from_le_bytes( - chunk[std::mem::size_of::()..std::mem::size_of::() * 2] - .try_into() - .unwrap(), - ); - let account_id = u32::from_le_bytes( - chunk[std::mem::size_of::() * 2..20] - .try_into() - .unwrap(), - ); - - Some(if change_id != u64::MAX { - BroadcastEvent::StateChange(StateChange { - change_id, - types: Bitmap::from(types), - account_id, - }) - } else { - match account_id { - 0 => BroadcastEvent::ReloadSettings, - 1 => BroadcastEvent::ReloadBlockedIps, - _ => return None, + pub fn next_event(&mut self) -> Result, ()> { + if let Some(id) = self.messages.next() { + match id.borrow() { + 0 => Ok(Some(BroadcastEvent::StateChange(StateChange { + change_id: self.messages.next_leb128().ok_or(())?, + types: Bitmap::from(self.messages.next_leb128::().ok_or(())?), + account_id: self.messages.next_leb128().ok_or(())?, + }))), + 1 => { + let count = self.messages.next_leb128::().ok_or(())?; + let mut items = Vec::with_capacity(count); + for _ in 0..count { + items.push(self.messages.next_leb128().ok_or(())?); } - }) - }) + Ok(Some(BroadcastEvent::InvalidateAccessTokens(items))) + } + 2 => { + let count = self.messages.next_leb128::().ok_or(())?; + let mut items = Vec::with_capacity(count); + for _ in 0..count { + items.push(self.messages.next_leb128().ok_or(())?); + } + Ok(Some(BroadcastEvent::InvalidateDavCache(items))) + } + 3 => Ok(Some(BroadcastEvent::ReloadSettings)), + + 4 => Ok(Some(BroadcastEvent::ReloadBlockedIps)), + + _ => Err(()), + } + } else { + Ok(None) + } } } diff --git a/crates/services/src/broadcast/subscriber.rs b/crates/services/src/broadcast/subscriber.rs index 57e8104f..41fd3567 100644 --- a/crates/services/src/broadcast/subscriber.rs +++ b/crates/services/src/broadcast/subscriber.rs @@ -72,7 +72,7 @@ pub fn spawn_broadcast_subscriber(inner: Arc, mut shutdown_rx: watch::Rec message = stream.next() => { match message { Some(message) => { - let batch = BroadcastBatch::new(message.payload()); + let mut batch = BroadcastBatch::new(message.payload().iter()); let node_id = match batch.node_id() { Some(node_id) => { if node_id != this_node_id { @@ -95,79 +95,101 @@ pub fn spawn_broadcast_subscriber(inner: Arc, mut shutdown_rx: watch::Rec }; let mut max_timestamp = 0; - let mut has_errors = false; - for event in batch.events() { - if let Some(event) = event { - match event { - BroadcastEvent::StateChange(state_change) => { - max_timestamp = std::cmp::max( - max_timestamp, - state_change.change_id, - ); - if inner.ipc.state_tx.send(StateEvent::Publish { state_change, broadcast: false }).await.is_err() { - trc::event!( - Server(ServerEvent::ThreadError), - Details = "Error sending state change.", - CausedBy = trc::location!() - ); + loop { + match batch.next_event() { + Ok(Some(event)) => { + trc::event!( + Cluster(ClusterEvent::MessageReceived), + From = node_id, + To = this_node_id, + Details = log_event(&event), + ); + match event { + BroadcastEvent::StateChange(state_change) => { + max_timestamp = + std::cmp::max(max_timestamp, state_change.change_id); + if inner + .ipc + .state_tx + .send(StateEvent::Publish { + state_change, + broadcast: false, + }) + .await + .is_err() + { + trc::event!( + Server(ServerEvent::ThreadError), + Details = "Error sending state change.", + CausedBy = trc::location!() + ); + } } - }, - BroadcastEvent::ReloadSettings => { - match inner.build_server().reload().await { - Ok(result) => { - if let Some(new_core) = result.new_core { - // Update core - inner.shared_core.store(new_core.into()); + BroadcastEvent::InvalidateAccessTokens(ids) => { + for id in &ids { + inner.cache.permissions.remove(id); + inner.cache.access_tokens.remove(id); + } + } + BroadcastEvent::InvalidateDavCache(ids) => { + for id in &ids { + inner.cache.files.remove(id); + inner.cache.contacts.remove(id); + inner.cache.events.remove(id); + inner.cache.scheduling.remove(id); + } + } + BroadcastEvent::ReloadSettings => { + match inner.build_server().reload().await { + Ok(result) => { + if let Some(new_core) = result.new_core { + // Update core + inner.shared_core.store(new_core.into()); - if inner - .ipc - .housekeeper_tx - .send(HousekeeperEvent::ReloadSettings) - .await - .is_err() - { - trc::event!( - Server(trc::ServerEvent::ThreadError), - Details = "Failed to send setting reload event to housekeeper", - CausedBy = trc::location!(), - ); + if inner + .ipc + .housekeeper_tx + .send(HousekeeperEvent::ReloadSettings) + .await + .is_err() + { + trc::event!( + Server(trc::ServerEvent::ThreadError), + Details = "Failed to send setting reload event to housekeeper", + CausedBy = trc::location!(), + ); + } } } + Err(err) => { + trc::error!( + err.details("Failed to reload settings") + .caused_by(trc::location!()) + ); + } } - Err(err) => { + } + BroadcastEvent::ReloadBlockedIps => { + if let Err(err) = inner.build_server().reload_blocked_ips().await { trc::error!( err.details("Failed to reload settings") .caused_by(trc::location!()) ); } } - }, - BroadcastEvent::ReloadBlockedIps => { - if let Err(err) = inner.build_server().reload_blocked_ips().await { - trc::error!( - err.details("Failed to reload settings") - .caused_by(trc::location!()) - ); - } - }, + } + } + Ok(None) => break, + Err(_) => { + trc::event!( + Cluster(ClusterEvent::MessageInvalid), + Details = message.payload() + ); + break; } - } else if !has_errors { - trc::event!( - Cluster(ClusterEvent::MessageInvalid), - Details = message.payload() - ); - has_errors = true; } - } - - trc::event!( - Cluster(ClusterEvent::MessageReceived), - From = node_id, - To = this_node_id, - Details = batch.events().flatten().map(log_event).collect::>(), - ); } None => { trc::event!( @@ -186,14 +208,31 @@ pub fn spawn_broadcast_subscriber(inner: Arc, mut shutdown_rx: watch::Rec }); } -fn log_event(event: BroadcastEvent) -> trc::Value { +fn log_event(event: &BroadcastEvent) -> trc::Value { match event { BroadcastEvent::StateChange(state_change) => trc::Value::Array(vec![ + "StateChange".into(), state_change.account_id.into(), state_change.change_id.into(), (*state_change.types.as_ref()).into(), ]), BroadcastEvent::ReloadSettings => CompactString::const_new("ReloadSettings").into(), BroadcastEvent::ReloadBlockedIps => CompactString::const_new("ReloadBlockedIps").into(), + BroadcastEvent::InvalidateAccessTokens(items) => { + let mut array = Vec::with_capacity(items.len() + 1); + array.push("InvalidateAccessTokens".into()); + for item in items { + array.push((*item).into()); + } + trc::Value::Array(array) + } + BroadcastEvent::InvalidateDavCache(items) => { + let mut array = Vec::with_capacity(items.len() + 1); + array.push("InvalidateDavCache".into()); + for item in items { + array.push((*item).into()); + } + trc::Value::Array(array) + } } } diff --git a/tests/src/cluster/broadcast.rs b/tests/src/cluster/broadcast.rs index a30feef2..d8d97061 100644 --- a/tests/src/cluster/broadcast.rs +++ b/tests/src/cluster/broadcast.rs @@ -4,23 +4,27 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ -use std::net::IpAddr; - -use crate::imap::idle; - use super::ClusterTest; +use crate::imap::idle; +use directory::backend::internal::{ + PrincipalAction, PrincipalField, PrincipalUpdate, PrincipalValue, + manage::{ManageDirectory, UpdatePrincipal}, +}; +use groupware::cache::GroupwareCache; +use jmap_proto::types::collection::SyncCollection; +use std::net::IpAddr; pub async fn test(cluster: &ClusterTest) { println!("Running cluster broadcast tests..."); // Run IMAP idle tests across nodes + let server1 = cluster.server(1); + let server2 = cluster.server(2); let mut node1_client = cluster.imap_client("john", 1).await; let mut node2_client = cluster.imap_client("john", 2).await; idle::test(&mut node1_client, &mut node2_client, true).await; // Test event broadcast - let server1 = cluster.server(1); - let server2 = cluster.server(2); let test_ip: IpAddr = "8.8.8.8".parse().unwrap(); assert!(!server1.is_ip_blocked(&test_ip)); assert!(!server2.is_ip_blocked(&test_ip)); @@ -28,4 +32,56 @@ pub async fn test(cluster: &ClusterTest) { tokio::time::sleep(std::time::Duration::from_millis(200)).await; assert!(server1.is_ip_blocked(&test_ip)); assert!(server2.is_ip_blocked(&test_ip)); + + // Change John's password and expect it to propagate + let account_id = cluster.account_id("john"); + assert!(server1.inner.cache.access_tokens.get(&account_id).is_some()); + assert!(server2.inner.cache.access_tokens.get(&account_id).is_some()); + let changes = server1 + .core + .storage + .data + .update_principal( + UpdatePrincipal::by_id(account_id).with_updates(vec![PrincipalUpdate { + action: PrincipalAction::AddItem, + field: PrincipalField::Secrets, + value: PrincipalValue::String("hello".into()), + }]), + ) + .await + .unwrap(); + server1.invalidate_principal_caches(changes).await; + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + assert!(server1.inner.cache.access_tokens.get(&account_id).is_none()); + assert!(server2.inner.cache.access_tokens.get(&account_id).is_none()); + + // Rename John to Juan and expect DAV caches to be invalidated + let access_token = server1.get_access_token(account_id).await.unwrap(); + server1 + .fetch_dav_resources(&access_token, account_id, SyncCollection::Calendar) + .await + .unwrap(); + server2 + .fetch_dav_resources(&access_token, account_id, SyncCollection::Calendar) + .await + .unwrap(); + assert!(server1.inner.cache.events.get(&account_id).is_some()); + assert!(server2.inner.cache.events.get(&account_id).is_some()); + let changes = server1 + .core + .storage + .data + .update_principal( + UpdatePrincipal::by_id(account_id).with_updates(vec![PrincipalUpdate { + action: PrincipalAction::Set, + field: PrincipalField::Name, + value: PrincipalValue::String("juan".into()), + }]), + ) + .await + .unwrap(); + server1.invalidate_principal_caches(changes).await; + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + assert!(server1.inner.cache.events.get(&account_id).is_none()); + assert!(server2.inner.cache.events.get(&account_id).is_none()); } diff --git a/tests/src/cluster/mod.rs b/tests/src/cluster/mod.rs index 497b9eba..b07e079c 100644 --- a/tests/src/cluster/mod.rs +++ b/tests/src/cluster/mod.rs @@ -6,6 +6,7 @@ use std::{path::PathBuf, sync::Arc, time::Duration}; +use ahash::AHashMap; use common::{ Caches, Core, Data, Inner, Server, config::{ @@ -52,6 +53,7 @@ pub async fn cluster_tests() { #[allow(dead_code)] pub struct ClusterTest { servers: Vec, + account_ids: AHashMap, shutdown_txs: Vec>, } @@ -107,15 +109,18 @@ async fn init_cluster_tests(delete_if_exists: bool) -> ClusterTest { } // Create test users + let mut account_ids = AHashMap::new(); for (account, secret, name, email) in TEST_USERS { - let _account_id = store + let account_id = store .create_test_user(account, secret, name, &[email]) .await; + account_ids.insert(account.to_string(), account_id); } ClusterTest { servers, shutdown_txs, + account_ids, } } @@ -142,6 +147,13 @@ impl ClusterTest { .get(node_id) .unwrap_or_else(|| panic!("No server found for node ID: {}", node_id)) } + + pub fn account_id(&self, login: &str) -> u32 { + self.account_ids + .get(login) + .cloned() + .unwrap_or_else(|| panic!("No account ID found for login: {}", login)) + } } fn find_account_secret(login: &str) -> &str { @@ -246,6 +258,7 @@ url = "'https://127.0.0.1:800{NODE_ID}'" [cluster] node-id = {NODE_ID} +coordinator = "{PUBSUB}" [server.listener.http] bind = ["127.0.0.1:1800{NODE_ID}"] @@ -323,7 +336,6 @@ fts = "{STORE}" blob = "{STORE}" lookup = "{STORE}" directory = "{STORE}" -pubsub = "{PUBSUB}" [directory."{STORE}"] type = "internal" diff --git a/tests/src/jmap/auth_acl.rs b/tests/src/jmap/auth_acl.rs index 3f9f5d76..b525d01a 100644 --- a/tests/src/jmap/auth_acl.rs +++ b/tests/src/jmap/auth_acl.rs @@ -668,7 +668,7 @@ pub async fn test(params: &mut JMAPTest) { // Add John and Jane to the Sales group for name in ["jdoe@example.com", "jane.smith@example.com"] { server - .increment_token_revision( + .invalidate_principal_caches( server .core .storage @@ -776,7 +776,7 @@ pub async fn test(params: &mut JMAPTest) { // Remove John from the sales group server - .increment_token_revision( + .invalidate_principal_caches( server .core .storage diff --git a/tests/src/webdav/basic.rs b/tests/src/webdav/basic.rs index 112a4f0d..3f529e4a 100644 --- a/tests/src/webdav/basic.rs +++ b/tests/src/webdav/basic.rs @@ -4,6 +4,9 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ +use dav_proto::Depth; +use hyper::StatusCode; + use super::WebDavTest; pub async fn test(test: &WebDavTest) { @@ -43,6 +46,30 @@ pub async fn test(test: &WebDavTest) { ["/dav/cal/", "/dav/cal/jane/", "/dav/cal/support/"], ); + // Test 404 responses + jane.sync_collection( + "/dav/cal/jane/default/", + "", + Depth::Infinity, + None, + ["D:getetag"], + ) + .await; + jane.sync_collection( + "/dav/cal/jane/test-404/", + "", + Depth::Infinity, + None, + ["D:getetag"], + ) + .await; + jane.request("PROPFIND", "/dav/cal/jane/default/", "") + .await + .with_status(StatusCode::MULTI_STATUS); + jane.request("PROPFIND", "/dav/cal/jane/test-404/", "") + .await + .with_status(StatusCode::NOT_FOUND); + john.delete_default_containers().await; jane.delete_default_containers().await; jane.delete_default_containers_by_account("support").await;