ThreadID caching

This commit is contained in:
mdecimus 2024-03-04 17:43:02 +01:00
parent cf9a16c462
commit 31bc716a5f
6 changed files with 203 additions and 77 deletions

View file

@ -1,7 +1,4 @@
use std::{
collections::BTreeMap,
sync::{atomic::Ordering, Arc},
};
use std::{collections::BTreeMap, sync::atomic::Ordering};
use ahash::AHashMap;
use directory::QueryBy;
@ -18,7 +15,9 @@ use parking_lot::Mutex;
use store::query::log::{Change, Query};
use utils::listener::{limiter::InFlight, SessionStream};
use super::{Account, AccountId, Mailbox, MailboxId, MailboxSync, Session, SessionData};
use super::{
Account, AccountId, CachedItem, Mailbox, MailboxId, MailboxSync, Session, SessionData,
};
impl<T: SessionStream> SessionData<T> {
pub async fn new(
@ -105,12 +104,12 @@ impl<T: SessionStream> SessionData<T> {
primary_id: access_token.primary_id(),
})
.or_insert_with(|| {
Arc::new(tokio::sync::Mutex::new(Account {
CachedItem::new(Account {
account_id: u32::MAX,
..Default::default()
}))
})
});
let mut cached_account = cached_account_.lock().await;
let mut cached_account = cached_account_.get().await;
if cached_account.state_mailbox == state_mailbox
&& cached_account.state_email == state_email
&& cached_account.account_id != u32::MAX
@ -417,6 +416,7 @@ impl<T: SessionStream> SessionData<T> {
StatusResponse::database_failure()
},
)?;
let state_mailbox = Some(changelog.to_change_id);
for account in self.mailboxes.lock().iter_mut() {
if account.account_id == account_id {
account.mailbox_state.values_mut().for_each(|v| {
@ -426,11 +426,32 @@ impl<T: SessionStream> SessionData<T> {
v.size = None;
v.uid_next = None;
});
account.state_mailbox = changelog.to_change_id.into();
account.state_mailbox = state_mailbox;
account.state_email = state_email;
break;
}
}
// Update cache
if let Some(cached_account) = self.imap.cache_account.get(&AccountId {
account_id,
primary_id: access_token.primary_id(),
}) {
let mut cached_account = cached_account.get().await;
if cached_account.state_mailbox != state_mailbox
|| cached_account.state_email != state_email
{
cached_account.mailbox_state.values_mut().for_each(|v| {
v.total_deleted = None;
v.total_unseen = None;
v.total_messages = None;
v.size = None;
v.uid_next = None;
});
cached_account.state_mailbox = state_mailbox;
cached_account.state_email = state_email;
}
}
} else {
// Refresh mailboxes for changed account
let mailbox_prefix = if !access_token.is_primary_id(account_id) {

View file

@ -21,7 +21,7 @@
* for more details.
*/
use std::{collections::BTreeMap, sync::Arc};
use std::collections::BTreeMap;
use ahash::AHashMap;
use imap_proto::{
@ -41,7 +41,9 @@ use utils::listener::SessionStream;
use crate::core::ImapId;
use super::{Mailbox, MailboxId, MailboxState, NextMailboxState, SelectedMailbox, SessionData};
use super::{
CachedItem, Mailbox, MailboxId, MailboxState, NextMailboxState, SelectedMailbox, SessionData,
};
pub(crate) const MAX_RETRIES: usize = 10;
@ -374,10 +376,9 @@ impl<T: SessionStream> SessionData<T> {
current_state.id_to_imap = id_to_imap;
// Update cache
self.imap.cache_mailbox.insert(
mailbox.id,
Arc::new(tokio::sync::Mutex::new(new_state.clone())),
);
self.imap
.cache_mailbox
.insert(mailbox.id, CachedItem::new(new_state.clone()));
// Update state
current_state.modseq = new_state.modseq;

View file

@ -24,11 +24,14 @@
use std::{
collections::BTreeMap,
net::IpAddr,
sync::{atomic::AtomicU32, Arc},
sync::{
atomic::{AtomicU32, AtomicU64},
Arc,
},
time::Duration,
};
use ahash::AHashMap;
use ahash::{AHashMap, AHashSet};
use dashmap::DashMap;
use imap_proto::{
protocol::{list::Attribute, ProtocolVersion},
@ -39,7 +42,7 @@ use jmap::{
auth::{rate_limit::ConcurrencyLimiters, AccessToken},
JMAP,
};
use store::roaring::RoaringBitmap;
use store::{roaring::RoaringBitmap, write::now};
use tokio::{
io::{ReadHalf, WriteHalf},
sync::watch,
@ -84,8 +87,18 @@ pub struct IMAP {
pub rate_requests: Rate,
pub rate_concurrent: u64,
pub cache_mailbox: DashMap<MailboxId, Arc<tokio::sync::Mutex<MailboxState>>>,
pub cache_account: DashMap<AccountId, Arc<tokio::sync::Mutex<Account>>>,
pub cache_account: DashMap<AccountId, CachedItem<Account>>,
pub cache_account_expiry: u64,
pub cache_mailbox: DashMap<MailboxId, CachedItem<MailboxState>>,
pub cache_mailbox_expiry: u64,
pub cache_threads: DashMap<u32, CachedItem<Threads>>,
pub cache_threads_expiry: u64,
}
#[derive(Clone)]
pub struct CachedItem<T> {
last_access: Arc<AtomicU64>,
item: Arc<tokio::sync::Mutex<T>>,
}
pub struct Session<T: SessionStream> {
@ -160,7 +173,7 @@ pub struct AccountId {
pub primary_id: u32,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub struct MailboxState {
pub uid_next: u32,
pub uid_validity: u32,
@ -185,6 +198,12 @@ pub struct MailboxSync {
pub deleted: Vec<String>,
}
#[derive(Debug, Default)]
pub struct Threads {
pub threads: AHashMap<u32, u32>,
pub modseq: Option<u64>,
}
pub enum SavedSearch {
InFlight {
rx: watch::Receiver<Arc<Vec<ImapId>>>,
@ -265,3 +284,23 @@ impl<T: SessionStream> SessionData<T> {
}
}
}
impl<T> CachedItem<T> {
pub fn new(item: T) -> Self {
Self {
last_access: Arc::new(AtomicU64::new(now())),
item: Arc::new(tokio::sync::Mutex::new(item)),
}
}
pub async fn get(&self) -> tokio::sync::MutexGuard<'_, T> {
let lock = self.item.lock().await;
self.last_access
.store(now(), std::sync::atomic::Ordering::Relaxed);
lock
}
pub fn last_access(&self) -> u64 {
self.last_access.load(std::sync::atomic::Ordering::Relaxed)
}
}

View file

@ -21,12 +21,14 @@
* for more details.
*/
use std::{collections::hash_map::RandomState, sync::Arc};
use core::mailbox;
use std::{collections::hash_map::RandomState, sync::Arc, time::Duration};
use crate::core::IMAP;
use dashmap::DashMap;
use imap_proto::{protocol::capability::Capability, ResponseCode, StatusResponse};
use store::write::now;
use utils::config::Config;
pub mod core;
@ -88,8 +90,34 @@ impl IMAP {
RandomState::default(),
shard_amount,
),
cache_threads: DashMap::with_capacity_and_hasher_and_shard_amount(
config.property("imap.cache.thread.size")?.unwrap_or(2048),
RandomState::default(),
shard_amount,
),
cache_account_expiry: config
.property_or_static::<Duration>("imap.cache.account.expiry", "1h")?
.as_secs(),
cache_mailbox_expiry: config
.property_or_static::<Duration>("imap.cache.mailbox.expiry", "1h")?
.as_secs(),
cache_threads_expiry: config
.property_or_static::<Duration>("imap.cache.thread.expiry", "1h")?
.as_secs(),
}))
}
pub fn purge(&self) {
let account_expiry = now() - self.cache_account_expiry;
let mailbox_expiry = now() - self.cache_mailbox_expiry;
let thread_expiry = now() - self.cache_threads_expiry;
self.cache_account
.retain(|_, item| item.last_access() > account_expiry);
self.cache_mailbox
.retain(|_, item| item.last_access() > mailbox_expiry);
self.cache_threads
.retain(|_, item| item.last_access() > thread_expiry);
}
}
pub struct ImapError;

View file

@ -37,7 +37,7 @@ use imap_proto::{
use jmap_proto::types::id::Id;
use utils::listener::SessionStream;
use crate::core::{SavedSearch, SelectedMailbox, Session, State};
use crate::core::{CachedItem, MailboxState, SavedSearch, SelectedMailbox, Session, State};
use super::ToModSeq;
@ -58,21 +58,21 @@ impl<T: SessionStream> Session<T> {
if let Some(mailbox) = data.get_mailbox_by_name(&arguments.mailbox_name) {
// Try obtaining the mailbox from the cache
let state = if let Some(cached_state) = self.imap.cache_mailbox.get(&mailbox) {
let modseq = match data.get_modseq(mailbox.account_id).await {
Ok(modseq) => modseq,
Err(mut response) => {
response.tag = arguments.tag.into();
return self.write_bytes(response.into_bytes()).await;
}
};
let state = {
let cached_state_ = self
.imap
.cache_mailbox
.entry(mailbox)
.or_insert_with(|| CachedItem::new(MailboxState::default()));
let mut cached_state = cached_state_.get().await;
let is_cache_miss =
cached_state.uid_validity == 0 && cached_state.uid_max == 0;
let mut modseq = None;
// Refresh the mailbox if the modseq has changed
let mut cached_state_ = cached_state.lock().await;
if cached_state_.modseq != modseq {
match data.fetch_messages(&mailbox).await {
Ok(new_state) => {
*cached_state_ = new_state;
if !is_cache_miss {
match data.get_modseq(mailbox.account_id).await {
Ok(modseq_) => {
modseq = modseq_;
}
Err(mut response) => {
response.tag = arguments.tag.into();
@ -81,23 +81,20 @@ impl<T: SessionStream> Session<T> {
}
}
(*cached_state_).clone()
} else {
// Synchronize messages
match data.fetch_messages(&mailbox).await {
Ok(state) => {
let todo = "cache cleanup";
self.imap.cache_mailbox.insert(
mailbox,
Arc::new(tokio::sync::Mutex::new(state.clone())),
);
state
}
Err(mut response) => {
response.tag = arguments.tag.into();
return self.write_bytes(response.into_bytes()).await;
// Refresh the mailbox if the modseq has changed or if it's a cache miss
if is_cache_miss || cached_state.modseq != modseq {
match data.fetch_messages(&mailbox).await {
Ok(new_state) => {
*cached_state = new_state;
}
Err(mut response) => {
response.tag = arguments.tag.into();
return self.write_bytes(response.into_bytes()).await;
}
}
}
(*cached_state).clone()
};
// Synchronize messages

View file

@ -37,7 +37,7 @@ use jmap_proto::types::{collection::Collection, property::Property};
use store::{write::ValueClass, ValueKey};
use utils::listener::SessionStream;
use crate::core::{SelectedMailbox, Session, SessionData};
use crate::core::{CachedItem, SelectedMailbox, Session, SessionData, Threads};
impl<T: SessionStream> Session<T> {
pub async fn handle_thread(
@ -80,33 +80,73 @@ impl<T: SessionStream> SessionData<T> {
// Synchronize mailbox
if !result_set.results.is_empty() {
self.synchronize_messages(&mailbox).await?;
} else {
return Ok(Response {
is_uid,
threads: vec![],
});
}
// Obtain current state
let modseq = self
.jmap
.store
.get_last_change_id(mailbox.id.account_id, Collection::Thread)
.await
.map_err(|err| {
tracing::error!(event = "error",
context = "store",
account_id = mailbox.id.account_id,
collection = ?Collection::Thread,
error = ?err,
"Failed to obtain state");
StatusResponse::database_failure()
})?;
// Lock the cache
let thread_cache_ = self
.imap
.cache_threads
.entry(mailbox.id.account_id)
.or_insert_with(|| CachedItem::new(Threads::default()));
let mut thread_cache = thread_cache_.get().await;
// Invalidate cache if the modseq has changed
if thread_cache.modseq != modseq {
thread_cache.threads.clear();
}
// Obtain threadIds for matching messages
let thread_ids = self
.jmap
.store
.get_values::<u32>(
result_set
.results
.iter()
.map(|document_id| ValueKey {
account_id: mailbox.id.account_id,
collection: Collection::Email.into(),
document_id,
class: ValueClass::Property(Property::ThreadId.into()),
})
.collect(),
)
.await
.map_err(|err| {
tracing::error!(
event = "error",
context = "thread_query",
error = ?err,
"Failed to obtain threadIds.");
StatusResponse::database_failure()
})?;
let mut thread_ids = Vec::with_capacity(result_set.results.len() as usize);
for document_id in &result_set.results {
if let Some(thread_id) = thread_cache.threads.get(&document_id) {
thread_ids.push((*thread_id).into());
} else if let Some(thread_id) = self
.jmap
.store
.get_value::<u32>(ValueKey {
account_id: mailbox.id.account_id,
collection: Collection::Email.into(),
document_id,
class: ValueClass::Property(Property::ThreadId.into()),
})
.await
.map_err(|err| {
tracing::error!(
event = "error",
context = "thread_query",
error = ?err,
"Failed to obtain threadId.");
StatusResponse::database_failure()
})?
{
thread_ids.push(thread_id.into());
thread_cache.threads.insert(document_id, thread_id);
} else {
thread_ids.push(None);
}
}
thread_cache.modseq = modseq;
// Group messages by thread
let mut threads: AHashMap<u32, Vec<u32>> = AHashMap::new();