IMAP4 folder caching

This commit is contained in:
mdecimus 2024-03-04 16:03:17 +01:00
parent 2b9eacb584
commit cf9a16c462
7 changed files with 212 additions and 118 deletions

View file

@ -69,6 +69,8 @@ impl ConfigDirectory for Config {
stores.get_lookup_store(self, "storage.lookup")?,
));
let todo = "store quota as u64";
for id in self.sub_keys("directory", ".type") {
if id.ends_with(".columns") || id.ends_with(".attributes") || id.contains(".principals")
{

View file

@ -1,4 +1,7 @@
use std::{collections::BTreeMap, sync::atomic::Ordering};
use std::{
collections::BTreeMap,
sync::{atomic::Ordering, Arc},
};
use ahash::AHashMap;
use directory::QueryBy;
@ -15,7 +18,7 @@ use parking_lot::Mutex;
use store::query::log::{Change, Query};
use utils::listener::{limiter::InFlight, SessionStream};
use super::{Account, Mailbox, MailboxId, MailboxSync, Session, SessionData};
use super::{Account, AccountId, Mailbox, MailboxId, MailboxSync, Session, SessionData};
impl<T: SessionStream> SessionData<T> {
pub async fn new(
@ -82,6 +85,39 @@ impl<T: SessionStream> SessionData<T> {
mailbox_prefix: Option<String>,
access_token: &AccessToken,
) -> crate::Result<Account> {
let state_mailbox = self
.jmap
.store
.get_last_change_id(account_id, Collection::Mailbox)
.await
.map_err(|_| {})?;
let state_email = self
.jmap
.store
.get_last_change_id(account_id, Collection::Email)
.await
.map_err(|_| {})?;
let cached_account_ = self
.imap
.cache_account
.entry(AccountId {
account_id,
primary_id: access_token.primary_id(),
})
.or_insert_with(|| {
Arc::new(tokio::sync::Mutex::new(Account {
account_id: u32::MAX,
..Default::default()
}))
});
let mut cached_account = cached_account_.lock().await;
if cached_account.state_mailbox == state_mailbox
&& cached_account.state_email == state_email
&& cached_account.account_id != u32::MAX
{
return Ok((*cached_account).clone());
}
let mailbox_ids = if access_token.is_primary_id(account_id)
|| access_token.member_of.contains(&account_id)
{
@ -150,18 +186,8 @@ impl<T: SessionStream> SessionData<T> {
prefix: mailbox_prefix,
mailbox_names: BTreeMap::new(),
mailbox_state: AHashMap::with_capacity(mailboxes.len()),
state_mailbox: self
.jmap
.store
.get_last_change_id(account_id, Collection::Mailbox)
.await
.map_err(|_| {})?,
state_email: self
.jmap
.store
.get_last_change_id(account_id, Collection::Email)
.await
.map_err(|_| {})?,
state_mailbox,
state_email,
};
loop {
@ -229,7 +255,7 @@ impl<T: SessionStream> SessionData<T> {
let mut mailbox_name = mailbox_path.join("/");
if mailbox_name.eq_ignore_ascii_case("inbox") && *mailbox_id != INBOX_ID {
// If there is another mailbox called Inbox, renamed it to avoid conflicts
// If there is another mailbox called Inbox, rename it to avoid conflicts
mailbox_name = format!("{mailbox_name} 2");
}
account.mailbox_names.insert(mailbox_name, *mailbox_id);
@ -252,6 +278,9 @@ impl<T: SessionStream> SessionData<T> {
}
}
// Update cache
*cached_account = account.clone();
Ok(account)
}

View file

@ -21,7 +21,7 @@
* for more details.
*/
use std::collections::BTreeMap;
use std::{collections::BTreeMap, sync::Arc};
use ahash::AHashMap;
use imap_proto::{
@ -373,6 +373,12 @@ impl<T: SessionStream> SessionData<T> {
}
current_state.id_to_imap = id_to_imap;
// Update cache
self.imap.cache_mailbox.insert(
mailbox.id,
Arc::new(tokio::sync::Mutex::new(new_state.clone())),
);
// Update state
current_state.modseq = new_state.modseq;
current_state.next_state = Some(Box::new(NextMailboxState {

View file

@ -83,6 +83,9 @@ pub struct IMAP {
pub rate_limiter: DashMap<u32, Arc<ConcurrencyLimiters>>,
pub rate_requests: Rate,
pub rate_concurrent: u64,
pub cache_mailbox: DashMap<MailboxId, Arc<tokio::sync::Mutex<MailboxState>>>,
pub cache_account: DashMap<AccountId, Arc<tokio::sync::Mutex<Account>>>,
}
pub struct Session<T: SessionStream> {
@ -113,7 +116,7 @@ pub struct SessionData<T: SessionStream> {
pub in_flight: InFlight,
}
#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub struct Mailbox {
pub has_children: bool,
pub is_subscribed: bool,
@ -127,7 +130,7 @@ pub struct Mailbox {
pub recent_messages: RoaringBitmap,
}
#[derive(Debug)]
#[derive(Debug, Clone, Default)]
pub struct Account {
pub account_id: u32,
pub prefix: Option<String>,
@ -145,13 +148,19 @@ pub struct SelectedMailbox {
pub is_condstore: bool,
}
#[derive(Debug, PartialEq, Eq, Hash)]
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
pub struct MailboxId {
pub account_id: u32,
pub mailbox_id: u32,
}
#[derive(Debug)]
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
pub struct AccountId {
pub account_id: u32,
pub primary_id: u32,
}
#[derive(Debug, Clone)]
pub struct MailboxState {
pub uid_next: u32,
pub uid_validity: u32,
@ -163,7 +172,7 @@ pub struct MailboxState {
pub next_state: Option<Box<NextMailboxState>>,
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct NextMailboxState {
pub next_state: MailboxState,
pub deletions: Vec<ImapId>,

View file

@ -40,6 +40,13 @@ static SERVER_GREETING: &str = concat!(
impl IMAP {
pub async fn init(config: &Config) -> utils::config::Result<Arc<Self>> {
let shard_amount = config
.property::<u64>("global.shared-map.shard")?
.unwrap_or(32)
.next_power_of_two() as usize;
let todo = "document imap.cache.rate-limit.size and imap.cache.mailbox.size";
Ok(Arc::new(IMAP {
max_request_size: config.property_or_static("imap.request.max-size", "52428800")?,
max_auth_failures: config.property_or_static("imap.auth.max-failures", "3")?,
@ -62,18 +69,25 @@ impl IMAP {
.into_bytes(),
rate_limiter: DashMap::with_capacity_and_hasher_and_shard_amount(
config
.property("imap.rate-limit.cache.size")?
.property("imap.cache.rate-limit.size")?
.unwrap_or(2048),
RandomState::default(),
config
.property::<u64>("global.shared-map.shard")?
.unwrap_or(32)
.next_power_of_two() as usize,
shard_amount,
),
rate_requests: config.property_or_static("imap.rate-limit.requests", "2000/1m")?,
rate_concurrent: config.property("imap.rate-limit.concurrent")?.unwrap_or(4),
allow_plain_auth: config.property_or_static("imap.auth.allow-plain-text", "false")?,
enable_uidplus: config.property_or_static("imap.protocol.uidplus", "false")?,
cache_account: DashMap::with_capacity_and_hasher_and_shard_amount(
config.property("imap.cache.account.size")?.unwrap_or(2048),
RandomState::default(),
shard_amount,
),
cache_mailbox: DashMap::with_capacity_and_hasher_and_shard_amount(
config.property("imap.cache.mailbox.size")?.unwrap_or(2048),
RandomState::default(),
shard_amount,
),
}))
}
}

View file

@ -57,105 +57,137 @@ impl<T: SessionStream> Session<T> {
}
if let Some(mailbox) = data.get_mailbox_by_name(&arguments.mailbox_name) {
// Synchronize messages
match data.fetch_messages(&mailbox).await {
Ok(state) => {
let closed_previous = self.state.close_mailbox();
let is_condstore = self.is_condstore || arguments.condstore;
// Try obtaining the mailbox from the cache
let state = if let Some(cached_state) = self.imap.cache_mailbox.get(&mailbox) {
let modseq = match data.get_modseq(mailbox.account_id).await {
Ok(modseq) => modseq,
Err(mut response) => {
response.tag = arguments.tag.into();
return self.write_bytes(response.into_bytes()).await;
}
};
// Build new state
let is_rev2 = self.version.is_rev2();
let uid_validity = state.uid_validity;
let uid_next = state.uid_next;
let total_messages = state.total_messages;
let highest_modseq = if is_condstore {
HighestModSeq::new(state.modseq.to_modseq()).into()
} else {
None
};
let mailbox = Arc::new(SelectedMailbox {
id: mailbox,
state: parking_lot::Mutex::new(state),
saved_search: parking_lot::Mutex::new(SavedSearch::None),
is_select,
is_condstore,
});
// Validate QRESYNC arguments
if let Some(qresync) = arguments.qresync {
if !self.is_qresync {
return self
.write_bytes(
StatusResponse::no("QRESYNC is not enabled.")
.with_tag(arguments.tag)
.into_bytes(),
)
.await;
// Refresh the mailbox if the modseq has changed
let mut cached_state_ = cached_state.lock().await;
if cached_state_.modseq != modseq {
match data.fetch_messages(&mailbox).await {
Ok(new_state) => {
*cached_state_ = new_state;
}
if qresync.uid_validity == uid_validity {
// Send flags for changed messages
data.fetch(
fetch::Arguments {
tag: String::new(),
sequence_set: qresync
.known_uids
.or_else(|| qresync.seq_match.map(|(_, s)| s))
.unwrap_or(Sequence::Range {
start: 1.into(),
end: None,
}),
attributes: vec![fetch::Attribute::Flags],
changed_since: qresync.modseq.into(),
include_vanished: true,
},
mailbox.clone(),
true,
true,
is_rev2,
false,
)
.await;
Err(mut response) => {
response.tag = arguments.tag.into();
return self.write_bytes(response.into_bytes()).await;
}
}
// Build response
let response = Response {
mailbox: ListItem::new(arguments.mailbox_name),
total_messages,
recent_messages: data.get_recent_count(&mailbox.id),
unseen_seq: 0,
uid_validity,
uid_next,
closed_previous,
is_rev2,
highest_modseq,
mailbox_id: Id::from_parts(
mailbox.id.account_id,
mailbox.id.mailbox_id,
)
.to_string(),
};
// Update state
self.state = State::Selected { data, mailbox };
self.write_bytes(
StatusResponse::completed(command)
.with_tag(arguments.tag)
.with_code(if is_select {
ResponseCode::ReadWrite
} else {
ResponseCode::ReadOnly
})
.serialize(response.serialize()),
)
.await
}
Err(mut response) => {
response.tag = arguments.tag.into();
self.write_bytes(response.into_bytes()).await
(*cached_state_).clone()
} else {
// Synchronize messages
match data.fetch_messages(&mailbox).await {
Ok(state) => {
let todo = "cache cleanup";
self.imap.cache_mailbox.insert(
mailbox,
Arc::new(tokio::sync::Mutex::new(state.clone())),
);
state
}
Err(mut response) => {
response.tag = arguments.tag.into();
return self.write_bytes(response.into_bytes()).await;
}
}
};
// Synchronize messages
let closed_previous = self.state.close_mailbox();
let is_condstore = self.is_condstore || arguments.condstore;
// Build new state
let is_rev2 = self.version.is_rev2();
let uid_validity = state.uid_validity;
let uid_next = state.uid_next;
let total_messages = state.total_messages;
let highest_modseq = if is_condstore {
HighestModSeq::new(state.modseq.to_modseq()).into()
} else {
None
};
let mailbox = Arc::new(SelectedMailbox {
id: mailbox,
state: parking_lot::Mutex::new(state),
saved_search: parking_lot::Mutex::new(SavedSearch::None),
is_select,
is_condstore,
});
// Validate QRESYNC arguments
if let Some(qresync) = arguments.qresync {
if !self.is_qresync {
return self
.write_bytes(
StatusResponse::no("QRESYNC is not enabled.")
.with_tag(arguments.tag)
.into_bytes(),
)
.await;
}
if qresync.uid_validity == uid_validity {
// Send flags for changed messages
data.fetch(
fetch::Arguments {
tag: String::new(),
sequence_set: qresync
.known_uids
.or_else(|| qresync.seq_match.map(|(_, s)| s))
.unwrap_or(Sequence::Range {
start: 1.into(),
end: None,
}),
attributes: vec![fetch::Attribute::Flags],
changed_since: qresync.modseq.into(),
include_vanished: true,
},
mailbox.clone(),
true,
true,
is_rev2,
false,
)
.await;
}
}
// Build response
let response = Response {
mailbox: ListItem::new(arguments.mailbox_name),
total_messages,
recent_messages: data.get_recent_count(&mailbox.id),
unseen_seq: 0,
uid_validity,
uid_next,
closed_previous,
is_rev2,
highest_modseq,
mailbox_id: Id::from_parts(mailbox.id.account_id, mailbox.id.mailbox_id)
.to_string(),
};
// Update state
self.state = State::Selected { data, mailbox };
self.write_bytes(
StatusResponse::completed(command)
.with_tag(arguments.tag)
.with_code(if is_select {
ResponseCode::ReadWrite
} else {
ResponseCode::ReadOnly
})
.serialize(response.serialize()),
)
.await
} else {
self.write_bytes(
StatusResponse::no("Mailbox does not exist.")

View file

@ -81,6 +81,8 @@ pub struct OAuthCode {
pub redirect_uri: Option<String>,
}
struct TodoStoreOAuthCodeInDb {}
#[derive(Debug, Serialize, Deserialize)]
pub struct DeviceAuthGet {
code: Option<String>,