IMAP message synchronization.

This commit is contained in:
Mauro D 2023-06-23 17:28:00 +00:00
parent e1c3190b48
commit 3581255fbc
9 changed files with 882 additions and 74 deletions

View file

@ -318,14 +318,14 @@ impl State {
}
}
pub fn mailbox_data(&self) -> (Arc<SessionData>, Arc<SelectedMailbox>) {
pub fn mailbox_state(&self) -> (Arc<SessionData>, Arc<SelectedMailbox>) {
match self {
State::Selected { data, mailbox, .. } => (data.clone(), mailbox.clone()),
_ => unreachable!(),
}
}
pub fn session_mailbox_data(&self) -> (Arc<SessionData>, Option<Arc<SelectedMailbox>>) {
pub fn session_mailbox_state(&self) -> (Arc<SessionData>, Option<Arc<SelectedMailbox>>) {
match self {
State::Authenticated { data } => (data.clone(), None),
State::Selected { data, mailbox, .. } => (data.clone(), mailbox.clone().into()),

View file

@ -142,13 +142,19 @@ impl SessionData {
account_id,
prefix: mailbox_prefix,
mailbox_names: BTreeMap::new(),
mailbox_data: AHashMap::with_capacity(mailboxes.len()),
state: self
mailbox_state: AHashMap::with_capacity(mailboxes.len()),
state_mailbox: self
.jmap
.store
.get_last_change_id(account_id, Collection::Mailbox)
.await
.map_err(|_| {})?,
state_email: self
.jmap
.store
.get_last_change_id(account_id, Collection::Email)
.await
.map_err(|_| {})?,
};
loop {
@ -170,7 +176,7 @@ impl SessionData {
.iter()
.any(|(_, child_parent_id, _)| child_parent_id == mailbox_id);
account.mailbox_data.insert(
account.mailbox_state.insert(
*mailbox_id,
Mailbox {
has_children,
@ -249,7 +255,7 @@ impl SessionData {
pub async fn synchronize_mailboxes(
&self,
return_changes: bool,
) -> crate::Result<Option<MailboxSync>> {
) -> crate::op::Result<Option<MailboxSync>> {
let mut changes = if return_changes {
MailboxSync::default().into()
} else {
@ -261,7 +267,7 @@ impl SessionData {
.jmap
.get_cached_access_token(self.account_id)
.await
.ok_or(())?;
.ok_or(StatusResponse::no("Account not found"))?;
let state = access_token.state();
// Shared mailboxes might have changed
@ -338,7 +344,7 @@ impl SessionData {
.mailboxes
.lock()
.iter()
.map(|m| (m.account_id, m.state))
.map(|m| (m.account_id, m.state_mailbox))
.collect::<Vec<_>>();
for (account_id, last_state) in account_states {
let changelog = self
@ -348,8 +354,7 @@ impl SessionData {
Collection::Mailbox,
last_state.map(Query::Since).unwrap_or(Query::All),
)
.await
.map_err(|_| {})?;
.await?;
if !changelog.changes.is_empty() {
let mut has_changes = false;
let mut has_child_changes = false;
@ -365,16 +370,27 @@ impl SessionData {
if has_child_changes && !has_changes && changes.is_none() {
// Only child changes, no need to re-fetch mailboxes
let state_email = self
.jmap
.store
.get_last_change_id(account_id, Collection::Email)
.await.map_err(
|e| {
tracing::warn!(parent: &self.span, "Failed to get last change id for email collection: {}", e);
StatusResponse::database_failure()
},
)?;
for account in self.mailboxes.lock().iter_mut() {
if account.account_id == account_id {
account.mailbox_data.values_mut().for_each(|v| {
account.mailbox_state.values_mut().for_each(|v| {
v.total_deleted = None;
v.total_unseen = None;
v.total_messages = None;
v.size = None;
v.uid_next = None;
});
account.state = changelog.to_change_id.into();
account.state_mailbox = changelog.to_change_id.into();
account.state_email = state_email;
break;
}
}
@ -421,8 +437,8 @@ impl SessionData {
// Add new mailboxes
for (mailbox_name, mailbox_id) in new_account.mailbox_names.iter() {
if let Some(old_mailbox) = old_account.mailbox_data.get(mailbox_id) {
if let Some(mailbox) = new_account.mailbox_data.get(mailbox_id) {
if let Some(old_mailbox) = old_account.mailbox_state.get(mailbox_id) {
if let Some(mailbox) = new_account.mailbox_state.get(mailbox_id) {
if mailbox.total_messages.unwrap_or(0)
!= old_mailbox.total_messages.unwrap_or(0)
|| mailbox.total_unseen.unwrap_or(0)
@ -438,7 +454,7 @@ impl SessionData {
// Add deleted mailboxes
for (mailbox_name, mailbox_id) in &old_account.mailbox_names {
if !new_account.mailbox_data.contains_key(mailbox_id) {
if !new_account.mailbox_state.contains_key(mailbox_id) {
changes.deleted.push(mailbox_name.to_string());
}
}
@ -473,25 +489,6 @@ impl SessionData {
Ok(changes)
}
pub async fn try_synchronize_mailboxes(&self, tag: &str) -> bool {
if self.synchronize_mailboxes(false).await.is_ok() {
true
} else {
tracing::warn!(parent: &self.span,
event = "error",
context = "synchronize_mailboxes",
account_id = self.account_id,
"Failed to synchronize mailboxes.");
self.write_bytes(
StatusResponse::database_failure()
.with_tag(tag)
.into_bytes(),
)
.await;
false
}
}
pub fn get_mailbox_by_name(&self, mailbox_name: &str) -> Option<MailboxId> {
if !self.is_all_mailbox(mailbox_name) {
for account in self.mailboxes.lock().iter() {

View file

@ -0,0 +1,487 @@
use std::{
hash::{BuildHasher, Hash, Hasher},
sync::Arc,
};
use ahash::{AHashMap, AHashSet, AHasher, RandomState};
use imap_proto::{protocol::Sequence, StatusResponse};
use jmap_proto::types::{collection::Collection, property::Property};
use store::{
roaring::RoaringBitmap,
write::{assert::HashedValue, now, BatchBuilder, ToBitmaps, F_VALUE},
Deserialize, Serialize,
};
use utils::codec::leb128::{Leb128Iterator, Leb128Vec};
use crate::core::ImapId;
use super::{MailboxId, MailboxState, SavedSearch, SelectedMailbox, SessionData};
struct UidMap {
uid_next: u32,
uid_validity: u32,
hash: u64,
items: Vec<Uid>,
}
struct Uid {
uid: u32,
id: u32,
received: u32,
}
struct UidMapBuilder {
message_ids: RoaringBitmap,
hasher: AHasher,
id_list: Vec<(u32, u32)>,
}
impl SessionData {
pub async fn fetch_messages(&self, mailbox: &MailboxId) -> crate::op::Result<MailboxState> {
let mut try_count = 0;
loop {
// Deserialize mailbox data
let uid_map = self
.jmap
.get_property::<HashedValue<UidMap>>(
mailbox.account_id,
Collection::Mailbox,
mailbox.mailbox_id.unwrap_or(u32::MAX),
Property::EmailIds,
)
.await?;
// Obtain current state
let last_state = self
.jmap
.store
.get_last_change_id(mailbox.account_id, Collection::Email)
.await
.map_err(|err| {
tracing::error!(event = "error",
context = "store",
account_id = mailbox.account_id,
collection = ?Collection::Email,
error = ?err,
"Failed to obtain state");
StatusResponse::database_failure()
})?;
// Obtain message ids
let message_ids = if let Some(mailbox_id) = mailbox.mailbox_id {
self.jmap
.get_tag(
mailbox.account_id,
Collection::Email,
Property::MailboxIds,
mailbox_id,
)
.await?
.unwrap_or_default()
} else {
self.jmap
.get_document_ids(mailbox.account_id, Collection::Email)
.await?
.unwrap_or_default()
};
// Obtain message data
let (id_list, id_list_hash) = if !message_ids.is_empty() {
let uid_builder = self
.jmap
.store
.index_values(
UidMapBuilder {
id_list: Vec::with_capacity(message_ids.len() as usize),
message_ids,
hasher: RandomState::with_seeds(
0xaf1f2242106c64b3,
0x60ca4cfb4b3ed0ce,
0xc7dbc0bb615e82b3,
0x520ad065378daf88,
)
.build_hasher(),
},
mailbox.account_id,
Collection::Email,
Property::ReceivedAt,
true,
|uid_builder, message_id, bytes| {
if uid_builder.message_ids.remove(message_id) {
let received = (u64::deserialize(bytes)? & u32::MAX as u64) as u32;
uid_builder.id_list.push((message_id, received));
message_id.hash(&mut uid_builder.hasher);
received.hash(&mut uid_builder.hasher);
Ok(!uid_builder.message_ids.is_empty())
} else {
Ok(true)
}
},
)
.await
.map_err(|err| {
tracing::error!(event = "error",
context = "store",
account_id = mailbox.account_id,
collection = ?Collection::Email,
error = ?err,
"Failed to obtain message data");
StatusResponse::database_failure()
})?;
(uid_builder.id_list, uid_builder.hasher.finish())
} else {
(Vec::new(), 0)
};
// Build mailboxdata
if let Some(mut uid_map) = uid_map {
if uid_map.inner.hash != id_list_hash {
let mut id_list_map = id_list.iter().cloned().collect::<AHashSet<_>>();
let mut items = Vec::with_capacity(uid_map.inner.items.len());
for item in uid_map.inner.items {
if id_list_map.remove(&(item.id, item.received)) {
items.push(item);
}
}
for (id, received) in id_list_map {
items.push(Uid {
uid: uid_map.inner.uid_next,
id,
received,
});
uid_map.inner.uid_next += 1;
}
uid_map.inner.items = items;
uid_map.inner.hash = id_list_hash;
// Save updated uid map
let mut batch = BatchBuilder::new();
batch
.with_account_id(mailbox.account_id)
.with_collection(Collection::Mailbox)
.update_document(mailbox.mailbox_id.unwrap_or(u32::MAX))
.assert_value(Property::EmailId, &uid_map)
.value(Property::EmailId, &uid_map.inner, F_VALUE);
match self.jmap.store.write(batch.build()).await {
Ok(_) => (),
Err(store::Error::AssertValueFailed) if try_count < 3 => {
try_count += 1;
continue;
}
Err(err) => {
tracing::error!(event = "error",
context = "store",
account_id = mailbox.account_id,
collection = ?Collection::Mailbox,
error = ?err,
"Failed to store uid map");
return Err(StatusResponse::database_failure());
}
}
}
let uid_map = uid_map.inner;
let mut id_to_imap = AHashMap::with_capacity(uid_map.items.len());
let mut uid_to_id = AHashMap::with_capacity(uid_map.items.len());
let mut uids = Vec::with_capacity(uid_map.items.len());
for (seqnum, item) in uid_map.items.into_iter().enumerate() {
id_to_imap.insert(
item.id,
ImapId {
uid: item.uid,
seqnum: (seqnum + 1) as u32,
},
);
uid_to_id.insert(item.uid, item.id);
uids.push(item.uid);
}
return Ok(MailboxState {
uid_next: uid_map.uid_next,
uid_validity: uid_map.uid_validity,
total_messages: uids.len(),
id_to_imap,
uid_to_id,
uids,
last_state,
});
} else {
let uid_next = id_list.len() as u32;
let uid_validity = now() as u32 ^ mailbox.mailbox_id.unwrap_or(0);
let mut id_to_imap = AHashMap::with_capacity(uid_next as usize);
let mut uid_to_id = AHashMap::with_capacity(uid_next as usize);
let mut uids = Vec::with_capacity(uid_next as usize);
let mut uid_map = UidMap {
uid_next,
uid_validity,
hash: id_list_hash,
items: Vec::with_capacity(uid_next as usize),
};
for (uid, (id, received)) in id_list.into_iter().enumerate() {
id_to_imap.insert(
id,
ImapId {
uid: uid as u32,
seqnum: (uid + 1) as u32,
},
);
uid_to_id.insert(uid as u32, id);
uids.push(uid as u32);
uid_map.items.push(Uid {
uid: uid as u32,
id,
received,
});
}
// Store uid map
let mut batch = BatchBuilder::new();
batch
.with_account_id(mailbox.account_id)
.with_collection(Collection::Mailbox)
.update_document(mailbox.mailbox_id.unwrap_or(u32::MAX))
.value(Property::EmailId, &uid_map, F_VALUE);
self.jmap.store.write(batch.build()).await.map_err(|err| {
tracing::error!(event = "error",
context = "store",
account_id = mailbox.account_id,
collection = ?Collection::Mailbox,
error = ?err,
"Failed to store uid map");
StatusResponse::database_failure()
})?;
return Ok(MailboxState {
uid_next,
uid_validity,
total_messages: uids.len(),
id_to_imap,
uid_to_id,
uids,
last_state,
});
}
}
}
}
impl SelectedMailbox {
pub async fn sequence_to_ids(
&self,
sequence: &Sequence,
is_uid: bool,
) -> crate::op::Result<AHashMap<u32, ImapId>> {
if !sequence.is_saved_search() {
let mut ids = AHashMap::new();
let state = self.state.lock();
if state.uids.is_empty() {
return Ok(ids);
}
let max_uid = state.uids.last().copied().unwrap_or(0);
let max_seqnum = state.uids.len() as u32;
for (id, imap_id) in &state.id_to_imap {
let matched = if is_uid {
sequence.contains(imap_id.uid, max_uid)
} else {
sequence.contains(imap_id.seqnum, max_seqnum)
};
if matched {
ids.insert(id.clone(), *imap_id);
}
}
Ok(ids)
} else {
let saved_ids = self
.get_saved_search()
.await
.ok_or_else(|| StatusResponse::no("No saved search found."))?;
let mut ids = AHashMap::with_capacity(saved_ids.len());
let state = self.state.lock();
for imap_id in saved_ids.iter() {
if let Some(id) = state.uid_to_id.get(&imap_id.uid) {
ids.insert(*id, *imap_id);
}
}
Ok(ids)
}
}
pub fn id_to_uid(&self, ids: &[u32]) -> Vec<ImapId> {
let mut imap_ids = Vec::with_capacity(ids.len());
let state = self.state.lock();
for id in ids {
if let Some(imap_id) = state.id_to_imap.get(id) {
imap_ids.push(*imap_id);
}
}
imap_ids
}
pub fn uid_to_id(&self, imap_ids: &[ImapId]) -> Vec<u32> {
let mut ids = Vec::with_capacity(imap_ids.len());
let state = self.state.lock();
for imap_id in imap_ids {
if let Some(id) = state.uid_to_id.get(&imap_id.uid) {
ids.push(*id);
}
}
ids
}
pub fn is_in_sync(&self, ids: &[u32]) -> bool {
let state = self.state.lock();
for id in ids {
if !state.id_to_imap.contains_key(id) {
return false;
}
}
true
}
pub fn update_mailbox_state(
&self,
mailbox_state: MailboxState,
return_deleted: bool,
) -> (Option<usize>, Option<Vec<ImapId>>) {
let mut state = self.state.lock();
let mailbox_size = if mailbox_state.total_messages != state.total_messages {
mailbox_state.total_messages.into()
} else {
None
};
let deletions = if return_deleted {
let mut deletions = Vec::new();
for (id, imap_id) in &state.id_to_imap {
if !mailbox_state.id_to_imap.contains_key(id) {
deletions.push(*imap_id);
}
}
if !deletions.is_empty() {
Some(deletions)
} else {
None
}
} else {
None
};
*state = mailbox_state;
(mailbox_size, deletions)
}
pub async fn get_saved_search(&self) -> Option<Arc<Vec<ImapId>>> {
let mut rx = match &*self.saved_search.lock() {
SavedSearch::InFlight { rx } => rx.clone(),
SavedSearch::Results { items } => {
return Some(items.clone());
}
SavedSearch::None => {
return None;
}
};
rx.changed().await.ok();
let v = rx.borrow();
Some(v.clone())
}
}
impl Serialize for &UidMap {
fn serialize(self) -> Vec<u8> {
let mut buf = Vec::with_capacity((self.items.len() + 2) * std::mem::size_of::<u64>());
buf.push_leb128(self.items.len());
buf.push_leb128(self.uid_next);
buf.extend_from_slice(self.uid_validity.to_le_bytes().as_ref());
buf.extend_from_slice(self.hash.to_le_bytes().as_ref());
let mut last_uid = u32::MAX;
for item in &self.items {
if last_uid.wrapping_add(1) != item.uid {
buf.push(0);
buf.push_leb128(item.uid);
}
buf.push_leb128(item.id + 1);
buf.extend_from_slice(item.received.to_le_bytes().as_ref());
last_uid = item.uid;
}
buf
}
}
impl UidMap {
fn deserialize_(bytes: &[u8]) -> Option<Self> {
let mut buf_u32 = [0u8; std::mem::size_of::<u32>()];
let mut buf_u64 = [0u8; std::mem::size_of::<u64>()];
let mut bytes = bytes.iter();
let items_len: usize = bytes.next_leb128()?;
let uid_next: u32 = bytes.next_leb128()?;
buf_u32
.iter_mut()
.try_for_each(|b| bytes.next().map(|v| *b = *v))?;
buf_u64
.iter_mut()
.try_for_each(|b| bytes.next().map(|v| *b = *v))?;
let mut uid_map = UidMap {
uid_next,
uid_validity: u32::from_le_bytes(buf_u32),
hash: u64::from_le_bytes(buf_u64),
items: Vec::with_capacity(items_len),
};
let mut next_uid: u32 = 0;
for _ in 0..items_len {
let mut id: u32 = bytes.next_leb128()?;
if id == 0 {
next_uid = bytes.next_leb128()?;
id = bytes.next_leb128()?;
}
buf_u32
.iter_mut()
.try_for_each(|b| bytes.next().map(|v| *b = *v))?;
uid_map.items.push(Uid {
uid: next_uid,
id: id - 1,
received: u32::from_le_bytes(buf_u32),
});
next_uid += 1;
}
uid_map.into()
}
}
impl Deserialize for UidMap {
fn deserialize(bytes: &[u8]) -> store::Result<Self> {
Self::deserialize_(bytes).ok_or(store::Error::InternalError(
"Failed to deserialize uid map".to_string(),
))
}
}
impl ToBitmaps for &UidMap {
fn to_bitmaps(&self, _: &mut Vec<store::write::Operation>, _: u8, _: bool) {
unreachable!()
}
}

View file

@ -19,6 +19,7 @@ use utils::listener::{limiter::InFlight, ServerInstance};
pub mod client;
pub mod mailbox;
pub mod message;
pub mod session;
pub mod writer;
@ -91,13 +92,14 @@ pub struct Account {
pub account_id: u32,
pub prefix: Option<String>,
pub mailbox_names: BTreeMap<String, u32>,
pub mailbox_data: AHashMap<u32, Mailbox>,
pub state: Option<u64>,
pub mailbox_state: AHashMap<u32, Mailbox>,
pub state_email: Option<u64>,
pub state_mailbox: Option<u64>,
}
pub struct SelectedMailbox {
pub id: MailboxId,
pub state: parking_lot::Mutex<MailboxData>,
pub state: parking_lot::Mutex<MailboxState>,
pub saved_search: parking_lot::Mutex<SavedSearch>,
pub is_select: bool,
pub is_condstore: bool,
@ -110,13 +112,14 @@ pub struct MailboxId {
}
#[derive(Debug)]
pub struct MailboxData {
pub struct MailboxState {
pub uid_next: u32,
pub uid_validity: u32,
pub jmap_ids: Vec<u32>,
pub imap_uids: Vec<u32>,
pub id_to_imap: AHashMap<u32, ImapId>,
pub uid_to_id: AHashMap<u32, u32>,
pub uids: Vec<u32>,
pub total_messages: usize,
pub last_state: u32,
pub last_state: Option<u64>,
}
#[derive(Debug, Default)]

View file

@ -0,0 +1,307 @@
/*
* Copyright (c) 2020-2022, Stalwart Labs Ltd.
*
* This file is part of the Stalwart IMAP Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::sync::Arc;
use imap_proto::{
protocol::create::Arguments, receiver::Request, Command, ResponseCode, StatusResponse,
};
use tokio::io::AsyncRead;
use crate::core::{Session, SessionData};
impl<T: AsyncRead> Session<T> {
pub async fn handle_create(&mut self, requests: Vec<Request<Command>>) -> Result<(), ()> {
let mut arguments = Vec::with_capacity(requests.len());
for request in requests {
match request.parse_create(self.version) {
Ok(argument) => {
arguments.push(argument);
}
Err(response) => self.write_bytes(response.into_bytes()).await?,
}
}
if !arguments.is_empty() {
let data = self.state.session_data();
tokio::spawn(async move {
for argument in arguments {
data.write_bytes(data.create_folder(argument).await.into_bytes())
.await;
}
});
}
Ok(())
}
}
impl SessionData {
pub async fn create_folder(&self, arguments: Arguments) -> StatusResponse {
// Refresh mailboxes
if let Err(err) = self.synchronize_mailboxes(false).await {
return err.with_tag(arguments.tag);
}
// Validate mailbox name
let mut params = match self.validate_mailbox_create(&arguments.mailbox_name) {
Ok(response) => response,
Err(message) => {
return StatusResponse::no(message).with_tag(arguments.tag);
}
};
debug_assert!(!params.path.is_empty());
// Build request
let mut request = self.client.build();
let mut create_ids: Vec<String> = Vec::with_capacity(params.path.len());
let set_request = request.set_mailbox().account_id(&params.account_id);
for (pos, path_item) in params.path.iter().enumerate() {
let create_item = set_request.create().name(*path_item);
if let Some(create_id) = create_ids.last() {
create_item.parent_id_ref(create_id);
} else {
create_item.parent_id(params.parent_mailbox_id.as_ref());
}
if arguments.mailbox_role != Role::None && pos == params.path.len() - 1 {
create_item.role(arguments.mailbox_role);
}
create_ids.push(create_item.create_id().unwrap());
}
match request.send_set_mailbox().await {
Ok(mut response) => {
match self.add_created_mailboxes(&mut params, create_ids, &mut response) {
Ok((_, mailbox_id)) => StatusResponse::ok("Mailbox created.")
.with_code(ResponseCode::MailboxId { mailbox_id }),
Err(message) => StatusResponse::no(message),
}
}
Err(err) => err.into_status_response(),
}
.with_tag(arguments.tag)
}
pub fn add_created_mailboxes(
&self,
params: &mut CreateParams<'_>,
create_ids: Vec<String>,
response: &mut SetResponse<jmap_client::mailbox::Mailbox>,
) -> Result<(parking_lot::MutexGuard<'_, Vec<Account>>, String), Cow<'static, str>> {
// Obtain created mailbox ids
let mut mailbox_ids = Vec::new();
for create_id in create_ids {
match response.created(&create_id) {
Ok(mut mailbox) => {
mailbox_ids.push(mailbox.take_id());
}
Err(err) => {
return Err(err.to_string().into());
}
}
}
// Lock mailboxes
let mut mailboxes = self.mailboxes.lock();
let account = if let Some(account) = mailboxes
.iter_mut()
.find(|account| account.account_id == params.account_id)
{
account
} else {
return Err(Cow::from("Account no longer available."));
};
// Update state
account.mailbox_state = response.take_new_state();
// Add mailboxes
if mailbox_ids.len() != params.path.len() {
return Err(Cow::from("Some mailboxes could not be created."));
}
let mut mailbox_name = if let Some(parent_mailbox_name) = params.parent_mailbox_name.take()
{
if let Some(parent_mailbox) = account
.mailbox_data
.get_mut(params.parent_mailbox_id.as_ref().unwrap())
{
parent_mailbox.has_children = true;
}
parent_mailbox_name
} else if let Some(account_prefix) = account.prefix.as_ref() {
account_prefix.to_string()
} else {
"".to_string()
};
let new_mailbox_id = format!("{}-{}", account.account_id, mailbox_ids.last().unwrap());
let has_updated = response.has_updated();
for (pos, (mailbox_id, path_item)) in
mailbox_ids.into_iter().zip(params.path.iter()).enumerate()
{
mailbox_name = if !mailbox_name.is_empty() {
format!("{}/{}", mailbox_name, path_item)
} else {
path_item.to_string()
};
account
.mailbox_names
.insert(mailbox_name.clone(), mailbox_id.clone());
account.mailbox_data.insert(
mailbox_id,
Mailbox {
has_children: pos < params.path.len() - 1 || has_updated,
is_subscribed: false,
role: Role::None,
total_messages: 0.into(),
total_unseen: 0.into(),
total_deleted: 0.into(),
uid_validity: None,
uid_next: None,
size: 0.into(),
},
);
}
Ok((mailboxes, new_mailbox_id))
}
pub fn validate_mailbox_create<'x>(
&self,
mailbox_name: &'x str,
) -> Result<CreateParams<'x>, Cow<'static, str>> {
// Remove leading and trailing separators
let mut name = mailbox_name.trim();
if let Some(suffix) = name.strip_prefix('/') {
name = suffix.trim();
};
if let Some(prefix) = name.strip_suffix('/') {
name = prefix.trim();
}
if name.is_empty() {
return Err(Cow::from(format!(
"Invalid folder name '{}'.",
mailbox_name
)));
}
// Build path
let mut path = Vec::new();
if name.contains('/') {
// Locate parent mailbox
for path_item in name.split('/') {
let path_item = path_item.trim();
if path_item.is_empty() {
return Err(Cow::from("Invalid empty path item."));
}
path.push(path_item);
}
if path.len() > MAX_MAILBOX_DEPTH {
return Err(Cow::from("Mailbox path is too deep."));
}
} else {
path.push(name);
}
// Validate special folders
let mut parent_mailbox_id = None;
let mut parent_mailbox_name = None;
let mailboxes = self.mailboxes.lock();
let first_path_item = path.first().unwrap();
let account = if first_path_item == &self.core.folder_all {
return Err(Cow::from(
"Mailboxes cannot be created under virtual folders.",
));
} else if first_path_item == &self.core.folder_shared {
// Shared Folders/<username>/<folder>
if path.len() < 3 {
return Err(Cow::from(
"Mailboxes under root shared folders are not allowed.",
));
}
let prefix = Some(format!("{}/{}", first_path_item, path[1]));
// Locate account
if let Some(account) = mailboxes
.iter()
.skip(1)
.find(|account| account.prefix == prefix)
{
account
} else {
return Err(Cow::from(format!(
"Shared account '{}' not found.",
prefix.unwrap_or_default()
)));
}
} else if let Some(account) = mailboxes.first() {
account
} else {
return Err(Cow::from("Internal error."));
};
// Locate parent mailbox
let full_path = path.join("/");
if account.mailbox_names.contains_key(&full_path) {
return Err(Cow::from(format!(
"Mailbox '{}' already exists.",
full_path
)));
}
let path = if path.len() > 1 {
let mut create_path = Vec::with_capacity(path.len());
while !path.is_empty() {
let mailbox_name = path.join("/");
if let Some(mailbox_id) = account.mailbox_names.get(&mailbox_name) {
parent_mailbox_id = mailbox_id.to_string().into();
parent_mailbox_name = mailbox_name.into();
break;
} else {
create_path.push(path.pop().unwrap());
}
}
create_path.reverse();
create_path
} else {
path
};
Ok(CreateParams {
account_id: account.account_id.to_string(),
path,
full_path,
parent_mailbox_id,
parent_mailbox_name,
})
}
}
#[derive(Debug)]
pub struct CreateParams<'x> {
pub account_id: String,
pub path: Vec<&'x str>,
pub full_path: String,
pub parent_mailbox_id: Option<String>,
pub parent_mailbox_name: Option<String>,
}

View file

@ -110,7 +110,8 @@ impl SessionData {
};
// Refresh mailboxes
if !self.try_synchronize_mailboxes(&tag).await {
if let Err(err) = self.synchronize_mailboxes(false).await {
self.write_bytes(err.with_tag(tag).into_bytes()).await;
return;
}
@ -215,13 +216,13 @@ impl SessionData {
for (mailbox_name, mailbox_id) in &account.mailbox_names {
if matches_pattern(&patterns, mailbox_name) {
let mailbox = account.mailbox_data.get(mailbox_id).unwrap();
let mailbox = account.mailbox_state.get(mailbox_id).unwrap();
let mut has_recursive_match = false;
if recursive_match {
let prefix = format!("{}/", mailbox_name);
for (mailbox_name, mailbox_id) in &account.mailbox_names {
if mailbox_name.starts_with(&prefix)
&& account.mailbox_data.get(mailbox_id).unwrap().is_subscribed
&& account.mailbox_state.get(mailbox_id).unwrap().is_subscribed
{
has_recursive_match = true;
break;

View file

@ -1,6 +1,7 @@
use imap_proto::StatusResponse;
pub mod authenticate;
//pub mod create;
pub mod list;
pub mod status;

View file

@ -30,9 +30,9 @@ use imap_proto::{
Command, ResponseCode, StatusResponse,
};
use jmap_proto::types::{collection::Collection, id::Id, keyword::Keyword, property::Property};
use store::{roaring::RoaringBitmap};
use tokio::io::AsyncRead;
use store::roaring::RoaringBitmap;
use store::Deserialize;
use tokio::io::AsyncRead;
use crate::core::{Mailbox, Session, SessionData};
@ -44,7 +44,9 @@ impl<T: AsyncRead> Session<T> {
let data = self.state.session_data();
tokio::spawn(async move {
// Refresh mailboxes
if !data.try_synchronize_mailboxes(&arguments.tag).await {
if let Err(err) = data.synchronize_mailboxes(false).await {
data.write_bytes(err.with_tag(arguments.tag).into_bytes())
.await;
return;
}
@ -81,7 +83,7 @@ impl SessionData {
) -> super::Result<StatusItem> {
// Get mailbox id
let mailbox = if let Some(mailbox) = self.get_mailbox_by_name(&mailbox_name) {
Arc::new(mailbox)
mailbox
} else {
return Err(
StatusResponse::no("Mailbox does not exist.").with_code(ResponseCode::NonExistent)
@ -91,52 +93,55 @@ impl SessionData {
// Make sure all requested fields are up to date
let mut items_update = AHashSet::with_capacity(items.len());
let mut items_response = Vec::with_capacity(items.len());
let mut do_synchronize = false;
for account in self.mailboxes.lock().iter_mut() {
if account.account_id == mailbox.account_id {
let mailbox_data = account
.mailbox_data
let mailbox_state = account
.mailbox_state
.entry(mailbox.mailbox_id.as_ref().cloned().unwrap_or_default())
.or_insert_with(Mailbox::default);
for item in items {
match item {
Status::Messages => {
if let Some(value) = mailbox_data.total_messages {
if let Some(value) = mailbox_state.total_messages {
items_response.push((*item, StatusItemType::Number(value)));
} else {
items_update.insert(*item);
}
}
Status::UidNext => {
if let Some(value) = mailbox_data.uid_next {
if let Some(value) = mailbox_state.uid_next {
items_response.push((*item, StatusItemType::Number(value)));
} else {
items_update.insert(*item);
do_synchronize = true;
}
}
Status::UidValidity => {
if let Some(value) = mailbox_data.uid_validity {
if let Some(value) = mailbox_state.uid_validity {
items_response.push((*item, StatusItemType::Number(value)));
} else {
items_update.insert(*item);
do_synchronize = true;
}
}
Status::Unseen => {
if let Some(value) = mailbox_data.total_unseen {
if let Some(value) = mailbox_state.total_unseen {
items_response.push((*item, StatusItemType::Number(value)));
} else {
items_update.insert(*item);
}
}
Status::Deleted => {
if let Some(value) = mailbox_data.total_deleted {
if let Some(value) = mailbox_state.total_deleted {
items_response.push((*item, StatusItemType::Number(value)));
} else {
items_update.insert(*item);
}
}
Status::Size => {
if let Some(value) = mailbox_data.size {
if let Some(value) = mailbox_state.size {
items_response.push((*item, StatusItemType::Number(value)));
} else {
items_update.insert(*item);
@ -145,7 +150,9 @@ impl SessionData {
Status::HighestModSeq => {
items_response.push((
*item,
StatusItemType::Number(account.state.unwrap_or_default() as u32),
StatusItemType::Number(
account.state_email.map(|id| id + 1).unwrap_or(0) as u32,
),
));
}
Status::MailboxId => {
@ -172,6 +179,11 @@ impl SessionData {
if !items_update.is_empty() {
// Retrieve latest values
let mut values_update = Vec::with_capacity(items_update.len());
let mailbox_state = if do_synchronize {
self.fetch_messages(&mailbox).await?.into()
} else {
None
};
if let Some(mailbox_id) = mailbox.mailbox_id {
let mailbox_message_ids = self
@ -194,8 +206,8 @@ impl SessionData {
Status::Messages => {
message_ids.as_ref().map(|v| v.len()).unwrap_or(0) as u32
}
Status::UidNext => todo!(),
Status::UidValidity => todo!(),
Status::UidNext => mailbox_state.as_ref().unwrap().uid_next,
Status::UidValidity => mailbox_state.as_ref().unwrap().uid_validity,
Status::Unseen => {
if let (Some(message_ids), Some(mailbox_message_ids), Some(mut seen)) = (
&message_ids,
@ -260,8 +272,8 @@ impl SessionData {
for item in items_update {
let result = match item {
Status::Messages => message_ids.len() as u32,
Status::UidNext => todo!(),
Status::UidValidity => todo!(),
Status::UidNext => mailbox_state.as_ref().unwrap().uid_next,
Status::UidValidity => mailbox_state.as_ref().unwrap().uid_validity,
Status::Unseen => self
.jmap
.get_tag(
@ -308,19 +320,19 @@ impl SessionData {
// Update cache
for account in self.mailboxes.lock().iter_mut() {
if account.account_id == mailbox.account_id {
let mailbox_data = account
.mailbox_data
let mailbox_state = account
.mailbox_state
.entry(mailbox.mailbox_id.as_ref().cloned().unwrap_or_default())
.or_insert_with(Mailbox::default);
for (item, value) in values_update {
match item {
Status::Messages => mailbox_data.total_messages = value.into(),
Status::UidNext => mailbox_data.uid_next = value.into(),
Status::UidValidity => mailbox_data.uid_validity = value.into(),
Status::Unseen => mailbox_data.total_unseen = value.into(),
Status::Deleted => mailbox_data.total_deleted = value.into(),
Status::Size => mailbox_data.size = value.into(),
Status::Messages => mailbox_state.total_messages = value.into(),
Status::UidNext => mailbox_state.uid_next = value.into(),
Status::UidValidity => mailbox_state.uid_validity = value.into(),
Status::Unseen => mailbox_state.total_unseen = value.into(),
Status::Deleted => mailbox_state.total_deleted = value.into(),
Status::Size => mailbox_state.size = value.into(),
Status::HighestModSeq | Status::MailboxId | Status::Recent => {
unreachable!()
}
@ -362,12 +374,13 @@ impl SessionData {
},
)
.await
.map(|(_, size)| size )
.map(|(_, size)| size)
.map_err(|err| {
tracing::warn!(parent: &self.span,
tracing::warn!(parent: &self.span,
event = "error",
reason = ?err,
reason = ?err,
"Failed to calculate mailbox size");
StatusResponse::database_failure()})
StatusResponse::database_failure()
})
}
}

View file

@ -85,4 +85,3 @@ impl Policy {
}
}
}