mirror of
https://github.com/stalwartlabs/mail-server.git
synced 2025-10-24 11:26:05 +08:00
Improved threadId management
This commit is contained in:
parent
9491071731
commit
b75e3a8012
21 changed files with 559 additions and 535 deletions
|
|
@ -16,9 +16,7 @@ use store::{
|
|||
SerializeInfallible,
|
||||
query::Filter,
|
||||
roaring::RoaringBitmap,
|
||||
write::{
|
||||
AlignedBytes, Archive, BatchBuilder, log::ChangeLogBuilder, serialize::rkyv_deserialize,
|
||||
},
|
||||
write::{AlignedBytes, Archive, BatchBuilder, log::ChangeLogBuilder},
|
||||
};
|
||||
use trc::AddContext;
|
||||
|
||||
|
|
@ -131,59 +129,39 @@ impl MailboxDestroy for Server {
|
|||
continue;
|
||||
}
|
||||
|
||||
let mut new_message_data =
|
||||
rkyv_deserialize(prev_message_data.inner).caused_by(trc::location!())?;
|
||||
let mut new_message_data = prev_message_data
|
||||
.deserialize()
|
||||
.caused_by(trc::location!())?;
|
||||
let thread_id = new_message_data.thread_id;
|
||||
|
||||
new_message_data
|
||||
.mailboxes
|
||||
.retain(|id| id.mailbox_id != document_id);
|
||||
|
||||
// Obtain threadId
|
||||
if let Some(thread_id) = self
|
||||
.get_property::<u32>(
|
||||
account_id,
|
||||
Collection::Email,
|
||||
message_id,
|
||||
Property::ThreadId,
|
||||
// Untag message from mailbox
|
||||
let mut batch = BatchBuilder::new();
|
||||
batch
|
||||
.with_account_id(account_id)
|
||||
.with_collection(Collection::Email)
|
||||
.update_document(message_id)
|
||||
.custom(
|
||||
ObjectIndexBuilder::new()
|
||||
.with_changes(new_message_data)
|
||||
.with_current(prev_message_data),
|
||||
)
|
||||
.await?
|
||||
{
|
||||
// Untag message from mailbox
|
||||
let mut batch = BatchBuilder::new();
|
||||
batch
|
||||
.with_account_id(account_id)
|
||||
.with_collection(Collection::Email)
|
||||
.update_document(message_id)
|
||||
.custom(
|
||||
ObjectIndexBuilder::new()
|
||||
.with_changes(new_message_data)
|
||||
.with_current(prev_message_data),
|
||||
)
|
||||
.caused_by(trc::location!())?;
|
||||
match self.core.storage.data.write(batch.build()).await {
|
||||
Ok(_) => changes.log_update(
|
||||
Collection::Email,
|
||||
Id::from_parts(thread_id, message_id),
|
||||
),
|
||||
Err(err) if err.is_assertion_failure() => {
|
||||
return Ok(Err(SetError::forbidden().with_description(concat!(
|
||||
"Another process modified a message in this mailbox ",
|
||||
"while deleting it, please try again."
|
||||
))));
|
||||
}
|
||||
Err(err) => {
|
||||
return Err(err.caused_by(trc::location!()));
|
||||
}
|
||||
.caused_by(trc::location!())?;
|
||||
match self.core.storage.data.write(batch.build()).await {
|
||||
Ok(_) => changes
|
||||
.log_update(Collection::Email, Id::from_parts(thread_id, message_id)),
|
||||
Err(err) if err.is_assertion_failure() => {
|
||||
return Ok(Err(SetError::forbidden().with_description(concat!(
|
||||
"Another process modified a message in this mailbox ",
|
||||
"while deleting it, please try again."
|
||||
))));
|
||||
}
|
||||
Err(err) => {
|
||||
return Err(err.caused_by(trc::location!()));
|
||||
}
|
||||
} else {
|
||||
trc::event!(
|
||||
Store(trc::StoreEvent::NotFound),
|
||||
AccountId = account_id,
|
||||
MessageId = message_id,
|
||||
MailboxId = document_id,
|
||||
Details = "Message does not have a threadId.",
|
||||
CausedBy = trc::location!(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -214,14 +214,20 @@ impl MailboxFnc for Server {
|
|||
document_ids: Option<RoaringBitmap>,
|
||||
) -> trc::Result<usize> {
|
||||
if let Some(document_ids) = document_ids {
|
||||
let mut thread_ids = AHashSet::default();
|
||||
self.get_cached_thread_ids(account_id, document_ids.into_iter())
|
||||
let thread_ids = self
|
||||
.get_cached_thread_ids(account_id)
|
||||
.await
|
||||
.caused_by(trc::location!())?
|
||||
.into_iter()
|
||||
.for_each(|(_, thread_id)| {
|
||||
thread_ids.insert(thread_id);
|
||||
});
|
||||
.threads
|
||||
.iter()
|
||||
.filter_map(|(document_id, thread_id)| {
|
||||
if document_ids.contains(*document_id) {
|
||||
Some(*thread_id)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect::<AHashSet<_>>();
|
||||
Ok(thread_ids.len())
|
||||
} else {
|
||||
Ok(0)
|
||||
|
|
|
|||
|
|
@ -15,10 +15,7 @@ use jmap_proto::{
|
|||
use mail_parser::parsers::fields::thread::thread_name;
|
||||
use store::{
|
||||
BlobClass,
|
||||
write::{
|
||||
AlignedBytes, Archive, BatchBuilder, MaybeDynamicId, TagValue, TaskQueueClass, ValueClass,
|
||||
log::{Changes, LogInsert},
|
||||
},
|
||||
write::{AlignedBytes, Archive, BatchBuilder, TaskQueueClass, ValueClass, log::Changes},
|
||||
};
|
||||
use trc::AddContext;
|
||||
|
||||
|
|
@ -129,13 +126,11 @@ impl EmailCopy for Server {
|
|||
}
|
||||
}
|
||||
|
||||
let thread_id = if !references.is_empty() {
|
||||
self.find_or_merge_thread(account_id, subject, &references)
|
||||
.await
|
||||
.caused_by(trc::location!())?
|
||||
} else {
|
||||
None
|
||||
};
|
||||
// Obtain threadId
|
||||
let thread_id = self
|
||||
.find_or_merge_thread(account_id, subject, &references)
|
||||
.await
|
||||
.caused_by(trc::location!())?;
|
||||
|
||||
// Assign id
|
||||
let mut email = IngestedEmail {
|
||||
|
|
@ -162,30 +157,19 @@ impl EmailCopy for Server {
|
|||
batch
|
||||
.with_account_id(account_id)
|
||||
.with_change_id(change_id)
|
||||
.with_collection(Collection::Thread);
|
||||
if let Some(thread_id) = thread_id {
|
||||
batch.log(Changes::update([thread_id]));
|
||||
} else {
|
||||
batch.create_document().log(LogInsert());
|
||||
};
|
||||
|
||||
// Build batch
|
||||
let maybe_thread_id = thread_id
|
||||
.map(MaybeDynamicId::Static)
|
||||
.unwrap_or(MaybeDynamicId::Dynamic(0));
|
||||
batch
|
||||
.with_collection(Collection::Thread)
|
||||
.log(Changes::update([thread_id]))
|
||||
.with_collection(Collection::Mailbox)
|
||||
.log(Changes::child_update(mailboxes.iter().copied()))
|
||||
.with_collection(Collection::Email)
|
||||
.create_document()
|
||||
.log(LogEmailInsert::new(thread_id))
|
||||
.set(Property::ThreadId, maybe_thread_id)
|
||||
.tag(Property::ThreadId, TagValue::Id(maybe_thread_id))
|
||||
.log(LogEmailInsert::new(thread_id.into()))
|
||||
.custom(
|
||||
ObjectIndexBuilder::<(), _>::new().with_changes(MessageData {
|
||||
mailboxes: mailbox_ids,
|
||||
keywords,
|
||||
change_id,
|
||||
thread_id,
|
||||
}),
|
||||
)
|
||||
.caused_by(trc::location!())?
|
||||
|
|
@ -213,10 +197,6 @@ impl EmailCopy for Server {
|
|||
.write(batch.build())
|
||||
.await
|
||||
.caused_by(trc::location!())?;
|
||||
let thread_id = match thread_id {
|
||||
Some(thread_id) => thread_id,
|
||||
None => ids.first_document_id().caused_by(trc::location!())?,
|
||||
};
|
||||
let document_id = ids.last_document_id().caused_by(trc::location!())?;
|
||||
|
||||
// Request FTS index
|
||||
|
|
|
|||
|
|
@ -12,14 +12,13 @@ use jmap_proto::types::{
|
|||
};
|
||||
use store::{
|
||||
BitmapKey, IterateParams, U32_LEN, ValueKey,
|
||||
ahash::AHashMap,
|
||||
roaring::RoaringBitmap,
|
||||
write::{
|
||||
AlignedBytes, Archive, BatchBuilder, BitmapClass, MaybeDynamicId, TagValue, ValueClass,
|
||||
log::ChangeLogBuilder,
|
||||
log::{ChangeLogBuilder, Changes},
|
||||
},
|
||||
};
|
||||
use trc::{AddContext, StoreEvent};
|
||||
use trc::AddContext;
|
||||
use utils::{BlobHash, codec::leb128::Leb128Reader};
|
||||
|
||||
use std::future::Future;
|
||||
|
|
@ -50,6 +49,9 @@ pub trait EmailDeletion: Sync + Send {
|
|||
&self,
|
||||
account_id: u32,
|
||||
) -> impl Future<Output = trc::Result<()>> + Send;
|
||||
|
||||
fn emails_purge_threads(&self, account_id: u32)
|
||||
-> impl Future<Output = trc::Result<()>> + Send;
|
||||
}
|
||||
|
||||
impl EmailDeletion for Server {
|
||||
|
|
@ -60,11 +62,14 @@ impl EmailDeletion for Server {
|
|||
) -> trc::Result<(ChangeLogBuilder, RoaringBitmap)> {
|
||||
// Create batch
|
||||
let mut changes = ChangeLogBuilder::with_change_id(0);
|
||||
let mut delete_properties = AHashMap::new();
|
||||
|
||||
// Fetch mailboxes and threadIds
|
||||
let mut thread_ids: AHashMap<u32, i32> = AHashMap::new();
|
||||
for (document_id, data) in self
|
||||
// Tombstone message and untag it from the mailboxes
|
||||
let mut batch = BatchBuilder::new();
|
||||
batch
|
||||
.with_account_id(account_id)
|
||||
.with_collection(Collection::Email);
|
||||
|
||||
for (document_id, data_) in self
|
||||
.get_properties::<Archive<AlignedBytes>, _>(
|
||||
account_id,
|
||||
Collection::Email,
|
||||
|
|
@ -73,126 +78,31 @@ impl EmailDeletion for Server {
|
|||
)
|
||||
.await?
|
||||
{
|
||||
delete_properties.insert(
|
||||
document_id,
|
||||
DeleteProperties {
|
||||
archive: Some(data),
|
||||
thread_id: None,
|
||||
},
|
||||
);
|
||||
}
|
||||
for (document_id, thread_id) in self
|
||||
.get_properties::<u32, _>(
|
||||
account_id,
|
||||
Collection::Email,
|
||||
&document_ids,
|
||||
Property::ThreadId,
|
||||
)
|
||||
.await?
|
||||
{
|
||||
*thread_ids.entry(thread_id).or_default() += 1;
|
||||
delete_properties
|
||||
.entry(document_id)
|
||||
.or_insert_with(DeleteProperties::default)
|
||||
.thread_id = Some(thread_id);
|
||||
}
|
||||
let data = data_
|
||||
.to_unarchived::<MessageData>()
|
||||
.caused_by(trc::location!())?;
|
||||
let thread_id = u32::from(data.inner.thread_id);
|
||||
|
||||
// Obtain all threadIds
|
||||
self.core
|
||||
.storage
|
||||
.data
|
||||
.iterate(
|
||||
IterateParams::new(
|
||||
BitmapKey {
|
||||
account_id,
|
||||
collection: Collection::Email.into(),
|
||||
class: BitmapClass::Tag {
|
||||
field: Property::ThreadId.into(),
|
||||
value: TagValue::Id(0),
|
||||
},
|
||||
document_id: 0,
|
||||
},
|
||||
BitmapKey {
|
||||
account_id,
|
||||
collection: Collection::Email.into(),
|
||||
class: BitmapClass::Tag {
|
||||
field: Property::ThreadId.into(),
|
||||
value: TagValue::Id(u32::MAX),
|
||||
},
|
||||
document_id: u32::MAX,
|
||||
},
|
||||
)
|
||||
.no_values(),
|
||||
|key, _| {
|
||||
let (thread_id, _) = key
|
||||
.get(U32_LEN + 2..)
|
||||
.and_then(|bytes| bytes.read_leb128::<u32>())
|
||||
.ok_or_else(|| trc::Error::corrupted_key(key, None, trc::location!()))?;
|
||||
if let Some(thread_count) = thread_ids.get_mut(&thread_id) {
|
||||
*thread_count -= 1;
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
},
|
||||
)
|
||||
.await
|
||||
.caused_by(trc::location!())?;
|
||||
|
||||
// Tombstone message and untag it from the mailboxes
|
||||
let mut batch = BatchBuilder::new();
|
||||
batch
|
||||
.with_account_id(account_id)
|
||||
.with_collection(Collection::Email);
|
||||
|
||||
for (document_id, delete_properties) in delete_properties {
|
||||
batch.update_document(document_id);
|
||||
|
||||
if let Some(data_) = delete_properties.archive {
|
||||
let data = data_
|
||||
.to_unarchived::<MessageData>()
|
||||
.caused_by(trc::location!())?;
|
||||
|
||||
for mailbox in data.inner.mailboxes.iter() {
|
||||
changes.log_child_update(Collection::Mailbox, u32::from(mailbox.mailbox_id));
|
||||
}
|
||||
|
||||
batch
|
||||
.custom(ObjectIndexBuilder::<_, ()>::new().with_current(data))
|
||||
.caused_by(trc::location!())?;
|
||||
} else {
|
||||
trc::event!(
|
||||
Store(StoreEvent::NotFound),
|
||||
AccountId = account_id,
|
||||
DocumentId = document_id,
|
||||
Details = "Failed to fetch mailboxIds.",
|
||||
CausedBy = trc::location!(),
|
||||
);
|
||||
for mailbox in data.inner.mailboxes.iter() {
|
||||
changes.log_child_update(Collection::Mailbox, u32::from(mailbox.mailbox_id));
|
||||
}
|
||||
if let Some(thread_id) = delete_properties.thread_id {
|
||||
batch
|
||||
.untag(Property::ThreadId, thread_id)
|
||||
.clear(Property::ThreadId);
|
||||
|
||||
// Log message deletion
|
||||
changes.log_delete(Collection::Email, Id::from_parts(thread_id, document_id));
|
||||
// Log message deletion
|
||||
changes.log_delete(Collection::Email, Id::from_parts(thread_id, document_id));
|
||||
|
||||
// Log thread changes
|
||||
if thread_ids[&thread_id] < 0 {
|
||||
changes.log_child_update(Collection::Thread, thread_id);
|
||||
}
|
||||
} else {
|
||||
trc::event!(
|
||||
Store(StoreEvent::NotFound),
|
||||
AccountId = account_id,
|
||||
DocumentId = document_id,
|
||||
Details = "Failed to fetch threadId.",
|
||||
CausedBy = trc::location!(),
|
||||
// Log thread changes
|
||||
changes.log_child_update(Collection::Thread, thread_id);
|
||||
|
||||
// Add changes to batch
|
||||
batch
|
||||
.update_document(document_id)
|
||||
.custom(ObjectIndexBuilder::<_, ()>::new().with_current(data))
|
||||
.caused_by(trc::location!())?
|
||||
.tag(
|
||||
Property::MailboxIds,
|
||||
TagValue::Id(MaybeDynamicId::Static(TOMBSTONE_ID)),
|
||||
);
|
||||
}
|
||||
batch.tag(
|
||||
Property::MailboxIds,
|
||||
TagValue::Id(MaybeDynamicId::Static(TOMBSTONE_ID)),
|
||||
);
|
||||
|
||||
document_ids.remove(document_id);
|
||||
|
||||
if batch.ops.len() >= 1000 {
|
||||
|
|
@ -210,16 +120,6 @@ impl EmailDeletion for Server {
|
|||
}
|
||||
}
|
||||
|
||||
// Delete threadIds
|
||||
for (thread_id, thread_count) in thread_ids {
|
||||
if thread_count == 0 {
|
||||
batch
|
||||
.with_collection(Collection::Thread)
|
||||
.delete_document(thread_id);
|
||||
changes.log_delete(Collection::Thread, thread_id);
|
||||
}
|
||||
}
|
||||
|
||||
if !batch.ops.is_empty() {
|
||||
self.core
|
||||
.storage
|
||||
|
|
@ -409,6 +309,9 @@ impl EmailDeletion for Server {
|
|||
Total = tombstoned_ids.len(),
|
||||
);
|
||||
|
||||
// Delete threadIds
|
||||
self.emails_purge_threads(account_id).await?;
|
||||
|
||||
// Delete full-text index
|
||||
self.core
|
||||
.storage
|
||||
|
|
@ -489,10 +392,78 @@ impl EmailDeletion for Server {
|
|||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
struct DeleteProperties {
|
||||
archive: Option<Archive<AlignedBytes>>,
|
||||
thread_id: Option<u32>,
|
||||
async fn emails_purge_threads(&self, account_id: u32) -> trc::Result<()> {
|
||||
// Delete threadIs without documents
|
||||
let mut thread_ids = self
|
||||
.get_document_ids(account_id, Collection::Thread)
|
||||
.await
|
||||
.caused_by(trc::location!())?
|
||||
.unwrap_or_default();
|
||||
|
||||
if thread_ids.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.core
|
||||
.storage
|
||||
.data
|
||||
.iterate(
|
||||
IterateParams::new(
|
||||
BitmapKey {
|
||||
account_id,
|
||||
collection: Collection::Email.into(),
|
||||
class: BitmapClass::Tag {
|
||||
field: Property::ThreadId.into(),
|
||||
value: TagValue::Id(0),
|
||||
},
|
||||
document_id: 0,
|
||||
},
|
||||
BitmapKey {
|
||||
account_id,
|
||||
collection: Collection::Email.into(),
|
||||
class: BitmapClass::Tag {
|
||||
field: Property::ThreadId.into(),
|
||||
value: TagValue::Id(u32::MAX),
|
||||
},
|
||||
document_id: u32::MAX,
|
||||
},
|
||||
)
|
||||
.no_values(),
|
||||
|key, _| {
|
||||
let (thread_id, _) = key
|
||||
.get(U32_LEN + 2..)
|
||||
.and_then(|bytes| bytes.read_leb128::<u32>())
|
||||
.ok_or_else(|| trc::Error::corrupted_key(key, None, trc::location!()))?;
|
||||
thread_ids.remove(thread_id);
|
||||
|
||||
Ok(!thread_ids.is_empty())
|
||||
},
|
||||
)
|
||||
.await
|
||||
.caused_by(trc::location!())?;
|
||||
|
||||
if thread_ids.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Create batch
|
||||
let mut batch = BatchBuilder::new();
|
||||
batch
|
||||
.with_account_id(account_id)
|
||||
.with_collection(Collection::Thread)
|
||||
.with_change_id(self.generate_snowflake_id().caused_by(trc::location!())?)
|
||||
.log(Changes::delete(thread_ids.iter().map(|id| id as u64)));
|
||||
for thread_id in thread_ids {
|
||||
batch.delete_document(thread_id);
|
||||
}
|
||||
self.core
|
||||
.storage
|
||||
.data
|
||||
.write(batch.build())
|
||||
.await
|
||||
.caused_by(trc::location!())?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -597,6 +597,10 @@ impl IndexableObject for MessageData {
|
|||
})
|
||||
.collect(),
|
||||
},
|
||||
IndexValue::Tag {
|
||||
field: Property::ThreadId.into(),
|
||||
value: vec![TagValue::Id(MaybeDynamicId::Static(self.thread_id))],
|
||||
},
|
||||
]
|
||||
.into_iter()
|
||||
}
|
||||
|
|
@ -624,6 +628,12 @@ impl IndexableObject for &ArchivedMessageData {
|
|||
})
|
||||
.collect(),
|
||||
},
|
||||
IndexValue::Tag {
|
||||
field: Property::ThreadId.into(),
|
||||
value: vec![TagValue::Id(MaybeDynamicId::Static(u32::from(
|
||||
self.thread_id,
|
||||
)))],
|
||||
},
|
||||
]
|
||||
.into_iter()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@
|
|||
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
collections::BTreeSet,
|
||||
fmt::Write,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
|
@ -13,6 +14,7 @@ use std::{
|
|||
use common::{
|
||||
Server,
|
||||
auth::{AccessToken, ResourceToken},
|
||||
storage::index::ObjectIndexBuilder,
|
||||
};
|
||||
use directory::Permission;
|
||||
use jmap_proto::types::{
|
||||
|
|
@ -33,19 +35,20 @@ use spam_filter::{
|
|||
};
|
||||
use std::future::Future;
|
||||
use store::{
|
||||
BitmapKey, BlobClass,
|
||||
ahash::AHashSet,
|
||||
BlobClass, IndexKey, IndexKeyPrefix, IterateParams, U32_LEN,
|
||||
ahash::AHashMap,
|
||||
query::Filter,
|
||||
roaring::RoaringBitmap,
|
||||
write::{
|
||||
AlignedBytes, Archive, AssignedIds, BatchBuilder, BitmapClass, MaybeDynamicId,
|
||||
MaybeDynamicValue, SerializeWithId, TagValue, TaskQueueClass, ValueClass,
|
||||
AlignedBytes, Archive, AssignedIds, BatchBuilder, MaybeDynamicValue, SerializeWithId,
|
||||
TaskQueueClass, ValueClass,
|
||||
key::DeserializeBigEndian,
|
||||
log::{ChangeLogBuilder, Changes, LogInsert},
|
||||
now,
|
||||
},
|
||||
};
|
||||
use store::{SerializeInfallible, rand::Rng};
|
||||
use trc::{AddContext, MessageIngestEvent};
|
||||
use utils::map::vec_map::VecMap;
|
||||
|
||||
use crate::{
|
||||
mailbox::{INBOX_ID, JUNK_ID, UidMailbox},
|
||||
|
|
@ -104,13 +107,14 @@ pub trait EmailIngest: Sync + Send {
|
|||
account_id: u32,
|
||||
thread_name: &str,
|
||||
references: &[&str],
|
||||
) -> impl Future<Output = trc::Result<Option<u32>>> + Send;
|
||||
) -> impl Future<Output = trc::Result<u32>> + Send;
|
||||
fn assign_imap_uid(
|
||||
&self,
|
||||
account_id: u32,
|
||||
mailbox_id: u32,
|
||||
) -> impl Future<Output = trc::Result<u32>> + Send;
|
||||
fn email_bayes_can_train(&self, access_token: &AccessToken) -> bool;
|
||||
fn create_thread_id(&self, account_id: u32) -> impl Future<Output = trc::Result<u32>> + Send;
|
||||
}
|
||||
|
||||
impl EmailIngest for Server {
|
||||
|
|
@ -323,12 +327,8 @@ impl EmailIngest for Server {
|
|||
});
|
||||
}
|
||||
|
||||
if !references.is_empty() {
|
||||
self.find_or_merge_thread(account_id, subject, &references)
|
||||
.await?
|
||||
} else {
|
||||
None
|
||||
}
|
||||
self.find_or_merge_thread(account_id, subject, &references)
|
||||
.await?
|
||||
};
|
||||
|
||||
// Add additional headers to message
|
||||
|
|
@ -474,27 +474,19 @@ impl EmailIngest for Server {
|
|||
batch
|
||||
.with_change_id(change_id)
|
||||
.with_account_id(account_id)
|
||||
.with_collection(Collection::Thread);
|
||||
if let Some(thread_id) = thread_id {
|
||||
batch.log(Changes::update([thread_id]));
|
||||
} else {
|
||||
batch.create_document().log(LogInsert());
|
||||
}
|
||||
|
||||
.with_collection(Collection::Thread)
|
||||
.log(Changes::update([thread_id]));
|
||||
// Build write batch
|
||||
let mailbox_ids_event = mailbox_ids
|
||||
.iter()
|
||||
.map(|m| trc::Value::from(m.mailbox_id))
|
||||
.collect::<Vec<_>>();
|
||||
let maybe_thread_id = thread_id
|
||||
.map(MaybeDynamicId::Static)
|
||||
.unwrap_or(MaybeDynamicId::Dynamic(0));
|
||||
batch
|
||||
.with_collection(Collection::Mailbox)
|
||||
.log(Changes::child_update(params.mailbox_ids.iter().copied()))
|
||||
.with_collection(Collection::Email)
|
||||
.create_document()
|
||||
.log(LogEmailInsert(thread_id))
|
||||
.log(LogEmailInsert(thread_id.into()))
|
||||
.index_message(
|
||||
account_id,
|
||||
tenant_id,
|
||||
|
|
@ -504,12 +496,11 @@ impl EmailIngest for Server {
|
|||
mailboxes: mailbox_ids,
|
||||
keywords: params.keywords,
|
||||
change_id,
|
||||
thread_id,
|
||||
},
|
||||
params.received_at.unwrap_or_else(now),
|
||||
)
|
||||
.caused_by(trc::location!())?
|
||||
.set(Property::ThreadId, maybe_thread_id)
|
||||
.tag(Property::ThreadId, TagValue::Id(maybe_thread_id))
|
||||
.set(
|
||||
ValueClass::TaskQueue(TaskQueueClass::IndexEmail {
|
||||
seq: self.generate_snowflake_id().caused_by(trc::location!())?,
|
||||
|
|
@ -538,10 +529,7 @@ impl EmailIngest for Server {
|
|||
.write(batch.build())
|
||||
.await
|
||||
.caused_by(trc::location!())?;
|
||||
let thread_id = match thread_id {
|
||||
Some(thread_id) => thread_id,
|
||||
None => ids.first_document_id().caused_by(trc::location!())?,
|
||||
};
|
||||
|
||||
let document_id = ids.last_document_id().caused_by(trc::location!())?;
|
||||
let id = Id::from_parts(thread_id, document_id);
|
||||
|
||||
|
|
@ -592,7 +580,11 @@ impl EmailIngest for Server {
|
|||
account_id: u32,
|
||||
thread_name: &str,
|
||||
references: &[&str],
|
||||
) -> trc::Result<Option<u32>> {
|
||||
) -> trc::Result<u32> {
|
||||
if references.is_empty() {
|
||||
return self.create_thread_id(account_id).await;
|
||||
}
|
||||
|
||||
let mut try_count = 0;
|
||||
let thread_name = if !thread_name.is_empty() {
|
||||
thread_name
|
||||
|
|
@ -600,59 +592,118 @@ impl EmailIngest for Server {
|
|||
"!"
|
||||
}
|
||||
.serialize();
|
||||
let references = references
|
||||
.iter()
|
||||
.map(|r| r.as_bytes())
|
||||
.collect::<BTreeSet<_>>();
|
||||
|
||||
loop {
|
||||
// Find messages with matching references
|
||||
let mut filters = Vec::with_capacity(references.len() + 3);
|
||||
filters.push(Filter::eq(Property::Subject, thread_name.clone()));
|
||||
filters.push(Filter::Or);
|
||||
for reference in references {
|
||||
filters.push(Filter::eq(Property::References, reference.serialize()));
|
||||
}
|
||||
filters.push(Filter::End);
|
||||
let results = self
|
||||
.core
|
||||
.storage
|
||||
.data
|
||||
.filter(account_id, Collection::Email, filters)
|
||||
.await
|
||||
.caused_by(trc::location!())?
|
||||
.results;
|
||||
// Find messages with a matching subject
|
||||
let mut subj_results = RoaringBitmap::new();
|
||||
self.store()
|
||||
.iterate(
|
||||
IterateParams::new(
|
||||
IndexKey {
|
||||
account_id,
|
||||
collection: Collection::Email.into(),
|
||||
document_id: 0,
|
||||
field: Property::Subject.into(),
|
||||
key: thread_name.clone(),
|
||||
},
|
||||
IndexKey {
|
||||
account_id,
|
||||
collection: Collection::Email.into(),
|
||||
document_id: u32::MAX,
|
||||
field: Property::Subject.into(),
|
||||
key: thread_name.clone(),
|
||||
},
|
||||
)
|
||||
.no_values()
|
||||
.ascending(),
|
||||
|key, _| {
|
||||
let id_pos = key.len() - U32_LEN;
|
||||
let value = key.get(IndexKeyPrefix::len()..id_pos).ok_or_else(|| {
|
||||
trc::Error::corrupted_key(key, None, trc::location!())
|
||||
})?;
|
||||
|
||||
if results.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
if value == thread_name {
|
||||
subj_results.insert(key.deserialize_be_u32(id_pos)?);
|
||||
}
|
||||
|
||||
// Obtain threadIds for matching messages
|
||||
let thread_ids = self
|
||||
.get_cached_thread_ids(account_id, results.iter())
|
||||
Ok(true)
|
||||
},
|
||||
)
|
||||
.await
|
||||
.caused_by(trc::location!())?;
|
||||
if subj_results.is_empty() {
|
||||
return self.create_thread_id(account_id).await;
|
||||
}
|
||||
|
||||
if thread_ids.len() == 1 {
|
||||
return Ok(thread_ids
|
||||
.into_iter()
|
||||
.next()
|
||||
.map(|(_, thread_id)| thread_id));
|
||||
// Find messages with matching references
|
||||
let mut results = RoaringBitmap::new();
|
||||
self.store()
|
||||
.iterate(
|
||||
IterateParams::new(
|
||||
IndexKey {
|
||||
account_id,
|
||||
collection: Collection::Email.into(),
|
||||
document_id: 0,
|
||||
field: Property::References.into(),
|
||||
key: references.first().unwrap().to_vec(),
|
||||
},
|
||||
IndexKey {
|
||||
account_id,
|
||||
collection: Collection::Email.into(),
|
||||
document_id: u32::MAX,
|
||||
field: Property::References.into(),
|
||||
key: references.last().unwrap().to_vec(),
|
||||
},
|
||||
)
|
||||
.no_values()
|
||||
.ascending(),
|
||||
|key, _| {
|
||||
let id_pos = key.len() - U32_LEN;
|
||||
let value = key.get(IndexKeyPrefix::len()..id_pos).ok_or_else(|| {
|
||||
trc::Error::corrupted_key(key, None, trc::location!())
|
||||
})?;
|
||||
let document_id = key.deserialize_be_u32(id_pos)?;
|
||||
|
||||
if subj_results.contains(document_id) && references.contains(value) {
|
||||
results.insert(document_id);
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
},
|
||||
)
|
||||
.await
|
||||
.caused_by(trc::location!())?;
|
||||
if results.is_empty() {
|
||||
return self.create_thread_id(account_id).await;
|
||||
}
|
||||
|
||||
// Find the most common threadId
|
||||
let mut thread_counts = VecMap::<u32, u32>::with_capacity(thread_ids.len());
|
||||
let mut thread_counts = AHashMap::<u32, u32>::with_capacity(16);
|
||||
let mut thread_id = u32::MAX;
|
||||
let mut thread_count = 0;
|
||||
for (_, thread_id_) in thread_ids.iter() {
|
||||
let tc = thread_counts.get_mut_or_insert(*thread_id_);
|
||||
*tc += 1;
|
||||
if *tc > thread_count {
|
||||
thread_count = *tc;
|
||||
thread_id = *thread_id_;
|
||||
let thread_cache = self
|
||||
.get_cached_thread_ids(account_id)
|
||||
.await
|
||||
.caused_by(trc::location!())?;
|
||||
for (document_id, thread_id_) in thread_cache.threads.iter() {
|
||||
if results.contains(*document_id) {
|
||||
let tc = thread_counts.entry(*thread_id_).or_default();
|
||||
*tc += 1;
|
||||
if *tc > thread_count {
|
||||
thread_count = *tc;
|
||||
thread_id = *thread_id_;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if thread_id == u32::MAX {
|
||||
return Ok(None); // This should never happen
|
||||
return self.create_thread_id(account_id).await;
|
||||
} else if thread_counts.len() == 1 {
|
||||
return Ok(Some(thread_id));
|
||||
return Ok(thread_id);
|
||||
}
|
||||
|
||||
// Delete all but the most common threadId
|
||||
|
|
@ -673,47 +724,49 @@ impl EmailIngest for Server {
|
|||
|
||||
// Move messages to the new threadId
|
||||
batch.with_collection(Collection::Email);
|
||||
for old_thread_id in thread_ids
|
||||
.into_iter()
|
||||
.map(|(_, thread_id)| thread_id)
|
||||
.collect::<AHashSet<_>>()
|
||||
{
|
||||
if thread_id != old_thread_id {
|
||||
for document_id in self
|
||||
.core
|
||||
.storage
|
||||
.data
|
||||
.get_bitmap(BitmapKey {
|
||||
account_id,
|
||||
collection: Collection::Email.into(),
|
||||
class: BitmapClass::Tag {
|
||||
field: Property::ThreadId.into(),
|
||||
value: TagValue::Id(old_thread_id),
|
||||
},
|
||||
document_id: 0,
|
||||
})
|
||||
.await
|
||||
.caused_by(trc::location!())?
|
||||
.unwrap_or_default()
|
||||
{
|
||||
batch
|
||||
.update_document(document_id)
|
||||
.assert_value(Property::ThreadId, old_thread_id)
|
||||
.untag(Property::ThreadId, old_thread_id)
|
||||
.tag(Property::ThreadId, thread_id)
|
||||
.set(Property::ThreadId, thread_id.serialize());
|
||||
changes.log_move(
|
||||
Collection::Email,
|
||||
Id::from_parts(old_thread_id, document_id),
|
||||
Id::from_parts(thread_id, document_id),
|
||||
);
|
||||
|
||||
for (&document_id, &old_thread_id) in &thread_cache.threads {
|
||||
if thread_id == old_thread_id || !thread_counts.contains_key(&old_thread_id) {
|
||||
continue;
|
||||
}
|
||||
if let Some(data_) = self
|
||||
.get_property::<Archive<AlignedBytes>>(
|
||||
account_id,
|
||||
Collection::Email,
|
||||
document_id,
|
||||
Property::Value,
|
||||
)
|
||||
.await
|
||||
.caused_by(trc::location!())?
|
||||
{
|
||||
let data = data_
|
||||
.to_unarchived::<MessageData>()
|
||||
.caused_by(trc::location!())?;
|
||||
if data.inner.thread_id != old_thread_id {
|
||||
continue;
|
||||
}
|
||||
let mut new_data = data.deserialize().caused_by(trc::location!())?;
|
||||
new_data.thread_id = thread_id;
|
||||
batch
|
||||
.update_document(document_id)
|
||||
.custom(
|
||||
ObjectIndexBuilder::new()
|
||||
.with_current(data)
|
||||
.with_changes(new_data),
|
||||
)
|
||||
.caused_by(trc::location!())?;
|
||||
changes.log_move(
|
||||
Collection::Email,
|
||||
Id::from_parts(old_thread_id, document_id),
|
||||
Id::from_parts(thread_id, document_id),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
batch.custom(changes).caused_by(trc::location!())?;
|
||||
|
||||
match self.core.storage.data.write(batch.build()).await {
|
||||
Ok(_) => return Ok(Some(thread_id)),
|
||||
Ok(_) => return Ok(thread_id),
|
||||
Err(err) if err.is_assertion_failure() && try_count < MAX_RETRIES => {
|
||||
let backoff = store::rand::rng().random_range(50..=300);
|
||||
tokio::time::sleep(Duration::from_millis(backoff)).await;
|
||||
|
|
@ -747,6 +800,20 @@ impl EmailIngest for Server {
|
|||
bayes.account_classify && access_token.has_permission(Permission::SpamFilterTrain)
|
||||
})
|
||||
}
|
||||
|
||||
async fn create_thread_id(&self, account_id: u32) -> trc::Result<u32> {
|
||||
let mut batch = BatchBuilder::new();
|
||||
batch
|
||||
.with_change_id(self.generate_snowflake_id().caused_by(trc::location!())?)
|
||||
.with_account_id(account_id)
|
||||
.with_collection(Collection::Thread)
|
||||
.create_document()
|
||||
.log(LogInsert());
|
||||
self.store()
|
||||
.write_expect_id(batch)
|
||||
.await
|
||||
.caused_by(trc::location!())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LogEmailInsert(Option<u32>);
|
||||
|
|
@ -790,3 +857,19 @@ impl From<IngestedEmail> for Object<Value> {
|
|||
.with_property(Property::Size, email.size)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
let thread_id = match thread_id {
|
||||
Some(thread_id) => thread_id,
|
||||
None => ids.first_document_id().caused_by(trc::location!())?,
|
||||
};
|
||||
|
||||
.with_collection(Collection::Thread);
|
||||
if let Some(thread_id) = thread_id {
|
||||
batch.log(Changes::update([thread_id]));
|
||||
} else {
|
||||
batch.create_document().log(LogInsert());
|
||||
|
||||
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -30,6 +30,7 @@ pub struct MessageData {
|
|||
pub mailboxes: Vec<UidMailbox>,
|
||||
pub keywords: Vec<Keyword>,
|
||||
pub change_id: u64,
|
||||
pub thread_id: u32,
|
||||
}
|
||||
|
||||
#[derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive, Debug)]
|
||||
|
|
|
|||
|
|
@ -9,22 +9,23 @@ use std::sync::Arc;
|
|||
use common::{Server, Threads};
|
||||
use jmap_proto::types::{collection::Collection, property::Property};
|
||||
use std::future::Future;
|
||||
use store::{
|
||||
BitmapKey, IterateParams, U32_LEN,
|
||||
ahash::AHashMap,
|
||||
write::{BitmapClass, TagValue, key::DeserializeBigEndian},
|
||||
};
|
||||
use trc::AddContext;
|
||||
use utils::codec::leb128::Leb128Reader;
|
||||
|
||||
pub trait ThreadCache: Sync + Send {
|
||||
fn get_cached_thread_ids(
|
||||
&self,
|
||||
account_id: u32,
|
||||
message_ids: impl Iterator<Item = u32> + Send,
|
||||
) -> impl Future<Output = trc::Result<Vec<(u32, u32)>>> + Send;
|
||||
) -> impl Future<Output = trc::Result<Arc<Threads>>> + Send;
|
||||
}
|
||||
|
||||
impl ThreadCache for Server {
|
||||
async fn get_cached_thread_ids(
|
||||
&self,
|
||||
account_id: u32,
|
||||
message_ids: impl Iterator<Item = u32> + Send,
|
||||
) -> trc::Result<Vec<(u32, u32)>> {
|
||||
async fn get_cached_thread_ids(&self, account_id: u32) -> trc::Result<Arc<Threads>> {
|
||||
// Obtain current state
|
||||
let modseq = self
|
||||
.core
|
||||
|
|
@ -35,35 +36,70 @@ impl ThreadCache for Server {
|
|||
.caused_by(trc::location!())?;
|
||||
|
||||
// Lock the cache
|
||||
let thread_cache = if let Some(thread_cache) =
|
||||
self.inner.cache.threads.get(&account_id).and_then(|t| {
|
||||
if t.modseq.unwrap_or(0) >= modseq.unwrap_or(0) {
|
||||
Some(t)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}) {
|
||||
thread_cache
|
||||
if let Some(thread_cache) = self.inner.cache.threads.get(&account_id).and_then(|t| {
|
||||
if t.modseq.unwrap_or(0) >= modseq.unwrap_or(0) {
|
||||
Some(t)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}) {
|
||||
Ok(thread_cache)
|
||||
} else {
|
||||
let thread_cache = Arc::new(Threads {
|
||||
threads: self
|
||||
.get_properties::<u32, _>(
|
||||
account_id,
|
||||
Collection::Email,
|
||||
&(),
|
||||
Property::ThreadId,
|
||||
let mut threads = AHashMap::new();
|
||||
self.core
|
||||
.storage
|
||||
.data
|
||||
.iterate(
|
||||
IterateParams::new(
|
||||
BitmapKey {
|
||||
account_id,
|
||||
collection: Collection::Email.into(),
|
||||
class: BitmapClass::Tag {
|
||||
field: Property::ThreadId.into(),
|
||||
value: TagValue::Id(0),
|
||||
},
|
||||
document_id: 0,
|
||||
},
|
||||
BitmapKey {
|
||||
account_id,
|
||||
collection: Collection::Email.into(),
|
||||
class: BitmapClass::Tag {
|
||||
field: Property::ThreadId.into(),
|
||||
value: TagValue::Id(u32::MAX),
|
||||
},
|
||||
document_id: u32::MAX,
|
||||
},
|
||||
)
|
||||
.await?
|
||||
.into_iter()
|
||||
.collect(),
|
||||
modseq,
|
||||
});
|
||||
.no_values(),
|
||||
|key, _| {
|
||||
let (thread_id, _) = key
|
||||
.get(U32_LEN + 2..)
|
||||
.and_then(|bytes| bytes.read_leb128::<u32>())
|
||||
.ok_or_else(|| {
|
||||
trc::Error::corrupted_key(key, None, trc::location!())
|
||||
})?;
|
||||
let document_id = key.deserialize_be_u32(key.len() - U32_LEN)?;
|
||||
|
||||
threads.insert(document_id, thread_id);
|
||||
|
||||
Ok(true)
|
||||
},
|
||||
)
|
||||
.await
|
||||
.caused_by(trc::location!())?;
|
||||
|
||||
let thread_cache = Arc::new(Threads { threads, modseq });
|
||||
self.inner
|
||||
.cache
|
||||
.threads
|
||||
.insert(account_id, thread_cache.clone());
|
||||
thread_cache
|
||||
};
|
||||
Ok(thread_cache)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
|
||||
// Obtain threadIds for matching messages
|
||||
let mut thread_ids = Vec::with_capacity(message_ids.size_hint().0);
|
||||
|
|
@ -74,5 +110,5 @@ impl ThreadCache for Server {
|
|||
}
|
||||
|
||||
Ok(thread_ids)
|
||||
}
|
||||
}
|
||||
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -189,8 +189,8 @@ impl<T: SessionStream> SessionData<T> {
|
|||
|
||||
for (id, imap_id) in ids {
|
||||
// Obtain mailbox tags
|
||||
let (data_, thread_id) = if let Some(result) = self
|
||||
.get_mailbox_tags(account_id, id)
|
||||
let data_ = if let Some(result) = self
|
||||
.get_message_data(account_id, id)
|
||||
.await
|
||||
.imap_ctx(&arguments.tag, trc::location!())?
|
||||
{
|
||||
|
|
@ -230,6 +230,7 @@ impl<T: SessionStream> SessionData<T> {
|
|||
.imap_ctx(&arguments.tag, trc::location!())?;
|
||||
}
|
||||
new_data.change_id = changelog.change_id;
|
||||
let thread_id = new_data.thread_id;
|
||||
|
||||
// Add destination folder
|
||||
new_data.add_mailbox(dest_mailbox_id);
|
||||
|
|
@ -485,26 +486,22 @@ impl<T: SessionStream> SessionData<T> {
|
|||
self.write_bytes(response).await
|
||||
}
|
||||
|
||||
pub async fn get_mailbox_tags(
|
||||
pub async fn get_message_data(
|
||||
&self,
|
||||
account_id: u32,
|
||||
id: u32,
|
||||
) -> trc::Result<Option<(Archive<AlignedBytes>, u32)>> {
|
||||
// Obtain mailbox tags
|
||||
if let (Some(mailboxes), Some(thread_id)) = (
|
||||
self.server
|
||||
.get_property::<Archive<AlignedBytes>>(
|
||||
account_id,
|
||||
Collection::Email,
|
||||
id,
|
||||
Property::Value,
|
||||
)
|
||||
.await?,
|
||||
self.server
|
||||
.get_property::<u32>(account_id, Collection::Email, id, Property::ThreadId)
|
||||
.await?,
|
||||
) {
|
||||
Ok(Some((mailboxes, thread_id)))
|
||||
) -> trc::Result<Option<Archive<AlignedBytes>>> {
|
||||
if let Some(data) = self
|
||||
.server
|
||||
.get_property::<Archive<AlignedBytes>>(
|
||||
account_id,
|
||||
Collection::Email,
|
||||
id,
|
||||
Property::Value,
|
||||
)
|
||||
.await?
|
||||
{
|
||||
Ok(Some(data))
|
||||
} else {
|
||||
trc::event!(
|
||||
Store(trc::StoreEvent::NotFound),
|
||||
|
|
|
|||
|
|
@ -212,24 +212,13 @@ impl<T: SessionStream> SessionData<T> {
|
|||
continue;
|
||||
}
|
||||
|
||||
// Remove deleted flag
|
||||
let thread_id = if let Some(thread_id) = self
|
||||
.server
|
||||
.get_property::<u32>(account_id, Collection::Email, id, Property::ThreadId)
|
||||
.await
|
||||
.caused_by(trc::location!())?
|
||||
{
|
||||
thread_id
|
||||
} else {
|
||||
continue;
|
||||
};
|
||||
|
||||
// Prepare changes
|
||||
let mut new_data = data.deserialize().caused_by(trc::location!())?;
|
||||
if changelog.change_id == u64::MAX {
|
||||
changelog.change_id = self.server.assign_change_id(account_id)?
|
||||
}
|
||||
new_data.change_id = changelog.change_id;
|
||||
let thread_id = new_data.thread_id;
|
||||
|
||||
// Untag message from this mailbox and remove Deleted flag
|
||||
new_data.remove_mailbox(mailbox_id);
|
||||
|
|
|
|||
|
|
@ -233,7 +233,6 @@ impl<T: SessionStream> SessionData<T> {
|
|||
|
||||
// Build properties list
|
||||
let mut set_seen_flags = false;
|
||||
let mut needs_thread_id = false;
|
||||
let mut needs_blobs = false;
|
||||
|
||||
for attribute in &arguments.attributes {
|
||||
|
|
@ -264,9 +263,6 @@ impl<T: SessionStream> SessionData<T> {
|
|||
}
|
||||
needs_blobs = true;
|
||||
}
|
||||
Attribute::ThreadId => {
|
||||
needs_thread_id = true;
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
|
|
@ -389,20 +385,7 @@ impl<T: SessionStream> SessionData<T> {
|
|||
.keywords
|
||||
.iter()
|
||||
.any(|k| k == &ArchivedKeyword::Seen);
|
||||
let thread_id = if needs_thread_id || set_seen_flag {
|
||||
if let Some(thread_id) = self
|
||||
.server
|
||||
.get_property::<u32>(account_id, Collection::Email, id, Property::ThreadId)
|
||||
.await
|
||||
.imap_ctx(&arguments.tag, trc::location!())?
|
||||
{
|
||||
thread_id
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
0
|
||||
};
|
||||
|
||||
for attribute in &arguments.attributes {
|
||||
match attribute {
|
||||
Attribute::Envelope => {
|
||||
|
|
@ -538,7 +521,8 @@ impl<T: SessionStream> SessionData<T> {
|
|||
}
|
||||
Attribute::ThreadId => {
|
||||
items.push(DataItem::ThreadId {
|
||||
thread_id: Id::from_parts(account_id, thread_id).to_string(),
|
||||
thread_id: Id::from_parts(account_id, u32::from(data.inner.thread_id))
|
||||
.to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
@ -568,6 +552,7 @@ impl<T: SessionStream> SessionData<T> {
|
|||
.imap_ctx(&arguments.tag, trc::location!())?;
|
||||
new_data.keywords.push(Keyword::Seen);
|
||||
new_data.change_id = change_id;
|
||||
let thread_id = new_data.thread_id;
|
||||
|
||||
let mut batch = BatchBuilder::new();
|
||||
batch
|
||||
|
|
|
|||
|
|
@ -200,23 +200,19 @@ impl<T: SessionStream> SessionData<T> {
|
|||
'outer: for (id, imap_id) in &ids {
|
||||
let mut try_count = 0;
|
||||
loop {
|
||||
// Obtain current keywords
|
||||
let (data_, thread_id) = if let (Some(data), Some(thread_id)) = (
|
||||
self.server
|
||||
.get_property::<Archive<AlignedBytes>>(
|
||||
account_id,
|
||||
Collection::Email,
|
||||
*id,
|
||||
Property::Value,
|
||||
)
|
||||
.await
|
||||
.imap_ctx(response.tag.as_ref().unwrap(), trc::location!())?,
|
||||
self.server
|
||||
.get_property::<u32>(account_id, Collection::Email, *id, Property::ThreadId)
|
||||
.await
|
||||
.imap_ctx(response.tag.as_ref().unwrap(), trc::location!())?,
|
||||
) {
|
||||
(data, thread_id)
|
||||
// Obtain message data
|
||||
let data_ = if let Some(data) = self
|
||||
.server
|
||||
.get_property::<Archive<AlignedBytes>>(
|
||||
account_id,
|
||||
Collection::Email,
|
||||
*id,
|
||||
Property::Value,
|
||||
)
|
||||
.await
|
||||
.imap_ctx(response.tag.as_ref().unwrap(), trc::location!())?
|
||||
{
|
||||
data
|
||||
} else {
|
||||
continue 'outer;
|
||||
};
|
||||
|
|
@ -228,6 +224,7 @@ impl<T: SessionStream> SessionData<T> {
|
|||
let mut new_data = data
|
||||
.deserialize()
|
||||
.imap_ctx(response.tag.as_ref().unwrap(), trc::location!())?;
|
||||
let thread_id = new_data.thread_id;
|
||||
|
||||
// Apply changes
|
||||
let mut seen_changed = false;
|
||||
|
|
|
|||
|
|
@ -80,18 +80,20 @@ impl<T: SessionStream> SessionData<T> {
|
|||
}
|
||||
|
||||
// Lock the cache
|
||||
let thread_ids = self
|
||||
let thread_cache = self
|
||||
.server
|
||||
.get_cached_thread_ids(mailbox.id.account_id, result_set.results.iter())
|
||||
.get_cached_thread_ids(mailbox.id.account_id)
|
||||
.await
|
||||
.caused_by(trc::location!())?;
|
||||
|
||||
// Group messages by thread
|
||||
let mut threads: AHashMap<u32, Vec<u32>> = AHashMap::new();
|
||||
let state = mailbox.state.lock();
|
||||
for (document_id, thread_id) in thread_ids {
|
||||
if let Some((imap_id, _)) = state.map_result_id(document_id, is_uid) {
|
||||
threads.entry(thread_id).or_default().push(imap_id);
|
||||
for (document_id, thread_id) in &thread_cache.threads {
|
||||
if result_set.results.contains(*document_id) {
|
||||
if let Some((imap_id, _)) = state.map_result_id(*document_id, is_uid) {
|
||||
threads.entry(*thread_id).or_default().push(imap_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -216,46 +216,37 @@ impl BlobOperations for Server {
|
|||
} if *account_id == req_account_id => {
|
||||
let collection = Collection::from(*collection);
|
||||
if collection == Collection::Email {
|
||||
if include_email || include_thread {
|
||||
if let Some(thread_id) = self
|
||||
.get_property::<u32>(
|
||||
req_account_id,
|
||||
Collection::Email,
|
||||
*document_id,
|
||||
Property::ThreadId,
|
||||
)
|
||||
.await?
|
||||
{
|
||||
if include_email {
|
||||
matched_ids.append(
|
||||
DataType::Email,
|
||||
vec![Id::from_parts(thread_id, *document_id)],
|
||||
);
|
||||
}
|
||||
if include_thread {
|
||||
matched_ids.append(
|
||||
DataType::Thread,
|
||||
vec![Id::from(thread_id)],
|
||||
);
|
||||
}
|
||||
if let Some(data_) = self
|
||||
.get_property::<Archive<AlignedBytes>>(
|
||||
req_account_id,
|
||||
Collection::Email,
|
||||
*document_id,
|
||||
Property::Value,
|
||||
)
|
||||
.await?
|
||||
{
|
||||
let data = data_
|
||||
.unarchive::<MessageData>()
|
||||
.caused_by(trc::location!())?;
|
||||
if include_email {
|
||||
matched_ids.append(
|
||||
DataType::Email,
|
||||
vec![Id::from_parts(
|
||||
u32::from(data.thread_id),
|
||||
*document_id,
|
||||
)],
|
||||
);
|
||||
}
|
||||
}
|
||||
if include_mailbox {
|
||||
if let Some(mailboxes) = self
|
||||
.get_property::<Archive<AlignedBytes>>(
|
||||
req_account_id,
|
||||
Collection::Email,
|
||||
*document_id,
|
||||
Property::Value,
|
||||
)
|
||||
.await?
|
||||
{
|
||||
if include_thread {
|
||||
matched_ids.append(
|
||||
DataType::Thread,
|
||||
vec![Id::from(u32::from(data.thread_id))],
|
||||
);
|
||||
}
|
||||
if include_mailbox {
|
||||
matched_ids.append(
|
||||
DataType::Mailbox,
|
||||
mailboxes
|
||||
.unarchive::<MessageData>()
|
||||
.caused_by(trc::location!())?
|
||||
.mailboxes
|
||||
data.mailboxes
|
||||
.iter()
|
||||
.map(|m| {
|
||||
debug_assert!(m.uid != 0);
|
||||
|
|
|
|||
|
|
@ -118,17 +118,13 @@ impl EmailGet for Server {
|
|||
let ids = if let Some(ids) = ids {
|
||||
ids
|
||||
} else {
|
||||
let document_ids = message_ids
|
||||
.iter()
|
||||
.take(self.core.jmap.get_max_objects)
|
||||
.collect::<Vec<_>>();
|
||||
self.get_cached_thread_ids(account_id, document_ids.iter().copied())
|
||||
self.get_cached_thread_ids(account_id)
|
||||
.await
|
||||
.caused_by(trc::location!())?
|
||||
.into_iter()
|
||||
.filter_map(|(document_id, thread_id)| {
|
||||
Id::from_parts(thread_id, document_id).into()
|
||||
})
|
||||
.threads
|
||||
.iter()
|
||||
.take(self.core.jmap.get_max_objects)
|
||||
.map(|(document_id, thread_id)| Id::from_parts(*thread_id, *document_id))
|
||||
.collect()
|
||||
};
|
||||
let mut response = GetResponse {
|
||||
|
|
|
|||
|
|
@ -15,11 +15,10 @@ use mail_parser::HeaderName;
|
|||
use nlp::language::Language;
|
||||
use std::future::Future;
|
||||
use store::{
|
||||
SerializeInfallible, ValueKey,
|
||||
SerializeInfallible,
|
||||
fts::{Field, FilterGroup, FtsFilter, IntoFilterGroup},
|
||||
query::{self},
|
||||
roaring::RoaringBitmap,
|
||||
write::ValueClass,
|
||||
};
|
||||
|
||||
use crate::JmapMethods;
|
||||
|
|
@ -367,16 +366,12 @@ impl EmailQuery for Server {
|
|||
}
|
||||
|
||||
// Sort results
|
||||
let thread_cache = self.get_cached_thread_ids(account_id).await?;
|
||||
self.sort(
|
||||
result_set,
|
||||
comparators,
|
||||
paginate
|
||||
.with_prefix_key(ValueKey {
|
||||
account_id,
|
||||
collection: Collection::Email.into(),
|
||||
document_id: 0,
|
||||
class: ValueClass::Property(Property::ThreadId.into()),
|
||||
})
|
||||
.with_prefix_map(&thread_cache.threads)
|
||||
.with_prefix_unique(request.arguments.collapse_threads.unwrap_or(false)),
|
||||
response,
|
||||
)
|
||||
|
|
@ -399,15 +394,15 @@ impl EmailQuery for Server {
|
|||
if keyword_doc_ids.is_empty() {
|
||||
return Ok(keyword_doc_ids);
|
||||
}
|
||||
let keyword_thread_ids = self
|
||||
.get_cached_thread_ids(account_id, keyword_doc_ids.iter())
|
||||
.await?;
|
||||
|
||||
let thread_cache = self.get_cached_thread_ids(account_id).await?;
|
||||
let mut not_matched_ids = RoaringBitmap::new();
|
||||
let mut matched_ids = RoaringBitmap::new();
|
||||
|
||||
for (keyword_doc_id, thread_id) in keyword_thread_ids {
|
||||
if matched_ids.contains(keyword_doc_id) || not_matched_ids.contains(keyword_doc_id) {
|
||||
for (&keyword_doc_id, &thread_id) in thread_cache.threads.iter() {
|
||||
if !keyword_doc_ids.contains(keyword_doc_id)
|
||||
|| matched_ids.contains(keyword_doc_id)
|
||||
|| not_matched_ids.contains(keyword_doc_id)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -145,7 +145,7 @@ impl JmapMethods for Server {
|
|||
&self,
|
||||
result_set: ResultSet,
|
||||
comparators: Vec<Comparator>,
|
||||
paginate: Pagination,
|
||||
paginate: Pagination<'_>,
|
||||
mut response: QueryResponse,
|
||||
) -> trc::Result<QueryResponse> {
|
||||
// Sort results
|
||||
|
|
|
|||
|
|
@ -9,14 +9,11 @@ use std::cmp::Ordering;
|
|||
use ahash::{AHashMap, AHashSet};
|
||||
use trc::AddContext;
|
||||
|
||||
use crate::{
|
||||
write::{key::DeserializeBigEndian, ValueClass},
|
||||
IndexKeyPrefix, IterateParams, Store, ValueKey, U32_LEN,
|
||||
};
|
||||
use crate::{IndexKeyPrefix, IterateParams, Store, U32_LEN, write::key::DeserializeBigEndian};
|
||||
|
||||
use super::{Comparator, ResultSet, SortedResultSet};
|
||||
|
||||
pub struct Pagination {
|
||||
pub struct Pagination<'x> {
|
||||
requested_position: i32,
|
||||
position: i32,
|
||||
pub limit: usize,
|
||||
|
|
@ -25,7 +22,7 @@ pub struct Pagination {
|
|||
has_anchor: bool,
|
||||
anchor_found: bool,
|
||||
pub ids: Vec<u64>,
|
||||
prefix_key: Option<ValueKey<ValueClass<u32>>>,
|
||||
prefix_map: Option<&'x AHashMap<u32, u32>>,
|
||||
prefix_unique: bool,
|
||||
}
|
||||
|
||||
|
|
@ -34,7 +31,7 @@ impl Store {
|
|||
&self,
|
||||
result_set: ResultSet,
|
||||
mut comparators: Vec<Comparator>,
|
||||
mut paginate: Pagination,
|
||||
mut paginate: Pagination<'_>,
|
||||
) -> trc::Result<SortedResultSet> {
|
||||
paginate.limit = match (result_set.results.len(), paginate.limit) {
|
||||
(0, _) => {
|
||||
|
|
@ -105,16 +102,12 @@ impl Store {
|
|||
}
|
||||
|
||||
// Obtain prefixes
|
||||
let prefix_key = paginate.prefix_key.take();
|
||||
let prefix_map = paginate.prefix_map.take();
|
||||
let mut sorted_results = paginate.build();
|
||||
if let Some(prefix_key) = prefix_key {
|
||||
if let Some(prefix_map) = prefix_map {
|
||||
for id in sorted_results.ids.iter_mut() {
|
||||
if let Some(prefix_id) = self
|
||||
.get_value::<u32>(prefix_key.clone().with_document_id(*id as u32))
|
||||
.await
|
||||
.caused_by(trc::location!())?
|
||||
{
|
||||
*id |= (prefix_id as u64) << 32;
|
||||
if let Some(prefix_id) = prefix_map.get(&(*id as u32)) {
|
||||
*id |= (*prefix_id as u64) << 32;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -213,16 +206,12 @@ impl Store {
|
|||
});
|
||||
for (document_id, _) in sorted_ids {
|
||||
// Obtain document prefixId
|
||||
let prefix_id = if let Some(prefix_key) = &paginate.prefix_key {
|
||||
if let Some(prefix_id) = self
|
||||
.get_value(prefix_key.clone().with_document_id(document_id))
|
||||
.await
|
||||
.caused_by(trc::location!())?
|
||||
{
|
||||
if paginate.prefix_unique && !seen_prefixes.insert(prefix_id) {
|
||||
let prefix_id = if let Some(prefix_map) = paginate.prefix_map {
|
||||
if let Some(prefix_id) = prefix_map.get(&document_id) {
|
||||
if paginate.prefix_unique && !seen_prefixes.insert(*prefix_id) {
|
||||
continue;
|
||||
}
|
||||
prefix_id
|
||||
*prefix_id
|
||||
} else {
|
||||
// Document no longer exists?
|
||||
continue;
|
||||
|
|
@ -242,16 +231,12 @@ impl Store {
|
|||
let mut seen_prefixes = AHashSet::new();
|
||||
for document_id in result_set.results {
|
||||
// Obtain document prefixId
|
||||
let prefix_id = if let Some(prefix_key) = &paginate.prefix_key {
|
||||
if let Some(prefix_id) = self
|
||||
.get_value(prefix_key.clone().with_document_id(document_id))
|
||||
.await
|
||||
.caused_by(trc::location!())?
|
||||
{
|
||||
if paginate.prefix_unique && !seen_prefixes.insert(prefix_id) {
|
||||
let prefix_id = if let Some(prefix_map) = paginate.prefix_map {
|
||||
if let Some(prefix_id) = prefix_map.get(&document_id) {
|
||||
if paginate.prefix_unique && !seen_prefixes.insert(*prefix_id) {
|
||||
continue;
|
||||
}
|
||||
prefix_id
|
||||
*prefix_id
|
||||
} else {
|
||||
// Document no longer exists?
|
||||
continue;
|
||||
|
|
@ -270,7 +255,7 @@ impl Store {
|
|||
}
|
||||
}
|
||||
|
||||
impl Pagination {
|
||||
impl<'x> Pagination<'x> {
|
||||
pub fn new(limit: usize, position: i32, anchor: Option<u32>, anchor_offset: i32) -> Self {
|
||||
let (has_anchor, anchor) = anchor.map(|anchor| (true, anchor)).unwrap_or((false, 0));
|
||||
|
||||
|
|
@ -283,13 +268,13 @@ impl Pagination {
|
|||
has_anchor,
|
||||
anchor_found: false,
|
||||
ids: Vec::with_capacity(limit),
|
||||
prefix_key: None,
|
||||
prefix_map: None,
|
||||
prefix_unique: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_prefix_key(mut self, prefix_key: ValueKey<ValueClass<u32>>) -> Self {
|
||||
self.prefix_key = Some(prefix_key);
|
||||
pub fn with_prefix_map(mut self, prefix_map: &'x AHashMap<u32, u32>) -> Self {
|
||||
self.prefix_map = Some(prefix_map);
|
||||
self
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -5,17 +5,17 @@
|
|||
*/
|
||||
|
||||
use std::convert::TryInto;
|
||||
use utils::{codec::leb128::Leb128_, BLOB_HASH_LEN};
|
||||
use utils::{BLOB_HASH_LEN, codec::leb128::Leb128_};
|
||||
|
||||
use crate::{
|
||||
BitmapKey, Deserialize, IndexKey, IndexKeyPrefix, Key, LogKey, ValueKey, SUBSPACE_ACL,
|
||||
BitmapKey, Deserialize, IndexKey, IndexKeyPrefix, Key, LogKey, SUBSPACE_ACL,
|
||||
SUBSPACE_BITMAP_ID, SUBSPACE_BITMAP_TAG, SUBSPACE_BITMAP_TEXT, SUBSPACE_BLOB_LINK,
|
||||
SUBSPACE_BLOB_RESERVE, SUBSPACE_COUNTER, SUBSPACE_DIRECTORY, SUBSPACE_FTS_INDEX,
|
||||
SUBSPACE_INDEXES, SUBSPACE_IN_MEMORY_COUNTER, SUBSPACE_IN_MEMORY_VALUE, SUBSPACE_LOGS,
|
||||
SUBSPACE_IN_MEMORY_COUNTER, SUBSPACE_IN_MEMORY_VALUE, SUBSPACE_INDEXES, SUBSPACE_LOGS,
|
||||
SUBSPACE_PROPERTY, SUBSPACE_QUEUE_EVENT, SUBSPACE_QUEUE_MESSAGE, SUBSPACE_QUOTA,
|
||||
SUBSPACE_REPORT_IN, SUBSPACE_REPORT_OUT, SUBSPACE_SETTINGS, SUBSPACE_TASK_QUEUE,
|
||||
SUBSPACE_TELEMETRY_INDEX, SUBSPACE_TELEMETRY_METRIC, SUBSPACE_TELEMETRY_SPAN, U32_LEN, U64_LEN,
|
||||
WITH_SUBSPACE,
|
||||
ValueKey, WITH_SUBSPACE,
|
||||
};
|
||||
|
||||
use super::{
|
||||
|
|
|
|||
|
|
@ -4,6 +4,8 @@
|
|||
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
|
||||
*/
|
||||
|
||||
use ::email::message::{ingest::EmailIngest, metadata::MessageData};
|
||||
use common::storage::index::ObjectIndexBuilder;
|
||||
use jmap_client::{
|
||||
core::query::{Comparator, Filter},
|
||||
email,
|
||||
|
|
@ -13,7 +15,7 @@ use jmap_proto::types::{collection::Collection, id::Id, property::Property, stat
|
|||
|
||||
use store::{
|
||||
ahash::{AHashMap, AHashSet},
|
||||
write::{BatchBuilder, MaybeDynamicId, TagValue, log::ChangeLogBuilder},
|
||||
write::{AlignedBytes, Archive, BatchBuilder, log::ChangeLogBuilder},
|
||||
};
|
||||
|
||||
use crate::jmap::{
|
||||
|
|
@ -131,6 +133,23 @@ pub async fn test(params: &mut JMAPTest) {
|
|||
LogAction::Move(from, to) => {
|
||||
let id = *id_map.get(from).unwrap();
|
||||
let new_id = Id::from_parts(thread_id, id.document_id());
|
||||
|
||||
let new_thread_id = server.create_thread_id(1).await.unwrap();
|
||||
|
||||
let old_message_ = server
|
||||
.get_property::<Archive<AlignedBytes>>(
|
||||
1,
|
||||
Collection::Email,
|
||||
id.document_id(),
|
||||
Property::Value,
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let old_message = old_message_.to_unarchived::<MessageData>().unwrap();
|
||||
let mut new_message = old_message.deserialize().unwrap();
|
||||
new_message.thread_id = new_thread_id;
|
||||
|
||||
server
|
||||
.core
|
||||
.storage
|
||||
|
|
@ -142,9 +161,12 @@ pub async fn test(params: &mut JMAPTest) {
|
|||
.create_document()
|
||||
.with_collection(Collection::Email)
|
||||
.update_document(id.document_id())
|
||||
.untag(Property::ThreadId, id.prefix_id())
|
||||
.set(Property::ThreadId, MaybeDynamicId::Dynamic(0))
|
||||
.tag(Property::ThreadId, TagValue::Id(MaybeDynamicId::Dynamic(0)))
|
||||
.custom(
|
||||
ObjectIndexBuilder::new()
|
||||
.with_current(old_message)
|
||||
.with_changes(new_message),
|
||||
)
|
||||
.unwrap()
|
||||
.custom(server.begin_changes(1).unwrap().with_log_move(
|
||||
Collection::Email,
|
||||
id,
|
||||
|
|
|
|||
|
|
@ -374,8 +374,8 @@ pub async fn jmap_tests() {
|
|||
.await;
|
||||
|
||||
webhooks::test(&mut params).await;
|
||||
/*email_query::test(&mut params, delete).await;
|
||||
email_get::test(&mut params).await;
|
||||
email_query::test(&mut params, delete).await;
|
||||
/*email_get::test(&mut params).await;
|
||||
email_set::test(&mut params).await;
|
||||
email_parse::test(&mut params).await;
|
||||
email_search_snippet::test(&mut params).await;
|
||||
|
|
@ -386,7 +386,7 @@ pub async fn jmap_tests() {
|
|||
thread_merge::test(&mut params).await;
|
||||
mailbox::test(&mut params).await;
|
||||
delivery::test(&mut params).await;
|
||||
auth_acl::test(&mut params).await;
|
||||
auth_acl::test(&mut params).await;*/
|
||||
auth_limits::test(&mut params).await;
|
||||
auth_oauth::test(&mut params).await;
|
||||
event_source::test(&mut params).await;
|
||||
|
|
@ -397,7 +397,7 @@ pub async fn jmap_tests() {
|
|||
websocket::test(&mut params).await;
|
||||
quota::test(&mut params).await;
|
||||
crypto::test(&mut params).await;
|
||||
blob::test(&mut params).await;*/
|
||||
blob::test(&mut params).await;
|
||||
permissions::test(¶ms).await;
|
||||
purge::test(&mut params).await;
|
||||
enterprise::test(&mut params).await;
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue