Principal search index (closes #1368)

This commit is contained in:
mdecimus 2025-04-16 13:07:22 +02:00
parent c5596fb656
commit 3bd255eb09
9 changed files with 245 additions and 104 deletions

1
Cargo.lock generated
View file

@ -1910,6 +1910,7 @@ dependencies = [
"mail-parser",
"mail-send",
"md5",
"nlp",
"password-hash",
"pbkdf2",
"proc_macros",

View file

@ -9,6 +9,7 @@ utils = { path = "../utils" }
proc_macros = { path = "../utils/proc-macros" }
store = { path = "../store" }
trc = { path = "../trc" }
nlp = { path = "../nlp" }
jmap_proto = { path = "../jmap-proto" }
smtp-proto = { version = "0.1" }
mail-parser = { version = "0.10", features = ["full_encoding", "serde_support"] }

View file

@ -11,13 +11,16 @@ use super::{
use crate::{
MemberOf, Permission, PermissionGrant, Permissions, Principal, PrincipalData, PrincipalQuota,
QueryBy, ROLE_ADMIN, ROLE_TENANT_ADMIN, ROLE_USER, Type, backend::RcptType,
core::principal::build_search_index,
};
use ahash::{AHashMap, AHashSet};
use compact_str::CompactString;
use jmap_proto::types::collection::Collection;
use nlp::tokenizers::word::WordTokenizer;
use store::{
Deserialize, IterateParams, Serialize, SerializeInfallible, Store, U32_LEN, ValueKey,
backend::MAX_TOKEN_LENGTH,
roaring::RoaringBitmap,
write::{
AlignedBytes, Archive, Archiver, BatchBuilder, DirectoryClass, ValueClass,
key::DeserializeBigEndian,
@ -148,7 +151,6 @@ impl ManageDirectory for Store {
self.get_principal_info(name).await.map(|v| v.map(|v| v.id))
}
async fn get_principal_info(&self, name: &str) -> trc::Result<Option<PrincipalInfo>> {
let todo = "cache";
self.get_value::<PrincipalInfo>(ValueKey::from(ValueClass::Directory(
DirectoryClass::NameToId(name.as_bytes().to_vec()),
)))
@ -195,7 +197,9 @@ impl ManageDirectory for Store {
.with_account_id(u32::MAX)
.with_collection(Collection::Principal)
.assert_value(name_key.clone(), ())
.create_document(principal_id)
.create_document(principal_id);
build_search_index(&mut batch, principal_id, None, Some(&principal));
batch
.set(
name_key,
PrincipalInfo::new(principal_id, typ, None).serialize(),
@ -545,7 +549,9 @@ impl ManageDirectory for Store {
principal_create.name().as_bytes().to_vec(),
)),
(),
)
);
build_search_index(&mut batch, principal_id, None, Some(&principal_create));
batch
.set(
ValueClass::Directory(DirectoryClass::Principal(principal_id)),
principal_bytes,
@ -618,23 +624,33 @@ impl ManageDirectory for Store {
QueryBy::Id(principal_id) => principal_id,
QueryBy::Credentials(_) => unreachable!(),
};
let principal = self
.get_principal(principal_id)
let principal_ = self
.get_value::<Archive<AlignedBytes>>(ValueKey::from(ValueClass::Directory(
DirectoryClass::Principal(principal_id),
)))
.await
.caused_by(trc::location!())?
.ok_or_else(|| not_found(principal_id.to_string()))?;
let principal = principal_
.unarchive::<Principal>()
.caused_by(trc::location!())?;
let typ = Type::from(&principal.typ);
let mut batch = BatchBuilder::new();
batch.with_account_id(u32::MAX);
// SPDX-SnippetBegin
// SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
// SPDX-License-Identifier: LicenseRef-SEL
// Make sure tenant has no data
let tenant = principal.tenant.as_ref().map(|t| t.to_native());
#[cfg(feature = "enterprise")]
match principal.typ {
match typ {
Type::Individual | Type::Group => {
// Update tenant quota
if let Some(tenant_id) = principal.tenant() {
if let Some(tenant_id) = tenant {
let quota = self
.get_counter(DirectoryClass::UsedQuota(principal_id))
.await
@ -648,7 +664,7 @@ impl ManageDirectory for Store {
let tenant_members = self
.list_principals(
None,
principal.id().into(),
principal_id.into(),
&[
Type::Individual,
Type::Group,
@ -688,8 +704,8 @@ impl ManageDirectory for Store {
}
}
Type::Domain => {
if let Some(tenant_id) = principal.tenant() {
let name = principal.name();
if let Some(tenant_id) = tenant {
let name = principal.name.as_str();
let tenant_members = self
.list_principals(
None,
@ -773,15 +789,17 @@ impl ManageDirectory for Store {
// Delete principal
batch
.with_account_id(principal_id)
.delete_document(principal_id)
.clear(DirectoryClass::NameToId(principal.name.as_bytes().to_vec()))
.clear(DirectoryClass::Principal(principal_id))
.clear(DirectoryClass::UsedQuota(principal_id));
for email in principal.emails {
for email in principal.emails.iter() {
batch.clear(DirectoryClass::EmailToId(email.as_bytes().to_vec()));
}
build_search_index(&mut batch, principal_id, Some(principal), None);
for member in self
.get_member_of(principal_id)
.await
@ -790,7 +808,7 @@ impl ManageDirectory for Store {
// Update changed principals
changed_principals.add_member_change(
principal_id,
principal.typ,
typ,
member.principal_id,
member.typ,
);
@ -817,12 +835,7 @@ impl ManageDirectory for Store {
.await
.caused_by(trc::location!())?
{
changed_principals.add_member_change(
member_id,
member_info.typ,
principal_id,
principal.typ,
);
changed_principals.add_member_change(member_id, member_info.typ, principal_id, typ);
}
// Remove members
@ -840,7 +853,7 @@ impl ManageDirectory for Store {
.await
.caused_by(trc::location!())?;
changed_principals.add_deletion(principal_id, principal.typ);
changed_principals.add_deletion(principal_id, typ);
Ok(changed_principals)
}
@ -908,13 +921,6 @@ impl ManageDirectory for Store {
)
});
if update_principal {
batch.assert_value(
ValueClass::Directory(DirectoryClass::Principal(principal_id)),
prev_principal,
);
}
let mut used_quota: Option<i64> = None;
// SPDX-SnippetBegin
@ -1860,12 +1866,24 @@ impl ManageDirectory for Store {
}
if update_principal {
batch.set(
ValueClass::Directory(DirectoryClass::Principal(principal_id)),
Archiver::new(principal)
.serialize()
.caused_by(trc::location!())?,
build_search_index(
&mut batch,
principal_id,
Some(prev_principal.inner),
Some(&principal),
);
batch
.assert_value(
ValueClass::Directory(DirectoryClass::Principal(principal_id)),
prev_principal,
)
.set(
ValueClass::Directory(DirectoryClass::Principal(principal_id)),
Archiver::new(principal)
.serialize()
.caused_by(trc::location!())?,
);
}
self.write(batch.build_all())
@ -1884,24 +1902,91 @@ impl ManageDirectory for Store {
page: usize,
limit: usize,
) -> trc::Result<PrincipalList<Principal>> {
let filter = if let Some(filter) = filter.filter(|f| !f.trim().is_empty()) {
let mut matches = RoaringBitmap::new();
for token in WordTokenizer::new(filter, MAX_TOKEN_LENGTH) {
let word_bytes = token.word.as_bytes();
let from_key = ValueKey::from(ValueClass::Directory(DirectoryClass::Index {
word: word_bytes.to_vec(),
principal_id: 0,
}));
let to_key = ValueKey::from(ValueClass::Directory(DirectoryClass::Index {
word: word_bytes.to_vec(),
principal_id: u32::MAX,
}));
let mut word_matches = RoaringBitmap::new();
self.iterate(
IterateParams::new(from_key, to_key).no_values(),
|key, _| {
let id_pos = key.len() - U32_LEN;
if key.get(1..id_pos).is_some_and(|v| v == word_bytes) {
word_matches.insert(key.deserialize_be_u32(id_pos)?);
Ok(true)
} else {
Ok(false)
}
},
)
.await
.caused_by(trc::location!())?;
if matches.is_empty() {
matches = word_matches;
} else {
matches &= word_matches;
if matches.is_empty() {
break;
}
}
}
if !matches.is_empty() {
Some(matches)
} else {
return Ok(PrincipalList {
total: 0,
items: vec![],
});
}
} else {
None
};
let from_key = ValueKey::from(ValueClass::Directory(DirectoryClass::NameToId(vec![])));
let to_key = ValueKey::from(ValueClass::Directory(DirectoryClass::NameToId(vec![
u8::MAX;
10
])));
let mut results = Vec::new();
let max_items = if limit > 0 { limit } else { usize::MAX };
let mut offset = page.saturating_sub(1) * limit;
let mut result = PrincipalList {
items: Vec::new(),
total: 0,
};
self.iterate(
IterateParams::new(from_key, to_key).ascending(),
|key, value| {
let pt = PrincipalInfo::deserialize(value).caused_by(trc::location!())?;
if (types.is_empty() || types.contains(&pt.typ)) && pt.has_tenant_access(tenant_id)
if (types.is_empty() || types.contains(&pt.typ))
&& pt.has_tenant_access(tenant_id)
&& filter.as_ref().is_none_or(|filter| filter.contains(pt.id))
{
let mut principal = Principal::new(pt.id, pt.typ);
principal.name =
String::from_utf8_lossy(key.get(1..).unwrap_or_default()).into_owned();
results.push(principal);
result.total += 1;
if offset == 0 {
if result.items.len() < max_items {
let mut principal = Principal::new(pt.id, pt.typ);
principal.name =
String::from_utf8_lossy(key.get(1..).unwrap_or_default())
.into_owned();
result.items.push(principal);
}
} else {
offset -= 1;
}
}
Ok(true)
@ -1910,69 +1995,23 @@ impl ManageDirectory for Store {
.await
.caused_by(trc::location!())?;
if filter.is_none() && !fetch {
return Ok(PrincipalList {
total: results.len() as u64,
items: results
.into_iter()
.skip(page.saturating_sub(1) * limit)
.take(if limit > 0 { limit } else { usize::MAX })
.collect(),
});
if fetch && !result.items.is_empty() {
let mut items = Vec::with_capacity(result.items.len());
for principal in result.items {
items.push(
self.query(QueryBy::Id(principal.id), fetch)
.await
.caused_by(trc::location!())?
.ok_or_else(|| not_found(principal.name().to_string()))?,
);
}
result.items = items;
Ok(result)
} else {
Ok(result)
}
let mut result = PrincipalList {
items: vec![],
total: 0,
};
let filters = filter.and_then(|filter| {
let filters = filter
.split_whitespace()
.map(|r| r.to_lowercase())
.collect::<Vec<_>>();
if !filters.is_empty() {
Some(filters)
} else {
None
}
});
let todo = "fix search";
let mut offset = limit * page.saturating_sub(1);
let mut is_done = false;
for mut principal in results {
if !is_done || filters.is_some() {
principal = self
.query(QueryBy::Id(principal.id), fetch)
.await
.caused_by(trc::location!())?
.ok_or_else(|| not_found(principal.name().to_string()))?;
}
if filters.as_ref().is_none_or(|filters| {
filters.iter().all(|f| {
principal.name.contains(f)
|| principal
.description
.as_ref()
.is_some_and(|n| n.contains(f))
})
}) {
result.total += 1;
if offset == 0 {
if !is_done {
result.items.push(principal);
is_done = limit != 0 && result.items.len() >= limit;
}
} else {
offset -= 1;
}
}
}
Ok(result)
}
async fn count_principals(
@ -2236,7 +2275,7 @@ impl ManageDirectory for Store {
if let Some(tenant_id) = principal.tenant {
if fields.is_empty() || fields.contains(&PrincipalField::Tenant) {
if let Some(name) = self
.get_principal_name(tenant_id as u32)
.get_principal_name(tenant_id)
.await
.caused_by(trc::location!())?
{

View file

@ -6,15 +6,21 @@
use std::{collections::hash_map::Entry, fmt, str::FromStr};
use ahash::AHashSet;
use nlp::tokenizers::word::WordTokenizer;
use serde::{
Deserializer, Serializer,
de::{self, IgnoredAny, Visitor},
ser::SerializeMap,
};
use store::U64_LEN;
use store::{
U64_LEN,
backend::MAX_TOKEN_LENGTH,
write::{BatchBuilder, DirectoryClass},
};
use crate::{
Permission, PermissionGrant, Principal, PrincipalData, ROLE_ADMIN, Type,
ArchivedPrincipal, Permission, PermissionGrant, Principal, PrincipalData, ROLE_ADMIN, Type,
backend::internal::{PrincipalField, PrincipalSet, PrincipalUpdate, PrincipalValue},
};
@ -801,6 +807,53 @@ impl From<Vec<u32>> for PrincipalValue {
}
}
pub(crate) fn build_search_index(
batch: &mut BatchBuilder,
principal_id: u32,
current: Option<&ArchivedPrincipal>,
new: Option<&Principal>,
) {
let mut current_words = AHashSet::new();
let mut new_words = AHashSet::new();
if let Some(current) = current {
for word in [Some(current.name.as_str()), current.description.as_deref()]
.into_iter()
.chain(current.emails.iter().map(|s| Some(s.as_str())))
.flatten()
{
current_words.extend(WordTokenizer::new(word, MAX_TOKEN_LENGTH).map(|t| t.word));
}
}
if let Some(new) = new {
for word in [Some(new.name.as_str()), new.description.as_deref()]
.into_iter()
.chain(new.emails.iter().map(|s| Some(s.as_str())))
.flatten()
{
new_words.extend(WordTokenizer::new(word, MAX_TOKEN_LENGTH).map(|t| t.word));
}
}
for word in new_words.difference(&current_words) {
batch.set(
DirectoryClass::Index {
word: word.as_bytes().to_vec(),
principal_id,
},
vec![],
);
}
for word in current_words.difference(&new_words) {
batch.clear(DirectoryClass::Index {
word: word.as_bytes().to_vec(),
principal_id,
});
}
}
impl Type {
pub fn to_jmap(&self) -> &'static str {
match self {

View file

@ -439,3 +439,21 @@ impl IntoError for LdapError {
}
}
}
impl From<&ArchivedType> for Type {
fn from(archived: &ArchivedType) -> Self {
match archived {
ArchivedType::Individual => Type::Individual,
ArchivedType::Group => Type::Group,
ArchivedType::Resource => Type::Resource,
ArchivedType::Location => Type::Location,
ArchivedType::List => Type::List,
ArchivedType::Other => Type::Other,
ArchivedType::Domain => Type::Domain,
ArchivedType::Tenant => Type::Tenant,
ArchivedType::Role => Type::Role,
ArchivedType::ApiKey => Type::ApiKey,
ArchivedType::OauthClient => Type::OauthClient,
}
}
}

View file

@ -322,6 +322,10 @@ impl ValueClass {
.write(6u8)
.write(*principal_id)
.write(*has_member),
DirectoryClass::Index { word, principal_id } => serializer
.write(7u8)
.write(word.as_slice())
.write(*principal_id),
},
ValueClass::Queue(queue) => match queue {
QueueClass::Message(queue_id) => serializer.write(*queue_id),
@ -534,6 +538,7 @@ impl ValueClass {
DirectoryClass::NameToId(v) | DirectoryClass::EmailToId(v) => v.len(),
DirectoryClass::Principal(_) | DirectoryClass::UsedQuota(_) => U32_LEN,
DirectoryClass::Members { .. } | DirectoryClass::MemberOf { .. } => U32_LEN * 2,
DirectoryClass::Index { word, .. } => word.len() + U32_LEN,
},
ValueClass::Blob(op) => match op {
BlobOp::Reserve { .. } => BLOB_HASH_LEN + U64_LEN + U32_LEN + 1,

View file

@ -203,6 +203,7 @@ pub enum InMemoryClass {
pub enum DirectoryClass {
NameToId(Vec<u8>),
EmailToId(Vec<u8>),
Index { word: Vec<u8>, principal_id: u32 },
MemberOf { principal_id: u32, member_of: u32 },
Members { principal_id: u32, has_member: u32 },
Principal(u32),

View file

@ -618,6 +618,28 @@ async fn internal_directory() {
.collect::<Vec<_>>(),
vec!["list"]
);
assert_eq!(
store
.list_principals("example.org".into(), None, &[], true, 0, 0)
.await
.unwrap()
.items
.into_iter()
.map(|p| p.name)
.collect::<Vec<_>>(),
vec!["example.org", "jane", "john.doe", "list"]
);
assert_eq!(
store
.list_principals("johnny doe".into(), None, &[], true, 0, 0)
.await
.unwrap()
.items
.into_iter()
.map(|p| p.name)
.collect::<Vec<_>>(),
vec!["john.doe"]
);
// Write records on John's and Jane's accounts
let mut document_id = u32::MAX;

View file

@ -362,6 +362,7 @@ impl DirectoryTest {
true,
)
.await;
config.assert_no_errors();
// Enable catch-all and subaddressing