Improved cache memory layout

This commit is contained in:
mdecimus 2025-04-13 08:40:40 +02:00
parent 15651c26a1
commit 0568456d29
51 changed files with 716 additions and 590 deletions

View file

@ -23,7 +23,7 @@ use utils::{
};
use crate::{
CacheSwap, Caches, Data, DavResource, DavResources, MailboxCache, MessageItemCache,
CacheSwap, Caches, Data, DavResource, DavResources, MailboxCache, MailboxStoreCache,
MessageStoreCache, MessageUidCache, TlsConnectors,
auth::{AccessToken, roles::RolePermissions},
config::smtp::resolver::{Policy, Tlsa},
@ -112,7 +112,7 @@ impl Caches {
"mailbox",
MB_10,
(std::mem::size_of::<u32>()
+ std::mem::size_of::<CacheSwap<MessageStoreCache<MailboxCache>>>()
+ std::mem::size_of::<CacheSwap<MailboxStoreCache>>()
+ (15 * (std::mem::size_of::<MailboxCache>() + 60))) as u64,
),
messages: Cache::from_config(
@ -120,7 +120,7 @@ impl Caches {
"message",
MB_10,
(std::mem::size_of::<u32>()
+ std::mem::size_of::<CacheSwap<MessageStoreCache<MessageItemCache>>>()
+ std::mem::size_of::<CacheSwap<MessageStoreCache>>()
+ (1024 * std::mem::size_of::<MessageUidCache>())) as u64,
),
dav: Cache::from_config(

View file

@ -531,11 +531,18 @@ impl Server {
})?;
for collection in [
Collection::Email,
Collection::Mailbox,
Collection::Thread,
Collection::Identity,
Collection::EmailSubmission,
Collection::Email.into(),
Collection::Mailbox.into(),
Collection::Mailbox.as_child_update(),
Collection::Thread.into(),
Collection::Identity.into(),
Collection::EmailSubmission.into(),
Collection::SieveScript.into(),
Collection::FileNode.into(),
Collection::AddressBook.into(),
Collection::ContactCard.into(),
Collection::Calendar.into(),
Collection::CalendarEvent.into(),
] {
self.core
.storage
@ -543,12 +550,12 @@ impl Server {
.delete_range(
LogKey {
account_id,
collection: collection.into(),
collection,
change_id: 0,
},
LogKey {
account_id,
collection: collection.into(),
collection,
change_id: reference_cid,
},
)

View file

@ -34,7 +34,7 @@ use config::{
};
use ipc::{HousekeeperEvent, QueueEvent, ReportingEvent, StateEvent};
use jmap_proto::types::{keyword::Keyword, value::AclGrant};
use jmap_proto::types::value::AclGrant;
use listener::{asn::AsnGeoLookupData, blocked::Security, tls::AcmeProviders};
use mail_auth::{MX, Txt};
@ -145,8 +145,8 @@ pub struct Caches {
pub http_auth: Cache<String, HttpAuthCache>,
pub permissions: Cache<u32, Arc<RolePermissions>>,
pub messages: Cache<u32, CacheSwap<MessageStoreCache<MessageItemCache>>>,
pub mailboxes: Cache<u32, CacheSwap<MessageStoreCache<MailboxCache>>>,
pub messages: Cache<u32, CacheSwap<MessageStoreCache>>,
pub mailboxes: Cache<u32, CacheSwap<MailboxStoreCache>>,
pub dav: Cache<DavResourceId, Arc<DavResources>>,
pub bayes: CacheWithTtl<TokenHash, Weights>,
@ -165,17 +165,29 @@ pub struct Caches {
pub struct CacheSwap<T>(pub Arc<ArcSwap<T>>);
#[derive(Debug, Clone)]
pub struct MessageStoreCache<T> {
pub struct MailboxStoreCache {
pub change_id: u64,
pub items: AHashMap<u32, T>,
pub index: AHashMap<u32, u32>,
pub items: Vec<MailboxCache>,
pub update_lock: Arc<Semaphore>,
pub size: u64,
}
#[derive(Debug, Clone)]
pub struct MessageItemCache {
pub struct MessageStoreCache {
pub change_id: u64,
pub items: Vec<MessageCache>,
pub index: AHashMap<u32, u32>,
pub keywords: Vec<CompactString>,
pub update_lock: Arc<Semaphore>,
pub size: u64,
}
#[derive(Debug, Clone)]
pub struct MessageCache {
pub document_id: u32,
pub mailboxes: TinyVec<[MessageUidCache; 2]>,
pub keywords: TinyVec<[Keyword; 2]>,
pub keywords: u128,
pub thread_id: u32,
pub change_id: u64,
}
@ -188,6 +200,7 @@ pub struct MessageUidCache {
#[derive(Debug, Clone)]
pub struct MailboxCache {
pub document_id: u32,
pub name: CompactString,
pub path: CompactString,
pub role: SpecialUse,
@ -272,7 +285,13 @@ impl<T: CacheItemWeight> CacheItemWeight for CacheSwap<T> {
}
}
impl<T> CacheItemWeight for MessageStoreCache<T> {
impl CacheItemWeight for MessageStoreCache {
fn weight(&self) -> u64 {
self.size
}
}
impl CacheItemWeight for MailboxStoreCache {
fn weight(&self) -> u64 {
self.size
}
@ -515,7 +534,7 @@ impl std::borrow::Borrow<u32> for DavResource {
}
}
impl MessageStoreCache<MessageItemCache> {
impl MessageStoreCache {
pub fn assign_thread_id(&self, thread_name: &[u8], message_id: &[u8]) -> u32 {
let mut bytes = Vec::with_capacity(thread_name.len() + message_id.len());
bytes.extend_from_slice(thread_name);
@ -529,7 +548,7 @@ impl MessageStoreCache<MessageItemCache> {
// Naive pass, assume hash is unique
let mut threads_ids = RoaringBitmap::new();
let mut is_unique_hash = true;
for item in self.items.values() {
for item in self.items.iter() {
if is_unique_hash && item.thread_id != hash {
is_unique_hash = false;
}

View file

@ -5,7 +5,7 @@
*/
use ahash::AHashSet;
use jmap_proto::types::{collection::Collection, property::Property, value::AclGrant};
use jmap_proto::types::{property::Property, value::AclGrant};
use rkyv::{
option::ArchivedOption,
primitive::{ArchivedU32, ArchivedU64},
@ -44,7 +44,7 @@ pub enum IndexValue<'x> {
prefix: Option<u32>,
},
LogParent {
collection: Collection,
collection: u8,
ids: Vec<u32>,
},
Acl {
@ -380,7 +380,7 @@ fn build_index(batch: &mut BatchBuilder, item: IndexValue<'_>, tenant_id: Option
}
IndexValue::LogParent { collection, ids } => {
for parent_id in ids {
batch.log_child_update(collection, parent_id);
batch.log_parent_update(collection, parent_id);
}
}
}
@ -526,12 +526,12 @@ fn merge_index(
) => {
for parent_id in &old_ids {
if !new_ids.contains(parent_id) {
batch.log_child_update(collection, *parent_id);
batch.log_parent_update(collection, *parent_id);
}
}
for parent_id in new_ids {
if !old_ids.contains(&parent_id) {
batch.log_child_update(collection, parent_id);
batch.log_parent_update(collection, parent_id);
}
}
}

View file

@ -7,14 +7,14 @@
use std::sync::Arc;
use common::{
CacheSwap, MailboxCache, MessageStoreCache, Server, auth::AccessToken,
CacheSwap, MailboxCache, MailboxStoreCache, Server, auth::AccessToken,
config::jmap::settings::SpecialUse, sharing::EffectiveAcl,
};
use compact_str::CompactString;
use jmap_proto::types::{acl::Acl, collection::Collection, value::AclGrant};
use std::future::Future;
use store::{
ahash::AHashMap,
ahash::{AHashMap, AHashSet},
query::log::{Change, Query},
roaring::RoaringBitmap,
};
@ -28,14 +28,11 @@ pub trait MessageMailboxCache: Sync + Send {
fn get_cached_mailboxes(
&self,
account_id: u32,
) -> impl Future<Output = trc::Result<Arc<MessageStoreCache<MailboxCache>>>> + Send;
) -> impl Future<Output = trc::Result<Arc<MailboxStoreCache>>> + Send;
}
impl MessageMailboxCache for Server {
async fn get_cached_mailboxes(
&self,
account_id: u32,
) -> trc::Result<Arc<MessageStoreCache<MailboxCache>>> {
async fn get_cached_mailboxes(&self, account_id: u32) -> trc::Result<Arc<MailboxStoreCache>> {
let cache_ = match self
.inner
.cache
@ -98,9 +95,14 @@ impl MessageMailboxCache for Server {
return Ok(cache);
}
let mut has_changes = false;
let mut cache = cache.as_ref().clone();
cache.change_id = changes.to_change_id;
let mut changed_ids = AHashSet::with_capacity(changes.changes.len());
let mut new_cache = MailboxStoreCache {
items: Vec::with_capacity(cache.items.len()),
index: AHashMap::with_capacity(cache.items.len()),
size: 0,
change_id: changes.to_change_id,
update_lock: cache.update_lock.clone(),
};
for change in changes.changes {
match change {
@ -111,24 +113,30 @@ impl MessageMailboxCache for Server {
.await
.caused_by(trc::location!())?
{
insert_item(&mut cache, document_id, archive.unarchive::<Mailbox>()?);
has_changes = true;
insert_item(&mut new_cache, document_id, archive.unarchive::<Mailbox>()?);
changed_ids.insert(document_id);
}
}
Change::Delete(id) => {
if cache.items.remove(&(id as u32)).is_some() {
has_changes = true;
}
changed_ids.insert(id as u32);
}
Change::ChildUpdate(_) => {}
}
}
if has_changes {
build_tree(&mut cache);
for item in cache.items.iter() {
if !changed_ids.contains(&item.document_id) {
new_cache.insert(item.clone());
}
}
let cache = Arc::new(cache);
build_tree(&mut new_cache);
if cache.items.len() > new_cache.items.len() {
new_cache.items.shrink_to_fit();
new_cache.index.shrink_to_fit();
}
let cache = Arc::new(new_cache);
cache_.update(cache.clone());
Ok(cache)
@ -139,10 +147,11 @@ async fn full_cache_build(
server: &Server,
account_id: u32,
update_lock: Arc<Semaphore>,
) -> trc::Result<Arc<MessageStoreCache<MailboxCache>>> {
) -> trc::Result<Arc<MailboxStoreCache>> {
// Build cache
let mut cache = MessageStoreCache {
items: AHashMap::with_capacity(16),
let mut cache = MailboxStoreCache {
items: Default::default(),
index: Default::default(),
size: 0,
change_id: 0,
update_lock,
@ -185,13 +194,10 @@ async fn full_cache_build(
Ok(Arc::new(cache))
}
fn insert_item(
cache: &mut MessageStoreCache<MailboxCache>,
document_id: u32,
mailbox: &ArchivedMailbox,
) {
fn insert_item(cache: &mut MailboxStoreCache, document_id: u32, mailbox: &ArchivedMailbox) {
let parent_id = mailbox.parent_id.to_native();
let item = MailboxCache {
document_id,
name: mailbox.name.as_str().into(),
path: "".into(),
role: (&mailbox.role).into(),
@ -217,21 +223,21 @@ fn insert_item(
.collect(),
};
cache.items.insert(document_id, item);
cache.insert(item);
}
fn build_tree(cache: &mut MessageStoreCache<MailboxCache>) {
fn build_tree(cache: &mut MailboxStoreCache) {
cache.size = 0;
let mut topological_sort = TopologicalSort::with_capacity(cache.items.len());
for (idx, (&document_id, mailbox)) in cache.items.iter_mut().enumerate() {
for (idx, mailbox) in cache.items.iter_mut().enumerate() {
topological_sort.insert(
if mailbox.parent_id == u32::MAX {
0
} else {
mailbox.parent_id + 1
},
document_id + 1,
mailbox.document_id + 1,
);
mailbox.path = if matches!(mailbox.role, SpecialUse::Inbox) {
"INBOX".into()
@ -241,35 +247,28 @@ fn build_tree(cache: &mut MessageStoreCache<MailboxCache>) {
mailbox.name.clone()
};
cache.size += (std::mem::size_of::<MailboxCache>()
+ std::mem::size_of::<u32>()
+ mailbox.name.len()
+ mailbox.path.len()) as u64;
cache.size += item_size(mailbox);
}
for folder_id in topological_sort.into_iterator() {
if folder_id != 0 {
let folder_id = folder_id - 1;
if let Some((path, parent_path)) = cache
.items
.get(&folder_id)
.by_id(&folder_id)
.and_then(|folder| {
folder
.parent_id()
.map(|parent_id| (&folder.path, parent_id))
})
.and_then(|(path, parent_id)| {
cache
.items
.get(&parent_id)
.map(|folder| (path, &folder.path))
cache.by_id(&parent_id).map(|folder| (path, &folder.path))
})
{
let mut new_path = CompactString::with_capacity(parent_path.len() + path.len() + 1);
new_path.push_str(parent_path.as_str());
new_path.push('/');
new_path.push_str(path.as_str());
let folder = cache.items.get_mut(&folder_id).unwrap();
let folder = cache.by_id_mut(&folder_id).unwrap();
folder.path = new_path;
}
}
@ -277,31 +276,35 @@ fn build_tree(cache: &mut MessageStoreCache<MailboxCache>) {
}
pub trait MailboxCacheAccess {
fn by_name(&self, name: &str) -> Option<(&u32, &MailboxCache)>;
fn by_path(&self, name: &str) -> Option<(&u32, &MailboxCache)>;
fn by_role(&self, role: &SpecialUse) -> Option<(&u32, &MailboxCache)>;
fn by_id(&self, id: &u32) -> Option<&MailboxCache>;
fn by_id_mut(&mut self, id: &u32) -> Option<&mut MailboxCache>;
fn insert(&mut self, item: MailboxCache);
fn by_name(&self, name: &str) -> Option<&MailboxCache>;
fn by_path(&self, name: &str) -> Option<&MailboxCache>;
fn by_role(&self, role: &SpecialUse) -> Option<&MailboxCache>;
fn shared_mailboxes(
&self,
access_token: &AccessToken,
check_acls: impl Into<Bitmap<Acl>> + Sync + Send,
) -> RoaringBitmap;
fn has_id(&self, id: &u32) -> bool;
}
impl MailboxCacheAccess for MessageStoreCache<MailboxCache> {
fn by_name(&self, name: &str) -> Option<(&u32, &MailboxCache)> {
impl MailboxCacheAccess for MailboxStoreCache {
fn by_name(&self, name: &str) -> Option<&MailboxCache> {
self.items
.iter()
.find(|(_, m)| m.name.eq_ignore_ascii_case(name))
.find(|m| m.name.eq_ignore_ascii_case(name))
}
fn by_path(&self, path: &str) -> Option<(&u32, &MailboxCache)> {
fn by_path(&self, path: &str) -> Option<&MailboxCache> {
self.items
.iter()
.find(|(_, m)| m.path.eq_ignore_ascii_case(path))
.find(|m| m.path.eq_ignore_ascii_case(path))
}
fn by_role(&self, role: &SpecialUse) -> Option<(&u32, &MailboxCache)> {
self.items.iter().find(|(_, m)| &m.role == role)
fn by_role(&self, role: &SpecialUse) -> Option<&MailboxCache> {
self.items.iter().find(|m| &m.role == role)
}
fn shared_mailboxes(
@ -314,13 +317,55 @@ impl MailboxCacheAccess for MessageStoreCache<MailboxCache> {
RoaringBitmap::from_iter(
self.items
.iter()
.filter(|(_, m)| {
.filter(|m| {
m.acls
.as_slice()
.effective_acl(access_token)
.contains_all(check_acls)
})
.map(|(id, _)| *id),
.map(|m| m.document_id),
)
}
fn by_id(&self, id: &u32) -> Option<&MailboxCache> {
self.index
.get(id)
.and_then(|idx| self.items.get(*idx as usize))
}
fn by_id_mut(&mut self, id: &u32) -> Option<&mut MailboxCache> {
self.index
.get(id)
.and_then(|idx| self.items.get_mut(*idx as usize))
}
fn insert(&mut self, item: MailboxCache) {
let id = item.document_id;
if let Some(idx) = self.index.get(&id) {
self.items[*idx as usize] = item;
} else {
let idx = self.items.len() as u32;
self.items.push(item);
self.index.insert(id, idx);
}
}
fn has_id(&self, id: &u32) -> bool {
self.index.contains_key(id)
}
}
#[inline(always)]
fn item_size(item: &MailboxCache) -> u64 {
(std::mem::size_of::<MailboxCache>()
+ (if item.name.len() > std::mem::size_of::<String>() {
item.name.len()
} else {
0
})
+ (if item.path.len() > std::mem::size_of::<String>() {
item.path.len()
} else {
0
})) as u64
}

View file

@ -16,7 +16,7 @@ use store::{roaring::RoaringBitmap, write::BatchBuilder};
use trc::AddContext;
use crate::message::{
cache::{MessageCache, MessageCacheAccess},
cache::{MessageCacheAccess, MessageCacheFetch},
delete::EmailDeletion,
metadata::MessageData,
};
@ -66,7 +66,7 @@ impl MailboxDestroy for Server {
.await?
.items
.iter()
.any(|(_, m)| m.parent_id == document_id)
.any(|item| item.parent_id == document_id)
{
return Ok(Err(SetError::new(SetErrorType::MailboxHasChild)
.with_description("Mailbox has at least one children.")));
@ -82,7 +82,7 @@ impl MailboxDestroy for Server {
.await
.caused_by(trc::location!())?
.in_mailbox(document_id)
.map(|(id, _)| id),
.map(|m| m.document_id),
);
if !message_ids.is_empty() {

View file

@ -101,12 +101,12 @@ impl MailboxFnc for Server {
}
}
if let Some((document_id, _)) = folders
if let Some(item) = folders
.items
.iter()
.find(|(_, item)| item.path.to_lowercase() == found_path)
.find(|item| item.path.to_lowercase() == found_path)
{
next_parent_id = *document_id + 1;
next_parent_id = item.document_id + 1;
} else {
create_paths.push(name.to_string());
create_paths.extend(path.map(|v| v.to_string()));

View file

@ -4,16 +4,21 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use std::sync::Arc;
use super::metadata::{ArchivedMessageData, MessageData};
use common::{
CacheSwap, MailboxCache, MessageItemCache, MessageStoreCache, MessageUidCache, Server,
CacheSwap, MailboxStoreCache, MessageCache, MessageStoreCache, MessageUidCache, Server,
auth::AccessToken, sharing::EffectiveAcl,
};
use jmap_proto::types::{acl::Acl, collection::Collection, keyword::Keyword};
use std::future::Future;
use compact_str::CompactString;
use jmap_proto::types::{
acl::Acl,
collection::Collection,
keyword::{Keyword, OTHER},
};
use std::sync::Arc;
use std::{collections::hash_map::Entry, future::Future};
use store::{
ahash::{AHashMap, AHashSet},
ahash::AHashMap,
query::log::{Change, Query},
roaring::RoaringBitmap,
};
@ -21,20 +26,15 @@ use tokio::sync::Semaphore;
use trc::AddContext;
use utils::map::bitmap::Bitmap;
use super::metadata::{ArchivedMessageData, MessageData};
pub trait MessageCache: Sync + Send {
pub trait MessageCacheFetch: Sync + Send {
fn get_cached_messages(
&self,
account_id: u32,
) -> impl Future<Output = trc::Result<Arc<MessageStoreCache<MessageItemCache>>>> + Send;
) -> impl Future<Output = trc::Result<Arc<MessageStoreCache>>> + Send;
}
impl MessageCache for Server {
async fn get_cached_messages(
&self,
account_id: u32,
) -> trc::Result<Arc<MessageStoreCache<MessageItemCache>>> {
impl MessageCacheFetch for Server {
async fn get_cached_messages(&self, account_id: u32) -> trc::Result<Arc<MessageStoreCache>> {
let cache_ = match self
.inner
.cache
@ -91,51 +91,74 @@ impl MessageCache for Server {
return Ok(cache);
}
let mut cache = cache.as_ref().clone();
cache.change_id = changes.to_change_id;
let mut delete = AHashSet::with_capacity(changes.changes.len() / 2);
let mut update = AHashMap::with_capacity(changes.changes.len());
let mut new_cache = MessageStoreCache {
index: AHashMap::with_capacity(cache.items.len()),
items: Vec::with_capacity(cache.items.len()),
size: 0,
change_id: changes.to_change_id,
update_lock: cache.update_lock.clone(),
keywords: cache.keywords.clone(),
};
let mut changed_ids: AHashMap<u32, bool> = AHashMap::with_capacity(changes.changes.len());
for change in changes.changes {
match change {
Change::Insert(id) => {
if let Some(item) = cache.items.get_mut(&(id as u32)) {
item.thread_id = (id >> 32) as u32;
Change::Insert(id) => match changed_ids.entry(id as u32) {
Entry::Occupied(mut entry) => {
*entry.get_mut() = true;
}
update.insert(id as u32, true);
}
Change::Update(id) | Change::ChildUpdate(id) => {
update.insert(id as u32, false);
Entry::Vacant(entry) => {
entry.insert(true);
}
},
Change::Update(id) => {
changed_ids.insert(id as u32, true);
}
Change::Delete(id) => {
delete.insert(id as u32);
match changed_ids.entry(id as u32) {
Entry::Occupied(mut entry) => {
// Thread reassignment
*entry.get_mut() = true;
}
Entry::Vacant(entry) => {
entry.insert(false);
}
}
}
}
}
for document_id in delete {
if update.remove(&document_id).is_none() {
if let Some(item) = cache.items.remove(&document_id) {
cache.size -= (std::mem::size_of::<MessageItemCache>()
+ std::mem::size_of::<u32>()
+ (item.mailboxes.len() * std::mem::size_of::<MessageUidCache>()))
as u64;
for (document_id, is_update) in &changed_ids {
if *is_update {
if let Some(archive) = self
.get_archive(account_id, Collection::Email, *document_id)
.await
.caused_by(trc::location!())?
{
insert_item(
&mut new_cache,
*document_id,
archive.unarchive::<MessageData>()?,
);
}
}
}
for (document_id, is_insert) in update {
if let Some(archive) = self
.get_archive(account_id, Collection::Email, document_id)
.await
.caused_by(trc::location!())?
{
let message = archive.unarchive::<MessageData>()?;
insert_item(&mut cache, document_id, message, is_insert);
for item in &cache.items {
if !changed_ids.contains_key(&item.document_id) {
new_cache.insert(item.clone());
}
}
let cache = Arc::new(cache);
if cache.items.len() > new_cache.items.len() {
new_cache.items.shrink_to_fit();
new_cache.index.shrink_to_fit();
}
if cache.keywords.len() > new_cache.keywords.len() {
new_cache.keywords.shrink_to_fit();
}
let cache = Arc::new(new_cache);
cache_.update(cache.clone());
Ok(cache)
@ -146,10 +169,12 @@ async fn full_cache_build(
server: &Server,
account_id: u32,
update_lock: Arc<Semaphore>,
) -> trc::Result<Arc<MessageStoreCache<MessageItemCache>>> {
) -> trc::Result<Arc<MessageStoreCache>> {
// Build cache
let mut cache = MessageStoreCache {
items: AHashMap::with_capacity(16),
items: Vec::with_capacity(16),
index: AHashMap::with_capacity(16),
keywords: Vec::new(),
size: 0,
change_id: 0,
update_lock,
@ -164,7 +189,7 @@ async fn full_cache_build(
let message = archive.unarchive::<MessageData>()?;
cache.change_id = std::cmp::max(cache.change_id, message.change_id.to_native());
insert_item(&mut cache, document_id, message, true);
insert_item(&mut cache, document_id, message);
Ok(true)
},
@ -175,13 +200,8 @@ async fn full_cache_build(
Ok(Arc::new(cache))
}
fn insert_item(
cache: &mut MessageStoreCache<MessageItemCache>,
document_id: u32,
message: &ArchivedMessageData,
update_size: bool,
) {
let item = MessageItemCache {
fn insert_item(cache: &mut MessageStoreCache, document_id: u32, message: &ArchivedMessageData) {
let mut item = MessageCache {
mailboxes: message
.mailboxes
.iter()
@ -190,75 +210,107 @@ fn insert_item(
uid: m.uid.to_native(),
})
.collect(),
keywords: message.keywords.iter().map(Into::into).collect(),
keywords: 0,
thread_id: message.thread_id.to_native(),
change_id: message.change_id.to_native(),
document_id,
};
if update_size {
cache.size += (std::mem::size_of::<MessageItemCache>()
+ std::mem::size_of::<u32>()
+ (item.mailboxes.len() * std::mem::size_of::<MessageUidCache>()))
as u64;
for keyword in message.keywords.iter() {
match keyword.id() {
Ok(id) => {
item.keywords |= 1 << id;
}
Err(custom) => {
if let Some(idx) = cache.keywords.iter().position(|k| k == custom) {
item.keywords |= 1 << (OTHER + idx);
} else if cache.keywords.len() < (128 - OTHER) {
cache.keywords.push(CompactString::from(custom));
item.keywords |= 1 << (OTHER + cache.keywords.len() - 1);
}
}
}
}
cache.items.insert(document_id, item);
cache.insert(item);
}
pub trait MessageCacheAccess {
fn in_mailbox(&self, mailbox_id: u32) -> impl Iterator<Item = (&u32, &MessageItemCache)>;
fn by_id(&self, id: &u32) -> Option<&MessageCache>;
fn in_thread(&self, thread_id: u32) -> impl Iterator<Item = (&u32, &MessageItemCache)>;
fn has_id(&self, id: &u32) -> bool;
fn with_keyword(&self, keyword: &Keyword) -> impl Iterator<Item = (&u32, &MessageItemCache)>;
fn by_id_mut(&mut self, id: &u32) -> Option<&mut MessageCache>;
fn insert(&mut self, item: MessageCache);
fn in_mailbox(&self, mailbox_id: u32) -> impl Iterator<Item = &MessageCache>;
fn in_thread(&self, thread_id: u32) -> impl Iterator<Item = &MessageCache>;
fn with_keyword(&self, keyword: &Keyword) -> impl Iterator<Item = &MessageCache>;
fn without_keyword(&self, keyword: &Keyword) -> impl Iterator<Item = &MessageCache>;
fn in_mailbox_with_keyword(
&self,
mailbox_id: u32,
keyword: &Keyword,
) -> impl Iterator<Item = (&u32, &MessageItemCache)>;
) -> impl Iterator<Item = &MessageCache>;
fn in_mailbox_without_keyword(
&self,
mailbox_id: u32,
keyword: &Keyword,
) -> impl Iterator<Item = (&u32, &MessageItemCache)>;
) -> impl Iterator<Item = &MessageCache>;
fn document_ids(&self) -> RoaringBitmap;
fn shared_messages(
&self,
access_token: &AccessToken,
mailboxes: &MessageStoreCache<MailboxCache>,
mailboxes: &MailboxStoreCache,
check_acls: impl Into<Bitmap<Acl>> + Sync + Send,
) -> RoaringBitmap;
fn expand_keywords(&self, message: &MessageCache) -> impl Iterator<Item = Keyword>;
fn has_keyword(&self, message: &MessageCache, keyword: &Keyword) -> bool;
}
impl MessageCacheAccess for MessageStoreCache<MessageItemCache> {
fn in_mailbox(&self, mailbox_id: u32) -> impl Iterator<Item = (&u32, &MessageItemCache)> {
impl MessageCacheAccess for MessageStoreCache {
fn in_mailbox(&self, mailbox_id: u32) -> impl Iterator<Item = &MessageCache> {
self.items
.iter()
.filter(move |(_, m)| m.mailboxes.iter().any(|m| m.mailbox_id == mailbox_id))
.filter(move |m| m.mailboxes.iter().any(|m| m.mailbox_id == mailbox_id))
}
fn in_thread(&self, thread_id: u32) -> impl Iterator<Item = (&u32, &MessageItemCache)> {
self.items
.iter()
.filter(move |(_, m)| m.thread_id == thread_id)
fn in_thread(&self, thread_id: u32) -> impl Iterator<Item = &MessageCache> {
self.items.iter().filter(move |m| m.thread_id == thread_id)
}
fn with_keyword(&self, keyword: &Keyword) -> impl Iterator<Item = (&u32, &MessageItemCache)> {
fn with_keyword(&self, keyword: &Keyword) -> impl Iterator<Item = &MessageCache> {
let keyword_id = keyword_to_id(self, keyword);
self.items
.iter()
.filter(move |(_, m)| m.keywords.contains(keyword))
.filter(move |m| keyword_id.is_some_and(|id| m.keywords & (1 << id) != 0))
}
fn without_keyword(&self, keyword: &Keyword) -> impl Iterator<Item = &MessageCache> {
let keyword_id = keyword_to_id(self, keyword);
self.items
.iter()
.filter(move |m| keyword_id.is_none_or(|id| m.keywords & (1 << id) == 0))
}
fn in_mailbox_with_keyword(
&self,
mailbox_id: u32,
keyword: &Keyword,
) -> impl Iterator<Item = (&u32, &MessageItemCache)> {
self.items.iter().filter(move |(_, m)| {
m.mailboxes.iter().any(|m| m.mailbox_id == mailbox_id) && m.keywords.contains(keyword)
) -> impl Iterator<Item = &MessageCache> {
let keyword_id = keyword_to_id(self, keyword);
self.items.iter().filter(move |m| {
m.mailboxes.iter().any(|m| m.mailbox_id == mailbox_id)
&& keyword_id.is_some_and(|id| m.keywords & (1 << id) != 0)
})
}
@ -266,34 +318,111 @@ impl MessageCacheAccess for MessageStoreCache<MessageItemCache> {
&self,
mailbox_id: u32,
keyword: &Keyword,
) -> impl Iterator<Item = (&u32, &MessageItemCache)> {
self.items.iter().filter(move |(_, m)| {
m.mailboxes.iter().any(|m| m.mailbox_id == mailbox_id) && !m.keywords.contains(keyword)
) -> impl Iterator<Item = &MessageCache> {
let keyword_id = keyword_to_id(self, keyword);
self.items.iter().filter(move |m| {
m.mailboxes.iter().any(|m| m.mailbox_id == mailbox_id)
&& keyword_id.is_none_or(|id| m.keywords & (1 << id) == 0)
})
}
fn shared_messages(
&self,
access_token: &AccessToken,
mailboxes: &MessageStoreCache<MailboxCache>,
mailboxes: &MailboxStoreCache,
check_acls: impl Into<Bitmap<Acl>> + Sync + Send,
) -> RoaringBitmap {
let check_acls = check_acls.into();
let mut shared_messages = RoaringBitmap::new();
for (mailbox_id, mailbox) in &mailboxes.items {
for mailbox in &mailboxes.items {
if mailbox
.acls
.as_slice()
.effective_acl(access_token)
.contains_all(check_acls)
{
shared_messages.extend(self.in_mailbox(*mailbox_id).map(|(id, _)| *id));
shared_messages.extend(
self.in_mailbox(mailbox.document_id)
.map(|item| item.document_id),
);
}
}
shared_messages
}
fn document_ids(&self) -> RoaringBitmap {
RoaringBitmap::from_iter(self.items.keys())
RoaringBitmap::from_iter(self.index.keys())
}
fn by_id(&self, id: &u32) -> Option<&MessageCache> {
self.index
.get(id)
.and_then(|idx| self.items.get(*idx as usize))
}
fn by_id_mut(&mut self, id: &u32) -> Option<&mut MessageCache> {
self.index
.get(id)
.and_then(|idx| self.items.get_mut(*idx as usize))
}
fn insert(&mut self, item: MessageCache) {
let id = item.document_id;
if let Some(idx) = self.index.get(&id) {
self.items[*idx as usize] = item;
} else {
self.size += (std::mem::size_of::<MessageCache>()
+ (std::mem::size_of::<u32>() * 2)
+ (item.mailboxes.len() * std::mem::size_of::<MessageUidCache>()))
as u64;
let idx = self.items.len() as u32;
self.items.push(item);
self.index.insert(id, idx);
}
}
fn has_id(&self, id: &u32) -> bool {
self.index.contains_key(id)
}
fn expand_keywords(&self, message: &MessageCache) -> impl Iterator<Item = Keyword> {
KeywordsIter(message.keywords).map(move |id| match Keyword::try_from_id(id) {
Ok(keyword) => keyword,
Err(id) => Keyword::Other(self.keywords[id - OTHER].clone()),
})
}
fn has_keyword(&self, message: &MessageCache, keyword: &Keyword) -> bool {
keyword_to_id(self, keyword).is_some_and(|id| message.keywords & (1 << id) != 0)
}
}
#[inline]
fn keyword_to_id(cache: &MessageStoreCache, keyword: &Keyword) -> Option<u32> {
match keyword.id() {
Ok(id) => Some(id),
Err(name) => cache
.keywords
.iter()
.position(|k| k == name)
.map(|idx| (OTHER + idx) as u32),
}
}
#[derive(Clone, Copy, Debug)]
struct KeywordsIter(u128);
impl Iterator for KeywordsIter {
type Item = usize;
fn next(&mut self) -> Option<Self::Item> {
if self.0 != 0 {
let item = 127 - self.0.leading_zeros();
self.0 ^= 1 << item;
Some(item as usize)
} else {
None
}
}
}

View file

@ -22,7 +22,7 @@ use trc::AddContext;
use crate::mailbox::UidMailbox;
use super::{
cache::MessageCache,
cache::MessageCacheFetch,
index::{MAX_ID_LENGTH, MAX_SORT_FIELD_LENGTH, TrimTextValue},
ingest::{EmailIngest, IngestedEmail, ThreadResult},
metadata::{HeaderName, HeaderValue, MessageData, MessageMetadata},

View file

@ -21,7 +21,7 @@ use store::rand::prelude::SliceRandom;
use crate::{
mailbox::*,
message::{cache::MessageCache, metadata::MessageMetadata},
message::{cache::MessageCacheFetch, metadata::MessageMetadata},
};
use super::metadata::MessageData;
@ -186,14 +186,14 @@ impl EmailDeletion for Server {
.caused_by(trc::location!())?
.items
.iter()
.filter(|(_, item)| {
.filter(|item| {
item.change_id < reference_cid
&& item
.mailboxes
.iter()
.any(|id| id.mailbox_id == TRASH_ID || id.mailbox_id == JUNK_ID)
})
.map(|(id, _)| id),
.map(|item| item.document_id),
);
if destroy_ids.is_empty() {
return Ok(());

View file

@ -581,11 +581,11 @@ impl IndexableObject for MessageData {
prefix: self.thread_id.into(),
},
IndexValue::LogParent {
collection: Collection::Thread,
collection: Collection::Thread.into(),
ids: vec![self.thread_id],
},
IndexValue::LogParent {
collection: Collection::Mailbox,
collection: Collection::Mailbox.as_child_update(),
ids: self.mailboxes.iter().map(|m| m.mailbox_id).collect(),
},
]
@ -600,11 +600,11 @@ impl IndexableObject for &ArchivedMessageData {
prefix: self.thread_id.to_native().into(),
},
IndexValue::LogParent {
collection: Collection::Thread,
collection: Collection::Thread.into(),
ids: vec![self.thread_id.to_native()],
},
IndexValue::LogParent {
collection: Collection::Mailbox,
collection: Collection::Mailbox.as_child_update(),
ids: self
.mailboxes
.iter()

View file

@ -45,7 +45,7 @@ use trc::{AddContext, MessageIngestEvent};
use crate::{
mailbox::{INBOX_ID, JUNK_ID, UidMailbox},
message::{
cache::MessageCache,
cache::MessageCacheFetch,
crypto::EncryptionParams,
index::{IndexMessage, MAX_ID_LENGTH, VisitValues},
metadata::MessageData,
@ -733,7 +733,7 @@ impl EmailIngest for Server {
if !found_message_id.is_empty()
&& cache
.in_mailbox(skip_duplicate.unwrap().1)
.any(|(id, _)| found_message_id.contains(id))
.any(|m| found_message_id.contains(&m.document_id))
{
return Ok(ThreadResult::Skip);
}
@ -742,8 +742,8 @@ impl EmailIngest for Server {
let mut thread_counts = AHashMap::<u32, u32>::with_capacity(16);
let mut thread_id = u32::MAX;
let mut thread_count = 0;
for (document_id, item) in &cache.items {
if results.contains(*document_id) {
for item in &cache.items {
if results.contains(item.document_id) {
let tc = thread_counts.entry(item.thread_id).or_default();
*tc += 1;
if *tc > thread_count {
@ -773,12 +773,12 @@ impl EmailIngest for Server {
// Move messages to the new threadId
batch.with_collection(Collection::Email);
for (&document_id, item) in &cache.items {
for item in &cache.items {
if thread_id == item.thread_id || !thread_counts.contains_key(&item.thread_id) {
continue;
}
if let Some(data_) = self
.get_archive(account_id, Collection::Email, document_id)
.get_archive(account_id, Collection::Email, item.document_id)
.await
.caused_by(trc::location!())?
{
@ -791,7 +791,7 @@ impl EmailIngest for Server {
let mut new_data = data.deserialize().caused_by(trc::location!())?;
new_data.thread_id = thread_id;
batch
.update_document(document_id)
.update_document(item.document_id)
.custom(
ObjectIndexBuilder::new()
.with_current(data)

View file

@ -189,9 +189,8 @@ impl SieveScriptIngest for Server {
} else {
let mut mailbox_id = u32::MAX;
if let Ok(role) = SpecialUse::parse_value(&role) {
if let Some((mailbox_id_, _)) = mailbox_cache.by_role(&role)
{
mailbox_id = *mailbox_id_;
if let Some(m) = mailbox_cache.by_role(&role) {
mailbox_id = m.document_id;
}
}
@ -205,8 +204,8 @@ impl SieveScriptIngest for Server {
Mailbox::Name(name) => {
if !matches!(
mailbox_cache.by_path(&name),
Some((document_id, _)) if special_use_ids.is_empty() ||
special_use_ids.contains(document_id)
Some(item) if special_use_ids.is_empty() ||
special_use_ids.contains(&item.document_id)
) {
result = false;
break;
@ -214,7 +213,7 @@ impl SieveScriptIngest for Server {
}
Mailbox::Id(id) => {
if !matches!(Id::from_bytes(id.as_bytes()), Some(id) if
mailbox_cache.items.contains_key(&id.document_id()) &&
mailbox_cache.has_id(&id.document_id()) &&
(special_use_ids.is_empty() ||
special_use_ids.contains(&id.document_id())))
{
@ -310,7 +309,7 @@ impl SieveScriptIngest for Server {
mailbox_id.and_then(|m| Id::from_bytes(m.as_bytes()))
{
let mailbox_id = mailbox_id.document_id();
if mailbox_cache.items.contains_key(&mailbox_id) {
if mailbox_cache.has_id(&mailbox_id) {
target_id = mailbox_id;
}
}
@ -323,8 +322,8 @@ impl SieveScriptIngest for Server {
} else if special_use.eq_ignore_ascii_case("trash") {
target_id = TRASH_ID;
} else if let Ok(role) = SpecialUse::parse_value(&special_use) {
if let Some((mailbox_id_, _)) = mailbox_cache.by_role(&role) {
target_id = *mailbox_id_;
if let Some(item) = mailbox_cache.by_role(&role) {
target_id = item.document_id;
}
}
}
@ -333,8 +332,8 @@ impl SieveScriptIngest for Server {
// Find mailbox by name
if target_id == u32::MAX {
if !create {
if let Some((document_id, _)) = mailbox_cache.by_path(&folder) {
target_id = *document_id;
if let Some(m) = mailbox_cache.by_path(&folder) {
target_id = m.document_id;
}
} else if let Some(document_id) = self
.mailbox_create_path(account_id, &folder)

View file

@ -7,7 +7,7 @@
use common::storage::index::{
IndexItem, IndexValue, IndexableAndSerializableObject, IndexableObject,
};
use jmap_proto::types::{collection::Collection, value::AclGrant};
use jmap_proto::types::value::AclGrant;
use store::SerializeInfallible;
use crate::{IDX_CARD_UID, IDX_NAME};
@ -89,10 +89,6 @@ impl IndexableObject for ContactCard {
+ self.size,
},
IndexValue::LogChild { prefix: None },
IndexValue::LogParent {
collection: Collection::AddressBook,
ids: self.names.iter().map(|v| v.parent_id).collect::<Vec<_>>(),
},
]
.into_iter()
}
@ -120,14 +116,6 @@ impl IndexableObject for &ArchivedContactCard {
+ self.size,
},
IndexValue::LogChild { prefix: None },
IndexValue::LogParent {
collection: Collection::AddressBook,
ids: self
.names
.iter()
.map(|v| v.parent_id.to_native())
.collect::<Vec<_>>(),
},
]
.into_iter()
}

View file

@ -20,7 +20,7 @@ use email::{
INBOX_ID,
cache::{MailboxCacheAccess, MessageMailboxCache},
},
message::cache::{MessageCache, MessageCacheAccess},
message::cache::{MessageCacheAccess, MessageCacheFetch},
};
use imap_proto::protocol::list::Attribute;
use jmap_proto::types::{acl::Acl, collection::Collection, id::Id, keyword::Keyword};
@ -124,13 +124,13 @@ impl<T: SessionStream> SessionData<T> {
// Build special uses
let mut special_uses = AHashMap::new();
for (&mailbox_id, mailbox) in &cached_mailboxes.items {
for mailbox in &cached_mailboxes.items {
if shared_mailbox_ids
.as_ref()
.is_none_or(|ids| ids.contains(mailbox_id))
.is_none_or(|ids| ids.contains(mailbox.document_id))
&& !matches!(mailbox.role, SpecialUse::None)
{
special_uses.insert(mailbox.role, mailbox_id);
special_uses.insert(mailbox.role, mailbox.document_id);
}
}
@ -146,10 +146,10 @@ impl<T: SessionStream> SessionData<T> {
},
};
for (&mailbox_id, mailbox) in &cached_mailboxes.items {
for mailbox in &cached_mailboxes.items {
if shared_mailbox_ids
.as_ref()
.is_some_and(|ids| !ids.contains(mailbox_id))
.is_some_and(|ids| !ids.contains(mailbox.document_id))
{
continue;
}
@ -173,17 +173,17 @@ impl<T: SessionStream> SessionData<T> {
.find(|f| f.name == mailbox_name || f.aliases.iter().any(|a| a == mailbox_name))
.and_then(|f| special_uses.get(&f.special_use))
.copied()
.unwrap_or(mailbox_id);
.unwrap_or(mailbox.document_id);
account
.mailbox_names
.insert(mailbox_name, effective_mailbox_id);
account.mailbox_state.insert(
mailbox_id,
mailbox.document_id,
Mailbox {
has_children: cached_mailboxes
.items
.values()
.any(|child| child.parent_id == mailbox_id),
.iter()
.any(|child| child.parent_id == mailbox.document_id),
is_subscribed: mailbox.subscribers.contains(&access_token.primary_id()),
special_use: match mailbox.role {
SpecialUse::Trash => Some(Attribute::Trash),
@ -194,18 +194,18 @@ impl<T: SessionStream> SessionData<T> {
SpecialUse::Important => Some(Attribute::Important),
_ => None,
},
total_messages: cached_messages.in_mailbox(mailbox_id).count() as u64,
total_messages: cached_messages.in_mailbox(mailbox.document_id).count() as u64,
total_unseen: cached_messages
.in_mailbox_without_keyword(mailbox_id, &Keyword::Seen)
.in_mailbox_without_keyword(mailbox.document_id, &Keyword::Seen)
.count() as u64,
total_deleted: cached_messages
.in_mailbox_with_keyword(mailbox_id, &Keyword::Deleted)
.in_mailbox_with_keyword(mailbox.document_id, &Keyword::Deleted)
.count() as u64,
uid_validity: mailbox.uid_validity as u64,
uid_next: self
.get_uid_next(&MailboxId {
account_id,
mailbox_id,
mailbox_id: mailbox.document_id,
})
.await
.caused_by(trc::location!())? as u64,

View file

@ -6,7 +6,7 @@
use ahash::AHashMap;
use common::listener::SessionStream;
use email::message::cache::MessageCache;
use email::message::cache::MessageCacheFetch;
use imap_proto::protocol::{Sequence, expunge, select::Exists};
use jmap_proto::types::{collection::Collection, property::Property};
use std::collections::BTreeMap;
@ -39,10 +39,10 @@ impl<T: SessionStream> SessionData<T> {
let uid_map = cached_messages
.items
.iter()
.filter_map(|(document_id, item)| {
.filter_map(|item| {
item.mailboxes.iter().find_map(|m| {
if m.mailbox_id == mailbox.mailbox_id {
Some((m.uid, *document_id))
Some((m.uid, item.document_id))
} else {
None
}

View file

@ -9,7 +9,7 @@ use std::{sync::Arc, time::Instant};
use ahash::AHashMap;
use directory::Permission;
use email::message::{
cache::{MessageCache, MessageCacheAccess},
cache::{MessageCacheFetch, MessageCacheAccess},
delete::EmailDeletion,
metadata::MessageData,
};
@ -121,7 +121,7 @@ impl<T: SessionStream> SessionData<T> {
.await
.caused_by(trc::location!())?
.in_mailbox_with_keyword(mailbox.id.mailbox_id, &Keyword::Deleted)
.map(|(id, _)| id),
.map(|m| m.document_id),
);
// Filter by sequence

View file

@ -14,7 +14,7 @@ use ahash::AHashMap;
use common::{listener::SessionStream, storage::index::ObjectIndexBuilder};
use directory::Permission;
use email::message::{
cache::MessageCache,
cache::{MessageCacheAccess, MessageCacheFetch},
metadata::{
ArchivedAddress, ArchivedGetHeader, ArchivedHeaderName, ArchivedHeaderValue,
ArchivedMessageMetadata, ArchivedMessageMetadataContents, ArchivedMetadataPartType,
@ -35,11 +35,7 @@ use imap_proto::{
receiver::Request,
};
use jmap_proto::types::{
acl::Acl,
collection::Collection,
id::Id,
keyword::{ArchivedKeyword, Keyword},
property::Property,
acl::Acl, collection::Collection, id::Id, keyword::Keyword, property::Property,
};
use store::{
query::log::{Change, Query},
@ -171,7 +167,7 @@ impl<T: SessionStream> SessionData<T> {
for change in changelog.changes {
match change {
Change::Insert(id) | Change::Update(id) | Change::ChildUpdate(id) => {
Change::Insert(id) | Change::Update(id) => {
let id = (id & u32::MAX as u64) as u32;
if let Some(uid) = ids.get(&id) {
changed_ids.insert(id, *uid);
@ -318,7 +314,7 @@ impl<T: SessionStream> SessionData<T> {
)
.await
.imap_ctx(&arguments.tag, trc::location!())?,
message_cache.items.get(&id),
message_cache.by_id(&id),
) {
(email, data)
} else {
@ -369,8 +365,7 @@ impl<T: SessionStream> SessionData<T> {
// Build response
let mut items = Vec::with_capacity(arguments.attributes.len());
let set_seen_flag =
set_seen_flags && !data.keywords.iter().any(|k| k == &ArchivedKeyword::Seen);
let set_seen_flag = set_seen_flags && !message_cache.has_keyword(data, &Keyword::Seen);
for attribute in &arguments.attributes {
match attribute {
@ -380,10 +375,8 @@ impl<T: SessionStream> SessionData<T> {
});
}
Attribute::Flags => {
let mut flags = data
.keywords
.iter()
.cloned()
let mut flags = message_cache
.expand_keywords(data)
.map(Flag::from)
.collect::<Vec<_>>();
if set_seen_flag {
@ -517,10 +510,8 @@ impl<T: SessionStream> SessionData<T> {
// Add flags to the response if the message was unseen
if set_seen_flag && !arguments.attributes.contains(&Attribute::Flags) {
let mut flags = data
.keywords
.iter()
.cloned()
let mut flags = message_cache
.expand_keywords(data)
.map(Flag::from)
.collect::<Vec<_>>();
flags.push(Flag::Seen);

View file

@ -8,7 +8,7 @@ use std::{sync::Arc, time::Instant};
use common::listener::SessionStream;
use directory::Permission;
use email::message::cache::{MessageCache, MessageCacheAccess};
use email::message::cache::{MessageCacheAccess, MessageCacheFetch};
use imap_proto::{
Command, StatusResponse,
protocol::{
@ -266,8 +266,11 @@ impl<T: SessionStream> SessionData<T> {
.get_cached_messages(mailbox.id.account_id)
.await
.caused_by(trc::location!())?;
let message_ids =
RoaringBitmap::from_iter(cache.in_mailbox(mailbox.id.mailbox_id).map(|(id, _)| id));
let message_ids = RoaringBitmap::from_iter(
cache
.in_mailbox(mailbox.id.mailbox_id)
.map(|m| m.document_id),
);
filters.push(query::Filter::is_in_set(message_ids.clone()));
@ -452,7 +455,9 @@ impl<T: SessionStream> SessionData<T> {
}
search::Filter::Answered => {
filters.push(query::Filter::is_in_set(RoaringBitmap::from_iter(
cache.with_keyword(&Keyword::Answered).map(|(id, _)| id),
cache
.with_keyword(&Keyword::Answered)
.map(|m| m.document_id),
)));
}
search::Filter::Before(date) => {
@ -463,24 +468,24 @@ impl<T: SessionStream> SessionData<T> {
}
search::Filter::Deleted => {
filters.push(query::Filter::is_in_set(RoaringBitmap::from_iter(
cache.with_keyword(&Keyword::Deleted).map(|(id, _)| id),
cache.with_keyword(&Keyword::Deleted).map(|m| m.document_id),
)));
}
search::Filter::Draft => {
filters.push(query::Filter::is_in_set(RoaringBitmap::from_iter(
cache.with_keyword(&Keyword::Draft).map(|(id, _)| id),
cache.with_keyword(&Keyword::Draft).map(|m| m.document_id),
)));
}
search::Filter::Flagged => {
filters.push(query::Filter::is_in_set(RoaringBitmap::from_iter(
cache.with_keyword(&Keyword::Flagged).map(|(id, _)| id),
cache.with_keyword(&Keyword::Flagged).map(|m| m.document_id),
)));
}
search::Filter::Keyword(keyword) => {
filters.push(query::Filter::is_in_set(RoaringBitmap::from_iter(
cache
.with_keyword(&Keyword::from(keyword))
.map(|(id, _)| id),
.map(|m| m.document_id),
)));
}
search::Filter::Larger(size) => {
@ -500,7 +505,7 @@ impl<T: SessionStream> SessionData<T> {
}
search::Filter::Seen => {
filters.push(query::Filter::is_in_set(RoaringBitmap::from_iter(
cache.with_keyword(&Keyword::Seen).map(|(id, _)| id),
cache.with_keyword(&Keyword::Seen).map(|m| m.document_id),
)));
}
search::Filter::SentBefore(date) => {
@ -537,48 +542,44 @@ impl<T: SessionStream> SessionData<T> {
filters.push(query::Filter::lt(Property::Size, size.serialize()));
}
search::Filter::Unanswered => {
filters.push(query::Filter::Not);
filters.push(query::Filter::is_in_set(RoaringBitmap::from_iter(
cache.with_keyword(&Keyword::Answered).map(|(id, _)| id),
)));
filters.push(query::Filter::End);
}
search::Filter::Undeleted => {
filters.push(query::Filter::Not);
filters.push(query::Filter::is_in_set(RoaringBitmap::from_iter(
cache.with_keyword(&Keyword::Deleted).map(|(id, _)| id),
)));
filters.push(query::Filter::End);
}
search::Filter::Undraft => {
filters.push(query::Filter::Not);
filters.push(query::Filter::is_in_set(RoaringBitmap::from_iter(
cache.with_keyword(&Keyword::Draft).map(|(id, _)| id),
)));
filters.push(query::Filter::End);
}
search::Filter::Unflagged => {
filters.push(query::Filter::Not);
filters.push(query::Filter::is_in_set(RoaringBitmap::from_iter(
cache.with_keyword(&Keyword::Flagged).map(|(id, _)| id),
)));
filters.push(query::Filter::End);
}
search::Filter::Unkeyword(keyword) => {
filters.push(query::Filter::Not);
filters.push(query::Filter::is_in_set(RoaringBitmap::from_iter(
cache
.with_keyword(&Keyword::from(keyword))
.map(|(id, _)| id),
.without_keyword(&Keyword::Answered)
.map(|m| m.document_id),
)));
}
search::Filter::Undeleted => {
filters.push(query::Filter::is_in_set(RoaringBitmap::from_iter(
cache
.without_keyword(&Keyword::Deleted)
.map(|m| m.document_id),
)));
}
search::Filter::Undraft => {
filters.push(query::Filter::is_in_set(RoaringBitmap::from_iter(
cache
.without_keyword(&Keyword::Draft)
.map(|m| m.document_id),
)));
}
search::Filter::Unflagged => {
filters.push(query::Filter::is_in_set(RoaringBitmap::from_iter(
cache
.without_keyword(&Keyword::Flagged)
.map(|m| m.document_id),
)));
}
search::Filter::Unkeyword(keyword) => {
filters.push(query::Filter::is_in_set(RoaringBitmap::from_iter(
cache
.without_keyword(&Keyword::from(keyword))
.map(|m| m.document_id),
)));
filters.push(query::Filter::End);
}
search::Filter::Unseen => {
filters.push(query::Filter::Not);
filters.push(query::Filter::is_in_set(RoaringBitmap::from_iter(
cache.with_keyword(&Keyword::Seen).map(|(id, _)| id),
cache.without_keyword(&Keyword::Seen).map(|m| m.document_id),
)));
filters.push(query::Filter::End);
}
search::Filter::And => {
filters.push(query::Filter::And);
@ -658,7 +659,7 @@ impl<T: SessionStream> SessionData<T> {
search::Filter::ThreadId(id) => {
if let Some(id) = Id::from_bytes(id.as_bytes()) {
filters.push(query::Filter::is_in_set(RoaringBitmap::from_iter(
cache.in_thread(id.document_id()).map(|(id, _)| id),
cache.in_thread(id.document_id()).map(|m| m.document_id),
)));
} else {
return Err(trc::ImapEvent::Error

View file

@ -14,7 +14,7 @@ use crate::{
use common::listener::SessionStream;
use compact_str::CompactString;
use directory::Permission;
use email::message::cache::{MessageCache, MessageCacheAccess};
use email::message::cache::{MessageCacheAccess, MessageCacheFetch};
use imap_proto::{
Command, ResponseCode, StatusResponse,
parser::PushUnique,
@ -232,7 +232,7 @@ impl<T: SessionStream> SessionData<T> {
&RoaringBitmap::from_iter(
cache
.in_mailbox_with_keyword(mailbox.mailbox_id, &Keyword::Deleted)
.map(|x| x.0),
.map(|x| x.document_id),
),
)
.await
@ -241,7 +241,7 @@ impl<T: SessionStream> SessionData<T> {
.calculate_mailbox_size(
mailbox.account_id,
&RoaringBitmap::from_iter(
cache.in_mailbox(mailbox.mailbox_id).map(|x| x.0),
cache.in_mailbox(mailbox.mailbox_id).map(|x| x.document_id),
),
)
.await

View file

@ -124,10 +124,7 @@ impl<T: SessionStream> SessionData<T> {
// Add all IDs that changed in this mailbox
for change in changelog.changes {
let (Change::Insert(id)
| Change::Update(id)
| Change::ChildUpdate(id)
| Change::Delete(id)) = change;
let (Change::Insert(id) | Change::Update(id) | Change::Delete(id)) = change;
let id = (id & u32::MAX as u64) as u32;
if let Some(imap_id) = ids.remove(&id) {
if is_uid {
@ -348,7 +345,7 @@ impl<T: SessionStream> SessionData<T> {
// Log mailbox changes
if !changed_mailboxes.is_empty() {
for parent_id in changed_mailboxes {
batch.log_child_update(Collection::Mailbox, parent_id);
batch.log_parent_update(Collection::Mailbox.as_child_update(), parent_id);
}
}

View file

@ -13,7 +13,7 @@ use crate::{
use ahash::AHashMap;
use common::listener::SessionStream;
use directory::Permission;
use email::message::cache::MessageCache;
use email::message::cache::MessageCacheFetch;
use imap_proto::{
Command, StatusResponse,
protocol::{
@ -89,9 +89,9 @@ impl<T: SessionStream> SessionData<T> {
// Group messages by thread
let mut threads: AHashMap<u32, Vec<u32>> = AHashMap::new();
let state = mailbox.state.lock();
for (document_id, item) in &cache.items {
if result_set.results.contains(*document_id) {
if let Some((imap_id, _)) = state.map_result_id(*document_id, is_uid) {
for item in &cache.items {
if result_set.results.contains(item.document_id) {
if let Some((imap_id, _)) = state.map_result_id(item.document_id, is_uid) {
threads.entry(item.thread_id).or_default().push(imap_id);
}
}

View file

@ -14,7 +14,7 @@ use crate::{
types::{
blob::BlobId,
id::Id,
state::{State, StateChange},
state::State,
value::{Object, SetValue, Value},
},
};
@ -52,9 +52,6 @@ pub struct CopyResponse {
#[serde(rename = "notCreated")]
#[serde(skip_serializing_if = "VecMap::is_empty")]
pub not_created: VecMap<Id, SetError>,
#[serde(skip)]
pub state_change: Option<StateChange>,
}
#[derive(Debug, Clone)]

View file

@ -20,7 +20,7 @@ use crate::{
id::Id,
keyword::Keyword,
property::Property,
state::{State, StateChange},
state::State,
value::{Object, SetValueMap, Value},
},
};
@ -59,9 +59,6 @@ pub struct ImportEmailResponse {
#[serde(rename = "notCreated")]
#[serde(skip_serializing_if = "VecMap::is_empty")]
pub not_created: VecMap<String, SetError>,
#[serde(skip)]
pub state_change: Option<StateChange>,
}
impl JsonObjectParser for ImportEmailRequest {

View file

@ -26,12 +26,11 @@ pub enum Collection {
Principal = 7,
Calendar = 8,
CalendarEvent = 9,
CalendarEventNotification = 10,
AddressBook = 11,
ContactCard = 12,
FileNode = 13,
AddressBook = 10,
ContactCard = 11,
FileNode = 12,
#[default]
None = 14,
None = 13,
}
impl Collection {
@ -79,10 +78,9 @@ impl From<u8> for Collection {
7 => Collection::Principal,
8 => Collection::Calendar,
9 => Collection::CalendarEvent,
10 => Collection::CalendarEventNotification,
11 => Collection::AddressBook,
12 => Collection::ContactCard,
13 => Collection::FileNode,
10 => Collection::AddressBook,
11 => Collection::ContactCard,
12 => Collection::FileNode,
_ => Collection::None,
}
}
@ -101,10 +99,9 @@ impl From<u64> for Collection {
7 => Collection::Principal,
8 => Collection::Calendar,
9 => Collection::CalendarEvent,
10 => Collection::CalendarEventNotification,
11 => Collection::AddressBook,
12 => Collection::ContactCard,
13 => Collection::FileNode,
10 => Collection::AddressBook,
11 => Collection::ContactCard,
12 => Collection::FileNode,
_ => Collection::None,
}
}
@ -158,13 +155,16 @@ impl Collection {
Collection::Principal => "principal",
Collection::Calendar => "calendar",
Collection::CalendarEvent => "calendarEvent",
Collection::CalendarEventNotification => "calendarEventNotification",
Collection::AddressBook => "addressBook",
Collection::ContactCard => "contactCard",
Collection::FileNode => "fileNode",
Collection::None => "",
}
}
pub fn as_child_update(&self) -> u8 {
u8::MAX - u8::from(*self)
}
}
impl FromStr for Collection {
@ -182,7 +182,6 @@ impl FromStr for Collection {
"principal" => Collection::Principal,
"calendar" => Collection::Calendar,
"calendarEvent" => Collection::CalendarEvent,
"calendarEventNotification" => Collection::CalendarEventNotification,
"addressBook" => Collection::AddressBook,
"contactCard" => Collection::ContactCard,
"fileNode" => Collection::FileNode,

View file

@ -255,6 +255,24 @@ impl Keyword {
Keyword::Other(string) => Err(string),
}
}
pub fn try_from_id(id: usize) -> Result<Self, usize> {
match id {
SEEN => Ok(Keyword::Seen),
DRAFT => Ok(Keyword::Draft),
FLAGGED => Ok(Keyword::Flagged),
ANSWERED => Ok(Keyword::Answered),
RECENT => Ok(Keyword::Recent),
IMPORTANT => Ok(Keyword::Important),
PHISHING => Ok(Keyword::Phishing),
JUNK => Ok(Keyword::Junk),
NOTJUNK => Ok(Keyword::NotJunk),
DELETED => Ok(Keyword::Deleted),
FORWARDED => Ok(Keyword::Forwarded),
MDN_SENT => Ok(Keyword::MdnSent),
_ => Err(id),
}
}
}
impl ArchivedKeyword {

View file

@ -176,9 +176,10 @@ impl TryFrom<ShortId> for DataType {
type Error = ();
fn try_from(value: ShortId) -> Result<Self, Self::Error> {
const MAILBOX_CHANGE: u8 = u8::MAX - 1;
match value.0 {
0 => Ok(DataType::Email),
1 => Ok(DataType::Mailbox),
1 | MAILBOX_CHANGE => Ok(DataType::Mailbox),
2 => Ok(DataType::Thread),
3 => Ok(DataType::Identity),
4 => Ok(DataType::EmailSubmission),
@ -186,10 +187,9 @@ impl TryFrom<ShortId> for DataType {
6 => Ok(DataType::PushSubscription),
8 => Ok(DataType::Calendar),
9 => Ok(DataType::CalendarEvent),
10 => Ok(DataType::CalendarEventNotification),
11 => Ok(DataType::AddressBook),
12 => Ok(DataType::ContactCard),
13 => Ok(DataType::FileNode),
10 => Ok(DataType::AddressBook),
11 => Ok(DataType::ContactCard),
12 => Ok(DataType::FileNode),
_ => Err(()),
}
}

View file

@ -109,17 +109,6 @@ impl RequestHandler for Server {
ResponseMethod::ImportEmail(import_response) => {
// Add created ids
import_response.update_created_ids(&mut response);
// Publish state changes
if let Some(state_change) = import_response.state_change.take() {
self.broadcast_state_change(state_change).await;
}
}
ResponseMethod::Copy(copy_response) => {
// Publish state changes
if let Some(state_change) = copy_response.state_change.take() {
self.broadcast_state_change(state_change).await;
}
}
ResponseMethod::UploadBlob(upload_response) => {
// Add created blobIds

View file

@ -9,7 +9,7 @@ use std::ops::Range;
use common::{Server, auth::AccessToken};
use email::{
mailbox::cache::MessageMailboxCache,
message::cache::{MessageCache, MessageCacheAccess},
message::cache::{MessageCacheFetch, MessageCacheAccess},
};
use jmap_proto::types::{acl::Acl, blob::BlobId, collection::Collection};
use std::future::Future;

View file

@ -10,7 +10,7 @@ use jmap_proto::{
types::{collection::Collection, property::Property, state::State},
};
use std::future::Future;
use store::query::log::{Change, Query};
use store::query::log::{Change, Changes, Query};
pub trait ChangesLookup: Sync + Send {
fn changes(
@ -80,10 +80,8 @@ impl ChangesLookup for Server {
let (items_sent, mut changelog) = match &request.since_state {
State::Initial => {
let changelog = self
.store()
.changes(account_id, collection, Query::All)
.await?;
let changelog =
changes(self, account_id, collection, Query::All, &mut response).await?;
if changelog.changes.is_empty() && changelog.from_change_id == 0 {
return Ok(response);
}
@ -92,29 +90,35 @@ impl ChangesLookup for Server {
}
State::Exact(change_id) => (
0,
self.store()
.changes(account_id, collection, Query::Since(*change_id))
.await?,
changes(
self,
account_id,
collection,
Query::Since(*change_id),
&mut response,
)
.await?,
),
State::Intermediate(intermediate_state) => {
let mut changelog = self
.store()
.changes(
account_id,
collection,
Query::RangeInclusive(intermediate_state.from_id, intermediate_state.to_id),
)
.await?;
let mut changelog = changes(
self,
account_id,
collection,
Query::RangeInclusive(intermediate_state.from_id, intermediate_state.to_id),
&mut response,
)
.await?;
if intermediate_state.items_sent >= changelog.changes.len() {
(
0,
self.store()
.changes(
account_id,
collection,
Query::Since(intermediate_state.to_id),
)
.await?,
changes(
self,
account_id,
collection,
Query::Since(intermediate_state.to_id),
&mut response,
)
.await?,
)
} else {
changelog.changes.drain(
@ -133,19 +137,13 @@ impl ChangesLookup for Server {
response.has_more_changes = true;
};
let mut items_changed = false;
let total_changes = changelog.changes.len();
if total_changes > 0 {
for change in changelog.changes {
match change {
Change::Insert(item) => response.created.push(item.into()),
Change::Update(item) => {
items_changed = true;
response.updated.push(item.into())
}
Change::Update(item) => response.updated.push(item.into()),
Change::Delete(item) => response.destroyed.push(item.into()),
Change::ChildUpdate(item) => response.updated.push(item.into()),
};
}
}
@ -159,16 +157,53 @@ impl ChangesLookup for Server {
State::new_exact(changelog.to_change_id)
};
if !response.updated.is_empty() && !items_changed && collection == Collection::Mailbox {
response.updated_properties = vec![
Property::TotalEmails,
Property::UnreadEmails,
Property::TotalThreads,
Property::UnreadThreads,
]
.into()
}
Ok(response)
}
}
async fn changes(
server: &Server,
account_id: u32,
collection: Collection,
query: Query,
response: &mut ChangesResponse,
) -> trc::Result<Changes> {
let mut main_changes = server
.store()
.changes(account_id, collection, query)
.await?;
if matches!(collection, Collection::Mailbox) {
let child_changes = server
.store()
.changes(account_id, collection.as_child_update(), query)
.await?;
if !child_changes.changes.is_empty() {
if child_changes.from_change_id < main_changes.from_change_id {
main_changes.from_change_id = child_changes.from_change_id;
}
if child_changes.to_change_id > main_changes.to_change_id {
main_changes.to_change_id = child_changes.to_change_id;
}
let mut has_child_changes = false;
for change in child_changes.changes {
let id = change.id();
if !main_changes.changes.iter().any(|c| c.id() == id) {
main_changes.changes.push(change);
has_child_changes = true;
}
}
if has_child_changes {
response.updated_properties = vec![
Property::TotalEmails,
Property::UnreadEmails,
Property::TotalThreads,
Property::UnreadThreads,
]
.into();
}
}
}
Ok(main_changes)
}

View file

@ -9,7 +9,7 @@ use common::{Server, auth::AccessToken};
use email::{
mailbox::cache::{MailboxCacheAccess, MessageMailboxCache},
message::{
cache::{MessageCache, MessageCacheAccess},
cache::{MessageCacheAccess, MessageCacheFetch},
copy::EmailCopy,
},
};
@ -30,8 +30,6 @@ use jmap_proto::{
acl::Acl,
collection::Collection,
property::Property,
state::{State, StateChange},
type_state::DataType,
value::{MaybePatchValue, Value},
},
};
@ -77,7 +75,6 @@ impl JmapEmailCopy for Server {
old_state,
created: VecMap::with_capacity(request.create.len()),
not_created: VecMap::new(),
state_change: None,
};
let from_cached_messages = self
@ -208,7 +205,7 @@ impl JmapEmailCopy for Server {
// Verify that the mailboxIds are valid
for mailbox_id in &mailboxes {
if !cached_mailboxes.items.contains_key(mailbox_id) {
if !cached_mailboxes.has_id(mailbox_id) {
response.not_created.append(
id,
SetError::invalid_properties()
@ -257,13 +254,6 @@ impl JmapEmailCopy for Server {
// Update state
if !response.created.is_empty() {
response.new_state = self.get_state(account_id, Collection::Email).await?;
if let State::Exact(change_id) = &response.new_state {
response.state_change = StateChange::new(account_id)
.with_change(DataType::Email, *change_id)
.with_change(DataType::Mailbox, *change_id)
.with_change(DataType::Thread, *change_id)
.into()
}
}
// Destroy ids

View file

@ -9,10 +9,9 @@ use common::{Server, auth::AccessToken};
use email::{
mailbox::cache::MessageMailboxCache,
message::{
cache::{MessageCache, MessageCacheAccess},
cache::{MessageCacheAccess, MessageCacheFetch},
metadata::{
ArchivedGetHeader, ArchivedHeaderName, ArchivedMetadataPartType, MessageData,
MessageMetadata,
ArchivedGetHeader, ArchivedHeaderName, ArchivedMetadataPartType, MessageMetadata,
},
},
};
@ -126,7 +125,7 @@ impl EmailGet for Server {
.items
.iter()
.take(self.core.jmap.get_max_objects)
.map(|(document_id, item)| Id::from_parts(item.thread_id, *document_id))
.map(|item| Id::from_parts(item.thread_id, item.document_id))
.collect()
};
let mut response = GetResponse {
@ -178,19 +177,13 @@ impl EmailGet for Server {
.caused_by(trc::location!())?;
// Obtain message data
let data_ = match self
.get_archive(account_id, Collection::Email, id.document_id())
.await?
{
let data = match cached_messages.by_id(&id.document_id()) {
Some(data) => data,
None => {
response.not_found.push(id.into());
continue;
}
};
let data = data_
.unarchive::<MessageData>()
.caused_by(trc::location!())?;
// Retrieve raw message if needed
let blob_hash = BlobHash::from(&metadata.blob_hash);
@ -243,17 +236,14 @@ impl EmailGet for Server {
let mut obj = Object::with_capacity(data.mailboxes.len());
for id in data.mailboxes.iter() {
debug_assert!(id.uid != 0);
obj.append(
Property::_T(Id::from(u32::from(id.mailbox_id)).to_string()),
true,
);
obj.append(Property::_T(Id::from(id.mailbox_id).to_string()), true);
}
email.append(property.clone(), Value::Object(obj));
}
Property::Keywords => {
let mut obj = Object::with_capacity(data.keywords.len());
for keyword in data.keywords.iter() {
let mut obj = Object::with_capacity(2);
for keyword in cached_messages.expand_keywords(data) {
obj.append(Property::_T(keyword.to_string()), true);
}
email.append(property.clone(), Value::Object(obj));

View file

@ -13,14 +13,7 @@ use http_proto::HttpSessionData;
use jmap_proto::{
error::set::{SetError, SetErrorType},
method::import::{ImportEmailRequest, ImportEmailResponse},
types::{
acl::Acl,
collection::Collection,
id::Id,
property::Property,
state::{State, StateChange},
type_state::DataType,
},
types::{acl::Acl, collection::Collection, id::Id, property::Property, state::State},
};
use mail_parser::MessageParser;
use utils::map::vec_map::VecMap;
@ -69,7 +62,6 @@ impl EmailImport for Server {
old_state: old_state.into(),
created: VecMap::with_capacity(request.emails.len()),
not_created: VecMap::new(),
state_change: None,
};
let can_train_spam = self.email_bayes_can_train(access_token);
@ -91,7 +83,7 @@ impl EmailImport for Server {
continue;
}
for mailbox_id in &mailbox_ids {
if !cached_mailboxes.items.contains_key(mailbox_id) {
if !cached_mailboxes.has_id(mailbox_id) {
response.not_created.append(
id,
SetError::invalid_properties()
@ -174,13 +166,6 @@ impl EmailImport for Server {
// Update state
if !response.created.is_empty() {
response.new_state = self.get_state(account_id, Collection::Email).await?;
if let State::Exact(change_id) = &response.new_state {
response.state_change = StateChange::new(account_id)
.with_change(DataType::Email, *change_id)
.with_change(DataType::Mailbox, *change_id)
.with_change(DataType::Thread, *change_id)
.into()
}
}
Ok(response)

View file

@ -4,10 +4,10 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use common::{MessageItemCache, MessageStoreCache, Server, auth::AccessToken};
use common::{MessageStoreCache, Server, auth::AccessToken};
use email::{
mailbox::cache::MessageMailboxCache,
message::cache::{MessageCache, MessageCacheAccess},
message::cache::{MessageCacheAccess, MessageCacheFetch},
};
use jmap_proto::{
method::query::{Comparator, Filter, QueryRequest, QueryResponse, SortProperty},
@ -191,7 +191,7 @@ impl EmailQuery for Server {
filters.push(query::Filter::is_in_set(RoaringBitmap::from_iter(
cached_messages
.in_mailbox(mailbox.document_id())
.map(|(id, _)| *id),
.map(|item| item.document_id),
)))
}
Filter::InMailboxOtherThan(mailboxes) => {
@ -201,7 +201,7 @@ impl EmailQuery for Server {
filters.push(query::Filter::is_in_set(RoaringBitmap::from_iter(
cached_messages
.in_mailbox(mailbox.document_id())
.map(|(id, _)| *id),
.map(|item| item.document_id),
)));
}
filters.push(query::Filter::End);
@ -244,13 +244,17 @@ impl EmailQuery for Server {
}
Filter::HasKeyword(keyword) => {
filters.push(query::Filter::is_in_set(RoaringBitmap::from_iter(
cached_messages.with_keyword(&keyword).map(|(id, _)| *id),
cached_messages
.with_keyword(&keyword)
.map(|item| item.document_id),
)));
}
Filter::NotKeyword(keyword) => {
filters.push(query::Filter::Not);
filters.push(query::Filter::is_in_set(RoaringBitmap::from_iter(
cached_messages.with_keyword(&keyword).map(|(id, _)| *id),
cached_messages
.with_keyword(&keyword)
.map(|item| item.document_id),
)));
filters.push(query::Filter::End);
}
@ -282,7 +286,7 @@ impl EmailQuery for Server {
filters.push(query::Filter::is_in_set(RoaringBitmap::from_iter(
cached_messages
.in_thread(id.document_id())
.map(|(id, _)| *id),
.map(|item| item.document_id),
)))
}
Filter::And | Filter::Or | Filter::Not | Filter::Close => {
@ -344,7 +348,7 @@ impl EmailQuery for Server {
RoaringBitmap::from_iter(
cached_messages
.with_keyword(&comparator.keyword.unwrap_or(Keyword::Seen))
.map(|(id, _)| *id),
.map(|item| item.document_id),
),
comparator.is_ascending,
),
@ -387,7 +391,7 @@ impl EmailQuery for Server {
&cache
.items
.iter()
.map(|(id, item)| (*id, item.thread_id))
.map(|item| (item.document_id, item.thread_id))
.collect(),
)
.with_prefix_unique(request.arguments.collapse_threads.unwrap_or(false)),
@ -400,12 +404,9 @@ impl EmailQuery for Server {
}
}
fn thread_keywords(
cache: &MessageStoreCache<MessageItemCache>,
keyword: Keyword,
match_all: bool,
) -> RoaringBitmap {
let keyword_doc_ids = RoaringBitmap::from_iter(cache.with_keyword(&keyword).map(|(id, _)| *id));
fn thread_keywords(cache: &MessageStoreCache, keyword: Keyword, match_all: bool) -> RoaringBitmap {
let keyword_doc_ids =
RoaringBitmap::from_iter(cache.with_keyword(&keyword).map(|item| item.document_id));
if keyword_doc_ids.is_empty() {
return keyword_doc_ids;
}
@ -414,14 +415,15 @@ fn thread_keywords(
let mut thread_map: AHashMap<u32, RoaringBitmap> = AHashMap::new();
for (&document_id, item) in &cache.items {
for item in &cache.items {
thread_map
.entry(item.thread_id)
.or_default()
.insert(document_id);
.insert(item.document_id);
}
for (&keyword_doc_id, item) in &cache.items {
for item in &cache.items {
let keyword_doc_id = item.document_id;
if !keyword_doc_ids.contains(keyword_doc_id)
|| matched_ids.contains(keyword_doc_id)
|| not_matched_ids.contains(keyword_doc_id)

View file

@ -13,7 +13,7 @@ use email::{
cache::{MailboxCacheAccess, MessageMailboxCache},
},
message::{
cache::{MessageCache, MessageCacheAccess},
cache::{MessageCacheAccess, MessageCacheFetch},
delete::EmailDeletion,
ingest::{EmailIngest, IngestEmail, IngestSource},
metadata::MessageData,
@ -662,7 +662,7 @@ impl EmailSet for Server {
// Verify that the mailboxIds are valid
for mailbox_id in &mailboxes {
if !cached_mailboxes.items.contains_key(mailbox_id) {
if !cached_mailboxes.has_id(mailbox_id) {
response.not_created.append(
id,
SetError::invalid_properties()
@ -878,7 +878,7 @@ impl EmailSet for Server {
// Make sure all new mailboxIds are valid
for mailbox_id in new_data.added_mailboxes(data.inner) {
if cached_mailboxes.items.contains_key(&mailbox_id.mailbox_id) {
if cached_mailboxes.has_id(&mailbox_id.mailbox_id) {
// Verify permissions on shared accounts
if !matches!(&can_add_mailbox_ids, Some(ids) if !ids.contains(mailbox_id.mailbox_id))
{
@ -958,7 +958,7 @@ impl EmailSet for Server {
if !batch.is_empty() {
// Log mailbox changes
for parent_id in changed_mailboxes {
batch.log_child_update(Collection::Mailbox, parent_id);
batch.log_parent_update(Collection::Mailbox.as_child_update(), parent_id);
}
match self.commit_batch(batch).await {

View file

@ -8,7 +8,7 @@ use common::{Server, auth::AccessToken};
use email::{
mailbox::cache::MessageMailboxCache,
message::{
cache::{MessageCache, MessageCacheAccess},
cache::{MessageCacheFetch, MessageCacheAccess},
metadata::{
ArchivedGetHeader, ArchivedHeaderName, ArchivedMetadataPartType, DecodedPartContent,
MessageMetadata,

View file

@ -7,7 +7,7 @@
use common::{Server, auth::AccessToken, sharing::EffectiveAcl};
use email::{
mailbox::cache::{MailboxCacheAccess, MessageMailboxCache},
message::cache::{MessageCache, MessageCacheAccess},
message::cache::{MessageCacheAccess, MessageCacheFetch},
};
use jmap_proto::{
method::get::{GetRequest, GetResponse, RequestArguments},
@ -63,7 +63,7 @@ impl MailboxGet for Server {
ids
} else {
mailbox_cache
.items
.index
.keys()
.filter(|id| shared_ids.as_ref().is_none_or(|ids| ids.contains(**id)))
.copied()
@ -82,7 +82,7 @@ impl MailboxGet for Server {
// Obtain the mailbox object
let document_id = id.document_id();
let cached_mailbox = if let Some(mailbox) =
mailbox_cache.items.get(&document_id).filter(|_| {
mailbox_cache.by_id(&document_id).filter(|_| {
shared_ids
.as_ref()
.is_none_or(|ids| ids.contains(document_id))
@ -125,14 +125,14 @@ impl MailboxGet for Server {
Property::TotalThreads => Value::UnsignedInt(
message_cache
.in_mailbox(document_id)
.map(|(_, m)| m.thread_id)
.map(|m| m.thread_id)
.collect::<AHashSet<_>>()
.len() as u64,
),
Property::UnreadThreads => Value::UnsignedInt(
message_cache
.in_mailbox_without_keyword(document_id, &Keyword::Seen)
.map(|(_, m)| m.thread_id)
.map(|m| m.thread_id)
.collect::<AHashSet<_>>()
.len() as u64,
),

View file

@ -50,8 +50,8 @@ impl MailboxQuery for Server {
mailboxes
.items
.iter()
.filter(|(_, mailbox)| mailbox.parent_id == parent_id)
.map(|(id, _)| id)
.filter(|mailbox| mailbox.parent_id == parent_id)
.map(|m| m.document_id)
.collect::<RoaringBitmap>(),
));
}
@ -68,8 +68,8 @@ impl MailboxQuery for Server {
mailboxes
.items
.iter()
.filter(|(_, mailbox)| mailbox.name.to_lowercase().contains(&name))
.map(|(id, _)| id)
.filter(|mailbox| mailbox.name.to_lowercase().contains(&name))
.map(|m| m.document_id)
.collect::<RoaringBitmap>(),
));
}
@ -79,10 +79,8 @@ impl MailboxQuery for Server {
mailboxes
.items
.iter()
.filter(|(_, mailbox)| {
mailbox.role.as_str().is_some_and(|r| r == role)
})
.map(|(id, _)| id)
.filter(|mailbox| mailbox.role.as_str().is_some_and(|r| r == role))
.map(|m| m.document_id)
.collect::<RoaringBitmap>(),
));
} else {
@ -91,8 +89,8 @@ impl MailboxQuery for Server {
mailboxes
.items
.iter()
.filter(|(_, mailbox)| matches!(mailbox.role, SpecialUse::None))
.map(|(id, _)| id)
.filter(|mailbox| matches!(mailbox.role, SpecialUse::None))
.map(|m| m.document_id)
.collect::<RoaringBitmap>(),
));
filters.push(query::Filter::End);
@ -106,8 +104,8 @@ impl MailboxQuery for Server {
mailboxes
.items
.iter()
.filter(|(_, mailbox)| !matches!(mailbox.role, SpecialUse::None))
.map(|(id, _)| id)
.filter(|mailbox| !matches!(mailbox.role, SpecialUse::None))
.map(|m| m.document_id)
.collect::<RoaringBitmap>(),
));
if !has_role {
@ -122,10 +120,10 @@ impl MailboxQuery for Server {
mailboxes
.items
.iter()
.filter(|(_, mailbox)| {
.filter(|mailbox| {
mailbox.subscribers.contains(&access_token.primary_id)
})
.map(|(id, _)| id)
.map(|m| m.document_id)
.collect::<RoaringBitmap>(),
));
if !is_subscribed {
@ -159,7 +157,7 @@ impl MailboxQuery for Server {
for document_id in &result_set.results {
let mut check_id = document_id;
for _ in 0..self.core.jmap.mailbox_max_depth {
if let Some(mailbox) = mailboxes.items.get(&check_id) {
if let Some(mailbox) = mailboxes.by_id(&check_id) {
if let Some(parent_id) = mailbox.parent_id() {
if result_set.results.contains(parent_id) {
check_id = parent_id;
@ -194,7 +192,7 @@ impl MailboxQuery for Server {
let sorted_list = mailboxes
.items
.iter()
.map(|(id, mailbox)| (mailbox.path.as_str(), *id))
.map(|mailbox| (mailbox.path.as_str(), mailbox.document_id))
.collect::<BTreeMap<_, _>>();
comparators.push(query::Comparator::sorted_list(
sorted_list.into_values().collect(),
@ -213,7 +211,7 @@ impl MailboxQuery for Server {
let sorted_list = mailboxes
.items
.iter()
.map(|(id, mailbox)| (mailbox.name.as_str(), *id))
.map(|mailbox| (mailbox.name.as_str(), mailbox.document_id))
.collect::<BTreeSet<_>>();
query::Comparator::sorted_list(
@ -225,7 +223,7 @@ impl MailboxQuery for Server {
let sorted_list = mailboxes
.items
.iter()
.map(|(id, mailbox)| (mailbox.sort_order, *id))
.map(|mailbox| (mailbox.sort_order, mailbox.document_id))
.collect::<BTreeSet<_>>();
query::Comparator::sorted_list(
@ -237,10 +235,10 @@ impl MailboxQuery for Server {
let sorted_list = mailboxes
.items
.iter()
.map(|(id, mailbox)| {
.map(|mailbox| {
(
mailbox.parent_id().map(|id| id + 1).unwrap_or_default(),
*id,
mailbox.document_id,
)
})
.collect::<BTreeSet<_>>();

View file

@ -83,7 +83,7 @@ impl MailboxSet for Server {
.prepare_set_response(&request, Collection::Mailbox)
.await?,
mailbox_ids: RoaringBitmap::from_iter(
self.get_cached_mailboxes(account_id).await?.items.keys(),
self.get_cached_mailboxes(account_id).await?.index.keys(),
),
will_destroy: request.unwrap_destroy(),
};
@ -469,7 +469,7 @@ impl MailboxSet for Server {
if update
.as_ref()
.is_none_or(|(_, m)| m.inner.name != changes.name)
&& cached_mailboxes.items.iter().any(|(_, m)| {
&& cached_mailboxes.items.iter().any(|m| {
m.name.to_lowercase() == lower_name
&& m.parent_id().map_or(0, |id| id + 1) == changes.parent_id
})

View file

@ -5,7 +5,7 @@
*/
use common::Server;
use email::message::cache::MessageCache;
use email::message::cache::MessageCacheFetch;
use jmap_proto::{
method::get::{GetRequest, GetResponse, RequestArguments},
types::{collection::Collection, id::Id, property::Property, value::Object},
@ -34,7 +34,7 @@ impl ThreadGet for Server {
) -> trc::Result<GetResponse> {
let account_id = request.account_id.document_id();
let mut thread_map: AHashMap<u32, RoaringBitmap> = AHashMap::with_capacity(32);
for (document_id, item) in &self
for item in &self
.get_cached_messages(account_id)
.await
.caused_by(trc::location!())?
@ -43,7 +43,7 @@ impl ThreadGet for Server {
thread_map
.entry(item.thread_id)
.or_default()
.insert(*document_id);
.insert(item.document_id);
}
let ids = if let Some(ids) = request.unwrap_ids(self.core.jmap.get_max_objects)? {

View file

@ -12,7 +12,7 @@ use email::{
INBOX_ID,
cache::{MailboxCacheAccess, MessageMailboxCache},
},
message::cache::MessageCache,
message::cache::MessageCacheFetch,
};
use jmap_proto::types::{collection::Collection, property::Property};
use store::{
@ -59,7 +59,7 @@ impl<T: SessionStream> Session<T> {
.caused_by(trc::location!())?;
let uid_validity = mailbox_cache
.by_role(&SpecialUse::Inbox)
.map(|x| x.1.uid_validity)
.map(|x| x.uid_validity)
.unwrap_or_default();
// Obtain message sizes
@ -88,7 +88,7 @@ impl<T: SessionStream> Session<T> {
.no_values(),
|key, _| {
let document_id = key.deserialize_be_u32(key.len() - U32_LEN)?;
if mailbox_cache.items.contains_key(&document_id) {
if mailbox_cache.has_id(&document_id) {
message_sizes.insert(
document_id,
key.deserialize_be_u32(key.len() - (U32_LEN * 2))?,
@ -105,11 +105,12 @@ impl<T: SessionStream> Session<T> {
let message_map = message_cache
.items
.iter()
.filter_map(|(document_id, m)| {
m.mailboxes
.filter_map(|message| {
message
.mailboxes
.iter()
.find(|m| m.mailbox_id == INBOX_ID)
.map(|m| (m.uid, *document_id))
.map(|m| (m.uid, message.document_id))
})
.collect::<BTreeMap<u32, u32>>();

View file

@ -13,7 +13,6 @@ use crate::{IterateParams, LogKey, Store, U64_LEN, write::key::DeserializeBigEnd
pub enum Change {
Insert(u64),
Update(u64),
ChildUpdate(u64),
Delete(u64),
}
@ -24,7 +23,7 @@ pub struct Changes {
pub to_change_id: u64,
}
#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
pub enum Query {
All,
Since(u64),
@ -144,7 +143,6 @@ impl Changes {
let mut bytes_it = bytes.iter();
let total_inserts: usize = bytes_it.next_leb128()?;
let total_updates: usize = bytes_it.next_leb128()?;
let total_child_updates: usize = bytes_it.next_leb128()?;
let total_deletes: usize = bytes_it.next_leb128()?;
if total_inserts > 0 {
@ -153,10 +151,9 @@ impl Changes {
}
}
if total_updates > 0 || total_child_updates > 0 {
'update_outer: for change_pos in 0..(total_updates + total_child_updates) {
if total_updates > 0 {
'update_outer: for _ in 0..total_updates {
let id = bytes_it.next_leb128()?;
let mut is_child_update = change_pos >= total_updates;
for (idx, change) in self.changes.iter().enumerate() {
match change {
@ -165,12 +162,6 @@ impl Changes {
continue 'update_outer;
}
Change::Update(update_id) if *update_id == id => {
// Move update to the front
is_child_update = false;
self.changes.remove(idx);
break;
}
Change::ChildUpdate(update_id) if *update_id == id => {
// Move update to the front
self.changes.remove(idx);
break;
@ -179,11 +170,7 @@ impl Changes {
}
}
self.changes.push(if !is_child_update {
Change::Update(id)
} else {
Change::ChildUpdate(id)
});
self.changes.push(Change::Update(id));
}
}
@ -197,9 +184,7 @@ impl Changes {
self.changes.remove(idx);
continue 'delete_outer;
}
Change::Update(update_id) | Change::ChildUpdate(update_id)
if *update_id == id =>
{
Change::Update(update_id) if *update_id == id => {
self.changes.remove(idx);
break 'delete_inner;
}
@ -220,7 +205,6 @@ impl Change {
match self {
Change::Insert(id) => *id,
Change::Update(id) => *id,
Change::ChildUpdate(id) => *id,
Change::Delete(id) => *id,
}
}
@ -229,7 +213,6 @@ impl Change {
match self {
Change::Insert(id) => id,
Change::Update(id) => id,
Change::ChildUpdate(id) => id,
Change::Delete(id) => id,
}
}

View file

@ -301,13 +301,13 @@ impl BatchBuilder {
self
}
pub fn log_child_update(&mut self, collection: impl Into<u8>, parent_id: u32) -> &mut Self {
pub fn log_parent_update(&mut self, collection: impl Into<u8>, parent_id: u32) -> &mut Self {
let collection = collection.into();
if let Some(account_id) = self.current_account_id {
self.changes
.get_mut_or_insert(account_id)
.log_child_update(collection, None, parent_id);
.log_update(collection, None, parent_id);
}
if self.current_change_id.is_none() {
self.generate_change_id();
@ -325,7 +325,11 @@ impl BatchBuilder {
for (collection, set) in changelog.serialize() {
let cc = self.changed_collections.get_mut_or_insert(account_id);
cc.0 = change_id;
cc.1.insert(ShortId(collection));
if collection < 64 {
cc.1.insert(ShortId(collection));
} else {
cc.1.insert(ShortId(u8::MAX - collection));
}
self.ops.push(Operation::Log {
change_id,

View file

@ -19,7 +19,6 @@ pub struct Changes {
pub inserts: AHashSet<u64>,
pub updates: AHashSet<u64>,
pub deletes: AHashSet<u64>,
pub child_updates: AHashSet<u64>,
}
impl ChangeLogBuilder {
@ -49,18 +48,6 @@ impl ChangeLogBuilder {
changes.updates.remove(&id);
changes.deletes.insert(id);
}
pub fn log_child_update(
&mut self,
collection: impl Into<u8>,
prefix: Option<u32>,
document_id: u32,
) {
self.changes
.get_mut_or_insert(collection.into())
.child_updates
.insert(build_id(prefix, document_id));
}
}
#[inline(always)]
@ -95,17 +82,6 @@ impl Changes {
}
}
pub fn child_update<T, I>(id: T) -> Self
where
T: IntoIterator<Item = I>,
I: Into<u64>,
{
Changes {
child_updates: id.into_iter().map(Into::into).collect(),
..Default::default()
}
}
pub fn delete<T, I>(id: T) -> Self
where
T: IntoIterator<Item = I>,
@ -121,25 +97,15 @@ impl Changes {
impl SerializeInfallible for Changes {
fn serialize(&self) -> Vec<u8> {
let mut buf = Vec::with_capacity(
1 + (self.inserts.len()
+ self.updates.len()
+ self.child_updates.len()
+ self.deletes.len()
+ 4)
1 + (self.inserts.len() + self.updates.len() + self.deletes.len() + 4)
* std::mem::size_of::<usize>(),
);
buf.push_leb128(self.inserts.len());
buf.push_leb128(self.updates.len());
buf.push_leb128(self.child_updates.len());
buf.push_leb128(self.deletes.len());
for list in [
&self.inserts,
&self.updates,
&self.child_updates,
&self.deletes,
] {
for list in [&self.inserts, &self.updates, &self.deletes] {
for id in list {
buf.push_leb128(*id);
}

View file

@ -22,6 +22,7 @@ use std::ops::Deref;
Hash,
)]
#[rkyv(compare(PartialEq), derive(Debug))]
#[repr(transparent)]
pub struct Bitmap<T: BitmapItem> {
pub bitmap: u64,
#[serde(skip)]

View file

@ -8,7 +8,7 @@ use std::time::Duration;
use email::{
mailbox::{INBOX_ID, JUNK_ID},
message::cache::{MessageCache, MessageCacheAccess},
message::cache::{MessageCacheAccess, MessageCacheFetch},
};
use jmap_proto::types::{collection::Collection, id::Id};

View file

@ -148,7 +148,7 @@ pub async fn test(params: &mut JMAPTest) {
batch.update_document(id as u32).log_delete(None);
}
LogAction::UpdateChild(id) => {
batch.log_child_update(Collection::Email, id as u32);
batch.log_parent_update(Collection::Email, id as u32);
}
LogAction::Move(old_id, new_id) => {
batch

View file

@ -11,7 +11,7 @@ use crate::{
store::{deflate_test_resource, query::FIELDS},
};
use ::email::{mailbox::Mailbox, message::cache::MessageCache};
use ::email::{mailbox::Mailbox, message::cache::MessageCacheFetch};
use ahash::AHashSet;
use common::{config::jmap::settings::SpecialUse, storage::index::ObjectIndexBuilder};
use jmap_client::{
@ -19,12 +19,12 @@ use jmap_client::{
core::query::{Comparator, Filter},
email,
};
use jmap_proto::types::{collection::Collection, id::Id, property::Property};
use jmap_proto::types::{collection::Collection, id::Id};
use mail_parser::{DateTime, HeaderName};
use store::{
ahash::AHashMap,
write::{BatchBuilder, ValueClass, now},
write::{BatchBuilder, now},
};
use super::JMAPTest;
@ -73,24 +73,6 @@ pub async fn test(params: &mut JMAPTest, insert: bool) {
println!("Inserting JMAP Mail query test messages...");
create(client).await;
// Remove mailboxes
let mut batch = BatchBuilder::new();
batch
.with_account_id(account_id)
.with_collection(Collection::Mailbox);
for mailbox_id in 1545..3010 {
batch
.delete_document(mailbox_id)
.clear(ValueClass::Property(Property::EmailIds.into()));
}
server
.core
.storage
.data
.write(batch.build_all())
.await
.unwrap();
assert_eq!(
params
.server
@ -98,7 +80,7 @@ pub async fn test(params: &mut JMAPTest, insert: bool) {
.await
.unwrap()
.items
.values()
.iter()
.map(|m| m.thread_id)
.collect::<AHashSet<_>>()
.len(),
@ -797,6 +779,27 @@ pub async fn create(client: &mut Client) {
total_messages += 1;
let mut keywords = Vec::new();
for keyword in [
values_str["medium"].to_string(),
values_str["artistRole"].to_string(),
values_str["accession_number"][0..1].to_string(),
format!(
"N{}",
&values_str["accession_number"][values_str["accession_number"].len() - 1..]
),
] {
if keyword == "attributed to"
|| keyword == "T"
|| keyword == "N0"
|| keyword == "N"
|| keyword == "artist"
|| keyword == "Bronze"
{
keywords.push(keyword);
}
}
client
.email_import(
format!(
@ -821,15 +824,7 @@ pub async fn create(client: &mut Client) {
Id::new(values_int["year"] as u64).to_string(),
Id::new((values_int["acquisitionYear"] + 1000) as u64).to_string(),
],
[
values_str["medium"].to_string(),
values_str["artistRole"].to_string(),
values_str["accession_number"][0..1].to_string(),
format!(
"N{}",
&values_str["accession_number"][values_str["accession_number"].len() - 1..]
),
]
keywords
.into(),
Some(values_int["year"] as i64),
)

View file

@ -373,8 +373,8 @@ pub async fn jmap_tests() {
)
.await;
webhooks::test(&mut params).await;
//email_query::test(&mut params, delete).await;
/*webhooks::test(&mut params).await;
email_query::test(&mut params, delete).await;
email_get::test(&mut params).await;
email_set::test(&mut params).await;
email_parse::test(&mut params).await;
@ -382,8 +382,8 @@ pub async fn jmap_tests() {
email_changes::test(&mut params).await;
email_query_changes::test(&mut params).await;
email_copy::test(&mut params).await;
thread_get::test(&mut params).await;
//thread_merge::test(&mut params).await;
thread_get::test(&mut params).await;*/
thread_merge::test(&mut params).await;
mailbox::test(&mut params).await;
delivery::test(&mut params).await;
auth_acl::test(&mut params).await;

View file

@ -10,7 +10,7 @@ use directory::{QueryBy, backend::internal::manage::ManageDirectory};
use email::{
mailbox::{INBOX_ID, JUNK_ID, TRASH_ID},
message::{
cache::{MessageCache, MessageCacheAccess},
cache::{MessageCacheAccess, MessageCacheFetch},
delete::EmailDeletion,
},
};

View file

@ -10,7 +10,7 @@ use crate::jmap::{mailbox::destroy_all_mailboxes_no_wait, wait_for_index};
use common::Server;
use directory::backend::internal::manage::ManageDirectory;
use email::message::{
cache::{MessageCache, MessageCacheAccess},
cache::{MessageCacheAccess, MessageCacheFetch},
metadata::MessageData,
};
use futures::future::join_all;
@ -223,7 +223,7 @@ async fn email_tests(server: Server, client: Arc<Client>) {
.await
.unwrap()
.in_mailbox(mailbox_id)
.map(|(id, _)| id),
.map(|m| m.document_id),
);
let mut email_ids_check = email_ids_in_mailbox.clone();
email_ids_check &= &email_ids;