From 6aa5686cd3cde108229866b73ac47fc07cc8282c Mon Sep 17 00:00:00 2001 From: mdecimus Date: Fri, 20 Sep 2024 10:54:22 +0200 Subject: [PATCH] FTS reindex support --- crates/directory/src/core/mod.rs | 1 + crates/directory/src/lib.rs | 1 + crates/jmap/src/api/management/stores.rs | 29 ++++++ crates/jmap/src/lib.rs | 2 +- crates/jmap/src/services/index.rs | 126 ++++++++++++++++++++++- 5 files changed, 155 insertions(+), 4 deletions(-) diff --git a/crates/directory/src/core/mod.rs b/crates/directory/src/core/mod.rs index 36c21dc1..50383ae9 100644 --- a/crates/directory/src/core/mod.rs +++ b/crates/directory/src/core/mod.rs @@ -73,6 +73,7 @@ impl Permission { Permission::PurgeDataStore => "Clear the data storage", Permission::PurgeLookupStore => "Clear the lookup storage", Permission::PurgeAccount => "Completely remove an account and all associated data", + Permission::FtsReindex => "Rebuild the full-text search index", Permission::Undelete => "Restore deleted items", Permission::DkimSignatureCreate => "Create DKIM signatures for email authentication", Permission::DkimSignatureGet => "Retrieve DKIM signature information", diff --git a/crates/directory/src/lib.rs b/crates/directory/src/lib.rs index e197b1c2..9d767e69 100644 --- a/crates/directory/src/lib.rs +++ b/crates/directory/src/lib.rs @@ -118,6 +118,7 @@ pub enum Permission { PurgeDataStore, PurgeLookupStore, PurgeAccount, + FtsReindex, Undelete, DkimSignatureCreate, DkimSignatureGet, diff --git a/crates/jmap/src/api/management/stores.rs b/crates/jmap/src/api/management/stores.rs index 78148a93..b86c88be 100644 --- a/crates/jmap/src/api/management/stores.rs +++ b/crates/jmap/src/api/management/stores.rs @@ -134,6 +134,35 @@ impl JMAP { self.housekeeper_request(Event::Purge(PurgeType::Account(account_id))) .await } + (Some("reindex"), id, None, &Method::GET) => { + // Validate the access token + access_token.assert_has_permission(Permission::FtsReindex)?; + + let account_id = if let Some(id) = id { + self.core + .storage + .data + .get_principal_id(decode_path_element(id).as_ref()) + .await? + .ok_or_else(|| trc::ManageEvent::NotFound.into_err())? + .into() + } else { + None + }; + let tenant_id = access_token.tenant.map(|t| t.id); + + let jmap = self.clone(); + tokio::spawn(async move { + if let Err(err) = jmap.reindex(account_id, tenant_id).await { + trc::error!(err.details("Failed to reindex FTS")); + } + }); + + Ok(JsonResponse::new(json!({ + "data": (), + })) + .into_http_response()) + } // SPDX-SnippetBegin // SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd // SPDX-License-Identifier: LicenseRef-SEL diff --git a/crates/jmap/src/lib.rs b/crates/jmap/src/lib.rs index e9b70463..84d2b9ef 100644 --- a/crates/jmap/src/lib.rs +++ b/crates/jmap/src/lib.rs @@ -401,7 +401,7 @@ impl JMAP { #[cfg(feature = "enterprise")] if self.core.is_enterprise_edition() { - if let Some(tenant) = quotas.tenant { + if let Some(tenant) = quotas.tenant.filter(|tenant| tenant.quota != 0) { let used_quota = self.get_used_quota(tenant.id).await? as u64; if used_quota + item_size > tenant.quota { diff --git a/crates/jmap/src/services/index.rs b/crates/jmap/src/services/index.rs index a8501771..5a17fa3e 100644 --- a/crates/jmap/src/services/index.rs +++ b/crates/jmap/src/services/index.rs @@ -6,18 +6,24 @@ use std::{sync::Arc, time::Instant}; +use directory::{ + backend::internal::{manage::ManageDirectory, PrincipalField}, + Type, +}; use jmap_proto::types::{collection::Collection, property::Property}; use store::{ + ahash::AHashMap, fts::index::FtsDocument, + roaring::RoaringBitmap, write::{ - key::DeserializeBigEndian, now, BatchBuilder, Bincode, FtsQueueClass, MaybeDynamicId, - ValueClass, + key::DeserializeBigEndian, now, BatchBuilder, Bincode, BlobOp, FtsQueueClass, + MaybeDynamicId, ValueClass, }, Deserialize, IterateParams, Serialize, ValueKey, U32_LEN, U64_LEN, }; use tokio::sync::Notify; -use trc::FtsIndexEvent; +use trc::{AddContext, FtsIndexEvent}; use utils::{BlobHash, BLOB_HASH_LEN}; use crate::{ @@ -240,6 +246,120 @@ impl JMAP { pub fn request_fts_index(&self) { self.inner.request_fts_index(); } + + pub async fn reindex( + &self, + account_id: Option, + tenant_id: Option, + ) -> trc::Result<()> { + let accounts = if let Some(account_id) = account_id { + RoaringBitmap::from_sorted_iter([account_id]).unwrap() + } else { + let mut accounts = RoaringBitmap::new(); + for principal in self + .core + .storage + .data + .list_principals( + None, + tenant_id, + &[Type::Individual, Type::Group], + &[PrincipalField::Name], + 0, + 0, + ) + .await + .caused_by(trc::location!())? + .items + { + accounts.insert(principal.id()); + } + accounts + }; + + // Validate linked blobs + let from_key = ValueKey { + account_id: 0, + collection: 0, + document_id: 0, + class: ValueClass::Blob(BlobOp::Link { + hash: BlobHash::default(), + }), + }; + let to_key = ValueKey { + account_id: u32::MAX, + collection: u8::MAX, + document_id: u32::MAX, + class: ValueClass::Blob(BlobOp::Link { + hash: BlobHash::new_max(), + }), + }; + let mut hashes: AHashMap> = AHashMap::new(); + self.core + .storage + .data + .iterate( + IterateParams::new(from_key, to_key).ascending().no_values(), + |key, _| { + let account_id = key.deserialize_be_u32(BLOB_HASH_LEN)?; + let collection = *key + .get(BLOB_HASH_LEN + U32_LEN) + .ok_or_else(|| trc::Error::corrupted_key(key, None, trc::location!()))?; + + if accounts.contains(account_id) && collection == Collection::Email as u8 { + let hash = + BlobHash::try_from_hash_slice(key.get(0..BLOB_HASH_LEN).ok_or_else( + || trc::Error::corrupted_key(key, None, trc::location!()), + )?) + .unwrap(); + let document_id = key.deserialize_be_u32(key.len() - U32_LEN)?; + + hashes + .entry(account_id) + .or_default() + .push((document_id, hash)); + } + + Ok(true) + }, + ) + .await + .caused_by(trc::location!())?; + + let mut seq = self.generate_snowflake_id().caused_by(trc::location!())?; + + for (account_id, hashes) in hashes { + let mut batch = BatchBuilder::new(); + batch + .with_account_id(account_id) + .with_collection(Collection::Email); + + for (document_id, hash) in hashes { + batch.update_document(document_id).set( + ValueClass::FtsQueue(FtsQueueClass { hash, seq }), + 0u64.serialize(), + ); + seq += 1; + + if batch.ops.len() >= 2000 { + self.core.storage.data.write(batch.build()).await?; + batch = BatchBuilder::new(); + batch + .with_account_id(account_id) + .with_collection(Collection::Email); + } + } + + if !batch.is_empty() { + self.core.storage.data.write(batch.build()).await?; + } + } + + // Request indexing + self.request_fts_index(); + + Ok(()) + } } impl Inner {