FTS reindex support

This commit is contained in:
mdecimus 2024-09-20 10:54:22 +02:00
parent 58351bdcad
commit 6aa5686cd3
5 changed files with 155 additions and 4 deletions

View file

@ -73,6 +73,7 @@ impl Permission {
Permission::PurgeDataStore => "Clear the data storage",
Permission::PurgeLookupStore => "Clear the lookup storage",
Permission::PurgeAccount => "Completely remove an account and all associated data",
Permission::FtsReindex => "Rebuild the full-text search index",
Permission::Undelete => "Restore deleted items",
Permission::DkimSignatureCreate => "Create DKIM signatures for email authentication",
Permission::DkimSignatureGet => "Retrieve DKIM signature information",

View file

@ -118,6 +118,7 @@ pub enum Permission {
PurgeDataStore,
PurgeLookupStore,
PurgeAccount,
FtsReindex,
Undelete,
DkimSignatureCreate,
DkimSignatureGet,

View file

@ -134,6 +134,35 @@ impl JMAP {
self.housekeeper_request(Event::Purge(PurgeType::Account(account_id)))
.await
}
(Some("reindex"), id, None, &Method::GET) => {
// Validate the access token
access_token.assert_has_permission(Permission::FtsReindex)?;
let account_id = if let Some(id) = id {
self.core
.storage
.data
.get_principal_id(decode_path_element(id).as_ref())
.await?
.ok_or_else(|| trc::ManageEvent::NotFound.into_err())?
.into()
} else {
None
};
let tenant_id = access_token.tenant.map(|t| t.id);
let jmap = self.clone();
tokio::spawn(async move {
if let Err(err) = jmap.reindex(account_id, tenant_id).await {
trc::error!(err.details("Failed to reindex FTS"));
}
});
Ok(JsonResponse::new(json!({
"data": (),
}))
.into_http_response())
}
// SPDX-SnippetBegin
// SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
// SPDX-License-Identifier: LicenseRef-SEL

View file

@ -401,7 +401,7 @@ impl JMAP {
#[cfg(feature = "enterprise")]
if self.core.is_enterprise_edition() {
if let Some(tenant) = quotas.tenant {
if let Some(tenant) = quotas.tenant.filter(|tenant| tenant.quota != 0) {
let used_quota = self.get_used_quota(tenant.id).await? as u64;
if used_quota + item_size > tenant.quota {

View file

@ -6,18 +6,24 @@
use std::{sync::Arc, time::Instant};
use directory::{
backend::internal::{manage::ManageDirectory, PrincipalField},
Type,
};
use jmap_proto::types::{collection::Collection, property::Property};
use store::{
ahash::AHashMap,
fts::index::FtsDocument,
roaring::RoaringBitmap,
write::{
key::DeserializeBigEndian, now, BatchBuilder, Bincode, FtsQueueClass, MaybeDynamicId,
ValueClass,
key::DeserializeBigEndian, now, BatchBuilder, Bincode, BlobOp, FtsQueueClass,
MaybeDynamicId, ValueClass,
},
Deserialize, IterateParams, Serialize, ValueKey, U32_LEN, U64_LEN,
};
use tokio::sync::Notify;
use trc::FtsIndexEvent;
use trc::{AddContext, FtsIndexEvent};
use utils::{BlobHash, BLOB_HASH_LEN};
use crate::{
@ -240,6 +246,120 @@ impl JMAP {
pub fn request_fts_index(&self) {
self.inner.request_fts_index();
}
pub async fn reindex(
&self,
account_id: Option<u32>,
tenant_id: Option<u32>,
) -> trc::Result<()> {
let accounts = if let Some(account_id) = account_id {
RoaringBitmap::from_sorted_iter([account_id]).unwrap()
} else {
let mut accounts = RoaringBitmap::new();
for principal in self
.core
.storage
.data
.list_principals(
None,
tenant_id,
&[Type::Individual, Type::Group],
&[PrincipalField::Name],
0,
0,
)
.await
.caused_by(trc::location!())?
.items
{
accounts.insert(principal.id());
}
accounts
};
// Validate linked blobs
let from_key = ValueKey {
account_id: 0,
collection: 0,
document_id: 0,
class: ValueClass::Blob(BlobOp::Link {
hash: BlobHash::default(),
}),
};
let to_key = ValueKey {
account_id: u32::MAX,
collection: u8::MAX,
document_id: u32::MAX,
class: ValueClass::Blob(BlobOp::Link {
hash: BlobHash::new_max(),
}),
};
let mut hashes: AHashMap<u32, Vec<(u32, BlobHash)>> = AHashMap::new();
self.core
.storage
.data
.iterate(
IterateParams::new(from_key, to_key).ascending().no_values(),
|key, _| {
let account_id = key.deserialize_be_u32(BLOB_HASH_LEN)?;
let collection = *key
.get(BLOB_HASH_LEN + U32_LEN)
.ok_or_else(|| trc::Error::corrupted_key(key, None, trc::location!()))?;
if accounts.contains(account_id) && collection == Collection::Email as u8 {
let hash =
BlobHash::try_from_hash_slice(key.get(0..BLOB_HASH_LEN).ok_or_else(
|| trc::Error::corrupted_key(key, None, trc::location!()),
)?)
.unwrap();
let document_id = key.deserialize_be_u32(key.len() - U32_LEN)?;
hashes
.entry(account_id)
.or_default()
.push((document_id, hash));
}
Ok(true)
},
)
.await
.caused_by(trc::location!())?;
let mut seq = self.generate_snowflake_id().caused_by(trc::location!())?;
for (account_id, hashes) in hashes {
let mut batch = BatchBuilder::new();
batch
.with_account_id(account_id)
.with_collection(Collection::Email);
for (document_id, hash) in hashes {
batch.update_document(document_id).set(
ValueClass::FtsQueue(FtsQueueClass { hash, seq }),
0u64.serialize(),
);
seq += 1;
if batch.ops.len() >= 2000 {
self.core.storage.data.write(batch.build()).await?;
batch = BatchBuilder::new();
batch
.with_account_id(account_id)
.with_collection(Collection::Email);
}
}
if !batch.is_empty() {
self.core.storage.data.write(batch.build()).await?;
}
}
// Request indexing
self.request_fts_index();
Ok(())
}
}
impl Inner {