From b75e3a80125844defc477357731ea60d8bb4b33e Mon Sep 17 00:00:00 2001 From: mdecimus Date: Wed, 12 Mar 2025 16:47:42 +0100 Subject: [PATCH] Improved threadId management --- crates/email/src/mailbox/destroy.rs | 76 +++---- crates/email/src/mailbox/manage.rs | 18 +- crates/email/src/message/copy.rs | 40 +--- crates/email/src/message/delete.rs | 245 ++++++++++------------ crates/email/src/message/index.rs | 10 + crates/email/src/message/ingest.rs | 285 +++++++++++++++++--------- crates/email/src/message/metadata.rs | 1 + crates/email/src/thread/cache.rs | 100 ++++++--- crates/imap/src/op/copy_move.rs | 35 ++-- crates/imap/src/op/expunge.rs | 13 +- crates/imap/src/op/fetch.rs | 23 +-- crates/imap/src/op/store.rs | 31 ++- crates/imap/src/op/thread.rs | 12 +- crates/jmap/src/blob/get.rs | 65 +++--- crates/jmap/src/email/get.rs | 14 +- crates/jmap/src/email/query.rs | 23 +-- crates/jmap/src/lib.rs | 2 +- crates/store/src/query/sort.rs | 55 ++--- crates/store/src/write/key.rs | 8 +- tests/src/jmap/email_query_changes.rs | 30 ++- tests/src/jmap/mod.rs | 8 +- 21 files changed, 559 insertions(+), 535 deletions(-) diff --git a/crates/email/src/mailbox/destroy.rs b/crates/email/src/mailbox/destroy.rs index bdfe8265..06211971 100644 --- a/crates/email/src/mailbox/destroy.rs +++ b/crates/email/src/mailbox/destroy.rs @@ -16,9 +16,7 @@ use store::{ SerializeInfallible, query::Filter, roaring::RoaringBitmap, - write::{ - AlignedBytes, Archive, BatchBuilder, log::ChangeLogBuilder, serialize::rkyv_deserialize, - }, + write::{AlignedBytes, Archive, BatchBuilder, log::ChangeLogBuilder}, }; use trc::AddContext; @@ -131,59 +129,39 @@ impl MailboxDestroy for Server { continue; } - let mut new_message_data = - rkyv_deserialize(prev_message_data.inner).caused_by(trc::location!())?; + let mut new_message_data = prev_message_data + .deserialize() + .caused_by(trc::location!())?; + let thread_id = new_message_data.thread_id; new_message_data .mailboxes .retain(|id| id.mailbox_id != document_id); - // Obtain threadId - if let Some(thread_id) = self - .get_property::( - account_id, - Collection::Email, - message_id, - Property::ThreadId, + // Untag message from mailbox + let mut batch = BatchBuilder::new(); + batch + .with_account_id(account_id) + .with_collection(Collection::Email) + .update_document(message_id) + .custom( + ObjectIndexBuilder::new() + .with_changes(new_message_data) + .with_current(prev_message_data), ) - .await? - { - // Untag message from mailbox - let mut batch = BatchBuilder::new(); - batch - .with_account_id(account_id) - .with_collection(Collection::Email) - .update_document(message_id) - .custom( - ObjectIndexBuilder::new() - .with_changes(new_message_data) - .with_current(prev_message_data), - ) - .caused_by(trc::location!())?; - match self.core.storage.data.write(batch.build()).await { - Ok(_) => changes.log_update( - Collection::Email, - Id::from_parts(thread_id, message_id), - ), - Err(err) if err.is_assertion_failure() => { - return Ok(Err(SetError::forbidden().with_description(concat!( - "Another process modified a message in this mailbox ", - "while deleting it, please try again." - )))); - } - Err(err) => { - return Err(err.caused_by(trc::location!())); - } + .caused_by(trc::location!())?; + match self.core.storage.data.write(batch.build()).await { + Ok(_) => changes + .log_update(Collection::Email, Id::from_parts(thread_id, message_id)), + Err(err) if err.is_assertion_failure() => { + return Ok(Err(SetError::forbidden().with_description(concat!( + "Another process modified a message in this mailbox ", + "while deleting it, please try again." + )))); + } + Err(err) => { + return Err(err.caused_by(trc::location!())); } - } else { - trc::event!( - Store(trc::StoreEvent::NotFound), - AccountId = account_id, - MessageId = message_id, - MailboxId = document_id, - Details = "Message does not have a threadId.", - CausedBy = trc::location!(), - ); } } diff --git a/crates/email/src/mailbox/manage.rs b/crates/email/src/mailbox/manage.rs index 655e4b41..ea354bb8 100644 --- a/crates/email/src/mailbox/manage.rs +++ b/crates/email/src/mailbox/manage.rs @@ -214,14 +214,20 @@ impl MailboxFnc for Server { document_ids: Option, ) -> trc::Result { if let Some(document_ids) = document_ids { - let mut thread_ids = AHashSet::default(); - self.get_cached_thread_ids(account_id, document_ids.into_iter()) + let thread_ids = self + .get_cached_thread_ids(account_id) .await .caused_by(trc::location!())? - .into_iter() - .for_each(|(_, thread_id)| { - thread_ids.insert(thread_id); - }); + .threads + .iter() + .filter_map(|(document_id, thread_id)| { + if document_ids.contains(*document_id) { + Some(*thread_id) + } else { + None + } + }) + .collect::>(); Ok(thread_ids.len()) } else { Ok(0) diff --git a/crates/email/src/message/copy.rs b/crates/email/src/message/copy.rs index a80f7db6..b32dcc8e 100644 --- a/crates/email/src/message/copy.rs +++ b/crates/email/src/message/copy.rs @@ -15,10 +15,7 @@ use jmap_proto::{ use mail_parser::parsers::fields::thread::thread_name; use store::{ BlobClass, - write::{ - AlignedBytes, Archive, BatchBuilder, MaybeDynamicId, TagValue, TaskQueueClass, ValueClass, - log::{Changes, LogInsert}, - }, + write::{AlignedBytes, Archive, BatchBuilder, TaskQueueClass, ValueClass, log::Changes}, }; use trc::AddContext; @@ -129,13 +126,11 @@ impl EmailCopy for Server { } } - let thread_id = if !references.is_empty() { - self.find_or_merge_thread(account_id, subject, &references) - .await - .caused_by(trc::location!())? - } else { - None - }; + // Obtain threadId + let thread_id = self + .find_or_merge_thread(account_id, subject, &references) + .await + .caused_by(trc::location!())?; // Assign id let mut email = IngestedEmail { @@ -162,30 +157,19 @@ impl EmailCopy for Server { batch .with_account_id(account_id) .with_change_id(change_id) - .with_collection(Collection::Thread); - if let Some(thread_id) = thread_id { - batch.log(Changes::update([thread_id])); - } else { - batch.create_document().log(LogInsert()); - }; - - // Build batch - let maybe_thread_id = thread_id - .map(MaybeDynamicId::Static) - .unwrap_or(MaybeDynamicId::Dynamic(0)); - batch + .with_collection(Collection::Thread) + .log(Changes::update([thread_id])) .with_collection(Collection::Mailbox) .log(Changes::child_update(mailboxes.iter().copied())) .with_collection(Collection::Email) .create_document() - .log(LogEmailInsert::new(thread_id)) - .set(Property::ThreadId, maybe_thread_id) - .tag(Property::ThreadId, TagValue::Id(maybe_thread_id)) + .log(LogEmailInsert::new(thread_id.into())) .custom( ObjectIndexBuilder::<(), _>::new().with_changes(MessageData { mailboxes: mailbox_ids, keywords, change_id, + thread_id, }), ) .caused_by(trc::location!())? @@ -213,10 +197,6 @@ impl EmailCopy for Server { .write(batch.build()) .await .caused_by(trc::location!())?; - let thread_id = match thread_id { - Some(thread_id) => thread_id, - None => ids.first_document_id().caused_by(trc::location!())?, - }; let document_id = ids.last_document_id().caused_by(trc::location!())?; // Request FTS index diff --git a/crates/email/src/message/delete.rs b/crates/email/src/message/delete.rs index ea28a1ba..6ec30e44 100644 --- a/crates/email/src/message/delete.rs +++ b/crates/email/src/message/delete.rs @@ -12,14 +12,13 @@ use jmap_proto::types::{ }; use store::{ BitmapKey, IterateParams, U32_LEN, ValueKey, - ahash::AHashMap, roaring::RoaringBitmap, write::{ AlignedBytes, Archive, BatchBuilder, BitmapClass, MaybeDynamicId, TagValue, ValueClass, - log::ChangeLogBuilder, + log::{ChangeLogBuilder, Changes}, }, }; -use trc::{AddContext, StoreEvent}; +use trc::AddContext; use utils::{BlobHash, codec::leb128::Leb128Reader}; use std::future::Future; @@ -50,6 +49,9 @@ pub trait EmailDeletion: Sync + Send { &self, account_id: u32, ) -> impl Future> + Send; + + fn emails_purge_threads(&self, account_id: u32) + -> impl Future> + Send; } impl EmailDeletion for Server { @@ -60,11 +62,14 @@ impl EmailDeletion for Server { ) -> trc::Result<(ChangeLogBuilder, RoaringBitmap)> { // Create batch let mut changes = ChangeLogBuilder::with_change_id(0); - let mut delete_properties = AHashMap::new(); - // Fetch mailboxes and threadIds - let mut thread_ids: AHashMap = AHashMap::new(); - for (document_id, data) in self + // Tombstone message and untag it from the mailboxes + let mut batch = BatchBuilder::new(); + batch + .with_account_id(account_id) + .with_collection(Collection::Email); + + for (document_id, data_) in self .get_properties::, _>( account_id, Collection::Email, @@ -73,126 +78,31 @@ impl EmailDeletion for Server { ) .await? { - delete_properties.insert( - document_id, - DeleteProperties { - archive: Some(data), - thread_id: None, - }, - ); - } - for (document_id, thread_id) in self - .get_properties::( - account_id, - Collection::Email, - &document_ids, - Property::ThreadId, - ) - .await? - { - *thread_ids.entry(thread_id).or_default() += 1; - delete_properties - .entry(document_id) - .or_insert_with(DeleteProperties::default) - .thread_id = Some(thread_id); - } + let data = data_ + .to_unarchived::() + .caused_by(trc::location!())?; + let thread_id = u32::from(data.inner.thread_id); - // Obtain all threadIds - self.core - .storage - .data - .iterate( - IterateParams::new( - BitmapKey { - account_id, - collection: Collection::Email.into(), - class: BitmapClass::Tag { - field: Property::ThreadId.into(), - value: TagValue::Id(0), - }, - document_id: 0, - }, - BitmapKey { - account_id, - collection: Collection::Email.into(), - class: BitmapClass::Tag { - field: Property::ThreadId.into(), - value: TagValue::Id(u32::MAX), - }, - document_id: u32::MAX, - }, - ) - .no_values(), - |key, _| { - let (thread_id, _) = key - .get(U32_LEN + 2..) - .and_then(|bytes| bytes.read_leb128::()) - .ok_or_else(|| trc::Error::corrupted_key(key, None, trc::location!()))?; - if let Some(thread_count) = thread_ids.get_mut(&thread_id) { - *thread_count -= 1; - } - - Ok(true) - }, - ) - .await - .caused_by(trc::location!())?; - - // Tombstone message and untag it from the mailboxes - let mut batch = BatchBuilder::new(); - batch - .with_account_id(account_id) - .with_collection(Collection::Email); - - for (document_id, delete_properties) in delete_properties { - batch.update_document(document_id); - - if let Some(data_) = delete_properties.archive { - let data = data_ - .to_unarchived::() - .caused_by(trc::location!())?; - - for mailbox in data.inner.mailboxes.iter() { - changes.log_child_update(Collection::Mailbox, u32::from(mailbox.mailbox_id)); - } - - batch - .custom(ObjectIndexBuilder::<_, ()>::new().with_current(data)) - .caused_by(trc::location!())?; - } else { - trc::event!( - Store(StoreEvent::NotFound), - AccountId = account_id, - DocumentId = document_id, - Details = "Failed to fetch mailboxIds.", - CausedBy = trc::location!(), - ); + for mailbox in data.inner.mailboxes.iter() { + changes.log_child_update(Collection::Mailbox, u32::from(mailbox.mailbox_id)); } - if let Some(thread_id) = delete_properties.thread_id { - batch - .untag(Property::ThreadId, thread_id) - .clear(Property::ThreadId); - // Log message deletion - changes.log_delete(Collection::Email, Id::from_parts(thread_id, document_id)); + // Log message deletion + changes.log_delete(Collection::Email, Id::from_parts(thread_id, document_id)); - // Log thread changes - if thread_ids[&thread_id] < 0 { - changes.log_child_update(Collection::Thread, thread_id); - } - } else { - trc::event!( - Store(StoreEvent::NotFound), - AccountId = account_id, - DocumentId = document_id, - Details = "Failed to fetch threadId.", - CausedBy = trc::location!(), + // Log thread changes + changes.log_child_update(Collection::Thread, thread_id); + + // Add changes to batch + batch + .update_document(document_id) + .custom(ObjectIndexBuilder::<_, ()>::new().with_current(data)) + .caused_by(trc::location!())? + .tag( + Property::MailboxIds, + TagValue::Id(MaybeDynamicId::Static(TOMBSTONE_ID)), ); - } - batch.tag( - Property::MailboxIds, - TagValue::Id(MaybeDynamicId::Static(TOMBSTONE_ID)), - ); + document_ids.remove(document_id); if batch.ops.len() >= 1000 { @@ -210,16 +120,6 @@ impl EmailDeletion for Server { } } - // Delete threadIds - for (thread_id, thread_count) in thread_ids { - if thread_count == 0 { - batch - .with_collection(Collection::Thread) - .delete_document(thread_id); - changes.log_delete(Collection::Thread, thread_id); - } - } - if !batch.ops.is_empty() { self.core .storage @@ -409,6 +309,9 @@ impl EmailDeletion for Server { Total = tombstoned_ids.len(), ); + // Delete threadIds + self.emails_purge_threads(account_id).await?; + // Delete full-text index self.core .storage @@ -489,10 +392,78 @@ impl EmailDeletion for Server { Ok(()) } -} -#[derive(Default, Debug)] -struct DeleteProperties { - archive: Option>, - thread_id: Option, + async fn emails_purge_threads(&self, account_id: u32) -> trc::Result<()> { + // Delete threadIs without documents + let mut thread_ids = self + .get_document_ids(account_id, Collection::Thread) + .await + .caused_by(trc::location!())? + .unwrap_or_default(); + + if thread_ids.is_empty() { + return Ok(()); + } + + self.core + .storage + .data + .iterate( + IterateParams::new( + BitmapKey { + account_id, + collection: Collection::Email.into(), + class: BitmapClass::Tag { + field: Property::ThreadId.into(), + value: TagValue::Id(0), + }, + document_id: 0, + }, + BitmapKey { + account_id, + collection: Collection::Email.into(), + class: BitmapClass::Tag { + field: Property::ThreadId.into(), + value: TagValue::Id(u32::MAX), + }, + document_id: u32::MAX, + }, + ) + .no_values(), + |key, _| { + let (thread_id, _) = key + .get(U32_LEN + 2..) + .and_then(|bytes| bytes.read_leb128::()) + .ok_or_else(|| trc::Error::corrupted_key(key, None, trc::location!()))?; + thread_ids.remove(thread_id); + + Ok(!thread_ids.is_empty()) + }, + ) + .await + .caused_by(trc::location!())?; + + if thread_ids.is_empty() { + return Ok(()); + } + + // Create batch + let mut batch = BatchBuilder::new(); + batch + .with_account_id(account_id) + .with_collection(Collection::Thread) + .with_change_id(self.generate_snowflake_id().caused_by(trc::location!())?) + .log(Changes::delete(thread_ids.iter().map(|id| id as u64))); + for thread_id in thread_ids { + batch.delete_document(thread_id); + } + self.core + .storage + .data + .write(batch.build()) + .await + .caused_by(trc::location!())?; + + Ok(()) + } } diff --git a/crates/email/src/message/index.rs b/crates/email/src/message/index.rs index 87bb45ee..1ceccb00 100644 --- a/crates/email/src/message/index.rs +++ b/crates/email/src/message/index.rs @@ -597,6 +597,10 @@ impl IndexableObject for MessageData { }) .collect(), }, + IndexValue::Tag { + field: Property::ThreadId.into(), + value: vec![TagValue::Id(MaybeDynamicId::Static(self.thread_id))], + }, ] .into_iter() } @@ -624,6 +628,12 @@ impl IndexableObject for &ArchivedMessageData { }) .collect(), }, + IndexValue::Tag { + field: Property::ThreadId.into(), + value: vec![TagValue::Id(MaybeDynamicId::Static(u32::from( + self.thread_id, + )))], + }, ] .into_iter() } diff --git a/crates/email/src/message/ingest.rs b/crates/email/src/message/ingest.rs index 894830f7..86970ab7 100644 --- a/crates/email/src/message/ingest.rs +++ b/crates/email/src/message/ingest.rs @@ -6,6 +6,7 @@ use std::{ borrow::Cow, + collections::BTreeSet, fmt::Write, time::{Duration, Instant}, }; @@ -13,6 +14,7 @@ use std::{ use common::{ Server, auth::{AccessToken, ResourceToken}, + storage::index::ObjectIndexBuilder, }; use directory::Permission; use jmap_proto::types::{ @@ -33,19 +35,20 @@ use spam_filter::{ }; use std::future::Future; use store::{ - BitmapKey, BlobClass, - ahash::AHashSet, + BlobClass, IndexKey, IndexKeyPrefix, IterateParams, U32_LEN, + ahash::AHashMap, query::Filter, + roaring::RoaringBitmap, write::{ - AlignedBytes, Archive, AssignedIds, BatchBuilder, BitmapClass, MaybeDynamicId, - MaybeDynamicValue, SerializeWithId, TagValue, TaskQueueClass, ValueClass, + AlignedBytes, Archive, AssignedIds, BatchBuilder, MaybeDynamicValue, SerializeWithId, + TaskQueueClass, ValueClass, + key::DeserializeBigEndian, log::{ChangeLogBuilder, Changes, LogInsert}, now, }, }; use store::{SerializeInfallible, rand::Rng}; use trc::{AddContext, MessageIngestEvent}; -use utils::map::vec_map::VecMap; use crate::{ mailbox::{INBOX_ID, JUNK_ID, UidMailbox}, @@ -104,13 +107,14 @@ pub trait EmailIngest: Sync + Send { account_id: u32, thread_name: &str, references: &[&str], - ) -> impl Future>> + Send; + ) -> impl Future> + Send; fn assign_imap_uid( &self, account_id: u32, mailbox_id: u32, ) -> impl Future> + Send; fn email_bayes_can_train(&self, access_token: &AccessToken) -> bool; + fn create_thread_id(&self, account_id: u32) -> impl Future> + Send; } impl EmailIngest for Server { @@ -323,12 +327,8 @@ impl EmailIngest for Server { }); } - if !references.is_empty() { - self.find_or_merge_thread(account_id, subject, &references) - .await? - } else { - None - } + self.find_or_merge_thread(account_id, subject, &references) + .await? }; // Add additional headers to message @@ -474,27 +474,19 @@ impl EmailIngest for Server { batch .with_change_id(change_id) .with_account_id(account_id) - .with_collection(Collection::Thread); - if let Some(thread_id) = thread_id { - batch.log(Changes::update([thread_id])); - } else { - batch.create_document().log(LogInsert()); - } - + .with_collection(Collection::Thread) + .log(Changes::update([thread_id])); // Build write batch let mailbox_ids_event = mailbox_ids .iter() .map(|m| trc::Value::from(m.mailbox_id)) .collect::>(); - let maybe_thread_id = thread_id - .map(MaybeDynamicId::Static) - .unwrap_or(MaybeDynamicId::Dynamic(0)); batch .with_collection(Collection::Mailbox) .log(Changes::child_update(params.mailbox_ids.iter().copied())) .with_collection(Collection::Email) .create_document() - .log(LogEmailInsert(thread_id)) + .log(LogEmailInsert(thread_id.into())) .index_message( account_id, tenant_id, @@ -504,12 +496,11 @@ impl EmailIngest for Server { mailboxes: mailbox_ids, keywords: params.keywords, change_id, + thread_id, }, params.received_at.unwrap_or_else(now), ) .caused_by(trc::location!())? - .set(Property::ThreadId, maybe_thread_id) - .tag(Property::ThreadId, TagValue::Id(maybe_thread_id)) .set( ValueClass::TaskQueue(TaskQueueClass::IndexEmail { seq: self.generate_snowflake_id().caused_by(trc::location!())?, @@ -538,10 +529,7 @@ impl EmailIngest for Server { .write(batch.build()) .await .caused_by(trc::location!())?; - let thread_id = match thread_id { - Some(thread_id) => thread_id, - None => ids.first_document_id().caused_by(trc::location!())?, - }; + let document_id = ids.last_document_id().caused_by(trc::location!())?; let id = Id::from_parts(thread_id, document_id); @@ -592,7 +580,11 @@ impl EmailIngest for Server { account_id: u32, thread_name: &str, references: &[&str], - ) -> trc::Result> { + ) -> trc::Result { + if references.is_empty() { + return self.create_thread_id(account_id).await; + } + let mut try_count = 0; let thread_name = if !thread_name.is_empty() { thread_name @@ -600,59 +592,118 @@ impl EmailIngest for Server { "!" } .serialize(); + let references = references + .iter() + .map(|r| r.as_bytes()) + .collect::>(); loop { - // Find messages with matching references - let mut filters = Vec::with_capacity(references.len() + 3); - filters.push(Filter::eq(Property::Subject, thread_name.clone())); - filters.push(Filter::Or); - for reference in references { - filters.push(Filter::eq(Property::References, reference.serialize())); - } - filters.push(Filter::End); - let results = self - .core - .storage - .data - .filter(account_id, Collection::Email, filters) - .await - .caused_by(trc::location!())? - .results; + // Find messages with a matching subject + let mut subj_results = RoaringBitmap::new(); + self.store() + .iterate( + IterateParams::new( + IndexKey { + account_id, + collection: Collection::Email.into(), + document_id: 0, + field: Property::Subject.into(), + key: thread_name.clone(), + }, + IndexKey { + account_id, + collection: Collection::Email.into(), + document_id: u32::MAX, + field: Property::Subject.into(), + key: thread_name.clone(), + }, + ) + .no_values() + .ascending(), + |key, _| { + let id_pos = key.len() - U32_LEN; + let value = key.get(IndexKeyPrefix::len()..id_pos).ok_or_else(|| { + trc::Error::corrupted_key(key, None, trc::location!()) + })?; - if results.is_empty() { - return Ok(None); - } + if value == thread_name { + subj_results.insert(key.deserialize_be_u32(id_pos)?); + } - // Obtain threadIds for matching messages - let thread_ids = self - .get_cached_thread_ids(account_id, results.iter()) + Ok(true) + }, + ) .await .caused_by(trc::location!())?; + if subj_results.is_empty() { + return self.create_thread_id(account_id).await; + } - if thread_ids.len() == 1 { - return Ok(thread_ids - .into_iter() - .next() - .map(|(_, thread_id)| thread_id)); + // Find messages with matching references + let mut results = RoaringBitmap::new(); + self.store() + .iterate( + IterateParams::new( + IndexKey { + account_id, + collection: Collection::Email.into(), + document_id: 0, + field: Property::References.into(), + key: references.first().unwrap().to_vec(), + }, + IndexKey { + account_id, + collection: Collection::Email.into(), + document_id: u32::MAX, + field: Property::References.into(), + key: references.last().unwrap().to_vec(), + }, + ) + .no_values() + .ascending(), + |key, _| { + let id_pos = key.len() - U32_LEN; + let value = key.get(IndexKeyPrefix::len()..id_pos).ok_or_else(|| { + trc::Error::corrupted_key(key, None, trc::location!()) + })?; + let document_id = key.deserialize_be_u32(id_pos)?; + + if subj_results.contains(document_id) && references.contains(value) { + results.insert(document_id); + } + + Ok(true) + }, + ) + .await + .caused_by(trc::location!())?; + if results.is_empty() { + return self.create_thread_id(account_id).await; } // Find the most common threadId - let mut thread_counts = VecMap::::with_capacity(thread_ids.len()); + let mut thread_counts = AHashMap::::with_capacity(16); let mut thread_id = u32::MAX; let mut thread_count = 0; - for (_, thread_id_) in thread_ids.iter() { - let tc = thread_counts.get_mut_or_insert(*thread_id_); - *tc += 1; - if *tc > thread_count { - thread_count = *tc; - thread_id = *thread_id_; + let thread_cache = self + .get_cached_thread_ids(account_id) + .await + .caused_by(trc::location!())?; + for (document_id, thread_id_) in thread_cache.threads.iter() { + if results.contains(*document_id) { + let tc = thread_counts.entry(*thread_id_).or_default(); + *tc += 1; + if *tc > thread_count { + thread_count = *tc; + thread_id = *thread_id_; + } } } if thread_id == u32::MAX { - return Ok(None); // This should never happen + return self.create_thread_id(account_id).await; } else if thread_counts.len() == 1 { - return Ok(Some(thread_id)); + return Ok(thread_id); } // Delete all but the most common threadId @@ -673,47 +724,49 @@ impl EmailIngest for Server { // Move messages to the new threadId batch.with_collection(Collection::Email); - for old_thread_id in thread_ids - .into_iter() - .map(|(_, thread_id)| thread_id) - .collect::>() - { - if thread_id != old_thread_id { - for document_id in self - .core - .storage - .data - .get_bitmap(BitmapKey { - account_id, - collection: Collection::Email.into(), - class: BitmapClass::Tag { - field: Property::ThreadId.into(), - value: TagValue::Id(old_thread_id), - }, - document_id: 0, - }) - .await - .caused_by(trc::location!())? - .unwrap_or_default() - { - batch - .update_document(document_id) - .assert_value(Property::ThreadId, old_thread_id) - .untag(Property::ThreadId, old_thread_id) - .tag(Property::ThreadId, thread_id) - .set(Property::ThreadId, thread_id.serialize()); - changes.log_move( - Collection::Email, - Id::from_parts(old_thread_id, document_id), - Id::from_parts(thread_id, document_id), - ); + + for (&document_id, &old_thread_id) in &thread_cache.threads { + if thread_id == old_thread_id || !thread_counts.contains_key(&old_thread_id) { + continue; + } + if let Some(data_) = self + .get_property::>( + account_id, + Collection::Email, + document_id, + Property::Value, + ) + .await + .caused_by(trc::location!())? + { + let data = data_ + .to_unarchived::() + .caused_by(trc::location!())?; + if data.inner.thread_id != old_thread_id { + continue; } + let mut new_data = data.deserialize().caused_by(trc::location!())?; + new_data.thread_id = thread_id; + batch + .update_document(document_id) + .custom( + ObjectIndexBuilder::new() + .with_current(data) + .with_changes(new_data), + ) + .caused_by(trc::location!())?; + changes.log_move( + Collection::Email, + Id::from_parts(old_thread_id, document_id), + Id::from_parts(thread_id, document_id), + ); } } + batch.custom(changes).caused_by(trc::location!())?; match self.core.storage.data.write(batch.build()).await { - Ok(_) => return Ok(Some(thread_id)), + Ok(_) => return Ok(thread_id), Err(err) if err.is_assertion_failure() && try_count < MAX_RETRIES => { let backoff = store::rand::rng().random_range(50..=300); tokio::time::sleep(Duration::from_millis(backoff)).await; @@ -747,6 +800,20 @@ impl EmailIngest for Server { bayes.account_classify && access_token.has_permission(Permission::SpamFilterTrain) }) } + + async fn create_thread_id(&self, account_id: u32) -> trc::Result { + let mut batch = BatchBuilder::new(); + batch + .with_change_id(self.generate_snowflake_id().caused_by(trc::location!())?) + .with_account_id(account_id) + .with_collection(Collection::Thread) + .create_document() + .log(LogInsert()); + self.store() + .write_expect_id(batch) + .await + .caused_by(trc::location!()) + } } pub struct LogEmailInsert(Option); @@ -790,3 +857,19 @@ impl From for Object { .with_property(Property::Size, email.size) } } + +/* + + let thread_id = match thread_id { + Some(thread_id) => thread_id, + None => ids.first_document_id().caused_by(trc::location!())?, + }; + + .with_collection(Collection::Thread); + if let Some(thread_id) = thread_id { + batch.log(Changes::update([thread_id])); + } else { + batch.create_document().log(LogInsert()); + + +*/ diff --git a/crates/email/src/message/metadata.rs b/crates/email/src/message/metadata.rs index 4d37a16f..18b317bc 100644 --- a/crates/email/src/message/metadata.rs +++ b/crates/email/src/message/metadata.rs @@ -30,6 +30,7 @@ pub struct MessageData { pub mailboxes: Vec, pub keywords: Vec, pub change_id: u64, + pub thread_id: u32, } #[derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive, Debug)] diff --git a/crates/email/src/thread/cache.rs b/crates/email/src/thread/cache.rs index 7d79a348..2baad407 100644 --- a/crates/email/src/thread/cache.rs +++ b/crates/email/src/thread/cache.rs @@ -9,22 +9,23 @@ use std::sync::Arc; use common::{Server, Threads}; use jmap_proto::types::{collection::Collection, property::Property}; use std::future::Future; +use store::{ + BitmapKey, IterateParams, U32_LEN, + ahash::AHashMap, + write::{BitmapClass, TagValue, key::DeserializeBigEndian}, +}; use trc::AddContext; +use utils::codec::leb128::Leb128Reader; pub trait ThreadCache: Sync + Send { fn get_cached_thread_ids( &self, account_id: u32, - message_ids: impl Iterator + Send, - ) -> impl Future>> + Send; + ) -> impl Future>> + Send; } impl ThreadCache for Server { - async fn get_cached_thread_ids( - &self, - account_id: u32, - message_ids: impl Iterator + Send, - ) -> trc::Result> { + async fn get_cached_thread_ids(&self, account_id: u32) -> trc::Result> { // Obtain current state let modseq = self .core @@ -35,35 +36,70 @@ impl ThreadCache for Server { .caused_by(trc::location!())?; // Lock the cache - let thread_cache = if let Some(thread_cache) = - self.inner.cache.threads.get(&account_id).and_then(|t| { - if t.modseq.unwrap_or(0) >= modseq.unwrap_or(0) { - Some(t) - } else { - None - } - }) { - thread_cache + if let Some(thread_cache) = self.inner.cache.threads.get(&account_id).and_then(|t| { + if t.modseq.unwrap_or(0) >= modseq.unwrap_or(0) { + Some(t) + } else { + None + } + }) { + Ok(thread_cache) } else { - let thread_cache = Arc::new(Threads { - threads: self - .get_properties::( - account_id, - Collection::Email, - &(), - Property::ThreadId, + let mut threads = AHashMap::new(); + self.core + .storage + .data + .iterate( + IterateParams::new( + BitmapKey { + account_id, + collection: Collection::Email.into(), + class: BitmapClass::Tag { + field: Property::ThreadId.into(), + value: TagValue::Id(0), + }, + document_id: 0, + }, + BitmapKey { + account_id, + collection: Collection::Email.into(), + class: BitmapClass::Tag { + field: Property::ThreadId.into(), + value: TagValue::Id(u32::MAX), + }, + document_id: u32::MAX, + }, ) - .await? - .into_iter() - .collect(), - modseq, - }); + .no_values(), + |key, _| { + let (thread_id, _) = key + .get(U32_LEN + 2..) + .and_then(|bytes| bytes.read_leb128::()) + .ok_or_else(|| { + trc::Error::corrupted_key(key, None, trc::location!()) + })?; + let document_id = key.deserialize_be_u32(key.len() - U32_LEN)?; + + threads.insert(document_id, thread_id); + + Ok(true) + }, + ) + .await + .caused_by(trc::location!())?; + + let thread_cache = Arc::new(Threads { threads, modseq }); self.inner .cache .threads .insert(account_id, thread_cache.clone()); - thread_cache - }; + Ok(thread_cache) + } + } +} + +/* + // Obtain threadIds for matching messages let mut thread_ids = Vec::with_capacity(message_ids.size_hint().0); @@ -74,5 +110,5 @@ impl ThreadCache for Server { } Ok(thread_ids) - } -} + +*/ diff --git a/crates/imap/src/op/copy_move.rs b/crates/imap/src/op/copy_move.rs index c2e1da99..1d19edae 100644 --- a/crates/imap/src/op/copy_move.rs +++ b/crates/imap/src/op/copy_move.rs @@ -189,8 +189,8 @@ impl SessionData { for (id, imap_id) in ids { // Obtain mailbox tags - let (data_, thread_id) = if let Some(result) = self - .get_mailbox_tags(account_id, id) + let data_ = if let Some(result) = self + .get_message_data(account_id, id) .await .imap_ctx(&arguments.tag, trc::location!())? { @@ -230,6 +230,7 @@ impl SessionData { .imap_ctx(&arguments.tag, trc::location!())?; } new_data.change_id = changelog.change_id; + let thread_id = new_data.thread_id; // Add destination folder new_data.add_mailbox(dest_mailbox_id); @@ -485,26 +486,22 @@ impl SessionData { self.write_bytes(response).await } - pub async fn get_mailbox_tags( + pub async fn get_message_data( &self, account_id: u32, id: u32, - ) -> trc::Result, u32)>> { - // Obtain mailbox tags - if let (Some(mailboxes), Some(thread_id)) = ( - self.server - .get_property::>( - account_id, - Collection::Email, - id, - Property::Value, - ) - .await?, - self.server - .get_property::(account_id, Collection::Email, id, Property::ThreadId) - .await?, - ) { - Ok(Some((mailboxes, thread_id))) + ) -> trc::Result>> { + if let Some(data) = self + .server + .get_property::>( + account_id, + Collection::Email, + id, + Property::Value, + ) + .await? + { + Ok(Some(data)) } else { trc::event!( Store(trc::StoreEvent::NotFound), diff --git a/crates/imap/src/op/expunge.rs b/crates/imap/src/op/expunge.rs index 5f9fe3c6..d8d7a60f 100644 --- a/crates/imap/src/op/expunge.rs +++ b/crates/imap/src/op/expunge.rs @@ -212,24 +212,13 @@ impl SessionData { continue; } - // Remove deleted flag - let thread_id = if let Some(thread_id) = self - .server - .get_property::(account_id, Collection::Email, id, Property::ThreadId) - .await - .caused_by(trc::location!())? - { - thread_id - } else { - continue; - }; - // Prepare changes let mut new_data = data.deserialize().caused_by(trc::location!())?; if changelog.change_id == u64::MAX { changelog.change_id = self.server.assign_change_id(account_id)? } new_data.change_id = changelog.change_id; + let thread_id = new_data.thread_id; // Untag message from this mailbox and remove Deleted flag new_data.remove_mailbox(mailbox_id); diff --git a/crates/imap/src/op/fetch.rs b/crates/imap/src/op/fetch.rs index 2e4c0037..c6860c7a 100644 --- a/crates/imap/src/op/fetch.rs +++ b/crates/imap/src/op/fetch.rs @@ -233,7 +233,6 @@ impl SessionData { // Build properties list let mut set_seen_flags = false; - let mut needs_thread_id = false; let mut needs_blobs = false; for attribute in &arguments.attributes { @@ -264,9 +263,6 @@ impl SessionData { } needs_blobs = true; } - Attribute::ThreadId => { - needs_thread_id = true; - } _ => (), } } @@ -389,20 +385,7 @@ impl SessionData { .keywords .iter() .any(|k| k == &ArchivedKeyword::Seen); - let thread_id = if needs_thread_id || set_seen_flag { - if let Some(thread_id) = self - .server - .get_property::(account_id, Collection::Email, id, Property::ThreadId) - .await - .imap_ctx(&arguments.tag, trc::location!())? - { - thread_id - } else { - continue; - } - } else { - 0 - }; + for attribute in &arguments.attributes { match attribute { Attribute::Envelope => { @@ -538,7 +521,8 @@ impl SessionData { } Attribute::ThreadId => { items.push(DataItem::ThreadId { - thread_id: Id::from_parts(account_id, thread_id).to_string(), + thread_id: Id::from_parts(account_id, u32::from(data.inner.thread_id)) + .to_string(), }); } } @@ -568,6 +552,7 @@ impl SessionData { .imap_ctx(&arguments.tag, trc::location!())?; new_data.keywords.push(Keyword::Seen); new_data.change_id = change_id; + let thread_id = new_data.thread_id; let mut batch = BatchBuilder::new(); batch diff --git a/crates/imap/src/op/store.rs b/crates/imap/src/op/store.rs index 41cdc13d..6dc15b67 100644 --- a/crates/imap/src/op/store.rs +++ b/crates/imap/src/op/store.rs @@ -200,23 +200,19 @@ impl SessionData { 'outer: for (id, imap_id) in &ids { let mut try_count = 0; loop { - // Obtain current keywords - let (data_, thread_id) = if let (Some(data), Some(thread_id)) = ( - self.server - .get_property::>( - account_id, - Collection::Email, - *id, - Property::Value, - ) - .await - .imap_ctx(response.tag.as_ref().unwrap(), trc::location!())?, - self.server - .get_property::(account_id, Collection::Email, *id, Property::ThreadId) - .await - .imap_ctx(response.tag.as_ref().unwrap(), trc::location!())?, - ) { - (data, thread_id) + // Obtain message data + let data_ = if let Some(data) = self + .server + .get_property::>( + account_id, + Collection::Email, + *id, + Property::Value, + ) + .await + .imap_ctx(response.tag.as_ref().unwrap(), trc::location!())? + { + data } else { continue 'outer; }; @@ -228,6 +224,7 @@ impl SessionData { let mut new_data = data .deserialize() .imap_ctx(response.tag.as_ref().unwrap(), trc::location!())?; + let thread_id = new_data.thread_id; // Apply changes let mut seen_changed = false; diff --git a/crates/imap/src/op/thread.rs b/crates/imap/src/op/thread.rs index 3dabbe7b..52d94da7 100644 --- a/crates/imap/src/op/thread.rs +++ b/crates/imap/src/op/thread.rs @@ -80,18 +80,20 @@ impl SessionData { } // Lock the cache - let thread_ids = self + let thread_cache = self .server - .get_cached_thread_ids(mailbox.id.account_id, result_set.results.iter()) + .get_cached_thread_ids(mailbox.id.account_id) .await .caused_by(trc::location!())?; // Group messages by thread let mut threads: AHashMap> = AHashMap::new(); let state = mailbox.state.lock(); - for (document_id, thread_id) in thread_ids { - if let Some((imap_id, _)) = state.map_result_id(document_id, is_uid) { - threads.entry(thread_id).or_default().push(imap_id); + for (document_id, thread_id) in &thread_cache.threads { + if result_set.results.contains(*document_id) { + if let Some((imap_id, _)) = state.map_result_id(*document_id, is_uid) { + threads.entry(*thread_id).or_default().push(imap_id); + } } } diff --git a/crates/jmap/src/blob/get.rs b/crates/jmap/src/blob/get.rs index bcf14430..20af554d 100644 --- a/crates/jmap/src/blob/get.rs +++ b/crates/jmap/src/blob/get.rs @@ -216,46 +216,37 @@ impl BlobOperations for Server { } if *account_id == req_account_id => { let collection = Collection::from(*collection); if collection == Collection::Email { - if include_email || include_thread { - if let Some(thread_id) = self - .get_property::( - req_account_id, - Collection::Email, - *document_id, - Property::ThreadId, - ) - .await? - { - if include_email { - matched_ids.append( - DataType::Email, - vec![Id::from_parts(thread_id, *document_id)], - ); - } - if include_thread { - matched_ids.append( - DataType::Thread, - vec![Id::from(thread_id)], - ); - } + if let Some(data_) = self + .get_property::>( + req_account_id, + Collection::Email, + *document_id, + Property::Value, + ) + .await? + { + let data = data_ + .unarchive::() + .caused_by(trc::location!())?; + if include_email { + matched_ids.append( + DataType::Email, + vec![Id::from_parts( + u32::from(data.thread_id), + *document_id, + )], + ); } - } - if include_mailbox { - if let Some(mailboxes) = self - .get_property::>( - req_account_id, - Collection::Email, - *document_id, - Property::Value, - ) - .await? - { + if include_thread { + matched_ids.append( + DataType::Thread, + vec![Id::from(u32::from(data.thread_id))], + ); + } + if include_mailbox { matched_ids.append( DataType::Mailbox, - mailboxes - .unarchive::() - .caused_by(trc::location!())? - .mailboxes + data.mailboxes .iter() .map(|m| { debug_assert!(m.uid != 0); diff --git a/crates/jmap/src/email/get.rs b/crates/jmap/src/email/get.rs index e9a92d5d..1539eadf 100644 --- a/crates/jmap/src/email/get.rs +++ b/crates/jmap/src/email/get.rs @@ -118,17 +118,13 @@ impl EmailGet for Server { let ids = if let Some(ids) = ids { ids } else { - let document_ids = message_ids - .iter() - .take(self.core.jmap.get_max_objects) - .collect::>(); - self.get_cached_thread_ids(account_id, document_ids.iter().copied()) + self.get_cached_thread_ids(account_id) .await .caused_by(trc::location!())? - .into_iter() - .filter_map(|(document_id, thread_id)| { - Id::from_parts(thread_id, document_id).into() - }) + .threads + .iter() + .take(self.core.jmap.get_max_objects) + .map(|(document_id, thread_id)| Id::from_parts(*thread_id, *document_id)) .collect() }; let mut response = GetResponse { diff --git a/crates/jmap/src/email/query.rs b/crates/jmap/src/email/query.rs index 89372413..e1dc5641 100644 --- a/crates/jmap/src/email/query.rs +++ b/crates/jmap/src/email/query.rs @@ -15,11 +15,10 @@ use mail_parser::HeaderName; use nlp::language::Language; use std::future::Future; use store::{ - SerializeInfallible, ValueKey, + SerializeInfallible, fts::{Field, FilterGroup, FtsFilter, IntoFilterGroup}, query::{self}, roaring::RoaringBitmap, - write::ValueClass, }; use crate::JmapMethods; @@ -367,16 +366,12 @@ impl EmailQuery for Server { } // Sort results + let thread_cache = self.get_cached_thread_ids(account_id).await?; self.sort( result_set, comparators, paginate - .with_prefix_key(ValueKey { - account_id, - collection: Collection::Email.into(), - document_id: 0, - class: ValueClass::Property(Property::ThreadId.into()), - }) + .with_prefix_map(&thread_cache.threads) .with_prefix_unique(request.arguments.collapse_threads.unwrap_or(false)), response, ) @@ -399,15 +394,15 @@ impl EmailQuery for Server { if keyword_doc_ids.is_empty() { return Ok(keyword_doc_ids); } - let keyword_thread_ids = self - .get_cached_thread_ids(account_id, keyword_doc_ids.iter()) - .await?; - + let thread_cache = self.get_cached_thread_ids(account_id).await?; let mut not_matched_ids = RoaringBitmap::new(); let mut matched_ids = RoaringBitmap::new(); - for (keyword_doc_id, thread_id) in keyword_thread_ids { - if matched_ids.contains(keyword_doc_id) || not_matched_ids.contains(keyword_doc_id) { + for (&keyword_doc_id, &thread_id) in thread_cache.threads.iter() { + if !keyword_doc_ids.contains(keyword_doc_id) + || matched_ids.contains(keyword_doc_id) + || not_matched_ids.contains(keyword_doc_id) + { continue; } diff --git a/crates/jmap/src/lib.rs b/crates/jmap/src/lib.rs index 76e0e121..f5a3f1e2 100644 --- a/crates/jmap/src/lib.rs +++ b/crates/jmap/src/lib.rs @@ -145,7 +145,7 @@ impl JmapMethods for Server { &self, result_set: ResultSet, comparators: Vec, - paginate: Pagination, + paginate: Pagination<'_>, mut response: QueryResponse, ) -> trc::Result { // Sort results diff --git a/crates/store/src/query/sort.rs b/crates/store/src/query/sort.rs index 95c092a5..6fdc0961 100644 --- a/crates/store/src/query/sort.rs +++ b/crates/store/src/query/sort.rs @@ -9,14 +9,11 @@ use std::cmp::Ordering; use ahash::{AHashMap, AHashSet}; use trc::AddContext; -use crate::{ - write::{key::DeserializeBigEndian, ValueClass}, - IndexKeyPrefix, IterateParams, Store, ValueKey, U32_LEN, -}; +use crate::{IndexKeyPrefix, IterateParams, Store, U32_LEN, write::key::DeserializeBigEndian}; use super::{Comparator, ResultSet, SortedResultSet}; -pub struct Pagination { +pub struct Pagination<'x> { requested_position: i32, position: i32, pub limit: usize, @@ -25,7 +22,7 @@ pub struct Pagination { has_anchor: bool, anchor_found: bool, pub ids: Vec, - prefix_key: Option>>, + prefix_map: Option<&'x AHashMap>, prefix_unique: bool, } @@ -34,7 +31,7 @@ impl Store { &self, result_set: ResultSet, mut comparators: Vec, - mut paginate: Pagination, + mut paginate: Pagination<'_>, ) -> trc::Result { paginate.limit = match (result_set.results.len(), paginate.limit) { (0, _) => { @@ -105,16 +102,12 @@ impl Store { } // Obtain prefixes - let prefix_key = paginate.prefix_key.take(); + let prefix_map = paginate.prefix_map.take(); let mut sorted_results = paginate.build(); - if let Some(prefix_key) = prefix_key { + if let Some(prefix_map) = prefix_map { for id in sorted_results.ids.iter_mut() { - if let Some(prefix_id) = self - .get_value::(prefix_key.clone().with_document_id(*id as u32)) - .await - .caused_by(trc::location!())? - { - *id |= (prefix_id as u64) << 32; + if let Some(prefix_id) = prefix_map.get(&(*id as u32)) { + *id |= (*prefix_id as u64) << 32; } } } @@ -213,16 +206,12 @@ impl Store { }); for (document_id, _) in sorted_ids { // Obtain document prefixId - let prefix_id = if let Some(prefix_key) = &paginate.prefix_key { - if let Some(prefix_id) = self - .get_value(prefix_key.clone().with_document_id(document_id)) - .await - .caused_by(trc::location!())? - { - if paginate.prefix_unique && !seen_prefixes.insert(prefix_id) { + let prefix_id = if let Some(prefix_map) = paginate.prefix_map { + if let Some(prefix_id) = prefix_map.get(&document_id) { + if paginate.prefix_unique && !seen_prefixes.insert(*prefix_id) { continue; } - prefix_id + *prefix_id } else { // Document no longer exists? continue; @@ -242,16 +231,12 @@ impl Store { let mut seen_prefixes = AHashSet::new(); for document_id in result_set.results { // Obtain document prefixId - let prefix_id = if let Some(prefix_key) = &paginate.prefix_key { - if let Some(prefix_id) = self - .get_value(prefix_key.clone().with_document_id(document_id)) - .await - .caused_by(trc::location!())? - { - if paginate.prefix_unique && !seen_prefixes.insert(prefix_id) { + let prefix_id = if let Some(prefix_map) = paginate.prefix_map { + if let Some(prefix_id) = prefix_map.get(&document_id) { + if paginate.prefix_unique && !seen_prefixes.insert(*prefix_id) { continue; } - prefix_id + *prefix_id } else { // Document no longer exists? continue; @@ -270,7 +255,7 @@ impl Store { } } -impl Pagination { +impl<'x> Pagination<'x> { pub fn new(limit: usize, position: i32, anchor: Option, anchor_offset: i32) -> Self { let (has_anchor, anchor) = anchor.map(|anchor| (true, anchor)).unwrap_or((false, 0)); @@ -283,13 +268,13 @@ impl Pagination { has_anchor, anchor_found: false, ids: Vec::with_capacity(limit), - prefix_key: None, + prefix_map: None, prefix_unique: false, } } - pub fn with_prefix_key(mut self, prefix_key: ValueKey>) -> Self { - self.prefix_key = Some(prefix_key); + pub fn with_prefix_map(mut self, prefix_map: &'x AHashMap) -> Self { + self.prefix_map = Some(prefix_map); self } diff --git a/crates/store/src/write/key.rs b/crates/store/src/write/key.rs index 34bd4bb5..cfa17064 100644 --- a/crates/store/src/write/key.rs +++ b/crates/store/src/write/key.rs @@ -5,17 +5,17 @@ */ use std::convert::TryInto; -use utils::{codec::leb128::Leb128_, BLOB_HASH_LEN}; +use utils::{BLOB_HASH_LEN, codec::leb128::Leb128_}; use crate::{ - BitmapKey, Deserialize, IndexKey, IndexKeyPrefix, Key, LogKey, ValueKey, SUBSPACE_ACL, + BitmapKey, Deserialize, IndexKey, IndexKeyPrefix, Key, LogKey, SUBSPACE_ACL, SUBSPACE_BITMAP_ID, SUBSPACE_BITMAP_TAG, SUBSPACE_BITMAP_TEXT, SUBSPACE_BLOB_LINK, SUBSPACE_BLOB_RESERVE, SUBSPACE_COUNTER, SUBSPACE_DIRECTORY, SUBSPACE_FTS_INDEX, - SUBSPACE_INDEXES, SUBSPACE_IN_MEMORY_COUNTER, SUBSPACE_IN_MEMORY_VALUE, SUBSPACE_LOGS, + SUBSPACE_IN_MEMORY_COUNTER, SUBSPACE_IN_MEMORY_VALUE, SUBSPACE_INDEXES, SUBSPACE_LOGS, SUBSPACE_PROPERTY, SUBSPACE_QUEUE_EVENT, SUBSPACE_QUEUE_MESSAGE, SUBSPACE_QUOTA, SUBSPACE_REPORT_IN, SUBSPACE_REPORT_OUT, SUBSPACE_SETTINGS, SUBSPACE_TASK_QUEUE, SUBSPACE_TELEMETRY_INDEX, SUBSPACE_TELEMETRY_METRIC, SUBSPACE_TELEMETRY_SPAN, U32_LEN, U64_LEN, - WITH_SUBSPACE, + ValueKey, WITH_SUBSPACE, }; use super::{ diff --git a/tests/src/jmap/email_query_changes.rs b/tests/src/jmap/email_query_changes.rs index addbfc73..e4ef1496 100644 --- a/tests/src/jmap/email_query_changes.rs +++ b/tests/src/jmap/email_query_changes.rs @@ -4,6 +4,8 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ +use ::email::message::{ingest::EmailIngest, metadata::MessageData}; +use common::storage::index::ObjectIndexBuilder; use jmap_client::{ core::query::{Comparator, Filter}, email, @@ -13,7 +15,7 @@ use jmap_proto::types::{collection::Collection, id::Id, property::Property, stat use store::{ ahash::{AHashMap, AHashSet}, - write::{BatchBuilder, MaybeDynamicId, TagValue, log::ChangeLogBuilder}, + write::{AlignedBytes, Archive, BatchBuilder, log::ChangeLogBuilder}, }; use crate::jmap::{ @@ -131,6 +133,23 @@ pub async fn test(params: &mut JMAPTest) { LogAction::Move(from, to) => { let id = *id_map.get(from).unwrap(); let new_id = Id::from_parts(thread_id, id.document_id()); + + let new_thread_id = server.create_thread_id(1).await.unwrap(); + + let old_message_ = server + .get_property::>( + 1, + Collection::Email, + id.document_id(), + Property::Value, + ) + .await + .unwrap() + .unwrap(); + let old_message = old_message_.to_unarchived::().unwrap(); + let mut new_message = old_message.deserialize().unwrap(); + new_message.thread_id = new_thread_id; + server .core .storage @@ -142,9 +161,12 @@ pub async fn test(params: &mut JMAPTest) { .create_document() .with_collection(Collection::Email) .update_document(id.document_id()) - .untag(Property::ThreadId, id.prefix_id()) - .set(Property::ThreadId, MaybeDynamicId::Dynamic(0)) - .tag(Property::ThreadId, TagValue::Id(MaybeDynamicId::Dynamic(0))) + .custom( + ObjectIndexBuilder::new() + .with_current(old_message) + .with_changes(new_message), + ) + .unwrap() .custom(server.begin_changes(1).unwrap().with_log_move( Collection::Email, id, diff --git a/tests/src/jmap/mod.rs b/tests/src/jmap/mod.rs index b6a2c817..5c6bac96 100644 --- a/tests/src/jmap/mod.rs +++ b/tests/src/jmap/mod.rs @@ -374,8 +374,8 @@ pub async fn jmap_tests() { .await; webhooks::test(&mut params).await; - /*email_query::test(&mut params, delete).await; - email_get::test(&mut params).await; + email_query::test(&mut params, delete).await; + /*email_get::test(&mut params).await; email_set::test(&mut params).await; email_parse::test(&mut params).await; email_search_snippet::test(&mut params).await; @@ -386,7 +386,7 @@ pub async fn jmap_tests() { thread_merge::test(&mut params).await; mailbox::test(&mut params).await; delivery::test(&mut params).await; - auth_acl::test(&mut params).await; + auth_acl::test(&mut params).await;*/ auth_limits::test(&mut params).await; auth_oauth::test(&mut params).await; event_source::test(&mut params).await; @@ -397,7 +397,7 @@ pub async fn jmap_tests() { websocket::test(&mut params).await; quota::test(&mut params).await; crypto::test(&mut params).await; - blob::test(&mut params).await;*/ + blob::test(&mut params).await; permissions::test(¶ms).await; purge::test(&mut params).await; enterprise::test(&mut params).await;