Cache invalidation cluster broadcast (closes #1741)

This commit is contained in:
mdecimus 2025-07-13 19:21:56 +02:00
parent 6e62f306ab
commit 4f3406d449
16 changed files with 348 additions and 291 deletions

View file

@ -4,6 +4,12 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use super::{AccessToken, ResourceToken, TenantInfo, roles::RolePermissions};
use crate::{
Server,
ipc::BroadcastEvent,
listener::limiter::{ConcurrencyLimiter, LimiterResult},
};
use ahash::AHashSet;
use directory::{
Permission, Principal, PrincipalData, QueryBy, Type,
@ -20,20 +26,13 @@ use std::{
hash::{DefaultHasher, Hash, Hasher},
sync::Arc,
};
use store::{dispatch::lookup::KeyValue, query::acl::AclQuery};
use store::{query::acl::AclQuery, rand};
use trc::AddContext;
use utils::map::{
bitmap::{Bitmap, BitmapItem},
vec_map::VecMap,
};
use crate::{
KV_TOKEN_REVISION, Server,
listener::limiter::{ConcurrencyLimiter, LimiterResult},
};
use super::{AccessToken, ResourceToken, TenantInfo, roles::RolePermissions};
pub enum PrincipalOrId {
Principal(Principal),
Id(u32),
@ -209,7 +208,6 @@ impl Server {
// Obtain current revision
let principal_id = principal.id();
let revision = self.fetch_token_revision(principal_id).await;
match self
.inner
@ -218,32 +216,9 @@ impl Server {
.get_value_or_guard_async(&principal_id)
.await
{
Ok(token) => {
if revision == Some(token.revision) {
Ok(token)
} else {
let revision = revision.unwrap_or(u64::MAX);
let token: Arc<AccessToken> = match principal {
PrincipalOrId::Principal(principal) => {
self.build_access_token_from_principal(principal, revision)
.await?
}
PrincipalOrId::Id(account_id) => {
self.build_access_token(account_id, revision).await?
}
}
.into();
self.inner
.cache
.access_tokens
.insert(token.primary_id(), token.clone());
Ok(token)
}
}
Ok(token) => Ok(token),
Err(guard) => {
let revision = revision.unwrap_or(u64::MAX);
let revision = rand::random::<u64>();
let token: Arc<AccessToken> = match principal {
PrincipalOrId::Principal(principal) => {
self.build_access_token_from_principal(principal, revision)
@ -260,11 +235,21 @@ impl Server {
}
}
pub async fn increment_token_revision(&self, changed_principals: ChangedPrincipals) {
pub async fn invalidate_principal_caches(&self, changed_principals: ChangedPrincipals) {
let mut nested_principals = Vec::new();
let mut changed_ids = AHashSet::new();
let mut changed_names = Vec::new();
for (id, changed_principal) in changed_principals.iter() {
self.increment_revision(*id).await;
changed_ids.insert(*id);
if changed_principal.name_change {
self.inner.cache.files.remove(id);
self.inner.cache.contacts.remove(id);
self.inner.cache.events.remove(id);
self.inner.cache.scheduling.remove(id);
changed_names.push(*id);
}
if changed_principal.member_change {
if changed_principal.typ == Type::Tenant {
@ -282,9 +267,7 @@ impl Server {
{
Ok(principals) => {
for principal in principals.items {
if !changed_principals.contains(principal.id()) {
self.increment_revision(principal.id()).await;
}
changed_ids.insert(principal.id());
}
}
Err(err) => {
@ -302,22 +285,16 @@ impl Server {
}
if !nested_principals.is_empty() {
let mut fetched_ids = AHashSet::new();
let mut ids = nested_principals.into_iter();
let mut ids_stack = vec![];
loop {
if let Some(id) = ids.next() {
// Skip if already fetched
if !fetched_ids.insert(id) {
if !changed_ids.insert(id) {
continue;
}
// Increment revision
if !changed_principals.contains(id) {
self.increment_revision(id).await;
}
// Obtain principal
match self.store().get_members(id).await {
Ok(members) => {
@ -339,41 +316,23 @@ impl Server {
}
}
}
}
async fn increment_revision(&self, id: u32) {
if let Err(err) = self
.in_memory_store()
.counter_incr(
KeyValue::with_prefix(KV_TOKEN_REVISION, id.to_be_bytes(), 1).expires(30 * 86400),
false,
)
.await
{
trc::error!(
err.details("Failed to increment principal revision")
.account_id(id)
);
}
}
pub async fn fetch_token_revision(&self, id: u32) -> Option<u64> {
match self
.in_memory_store()
.counter_get(KeyValue::<()>::build_key(
KV_TOKEN_REVISION,
id.to_be_bytes(),
))
.await
{
Ok(revision) => (revision as u64).into(),
Err(err) => {
trc::error!(
err.details("Failed to obtain principal revision")
.account_id(id)
);
None
// Invalidate access tokens in cluster
if !changed_ids.is_empty() {
let mut ids = Vec::with_capacity(changed_ids.len());
for id in changed_ids {
self.inner.cache.permissions.remove(&id);
self.inner.cache.access_tokens.remove(&id);
ids.push(id);
}
self.cluster_broadcast(BroadcastEvent::InvalidateAccessTokens(ids))
.await;
}
// Invalidate DAV caches
if !changed_names.is_empty() {
self.cluster_broadcast(BroadcastEvent::InvalidateDavCache(changed_names))
.await;
}
}
}

View file

@ -20,7 +20,6 @@ use crate::Server;
pub struct RolePermissions {
pub enabled: Permissions,
pub disabled: Permissions,
pub revision: u64,
}
static USER_PERMISSIONS: LazyLock<Arc<RolePermissions>> = LazyLock::new(user_permissions);
@ -35,8 +34,6 @@ impl Server {
ROLE_ADMIN => Ok(ADMIN_PERMISSIONS.clone()),
ROLE_TENANT_ADMIN => Ok(TENANT_ADMIN_PERMISSIONS.clone()),
role_id => {
let revision = self.fetch_token_revision(role_id).await;
match self
.inner
.cache
@ -44,26 +41,9 @@ impl Server {
.get_value_or_guard_async(&role_id)
.await
{
Ok(permissions) => {
if Some(permissions.revision) == revision {
Ok(permissions)
} else {
let permissions = self
.build_role_permissions(role_id, revision.unwrap_or(u64::MAX))
.await?;
self.inner
.cache
.permissions
.insert(role_id, permissions.clone());
Ok(permissions)
}
}
Ok(permissions) => Ok(permissions),
Err(guard) => {
let permissions = self
.build_role_permissions(role_id, revision.unwrap_or(u64::MAX))
.await?;
let permissions = self.build_role_permissions(role_id).await?;
let _ = guard.insert(permissions.clone());
Ok(permissions)
}
@ -72,18 +52,11 @@ impl Server {
}
}
async fn build_role_permissions(
&self,
role_id: u32,
revision: u64,
) -> trc::Result<Arc<RolePermissions>> {
async fn build_role_permissions(&self, role_id: u32) -> trc::Result<Arc<RolePermissions>> {
let mut role_ids = vec![role_id].into_iter();
let mut role_ids_stack = vec![];
let mut fetched_role_ids = AHashSet::new();
let mut return_permissions = RolePermissions {
revision,
..Default::default()
};
let mut return_permissions = RolePermissions::default();
'outer: loop {
if let Some(role_id) = role_ids.next() {
@ -116,20 +89,10 @@ impl Server {
}
role_id => {
// Try with the cache
let revision = self.fetch_token_revision(role_id).await;
if let Some(role_permissions) = self
.inner
.cache
.permissions
.get(&role_id)
.filter(|p| Some(p.revision) == revision)
{
if let Some(role_permissions) = self.inner.cache.permissions.get(&role_id) {
return_permissions.union(role_permissions.as_ref());
} else {
let mut role_permissions = RolePermissions {
revision: revision.unwrap_or(u64::MAX),
..Default::default()
};
let mut role_permissions = RolePermissions::default();
// Obtain principal
let mut principal = self
@ -233,7 +196,6 @@ fn admin_permissions() -> Arc<RolePermissions> {
Arc::new(RolePermissions {
enabled: Permissions::all(),
disabled: Permissions::new(),
revision: 0,
})
}

View file

@ -70,6 +70,8 @@ pub enum StateEvent {
#[derive(Debug)]
pub enum BroadcastEvent {
StateChange(StateChange),
InvalidateAccessTokens(Vec<u32>),
InvalidateDavCache(Vec<u32>),
ReloadSettings,
ReloadBlockedIps,
}

View file

@ -100,7 +100,6 @@ pub const KV_RATE_LIMIT_CONTACT: u8 = 7;
pub const KV_RATE_LIMIT_HTTP_AUTHENTICATED: u8 = 8;
pub const KV_RATE_LIMIT_HTTP_ANONYMOUS: u8 = 9;
pub const KV_RATE_LIMIT_IMAP: u8 = 10;
pub const KV_TOKEN_REVISION: u8 = 11;
pub const KV_REPUTATION_IP: u8 = 12;
pub const KV_REPUTATION_FROM: u8 = 13;
pub const KV_REPUTATION_DOMAIN: u8 = 14;

View file

@ -470,7 +470,7 @@ impl BootManager {
core.network.asn_geo_lookup,
AsnGeoLookupConfig::Resource { .. }
);
let (ipc, ipc_rxs) = build_ipc(!core.storage.pubsub.is_none());
let (ipc, ipc_rxs) = build_ipc(core.storage.pubsub.is_none());
let inner = Arc::new(Inner {
shared_core: ArcSwap::from_pointee(core),
data,

View file

@ -161,7 +161,7 @@ impl Server {
}
}
self.increment_token_revision(changed_principals).await;
self.invalidate_principal_caches(changed_principals).await;
}
pub async fn refresh_archived_acls(
@ -204,7 +204,7 @@ impl Server {
}
}
self.increment_token_revision(changed_principals).await;
self.invalidate_principal_caches(changed_principals).await;
}
pub async fn map_acl_set(&self, acl_set: Vec<Value>) -> Result<Vec<AclGrant>, SetError> {

View file

@ -50,6 +50,7 @@ pub struct ChangedPrincipals(AHashMap<u32, ChangedPrincipal>);
#[derive(Debug, Default, PartialEq, Eq)]
pub struct ChangedPrincipal {
pub typ: Type,
pub name_change: bool,
pub member_change: bool,
}
@ -2538,7 +2539,8 @@ impl ChangedPrincipals {
PrincipalField::EnabledPermissions | PrincipalField::DisabledPermissions,
Type::Role | Type::Tenant
)
));
))
.update_name_change(matches!(field, PrincipalField::Name));
}
}
@ -2615,13 +2617,18 @@ impl ChangedPrincipal {
Self {
typ,
member_change: false,
name_change: false,
}
}
pub fn update_member_change(&mut self, member_change: bool) {
if !self.member_change && member_change {
self.member_change = true;
}
pub fn update_member_change(&mut self, member_change: bool) -> &mut Self {
self.member_change |= member_change;
self
}
pub fn update_name_change(&mut self, name_change: bool) -> &mut Self {
self.name_change |= name_change;
self
}
}

View file

@ -5,7 +5,6 @@
*/
use crate::{
DavResourceName, RFC_3986,
cache::calcard::{build_scheduling_resources, path_from_scheduling, resource_from_scheduling},
calendar::{Calendar, CalendarEvent, CalendarPreferences},
contact::{AddressBook, ContactCard},
@ -17,7 +16,6 @@ use calcard::{
resource_from_calendar, resource_from_card, resource_from_event,
};
use common::{CacheSwap, DavResource, DavResources, Server, auth::AccessToken};
use directory::backend::internal::manage::ManageDirectory;
use file::{build_file_resources, build_nested_hierarchy, resource_from_file};
use jmap_proto::types::collection::{Collection, SyncCollection};
use std::{sync::Arc, time::Instant};
@ -179,27 +177,6 @@ impl GroupwareCache for Server {
return Ok(cache);
}
// Build base path
let base_path = if access_token.primary_id() == account_id {
format!(
"{}/{}/",
DavResourceName::from(collection).base_path(),
percent_encoding::utf8_percent_encode(&access_token.name, RFC_3986)
)
} else {
let name = self
.store()
.get_principal_name(account_id)
.await
.caused_by(trc::location!())?
.unwrap_or_else(|| format!("_{account_id}"));
format!(
"{}/{}/",
DavResourceName::from(collection).base_path(),
percent_encoding::utf8_percent_encode(&name, RFC_3986)
)
};
let num_changes = changes.changes.len();
let cache = if !matches!(collection, SyncCollection::CalendarScheduling) {
let mut updated_resources = AHashMap::with_capacity(8);
@ -244,7 +221,7 @@ impl GroupwareCache for Server {
if rebuild_hierarchy {
let mut cache = DavResources {
base_path,
base_path: cache.base_path.clone(),
paths: Default::default(),
resources,
item_change_id: changes.item_change_id.unwrap_or(cache.item_change_id),
@ -264,7 +241,7 @@ impl GroupwareCache for Server {
cache
} else {
DavResources {
base_path,
base_path: cache.base_path.clone(),
paths: cache.paths.clone(),
resources,
item_change_id: changes.item_change_id.unwrap_or(cache.item_change_id),
@ -307,7 +284,7 @@ impl GroupwareCache for Server {
}
DavResources {
base_path,
base_path: cache.base_path.clone(),
paths,
resources,
item_change_id: changes.item_change_id.unwrap_or(cache.item_change_id),

View file

@ -181,7 +181,7 @@ impl PrincipalManager for Server {
}
// Increment revision
self.increment_token_revision(result.changed_principals)
self.invalidate_principal_caches(result.changed_principals)
.await;
Ok(JsonResponse::new(json!({
@ -390,7 +390,7 @@ impl PrincipalManager for Server {
{
Ok(changed_principals) => {
// Increment revision
server.increment_token_revision(changed_principals).await;
server.invalidate_principal_caches(changed_principals).await;
}
Err(err) => {
trc::error!(err.details("Failed to delete principal"));
@ -539,7 +539,7 @@ impl PrincipalManager for Server {
}
// Increment revision
self.increment_token_revision(changed_principals).await;
self.invalidate_principal_caches(changed_principals).await;
Ok(JsonResponse::new(json!({
"data": (),
@ -669,7 +669,7 @@ impl PrincipalManager for Server {
.await?;
// Increment revision
self.increment_token_revision(changed_principals).await;
self.invalidate_principal_caches(changed_principals).await;
Ok(JsonResponse::new(json!({
"data": (),
@ -770,7 +770,7 @@ impl PrincipalManager for Server {
.await?;
// Increment revision
self.increment_token_revision(ChangedPrincipals::from_change(
self.invalidate_principal_caches(ChangedPrincipals::from_change(
access_token.primary_id(),
Type::Individual,
PrincipalField::Secrets,
@ -841,7 +841,7 @@ impl PrincipalManager for Server {
.await?;
// Increment revision
self.increment_token_revision(changed_principals).await;
self.invalidate_principal_caches(changed_principals).await;
Ok(JsonResponse::new(json!({
"data": (),

View file

@ -341,7 +341,7 @@ impl<T: SessionStream> Session<T> {
// Invalidate ACLs
data.server
.increment_token_revision(ChangedPrincipals::from_change(
.invalidate_principal_caches(ChangedPrincipals::from_change(
acl_account_id,
Type::Individual,
PrincipalField::EnabledPermissions,

View file

@ -4,9 +4,14 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use std::borrow::Borrow;
use common::ipc::BroadcastEvent;
use jmap_proto::types::state::StateChange;
use utils::map::bitmap::Bitmap;
use utils::{
codec::leb128::{Leb128Iterator, Leb128Writer},
map::bitmap::Bitmap,
};
pub mod publisher;
pub mod subscriber;
@ -17,7 +22,6 @@ pub(crate) struct BroadcastBatch<T> {
}
const MAX_BATCH_SIZE: usize = 100;
const MESSAGE_SIZE: usize = std::mem::size_of::<u32>() + (std::mem::size_of::<u64>() * 2);
pub(crate) const BROADCAST_TOPIC: &str = "stwt.agora";
impl BroadcastBatch<Vec<BroadcastEvent>> {
@ -34,23 +38,37 @@ impl BroadcastBatch<Vec<BroadcastEvent>> {
pub fn serialize(&self, node_id: u16) -> Vec<u8> {
let mut serialized =
Vec::with_capacity((self.messages.len() * MESSAGE_SIZE) + std::mem::size_of::<u16>());
serialized.extend_from_slice(&node_id.to_le_bytes());
Vec::with_capacity((self.messages.len() * 10) + std::mem::size_of::<u16>());
let _ = serialized.write_leb128(node_id);
for message in &self.messages {
let msg_id: u32 = match message {
match message {
BroadcastEvent::StateChange(state_change) => {
serialized.extend_from_slice(&state_change.change_id.to_le_bytes());
serialized.extend_from_slice(&state_change.types.as_ref().to_le_bytes());
serialized.extend_from_slice(&state_change.account_id.to_le_bytes());
continue;
serialized.push(0u8);
let _ = serialized.write_leb128(state_change.change_id);
let _ = serialized.write_leb128(*state_change.types.as_ref());
let _ = serialized.write_leb128(state_change.account_id);
}
BroadcastEvent::ReloadSettings => 0,
BroadcastEvent::ReloadBlockedIps => 1,
};
serialized.extend_from_slice(&u64::MAX.to_le_bytes());
serialized.extend_from_slice(&u64::MAX.to_le_bytes());
serialized.extend_from_slice(&msg_id.to_le_bytes());
BroadcastEvent::InvalidateAccessTokens(items) => {
serialized.push(1u8);
let _ = serialized.write_leb128(items.len());
for item in items {
let _ = serialized.write_leb128(*item);
}
}
BroadcastEvent::InvalidateDavCache(items) => {
serialized.push(2u8);
let _ = serialized.write_leb128(items.len());
for item in items {
let _ = serialized.write_leb128(*item);
}
}
BroadcastEvent::ReloadSettings => {
serialized.push(3u8);
}
BroadcastEvent::ReloadBlockedIps => {
serialized.push(4u8);
}
}
}
serialized
}
@ -60,49 +78,48 @@ impl BroadcastBatch<Vec<BroadcastEvent>> {
}
}
impl<T: AsRef<[u8]>> BroadcastBatch<T> {
pub fn node_id(&self) -> Option<u16> {
self.messages
.as_ref()
.get(0..std::mem::size_of::<u16>())
.and_then(|bytes| bytes.try_into().ok())
.map(u16::from_le_bytes)
impl<T, I> BroadcastBatch<T>
where
T: Iterator<Item = I> + Leb128Iterator<I>,
I: Borrow<u8>,
{
pub fn node_id(&mut self) -> Option<u16> {
self.messages.next_leb128::<u16>()
}
pub fn events(&self) -> impl Iterator<Item = Option<BroadcastEvent>> {
self.messages
.as_ref()
.get(std::mem::size_of::<u16>()..)
.unwrap_or_default()
.chunks_exact(MESSAGE_SIZE)
.map(|chunk| {
let change_id =
u64::from_le_bytes(chunk[0..std::mem::size_of::<u64>()].try_into().unwrap());
let types = u64::from_le_bytes(
chunk[std::mem::size_of::<u64>()..std::mem::size_of::<u64>() * 2]
.try_into()
.unwrap(),
);
let account_id = u32::from_le_bytes(
chunk[std::mem::size_of::<u64>() * 2..20]
.try_into()
.unwrap(),
);
Some(if change_id != u64::MAX {
BroadcastEvent::StateChange(StateChange {
change_id,
types: Bitmap::from(types),
account_id,
})
} else {
match account_id {
0 => BroadcastEvent::ReloadSettings,
1 => BroadcastEvent::ReloadBlockedIps,
_ => return None,
pub fn next_event(&mut self) -> Result<Option<BroadcastEvent>, ()> {
if let Some(id) = self.messages.next() {
match id.borrow() {
0 => Ok(Some(BroadcastEvent::StateChange(StateChange {
change_id: self.messages.next_leb128().ok_or(())?,
types: Bitmap::from(self.messages.next_leb128::<u64>().ok_or(())?),
account_id: self.messages.next_leb128().ok_or(())?,
}))),
1 => {
let count = self.messages.next_leb128::<usize>().ok_or(())?;
let mut items = Vec::with_capacity(count);
for _ in 0..count {
items.push(self.messages.next_leb128().ok_or(())?);
}
})
})
Ok(Some(BroadcastEvent::InvalidateAccessTokens(items)))
}
2 => {
let count = self.messages.next_leb128::<usize>().ok_or(())?;
let mut items = Vec::with_capacity(count);
for _ in 0..count {
items.push(self.messages.next_leb128().ok_or(())?);
}
Ok(Some(BroadcastEvent::InvalidateDavCache(items)))
}
3 => Ok(Some(BroadcastEvent::ReloadSettings)),
4 => Ok(Some(BroadcastEvent::ReloadBlockedIps)),
_ => Err(()),
}
} else {
Ok(None)
}
}
}

View file

@ -72,7 +72,7 @@ pub fn spawn_broadcast_subscriber(inner: Arc<Inner>, mut shutdown_rx: watch::Rec
message = stream.next() => {
match message {
Some(message) => {
let batch = BroadcastBatch::new(message.payload());
let mut batch = BroadcastBatch::new(message.payload().iter());
let node_id = match batch.node_id() {
Some(node_id) => {
if node_id != this_node_id {
@ -95,79 +95,101 @@ pub fn spawn_broadcast_subscriber(inner: Arc<Inner>, mut shutdown_rx: watch::Rec
};
let mut max_timestamp = 0;
let mut has_errors = false;
for event in batch.events() {
if let Some(event) = event {
match event {
BroadcastEvent::StateChange(state_change) => {
max_timestamp = std::cmp::max(
max_timestamp,
state_change.change_id,
);
if inner.ipc.state_tx.send(StateEvent::Publish { state_change, broadcast: false }).await.is_err() {
trc::event!(
Server(ServerEvent::ThreadError),
Details = "Error sending state change.",
CausedBy = trc::location!()
);
loop {
match batch.next_event() {
Ok(Some(event)) => {
trc::event!(
Cluster(ClusterEvent::MessageReceived),
From = node_id,
To = this_node_id,
Details = log_event(&event),
);
match event {
BroadcastEvent::StateChange(state_change) => {
max_timestamp =
std::cmp::max(max_timestamp, state_change.change_id);
if inner
.ipc
.state_tx
.send(StateEvent::Publish {
state_change,
broadcast: false,
})
.await
.is_err()
{
trc::event!(
Server(ServerEvent::ThreadError),
Details = "Error sending state change.",
CausedBy = trc::location!()
);
}
}
},
BroadcastEvent::ReloadSettings => {
match inner.build_server().reload().await {
Ok(result) => {
if let Some(new_core) = result.new_core {
// Update core
inner.shared_core.store(new_core.into());
BroadcastEvent::InvalidateAccessTokens(ids) => {
for id in &ids {
inner.cache.permissions.remove(id);
inner.cache.access_tokens.remove(id);
}
}
BroadcastEvent::InvalidateDavCache(ids) => {
for id in &ids {
inner.cache.files.remove(id);
inner.cache.contacts.remove(id);
inner.cache.events.remove(id);
inner.cache.scheduling.remove(id);
}
}
BroadcastEvent::ReloadSettings => {
match inner.build_server().reload().await {
Ok(result) => {
if let Some(new_core) = result.new_core {
// Update core
inner.shared_core.store(new_core.into());
if inner
.ipc
.housekeeper_tx
.send(HousekeeperEvent::ReloadSettings)
.await
.is_err()
{
trc::event!(
Server(trc::ServerEvent::ThreadError),
Details = "Failed to send setting reload event to housekeeper",
CausedBy = trc::location!(),
);
if inner
.ipc
.housekeeper_tx
.send(HousekeeperEvent::ReloadSettings)
.await
.is_err()
{
trc::event!(
Server(trc::ServerEvent::ThreadError),
Details = "Failed to send setting reload event to housekeeper",
CausedBy = trc::location!(),
);
}
}
}
Err(err) => {
trc::error!(
err.details("Failed to reload settings")
.caused_by(trc::location!())
);
}
}
Err(err) => {
}
BroadcastEvent::ReloadBlockedIps => {
if let Err(err) = inner.build_server().reload_blocked_ips().await {
trc::error!(
err.details("Failed to reload settings")
.caused_by(trc::location!())
);
}
}
},
BroadcastEvent::ReloadBlockedIps => {
if let Err(err) = inner.build_server().reload_blocked_ips().await {
trc::error!(
err.details("Failed to reload settings")
.caused_by(trc::location!())
);
}
},
}
}
Ok(None) => break,
Err(_) => {
trc::event!(
Cluster(ClusterEvent::MessageInvalid),
Details = message.payload()
);
break;
}
} else if !has_errors {
trc::event!(
Cluster(ClusterEvent::MessageInvalid),
Details = message.payload()
);
has_errors = true;
}
}
trc::event!(
Cluster(ClusterEvent::MessageReceived),
From = node_id,
To = this_node_id,
Details = batch.events().flatten().map(log_event).collect::<Vec<_>>(),
);
}
None => {
trc::event!(
@ -186,14 +208,31 @@ pub fn spawn_broadcast_subscriber(inner: Arc<Inner>, mut shutdown_rx: watch::Rec
});
}
fn log_event(event: BroadcastEvent) -> trc::Value {
fn log_event(event: &BroadcastEvent) -> trc::Value {
match event {
BroadcastEvent::StateChange(state_change) => trc::Value::Array(vec![
"StateChange".into(),
state_change.account_id.into(),
state_change.change_id.into(),
(*state_change.types.as_ref()).into(),
]),
BroadcastEvent::ReloadSettings => CompactString::const_new("ReloadSettings").into(),
BroadcastEvent::ReloadBlockedIps => CompactString::const_new("ReloadBlockedIps").into(),
BroadcastEvent::InvalidateAccessTokens(items) => {
let mut array = Vec::with_capacity(items.len() + 1);
array.push("InvalidateAccessTokens".into());
for item in items {
array.push((*item).into());
}
trc::Value::Array(array)
}
BroadcastEvent::InvalidateDavCache(items) => {
let mut array = Vec::with_capacity(items.len() + 1);
array.push("InvalidateDavCache".into());
for item in items {
array.push((*item).into());
}
trc::Value::Array(array)
}
}
}

View file

@ -4,23 +4,27 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use std::net::IpAddr;
use crate::imap::idle;
use super::ClusterTest;
use crate::imap::idle;
use directory::backend::internal::{
PrincipalAction, PrincipalField, PrincipalUpdate, PrincipalValue,
manage::{ManageDirectory, UpdatePrincipal},
};
use groupware::cache::GroupwareCache;
use jmap_proto::types::collection::SyncCollection;
use std::net::IpAddr;
pub async fn test(cluster: &ClusterTest) {
println!("Running cluster broadcast tests...");
// Run IMAP idle tests across nodes
let server1 = cluster.server(1);
let server2 = cluster.server(2);
let mut node1_client = cluster.imap_client("john", 1).await;
let mut node2_client = cluster.imap_client("john", 2).await;
idle::test(&mut node1_client, &mut node2_client, true).await;
// Test event broadcast
let server1 = cluster.server(1);
let server2 = cluster.server(2);
let test_ip: IpAddr = "8.8.8.8".parse().unwrap();
assert!(!server1.is_ip_blocked(&test_ip));
assert!(!server2.is_ip_blocked(&test_ip));
@ -28,4 +32,56 @@ pub async fn test(cluster: &ClusterTest) {
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
assert!(server1.is_ip_blocked(&test_ip));
assert!(server2.is_ip_blocked(&test_ip));
// Change John's password and expect it to propagate
let account_id = cluster.account_id("john");
assert!(server1.inner.cache.access_tokens.get(&account_id).is_some());
assert!(server2.inner.cache.access_tokens.get(&account_id).is_some());
let changes = server1
.core
.storage
.data
.update_principal(
UpdatePrincipal::by_id(account_id).with_updates(vec![PrincipalUpdate {
action: PrincipalAction::AddItem,
field: PrincipalField::Secrets,
value: PrincipalValue::String("hello".into()),
}]),
)
.await
.unwrap();
server1.invalidate_principal_caches(changes).await;
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
assert!(server1.inner.cache.access_tokens.get(&account_id).is_none());
assert!(server2.inner.cache.access_tokens.get(&account_id).is_none());
// Rename John to Juan and expect DAV caches to be invalidated
let access_token = server1.get_access_token(account_id).await.unwrap();
server1
.fetch_dav_resources(&access_token, account_id, SyncCollection::Calendar)
.await
.unwrap();
server2
.fetch_dav_resources(&access_token, account_id, SyncCollection::Calendar)
.await
.unwrap();
assert!(server1.inner.cache.events.get(&account_id).is_some());
assert!(server2.inner.cache.events.get(&account_id).is_some());
let changes = server1
.core
.storage
.data
.update_principal(
UpdatePrincipal::by_id(account_id).with_updates(vec![PrincipalUpdate {
action: PrincipalAction::Set,
field: PrincipalField::Name,
value: PrincipalValue::String("juan".into()),
}]),
)
.await
.unwrap();
server1.invalidate_principal_caches(changes).await;
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
assert!(server1.inner.cache.events.get(&account_id).is_none());
assert!(server2.inner.cache.events.get(&account_id).is_none());
}

View file

@ -6,6 +6,7 @@
use std::{path::PathBuf, sync::Arc, time::Duration};
use ahash::AHashMap;
use common::{
Caches, Core, Data, Inner, Server,
config::{
@ -52,6 +53,7 @@ pub async fn cluster_tests() {
#[allow(dead_code)]
pub struct ClusterTest {
servers: Vec<Server>,
account_ids: AHashMap<String, u32>,
shutdown_txs: Vec<watch::Sender<bool>>,
}
@ -107,15 +109,18 @@ async fn init_cluster_tests(delete_if_exists: bool) -> ClusterTest {
}
// Create test users
let mut account_ids = AHashMap::new();
for (account, secret, name, email) in TEST_USERS {
let _account_id = store
let account_id = store
.create_test_user(account, secret, name, &[email])
.await;
account_ids.insert(account.to_string(), account_id);
}
ClusterTest {
servers,
shutdown_txs,
account_ids,
}
}
@ -142,6 +147,13 @@ impl ClusterTest {
.get(node_id)
.unwrap_or_else(|| panic!("No server found for node ID: {}", node_id))
}
pub fn account_id(&self, login: &str) -> u32 {
self.account_ids
.get(login)
.cloned()
.unwrap_or_else(|| panic!("No account ID found for login: {}", login))
}
}
fn find_account_secret(login: &str) -> &str {
@ -246,6 +258,7 @@ url = "'https://127.0.0.1:800{NODE_ID}'"
[cluster]
node-id = {NODE_ID}
coordinator = "{PUBSUB}"
[server.listener.http]
bind = ["127.0.0.1:1800{NODE_ID}"]
@ -323,7 +336,6 @@ fts = "{STORE}"
blob = "{STORE}"
lookup = "{STORE}"
directory = "{STORE}"
pubsub = "{PUBSUB}"
[directory."{STORE}"]
type = "internal"

View file

@ -668,7 +668,7 @@ pub async fn test(params: &mut JMAPTest) {
// Add John and Jane to the Sales group
for name in ["jdoe@example.com", "jane.smith@example.com"] {
server
.increment_token_revision(
.invalidate_principal_caches(
server
.core
.storage
@ -776,7 +776,7 @@ pub async fn test(params: &mut JMAPTest) {
// Remove John from the sales group
server
.increment_token_revision(
.invalidate_principal_caches(
server
.core
.storage

View file

@ -4,6 +4,9 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use dav_proto::Depth;
use hyper::StatusCode;
use super::WebDavTest;
pub async fn test(test: &WebDavTest) {
@ -43,6 +46,30 @@ pub async fn test(test: &WebDavTest) {
["/dav/cal/", "/dav/cal/jane/", "/dav/cal/support/"],
);
// Test 404 responses
jane.sync_collection(
"/dav/cal/jane/default/",
"",
Depth::Infinity,
None,
["D:getetag"],
)
.await;
jane.sync_collection(
"/dav/cal/jane/test-404/",
"",
Depth::Infinity,
None,
["D:getetag"],
)
.await;
jane.request("PROPFIND", "/dav/cal/jane/default/", "")
.await
.with_status(StatusCode::MULTI_STATUS);
jane.request("PROPFIND", "/dav/cal/jane/test-404/", "")
.await
.with_status(StatusCode::NOT_FOUND);
john.delete_default_containers().await;
jane.delete_default_containers().await;
jane.delete_default_containers_by_account("support").await;