diff --git a/crates/jmap/src/email/copy.rs b/crates/jmap/src/email/copy.rs index 145eaefa..aea2686a 100644 --- a/crates/jmap/src/email/copy.rs +++ b/crates/jmap/src/email/copy.rs @@ -50,9 +50,10 @@ use mail_parser::{parsers::fields::thread::thread_name, HeaderName, HeaderValue} use store::{ write::{ log::{Changes, LogInsert}, - BatchBuilder, Bincode, MaybeDynamicId, TagValue, ValueClass, F_BITMAP, F_VALUE, + BatchBuilder, Bincode, IndexEmailClass, MaybeDynamicId, TagValue, ValueClass, F_BITMAP, + F_VALUE, }, - BlobClass, + BlobClass, Serialize, }; use utils::map::vec_map::VecMap; @@ -416,8 +417,11 @@ impl JMAP { .value(Property::Keywords, keywords, F_VALUE | F_BITMAP) .value(Property::Cid, change_id, F_VALUE) .set( - ValueClass::IndexEmail(self.generate_snowflake_id()?), - metadata.blob_hash.as_ref(), + ValueClass::IndexEmail(IndexEmailClass::Insert { + seq: self.generate_snowflake_id()?, + hash: metadata.blob_hash.clone(), + }), + 0u64.serialize(), ) .custom(EmailIndexBuilder::set(metadata)); diff --git a/crates/jmap/src/email/ingest.rs b/crates/jmap/src/email/ingest.rs index 71583eef..e9c353a6 100644 --- a/crates/jmap/src/email/ingest.rs +++ b/crates/jmap/src/email/ingest.rs @@ -40,8 +40,8 @@ use store::{ query::Filter, write::{ log::{ChangeLogBuilder, Changes, LogInsert}, - now, AssignedIds, BatchBuilder, BitmapClass, MaybeDynamicId, MaybeDynamicValue, - SerializeWithId, TagValue, ValueClass, F_BITMAP, F_CLEAR, F_VALUE, + now, AssignedIds, BatchBuilder, BitmapClass, IndexEmailClass, MaybeDynamicId, + MaybeDynamicValue, SerializeWithId, TagValue, ValueClass, F_BITMAP, F_CLEAR, F_VALUE, }, BitmapKey, BlobClass, Serialize, }; @@ -340,11 +340,13 @@ impl JMAP { .set(Property::ThreadId, maybe_thread_id) .tag(Property::ThreadId, TagValue::Id(maybe_thread_id), 0) .set( - ValueClass::IndexEmail( - self.generate_snowflake_id() + ValueClass::IndexEmail(IndexEmailClass::Insert { + seq: self + .generate_snowflake_id() .map_err(|_| IngestError::Temporary)?, - ), - blob_id.hash.as_ref(), + hash: blob_id.hash.clone(), + }), + 0u64.serialize(), ); // Insert and obtain ids diff --git a/crates/jmap/src/email/set.rs b/crates/jmap/src/email/set.rs index 20a793af..c4ce3aea 100644 --- a/crates/jmap/src/email/set.rs +++ b/crates/jmap/src/email/set.rs @@ -54,7 +54,7 @@ use store::{ ahash::AHashSet, write::{ assert::HashedValue, log::ChangeLogBuilder, BatchBuilder, Bincode, DeserializeFrom, - SerializeInto, ToBitmaps, ValueClass, F_BITMAP, F_CLEAR, F_VALUE, + IndexEmailClass, SerializeInto, ToBitmaps, ValueClass, F_BITMAP, F_CLEAR, F_VALUE, }, Serialize, }; @@ -1091,8 +1091,10 @@ impl JMAP { .with_collection(Collection::Email) .delete_document(document_id) .set( - ValueClass::IndexEmail(self.generate_snowflake_id()?), - vec![], + ValueClass::IndexEmail(IndexEmailClass::Delete { + seq: self.generate_snowflake_id()?, + }), + 0u64.serialize(), ); // Remove last changeId diff --git a/crates/jmap/src/services/index.rs b/crates/jmap/src/services/index.rs index eef9014c..83539807 100644 --- a/crates/jmap/src/services/index.rs +++ b/crates/jmap/src/services/index.rs @@ -23,10 +23,15 @@ use jmap_proto::types::{collection::Collection, property::Property}; use store::{ + ahash::AHashSet, fts::index::FtsDocument, - write::{key::DeserializeBigEndian, BatchBuilder, Bincode, ValueClass}, - Deserialize, IterateParams, ValueKey, U32_LEN, U64_LEN, + write::{ + key::DeserializeBigEndian, now, BatchBuilder, Bincode, IndexEmailClass, MaybeDynamicId, + ValueClass, + }, + Deserialize, IterateParams, Serialize, ValueKey, U32_LEN, U64_LEN, }; +use utils::{BlobHash, BLOB_HASH_LEN}; use crate::{ email::{index::IndexMessageText, metadata::MessageMetadata}, @@ -40,26 +45,31 @@ struct IndexEmail { account_id: u32, document_id: u32, seq: u64, + lock_expiry: u64, + insert_hash: Option, } +const INDEX_LOCK_EXPIRY: u64 = 60 * 5; + impl JMAP { pub async fn fts_index_queued(&self) { let from_key = ValueKey::> { account_id: 0, collection: 0, document_id: 0, - class: ValueClass::IndexEmail(0), + class: ValueClass::IndexEmail(IndexEmailClass::Delete { seq: 0 }), }; let to_key = ValueKey::> { account_id: u32::MAX, collection: u8::MAX, document_id: u32::MAX, - class: ValueClass::IndexEmail(u64::MAX), + class: ValueClass::IndexEmail(IndexEmailClass::Delete { seq: u64::MAX }), }; // Retrieve entries pending to be indexed - // TODO: Support indexing from multiple nodes let mut entries = Vec::new(); + let mut skipped_documents = AHashSet::new(); + let now = now(); let _ = self .core .storage @@ -67,7 +77,32 @@ impl JMAP { .iterate( IterateParams::new(from_key, to_key).ascending(), |key, value| { - entries.push((IndexEmail::deserialize(key)?, value.to_vec())); + let event = IndexEmail::deserialize(key, value)?; + + if event.lock_expiry < now { + if !skipped_documents.contains(&(event.account_id, event.document_id)) { + entries.push(event); + } else { + tracing::trace!( + context = "queue", + event = "skipped", + account_id = event.account_id, + document_id = event.document_id, + "DocumentId already locked by another process." + ); + } + } else { + skipped_documents.insert((event.account_id, event.document_id)); + tracing::trace!( + context = "queue", + event = "locked", + account_id = event.account_id, + document_id = event.document_id, + expiry = event.lock_expiry - now, + "Index event locked by another process." + ); + } + Ok(true) }, ) @@ -82,19 +117,24 @@ impl JMAP { }); // Index entries - for (key, blob_hash) in entries { - if !blob_hash.is_empty() { + for event in entries { + // Lock index + if !self.try_lock_index(&event).await { + continue; + } + + if let Some(hash) = &event.insert_hash { match self .get_property::>( - key.account_id, + event.account_id, Collection::Email, - key.document_id, + event.document_id, Property::BodyStructure, ) .await { Ok(Some(metadata)) - if metadata.inner.blob_hash.as_slice() == blob_hash.as_slice() => + if metadata.inner.blob_hash.as_slice() == hash.as_slice() => { // Obtain raw message let raw_message = if let Ok(Some(raw_message)) = self @@ -106,8 +146,8 @@ impl JMAP { tracing::warn!( context = "fts_index_queued", event = "error", - account_id = key.account_id, - document_id = key.document_id, + account_id = event.account_id, + document_id = event.document_id, blob_hash = ?metadata.inner.blob_hash, "Message blob not found" ); @@ -118,16 +158,16 @@ impl JMAP { // Index message let document = FtsDocument::with_default_language(self.core.jmap.default_language) - .with_account_id(key.account_id) + .with_account_id(event.account_id) .with_collection(Collection::Email) - .with_document_id(key.document_id) + .with_document_id(event.document_id) .index_message(&message); if let Err(err) = self.core.storage.fts.index(document).await { tracing::error!( context = "fts_index_queued", event = "error", - account_id = key.account_id, - document_id = key.document_id, + account_id = event.account_id, + document_id = event.document_id, reason = ?err, "Failed to index email in FTS index" ); @@ -137,8 +177,8 @@ impl JMAP { tracing::debug!( context = "fts_index_queued", event = "index", - account_id = key.account_id, - document_id = key.document_id, + account_id = event.account_id, + document_id = event.document_id, "Indexed document in FTS index" ); } @@ -147,8 +187,8 @@ impl JMAP { tracing::error!( context = "fts_index_queued", event = "error", - account_id = key.account_id, - document_id = key.document_id, + account_id = event.account_id, + document_id = event.document_id, reason = ?err, "Failed to retrieve email metadata" ); @@ -159,8 +199,8 @@ impl JMAP { tracing::debug!( context = "fts_index_queued", event = "error", - account_id = key.account_id, - document_id = key.document_id, + account_id = event.account_id, + document_id = event.document_id, "Email metadata not found" ); } @@ -170,14 +210,18 @@ impl JMAP { .core .storage .fts - .remove(key.account_id, Collection::Email.into(), key.document_id) + .remove( + event.account_id, + Collection::Email.into(), + event.document_id, + ) .await { tracing::error!( context = "fts_index_queued", event = "error", - account_id = key.account_id, - document_id = key.document_id, + account_id = event.account_id, + document_id = event.document_id, reason = ?err, "Failed to remove document from FTS index" ); @@ -187,8 +231,8 @@ impl JMAP { tracing::debug!( context = "fts_index_queued", event = "delete", - account_id = key.account_id, - document_id = key.document_id, + account_id = event.account_id, + document_id = event.document_id, "Deleted document from FTS index" ); } @@ -200,10 +244,10 @@ impl JMAP { .data .write( BatchBuilder::new() - .with_account_id(key.account_id) + .with_account_id(event.account_id) .with_collection(Collection::Email) - .update_document(key.document_id) - .clear(ValueClass::IndexEmail(key.seq)) + .update_document(event.document_id) + .clear(event.value_class()) .build_batch(), ) .await @@ -222,15 +266,59 @@ impl JMAP { tracing::warn!("Failed to send index done event to housekeeper: {}", err); } } + + async fn try_lock_index(&self, event: &IndexEmail) -> bool { + let mut batch = BatchBuilder::new(); + batch + .with_account_id(event.account_id) + .update_document(event.document_id) + .assert_value(event.value_class(), event.lock_expiry) + .set(event.value_class(), (now() + INDEX_LOCK_EXPIRY).serialize()); + match self.core.storage.data.write(batch.build()).await { + Ok(_) => true, + Err(store::Error::AssertValueFailed) => { + tracing::trace!( + context = "queue", + event = "locked", + account_id = event.account_id, + document_id = event.document_id, + "Failed to lock index: Index already locked." + ); + false + } + Err(err) => { + tracing::error!( + context = "queue", + event = "error", + "Failed to lock index: {}", + err + ); + false + } + } + } } -impl Deserialize for IndexEmail { - fn deserialize(bytes: &[u8]) -> store::Result { - let len = bytes.len(); +impl IndexEmail { + fn value_class(&self) -> ValueClass { + match &self.insert_hash { + Some(hash) => ValueClass::IndexEmail(IndexEmailClass::Insert { + hash: hash.clone(), + seq: self.seq, + }), + None => ValueClass::IndexEmail(IndexEmailClass::Delete { seq: self.seq }), + } + } + + fn deserialize(key: &[u8], value: &[u8]) -> store::Result { Ok(IndexEmail { - seq: bytes.deserialize_be_u64(len - U64_LEN - (U32_LEN * 2))?, - account_id: bytes.deserialize_be_u32(len - U32_LEN * 2)?, - document_id: bytes.deserialize_be_u32(len - U32_LEN)?, + seq: key.deserialize_be_u64(0)?, + account_id: key.deserialize_be_u32(U64_LEN)?, + document_id: key.deserialize_be_u32(U64_LEN + U32_LEN)?, + lock_expiry: u64::deserialize(value)?, + insert_hash: key + .get(U64_LEN + U32_LEN + U32_LEN..U64_LEN + U32_LEN + U32_LEN + BLOB_HASH_LEN) + .and_then(|bytes| BlobHash::try_from_hash_slice(bytes).ok()), }) } } diff --git a/crates/smtp/src/queue/spool.rs b/crates/smtp/src/queue/spool.rs index 07449d93..70efbf2b 100644 --- a/crates/smtp/src/queue/spool.rs +++ b/crates/smtp/src/queue/spool.rs @@ -101,7 +101,7 @@ impl SMTP { if event.lock_expiry < now { events.push(event); } else { - tracing::debug!( + tracing::trace!( context = "queue", event = "locked", id = event.queue_id, diff --git a/crates/store/src/write/key.rs b/crates/store/src/write/key.rs index 98bad51d..26b74d44 100644 --- a/crates/store/src/write/key.rs +++ b/crates/store/src/write/key.rs @@ -34,8 +34,8 @@ use crate::{ }; use super::{ - AnyKey, AssignedIds, BitmapClass, BlobOp, DirectoryClass, LookupClass, QueueClass, ReportClass, - ReportEvent, ResolveId, TagValue, ValueClass, + AnyKey, AssignedIds, BitmapClass, BlobOp, DirectoryClass, IndexEmailClass, LookupClass, + QueueClass, ReportClass, ReportEvent, ResolveId, TagValue, ValueClass, }; pub struct KeySerializer { @@ -271,9 +271,16 @@ impl ValueClass { .write(account_id) .write(collection) .write(document_id), - ValueClass::IndexEmail(seq) => { - serializer.write(*seq).write(account_id).write(document_id) - } + ValueClass::IndexEmail(index) => match index { + IndexEmailClass::Insert { seq, hash } => serializer + .write(*seq) + .write(account_id) + .write(document_id) + .write::<&[u8]>(hash.as_ref()), + IndexEmailClass::Delete { seq } => { + serializer.write(*seq).write(account_id).write(document_id) + } + }, ValueClass::Blob(op) => match op { BlobOp::Reserve { hash, until } => serializer .write(account_id) @@ -527,7 +534,7 @@ impl ValueClass { BlobOp::Reserve { .. } => BLOB_HASH_LEN + U64_LEN + U32_LEN + 1, BlobOp::Commit { .. } | BlobOp::Link { .. } => BLOB_HASH_LEN + U32_LEN * 2 + 2, }, - ValueClass::IndexEmail { .. } => U64_LEN * 2, + ValueClass::IndexEmail { .. } => BLOB_HASH_LEN + U64_LEN * 2, ValueClass::Queue(q) => match q { QueueClass::Message(_) => U64_LEN, QueueClass::MessageEvent(_) => U64_LEN * 2, @@ -554,7 +561,7 @@ impl ValueClass { } ValueClass::TermIndex => SUBSPACE_TERM_INDEX, ValueClass::Acl(_) => SUBSPACE_ACL, - ValueClass::IndexEmail(_) => SUBSPACE_FTS_INDEX, + ValueClass::IndexEmail { .. } => SUBSPACE_FTS_INDEX, ValueClass::Blob(op) => match op { BlobOp::Reserve { .. } => SUBSPACE_BLOB_RESERVE, BlobOp::Commit { .. } | BlobOp::Link { .. } => SUBSPACE_BLOB_LINK, diff --git a/crates/store/src/write/mod.rs b/crates/store/src/write/mod.rs index 7990c9fe..f62db0c5 100644 --- a/crates/store/src/write/mod.rs +++ b/crates/store/src/write/mod.rs @@ -167,12 +167,18 @@ pub enum ValueClass { TermIndex, Directory(DirectoryClass), Blob(BlobOp), - IndexEmail(u64), + IndexEmail(IndexEmailClass), Config(Vec), Queue(QueueClass), Report(ReportClass), } +#[derive(Debug, PartialEq, Clone, Eq, Hash)] +pub enum IndexEmailClass { + Insert { seq: u64, hash: BlobHash }, + Delete { seq: u64 }, +} + #[derive(Debug, PartialEq, Clone, Eq, Hash)] pub enum LookupClass { Key(Vec),