From cf9a16c462965a95b1b1bb3bca33d6597c327e59 Mon Sep 17 00:00:00 2001 From: mdecimus Date: Mon, 4 Mar 2024 16:03:17 +0100 Subject: [PATCH] IMAP4 folder caching --- crates/directory/src/core/config.rs | 2 + crates/imap/src/core/mailbox.rs | 59 ++++++-- crates/imap/src/core/message.rs | 8 +- crates/imap/src/core/mod.rs | 19 ++- crates/imap/src/lib.rs | 24 +++- crates/imap/src/op/select.rs | 216 ++++++++++++++++------------ crates/jmap/src/auth/oauth/mod.rs | 2 + 7 files changed, 212 insertions(+), 118 deletions(-) diff --git a/crates/directory/src/core/config.rs b/crates/directory/src/core/config.rs index 1b8744b3..06a81035 100644 --- a/crates/directory/src/core/config.rs +++ b/crates/directory/src/core/config.rs @@ -69,6 +69,8 @@ impl ConfigDirectory for Config { stores.get_lookup_store(self, "storage.lookup")?, )); + let todo = "store quota as u64"; + for id in self.sub_keys("directory", ".type") { if id.ends_with(".columns") || id.ends_with(".attributes") || id.contains(".principals") { diff --git a/crates/imap/src/core/mailbox.rs b/crates/imap/src/core/mailbox.rs index 25f41839..82cd6afa 100644 --- a/crates/imap/src/core/mailbox.rs +++ b/crates/imap/src/core/mailbox.rs @@ -1,4 +1,7 @@ -use std::{collections::BTreeMap, sync::atomic::Ordering}; +use std::{ + collections::BTreeMap, + sync::{atomic::Ordering, Arc}, +}; use ahash::AHashMap; use directory::QueryBy; @@ -15,7 +18,7 @@ use parking_lot::Mutex; use store::query::log::{Change, Query}; use utils::listener::{limiter::InFlight, SessionStream}; -use super::{Account, Mailbox, MailboxId, MailboxSync, Session, SessionData}; +use super::{Account, AccountId, Mailbox, MailboxId, MailboxSync, Session, SessionData}; impl SessionData { pub async fn new( @@ -82,6 +85,39 @@ impl SessionData { mailbox_prefix: Option, access_token: &AccessToken, ) -> crate::Result { + let state_mailbox = self + .jmap + .store + .get_last_change_id(account_id, Collection::Mailbox) + .await + .map_err(|_| {})?; + let state_email = self + .jmap + .store + .get_last_change_id(account_id, Collection::Email) + .await + .map_err(|_| {})?; + let cached_account_ = self + .imap + .cache_account + .entry(AccountId { + account_id, + primary_id: access_token.primary_id(), + }) + .or_insert_with(|| { + Arc::new(tokio::sync::Mutex::new(Account { + account_id: u32::MAX, + ..Default::default() + })) + }); + let mut cached_account = cached_account_.lock().await; + if cached_account.state_mailbox == state_mailbox + && cached_account.state_email == state_email + && cached_account.account_id != u32::MAX + { + return Ok((*cached_account).clone()); + } + let mailbox_ids = if access_token.is_primary_id(account_id) || access_token.member_of.contains(&account_id) { @@ -150,18 +186,8 @@ impl SessionData { prefix: mailbox_prefix, mailbox_names: BTreeMap::new(), mailbox_state: AHashMap::with_capacity(mailboxes.len()), - state_mailbox: self - .jmap - .store - .get_last_change_id(account_id, Collection::Mailbox) - .await - .map_err(|_| {})?, - state_email: self - .jmap - .store - .get_last_change_id(account_id, Collection::Email) - .await - .map_err(|_| {})?, + state_mailbox, + state_email, }; loop { @@ -229,7 +255,7 @@ impl SessionData { let mut mailbox_name = mailbox_path.join("/"); if mailbox_name.eq_ignore_ascii_case("inbox") && *mailbox_id != INBOX_ID { - // If there is another mailbox called Inbox, renamed it to avoid conflicts + // If there is another mailbox called Inbox, rename it to avoid conflicts mailbox_name = format!("{mailbox_name} 2"); } account.mailbox_names.insert(mailbox_name, *mailbox_id); @@ -252,6 +278,9 @@ impl SessionData { } } + // Update cache + *cached_account = account.clone(); + Ok(account) } diff --git a/crates/imap/src/core/message.rs b/crates/imap/src/core/message.rs index ecaae59e..c1675215 100644 --- a/crates/imap/src/core/message.rs +++ b/crates/imap/src/core/message.rs @@ -21,7 +21,7 @@ * for more details. */ -use std::collections::BTreeMap; +use std::{collections::BTreeMap, sync::Arc}; use ahash::AHashMap; use imap_proto::{ @@ -373,6 +373,12 @@ impl SessionData { } current_state.id_to_imap = id_to_imap; + // Update cache + self.imap.cache_mailbox.insert( + mailbox.id, + Arc::new(tokio::sync::Mutex::new(new_state.clone())), + ); + // Update state current_state.modseq = new_state.modseq; current_state.next_state = Some(Box::new(NextMailboxState { diff --git a/crates/imap/src/core/mod.rs b/crates/imap/src/core/mod.rs index 8a8c751f..2d5d5a20 100644 --- a/crates/imap/src/core/mod.rs +++ b/crates/imap/src/core/mod.rs @@ -83,6 +83,9 @@ pub struct IMAP { pub rate_limiter: DashMap>, pub rate_requests: Rate, pub rate_concurrent: u64, + + pub cache_mailbox: DashMap>>, + pub cache_account: DashMap>>, } pub struct Session { @@ -113,7 +116,7 @@ pub struct SessionData { pub in_flight: InFlight, } -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct Mailbox { pub has_children: bool, pub is_subscribed: bool, @@ -127,7 +130,7 @@ pub struct Mailbox { pub recent_messages: RoaringBitmap, } -#[derive(Debug)] +#[derive(Debug, Clone, Default)] pub struct Account { pub account_id: u32, pub prefix: Option, @@ -145,13 +148,19 @@ pub struct SelectedMailbox { pub is_condstore: bool, } -#[derive(Debug, PartialEq, Eq, Hash)] +#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] pub struct MailboxId { pub account_id: u32, pub mailbox_id: u32, } -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] +pub struct AccountId { + pub account_id: u32, + pub primary_id: u32, +} + +#[derive(Debug, Clone)] pub struct MailboxState { pub uid_next: u32, pub uid_validity: u32, @@ -163,7 +172,7 @@ pub struct MailboxState { pub next_state: Option>, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct NextMailboxState { pub next_state: MailboxState, pub deletions: Vec, diff --git a/crates/imap/src/lib.rs b/crates/imap/src/lib.rs index d7d444b2..25a6aea9 100644 --- a/crates/imap/src/lib.rs +++ b/crates/imap/src/lib.rs @@ -40,6 +40,13 @@ static SERVER_GREETING: &str = concat!( impl IMAP { pub async fn init(config: &Config) -> utils::config::Result> { + let shard_amount = config + .property::("global.shared-map.shard")? + .unwrap_or(32) + .next_power_of_two() as usize; + + let todo = "document imap.cache.rate-limit.size and imap.cache.mailbox.size"; + Ok(Arc::new(IMAP { max_request_size: config.property_or_static("imap.request.max-size", "52428800")?, max_auth_failures: config.property_or_static("imap.auth.max-failures", "3")?, @@ -62,18 +69,25 @@ impl IMAP { .into_bytes(), rate_limiter: DashMap::with_capacity_and_hasher_and_shard_amount( config - .property("imap.rate-limit.cache.size")? + .property("imap.cache.rate-limit.size")? .unwrap_or(2048), RandomState::default(), - config - .property::("global.shared-map.shard")? - .unwrap_or(32) - .next_power_of_two() as usize, + shard_amount, ), rate_requests: config.property_or_static("imap.rate-limit.requests", "2000/1m")?, rate_concurrent: config.property("imap.rate-limit.concurrent")?.unwrap_or(4), allow_plain_auth: config.property_or_static("imap.auth.allow-plain-text", "false")?, enable_uidplus: config.property_or_static("imap.protocol.uidplus", "false")?, + cache_account: DashMap::with_capacity_and_hasher_and_shard_amount( + config.property("imap.cache.account.size")?.unwrap_or(2048), + RandomState::default(), + shard_amount, + ), + cache_mailbox: DashMap::with_capacity_and_hasher_and_shard_amount( + config.property("imap.cache.mailbox.size")?.unwrap_or(2048), + RandomState::default(), + shard_amount, + ), })) } } diff --git a/crates/imap/src/op/select.rs b/crates/imap/src/op/select.rs index 435dc436..6006c3fd 100644 --- a/crates/imap/src/op/select.rs +++ b/crates/imap/src/op/select.rs @@ -57,105 +57,137 @@ impl Session { } if let Some(mailbox) = data.get_mailbox_by_name(&arguments.mailbox_name) { - // Synchronize messages - match data.fetch_messages(&mailbox).await { - Ok(state) => { - let closed_previous = self.state.close_mailbox(); - let is_condstore = self.is_condstore || arguments.condstore; + // Try obtaining the mailbox from the cache + let state = if let Some(cached_state) = self.imap.cache_mailbox.get(&mailbox) { + let modseq = match data.get_modseq(mailbox.account_id).await { + Ok(modseq) => modseq, + Err(mut response) => { + response.tag = arguments.tag.into(); + return self.write_bytes(response.into_bytes()).await; + } + }; - // Build new state - let is_rev2 = self.version.is_rev2(); - let uid_validity = state.uid_validity; - let uid_next = state.uid_next; - let total_messages = state.total_messages; - let highest_modseq = if is_condstore { - HighestModSeq::new(state.modseq.to_modseq()).into() - } else { - None - }; - let mailbox = Arc::new(SelectedMailbox { - id: mailbox, - state: parking_lot::Mutex::new(state), - saved_search: parking_lot::Mutex::new(SavedSearch::None), - is_select, - is_condstore, - }); - - // Validate QRESYNC arguments - if let Some(qresync) = arguments.qresync { - if !self.is_qresync { - return self - .write_bytes( - StatusResponse::no("QRESYNC is not enabled.") - .with_tag(arguments.tag) - .into_bytes(), - ) - .await; + // Refresh the mailbox if the modseq has changed + let mut cached_state_ = cached_state.lock().await; + if cached_state_.modseq != modseq { + match data.fetch_messages(&mailbox).await { + Ok(new_state) => { + *cached_state_ = new_state; } - if qresync.uid_validity == uid_validity { - // Send flags for changed messages - data.fetch( - fetch::Arguments { - tag: String::new(), - sequence_set: qresync - .known_uids - .or_else(|| qresync.seq_match.map(|(_, s)| s)) - .unwrap_or(Sequence::Range { - start: 1.into(), - end: None, - }), - attributes: vec![fetch::Attribute::Flags], - changed_since: qresync.modseq.into(), - include_vanished: true, - }, - mailbox.clone(), - true, - true, - is_rev2, - false, - ) - .await; + Err(mut response) => { + response.tag = arguments.tag.into(); + return self.write_bytes(response.into_bytes()).await; } } - - // Build response - let response = Response { - mailbox: ListItem::new(arguments.mailbox_name), - total_messages, - recent_messages: data.get_recent_count(&mailbox.id), - unseen_seq: 0, - uid_validity, - uid_next, - closed_previous, - is_rev2, - highest_modseq, - mailbox_id: Id::from_parts( - mailbox.id.account_id, - mailbox.id.mailbox_id, - ) - .to_string(), - }; - - // Update state - self.state = State::Selected { data, mailbox }; - - self.write_bytes( - StatusResponse::completed(command) - .with_tag(arguments.tag) - .with_code(if is_select { - ResponseCode::ReadWrite - } else { - ResponseCode::ReadOnly - }) - .serialize(response.serialize()), - ) - .await } - Err(mut response) => { - response.tag = arguments.tag.into(); - self.write_bytes(response.into_bytes()).await + + (*cached_state_).clone() + } else { + // Synchronize messages + match data.fetch_messages(&mailbox).await { + Ok(state) => { + let todo = "cache cleanup"; + self.imap.cache_mailbox.insert( + mailbox, + Arc::new(tokio::sync::Mutex::new(state.clone())), + ); + state + } + Err(mut response) => { + response.tag = arguments.tag.into(); + return self.write_bytes(response.into_bytes()).await; + } + } + }; + + // Synchronize messages + let closed_previous = self.state.close_mailbox(); + let is_condstore = self.is_condstore || arguments.condstore; + + // Build new state + let is_rev2 = self.version.is_rev2(); + let uid_validity = state.uid_validity; + let uid_next = state.uid_next; + let total_messages = state.total_messages; + let highest_modseq = if is_condstore { + HighestModSeq::new(state.modseq.to_modseq()).into() + } else { + None + }; + let mailbox = Arc::new(SelectedMailbox { + id: mailbox, + state: parking_lot::Mutex::new(state), + saved_search: parking_lot::Mutex::new(SavedSearch::None), + is_select, + is_condstore, + }); + + // Validate QRESYNC arguments + if let Some(qresync) = arguments.qresync { + if !self.is_qresync { + return self + .write_bytes( + StatusResponse::no("QRESYNC is not enabled.") + .with_tag(arguments.tag) + .into_bytes(), + ) + .await; + } + if qresync.uid_validity == uid_validity { + // Send flags for changed messages + data.fetch( + fetch::Arguments { + tag: String::new(), + sequence_set: qresync + .known_uids + .or_else(|| qresync.seq_match.map(|(_, s)| s)) + .unwrap_or(Sequence::Range { + start: 1.into(), + end: None, + }), + attributes: vec![fetch::Attribute::Flags], + changed_since: qresync.modseq.into(), + include_vanished: true, + }, + mailbox.clone(), + true, + true, + is_rev2, + false, + ) + .await; } } + + // Build response + let response = Response { + mailbox: ListItem::new(arguments.mailbox_name), + total_messages, + recent_messages: data.get_recent_count(&mailbox.id), + unseen_seq: 0, + uid_validity, + uid_next, + closed_previous, + is_rev2, + highest_modseq, + mailbox_id: Id::from_parts(mailbox.id.account_id, mailbox.id.mailbox_id) + .to_string(), + }; + + // Update state + self.state = State::Selected { data, mailbox }; + + self.write_bytes( + StatusResponse::completed(command) + .with_tag(arguments.tag) + .with_code(if is_select { + ResponseCode::ReadWrite + } else { + ResponseCode::ReadOnly + }) + .serialize(response.serialize()), + ) + .await } else { self.write_bytes( StatusResponse::no("Mailbox does not exist.") diff --git a/crates/jmap/src/auth/oauth/mod.rs b/crates/jmap/src/auth/oauth/mod.rs index f0632c79..02a55501 100644 --- a/crates/jmap/src/auth/oauth/mod.rs +++ b/crates/jmap/src/auth/oauth/mod.rs @@ -81,6 +81,8 @@ pub struct OAuthCode { pub redirect_uri: Option, } +struct TodoStoreOAuthCodeInDb {} + #[derive(Debug, Serialize, Deserialize)] pub struct DeviceAuthGet { code: Option,