From b5f2af7d6a5398031e7261344e9ab7aa0b8b8ab1 Mon Sep 17 00:00:00 2001 From: Mauro D Date: Sun, 25 Jun 2023 18:00:45 +0000 Subject: [PATCH] IMAP fetch and store commands. --- Cargo.lock | 3 + crates/imap-proto/src/lib.rs | 10 +- crates/imap-proto/src/protocol/fetch.rs | 2 +- crates/imap-proto/src/protocol/mod.rs | 53 +- crates/imap-proto/src/protocol/status.rs | 2 +- crates/imap/Cargo.toml | 4 + crates/imap/src/core/message.rs | 112 ++- crates/imap/src/core/mod.rs | 2 +- crates/imap/src/op/fetch.rs | 1156 ++++++++++++++++++++++ crates/imap/src/op/mod.rs | 17 + crates/imap/src/op/status.rs | 44 +- crates/imap/src/op/store.rs | 395 ++++++++ crates/jmap/src/changes/write.rs | 44 +- crates/jmap/src/email/ingest.rs | 1 + crates/jmap/src/email/set.rs | 11 +- tests/Cargo.toml | 2 + tests/src/imap/body_structure.rs | 193 ++++ tests/src/imap/mod.rs | 1 + tests/src/lib.rs | 2 + 19 files changed, 1976 insertions(+), 78 deletions(-) create mode 100644 crates/imap/src/op/fetch.rs create mode 100644 crates/imap/src/op/store.rs create mode 100644 tests/src/imap/body_structure.rs create mode 100644 tests/src/imap/mod.rs diff --git a/Cargo.lock b/Cargo.lock index e0790527..9f31c836 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1715,6 +1715,7 @@ dependencies = [ "jmap_proto", "mail-parser", "mail-send", + "md5", "parking_lot", "rustls 0.21.1", "rustls-pemfile", @@ -4123,6 +4124,8 @@ dependencies = [ "futures", "http-body-util", "hyper 1.0.0-rc.3", + "imap", + "imap_proto", "jmap", "jmap-client", "jmap_proto", diff --git a/crates/imap-proto/src/lib.rs b/crates/imap-proto/src/lib.rs index ac326090..9e9ac71f 100644 --- a/crates/imap-proto/src/lib.rs +++ b/crates/imap-proto/src/lib.rs @@ -134,7 +134,7 @@ pub enum ResponseCode { ids: Vec, }, HighestModseq { - modseq: u32, + modseq: u64, }, // ObjectID @@ -160,6 +160,14 @@ pub enum ResponseType { Bye, } +impl ResponseCode { + pub fn highest_modseq(modseq: Option) -> Self { + ResponseCode::HighestModseq { + modseq: modseq.map(|id| id + 1).unwrap_or(0), + } + } +} + impl StatusResponse { pub fn bad(message: impl Into>) -> Self { StatusResponse { diff --git a/crates/imap-proto/src/protocol/fetch.rs b/crates/imap-proto/src/protocol/fetch.rs index 275d2f35..5f59446d 100644 --- a/crates/imap-proto/src/protocol/fetch.rs +++ b/crates/imap-proto/src/protocol/fetch.rs @@ -140,7 +140,7 @@ pub enum DataItem<'x> { contents: Option>, }, ModSeq { - modseq: u32, + modseq: u64, }, EmailId { email_id: String, diff --git a/crates/imap-proto/src/protocol/mod.rs b/crates/imap-proto/src/protocol/mod.rs index ae9c4f71..47476557 100644 --- a/crates/imap-proto/src/protocol/mod.rs +++ b/crates/imap-proto/src/protocol/mod.rs @@ -25,6 +25,7 @@ use std::{cmp::Ordering, fmt::Display}; use ahash::AHashSet; use chrono::{DateTime, NaiveDateTime, Utc}; +use jmap_proto::types::keyword::Keyword; use crate::{Command, ResponseCode, ResponseType, StatusResponse}; @@ -263,22 +264,44 @@ impl Flag { Flag::Keyword(keyword) => keyword.as_bytes(), }); } +} - pub fn to_jmap(&self) -> &str { - match self { - Flag::Seen => "$seen", - Flag::Draft => "$draft", - Flag::Flagged => "$flagged", - Flag::Answered => "$answered", - Flag::Recent => "$recent", - Flag::Important => "$important", - Flag::Phishing => "$phishing", - Flag::Junk => "$junk", - Flag::NotJunk => "$notjunk", - Flag::Deleted => "$deleted", - Flag::Forwarded => "$forwarded", - Flag::MDNSent => "$mdnsent", - Flag::Keyword(keyword) => keyword, +impl From for Flag { + fn from(value: Keyword) -> Self { + match value { + Keyword::Seen => Flag::Seen, + Keyword::Draft => Flag::Draft, + Keyword::Flagged => Flag::Flagged, + Keyword::Answered => Flag::Answered, + Keyword::Recent => Flag::Recent, + Keyword::Important => Flag::Important, + Keyword::Phishing => Flag::Phishing, + Keyword::Junk => Flag::Junk, + Keyword::NotJunk => Flag::NotJunk, + Keyword::Deleted => Flag::Deleted, + Keyword::Forwarded => Flag::Forwarded, + Keyword::MdnSent => Flag::MDNSent, + Keyword::Other(value) => Flag::Keyword(value), + } + } +} + +impl From for Keyword { + fn from(value: Flag) -> Self { + match value { + Flag::Seen => Keyword::Seen, + Flag::Draft => Keyword::Draft, + Flag::Flagged => Keyword::Flagged, + Flag::Answered => Keyword::Answered, + Flag::Recent => Keyword::Recent, + Flag::Important => Keyword::Important, + Flag::Phishing => Keyword::Phishing, + Flag::Junk => Keyword::Junk, + Flag::NotJunk => Keyword::NotJunk, + Flag::Deleted => Keyword::Deleted, + Flag::Forwarded => Keyword::Forwarded, + Flag::MDNSent => Keyword::MdnSent, + Flag::Keyword(value) => Keyword::Other(value), } } } diff --git a/crates/imap-proto/src/protocol/status.rs b/crates/imap-proto/src/protocol/status.rs index 2bee93f8..988af3ea 100644 --- a/crates/imap-proto/src/protocol/status.rs +++ b/crates/imap-proto/src/protocol/status.rs @@ -53,7 +53,7 @@ pub struct StatusItem { #[derive(Debug, Clone, PartialEq, Eq)] pub enum StatusItemType { - Number(u32), + Number(u64), String(String), } diff --git a/crates/imap/Cargo.toml b/crates/imap/Cargo.toml index 00e32295..e647391a 100644 --- a/crates/imap/Cargo.toml +++ b/crates/imap/Cargo.toml @@ -20,3 +20,7 @@ tokio-rustls = { version = "0.24.0"} parking_lot = "0.12" tracing = "0.1" ahash = { version = "0.8" } +md5 = "0.7.0" + +[features] +test_mode = [] diff --git a/crates/imap/src/core/message.rs b/crates/imap/src/core/message.rs index 6a32eabc..01716995 100644 --- a/crates/imap/src/core/message.rs +++ b/crates/imap/src/core/message.rs @@ -4,7 +4,10 @@ use std::{ }; use ahash::{AHashMap, AHashSet, AHasher, RandomState}; -use imap_proto::{protocol::Sequence, StatusResponse}; +use imap_proto::{ + protocol::{expunge, select::Exists, Sequence}, + StatusResponse, +}; use jmap_proto::types::{collection::Collection, property::Property}; use store::{ roaring::RoaringBitmap, @@ -189,7 +192,7 @@ impl SessionData { let uid_map = uid_map.inner; let mut id_to_imap = AHashMap::with_capacity(uid_map.items.len()); let mut uid_to_id = AHashMap::with_capacity(uid_map.items.len()); - let mut uids = Vec::with_capacity(uid_map.items.len()); + let mut uid_max = 0; for (seqnum, item) in uid_map.items.into_iter().enumerate() { id_to_imap.insert( @@ -200,16 +203,16 @@ impl SessionData { }, ); uid_to_id.insert(item.uid, item.id); - uids.push(item.uid); + uid_max = item.uid; } return Ok(MailboxState { uid_next: uid_map.uid_next, uid_validity: uid_map.uid_validity, - total_messages: uids.len(), + total_messages: id_to_imap.len(), id_to_imap, uid_to_id, - uids, + uid_max, last_state, }); } else { @@ -265,12 +268,72 @@ impl SessionData { total_messages: uids.len(), id_to_imap, uid_to_id, - uids, + uid_max: uid_next.saturating_sub(1), last_state, }); } } } + + pub async fn synchronize_messages( + &self, + mailbox: &SelectedMailbox, + is_qresync: bool, + ) -> crate::op::Result> { + // Obtain current modseq + let modseq = self.get_modseq(mailbox.id.account_id).await?; + if mailbox.state.lock().last_state == modseq { + return Ok(modseq); + } + + // Synchronize messages + let new_state = self.fetch_messages(&mailbox.id).await?; + + // Update UIDs + let mut buf = Vec::with_capacity(64); + let (new_message_count, deletions) = mailbox.update_mailbox_state(new_state, true); + if let Some(deletions) = deletions { + expunge::Response { + is_qresync, + ids: deletions + .into_iter() + .map(|id| if !is_qresync { id.seqnum } else { id.uid }) + .collect(), + } + .serialize_to(&mut buf); + } + if let Some(new_message_count) = new_message_count { + Exists { + total_messages: new_message_count, + } + .serialize(&mut buf); + } + if !buf.is_empty() { + self.write_bytes(buf).await; + } + + Ok(modseq) + } + + pub async fn get_modseq(&self, account_id: u32) -> crate::op::Result> { + // Obtain current modseq + if let Ok(modseq) = self + .jmap + .store + .get_last_change_id(account_id, Collection::Email) + .await + { + Ok(modseq) + } else { + tracing::error!(parent: &self.span, + event = "error", + context = "store", + account_id = account_id, + collection = ?Collection::Email, + "Failed to obtain modseq"); + Err(StatusResponse::database_failure()) + } + } } impl SelectedMailbox { @@ -282,12 +345,12 @@ impl SelectedMailbox { if !sequence.is_saved_search() { let mut ids = AHashMap::new(); let state = self.state.lock(); - if state.uids.is_empty() { + if state.id_to_imap.is_empty() { return Ok(ids); } - let max_uid = state.uids.last().copied().unwrap_or(0); - let max_seqnum = state.uids.len() as u32; + let max_uid = state.uid_max; + let max_seqnum = state.total_messages as u32; for (id, imap_id) in &state.id_to_imap { let matched = if is_uid { @@ -296,7 +359,7 @@ impl SelectedMailbox { sequence.contains(imap_id.seqnum, max_seqnum) }; if matched { - ids.insert(id.clone(), *imap_id); + ids.insert(*id, *imap_id); } } @@ -319,6 +382,35 @@ impl SelectedMailbox { } } + pub async fn sequence_expand_missing(&self, sequence: &Sequence, is_uid: bool) -> Vec { + let mut deleted_ids = Vec::new(); + if !sequence.is_saved_search() { + let state = self.state.lock(); + if is_uid { + for uid in sequence.expand(state.uid_max) { + if !state.uid_to_id.contains_key(&uid) { + deleted_ids.push(uid); + } + } + } else { + for seqnum in sequence.expand(state.total_messages as u32) { + if seqnum > state.total_messages as u32 { + deleted_ids.push(seqnum); + } + } + } + } else if let Some(saved_ids) = self.get_saved_search().await { + let state = self.state.lock(); + for id in saved_ids.iter() { + if !state.uid_to_id.contains_key(&id.uid) { + deleted_ids.push(if is_uid { id.uid } else { id.seqnum }); + } + } + } + deleted_ids.sort_unstable(); + deleted_ids + } + pub fn id_to_uid(&self, ids: &[u32]) -> Vec { let mut imap_ids = Vec::with_capacity(ids.len()); let state = self.state.lock(); diff --git a/crates/imap/src/core/mod.rs b/crates/imap/src/core/mod.rs index e19ec19d..066dc50c 100644 --- a/crates/imap/src/core/mod.rs +++ b/crates/imap/src/core/mod.rs @@ -118,9 +118,9 @@ pub struct MailboxId { pub struct MailboxState { pub uid_next: u32, pub uid_validity: u32, + pub uid_max: u32, pub id_to_imap: AHashMap, pub uid_to_id: AHashMap, - pub uids: Vec, pub total_messages: usize, pub last_state: Option, } diff --git a/crates/imap/src/op/fetch.rs b/crates/imap/src/op/fetch.rs new file mode 100644 index 00000000..fa3c16ed --- /dev/null +++ b/crates/imap/src/op/fetch.rs @@ -0,0 +1,1156 @@ +/* + * Copyright (c) 2020-2022, Stalwart Labs Ltd. + * + * This file is part of the Stalwart IMAP Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use std::{borrow::Cow, sync::Arc}; + +use ahash::AHashMap; +use imap_proto::{ + parser::PushUnique, + protocol::{ + expunge::Vanished, + fetch::{ + self, Arguments, Attribute, BodyContents, BodyPart, BodyPartExtension, BodyPartFields, + DataItem, Envelope, FetchItem, Section, + }, + Flag, + }, + receiver::Request, + Command, ResponseCode, StatusResponse, +}; +use jmap_proto::{ + error::method::MethodError, + object::Object, + types::{ + acl::Acl, blob::BlobId, collection::Collection, id::Id, keyword::Keyword, + property::Property, state::StateChange, type_state::TypeState, value::Value, + }, +}; +use mail_parser::{GetHeader, Message, PartType, RfcHeader}; +use store::{ + query::log::{Change, Query}, + write::{assert::HashedValue, BatchBuilder, F_BITMAP, F_VALUE}, +}; +use tokio::io::AsyncRead; + +use crate::core::{SelectedMailbox, Session, SessionData}; + +use super::FromModSeq; + +impl Session { + pub async fn handle_fetch( + &mut self, + request: Request, + is_uid: bool, + ) -> Result<(), ()> { + match request.parse_fetch() { + Ok(arguments) => { + let (data, mailbox) = self.state.select_data(); + let is_qresync = self.is_qresync; + + let enabled_condstore = if !self.is_condstore && arguments.changed_since.is_some() + || arguments.attributes.contains(&Attribute::ModSeq) + { + self.is_condstore = true; + true + } else { + false + }; + + tokio::spawn(async move { + data.write_bytes( + data.fetch(arguments, mailbox, is_uid, is_qresync, enabled_condstore) + .await + .into_bytes(), + ) + .await; + }); + Ok(()) + } + Err(response) => self.write_bytes(response.into_bytes()).await, + } + } +} + +impl SessionData { + pub async fn fetch( + &self, + mut arguments: Arguments, + mailbox: Arc, + is_uid: bool, + is_qresync: bool, + enabled_condstore: bool, + ) -> StatusResponse { + // Validate VANISHED parameter + if arguments.include_vanished { + if !is_qresync { + return StatusResponse::bad("Enable QRESYNC first to use the VANISHED parameter.") + .with_tag(arguments.tag); + } else if !is_uid { + return StatusResponse::bad("VANISHED parameter is only available for UID FETCH.") + .with_tag(arguments.tag); + } + } + + // Resync messages if needed + let account_id = mailbox.id.account_id; + let mut modseq = match { + if is_uid { + self.synchronize_messages(&mailbox, is_qresync).await + } else { + // Don't synchronize if we're not using UIDs as seqnums might change. + self.get_modseq(mailbox.id.account_id).await + } + } { + Ok(modseq) => modseq, + Err(response) => return response.with_tag(arguments.tag), + }; + + // Convert IMAP ids to JMAP ids. + let mut ids = match mailbox + .sequence_to_ids(&arguments.sequence_set, is_uid) + .await + { + Ok(ids) => ids, + Err(response) => { + return response.with_tag(arguments.tag); + } + }; + + // Convert state to modseq + if let Some(changed_since) = arguments.changed_since { + // Obtain changes since the modseq. + let changelog = match self + .jmap + .changes_( + account_id, + Collection::Email, + Query::from_modseq(changed_since), + ) + .await + { + Ok(changelog) => changelog, + Err(_) => return StatusResponse::database_failure().with_tag(arguments.tag), + }; + + // Process changes + let mut changed_ids = AHashMap::new(); + let mut has_vanished = false; + + for change in changelog.changes { + match change { + Change::Insert(id) | Change::Update(id) | Change::ChildUpdate(id) => { + let id = (id & u32::MAX as u64) as u32; + if let Some(uid) = ids.get(&id) { + changed_ids.insert(id, *uid); + } + if !has_vanished { + has_vanished = matches!(change, Change::Update(_)); + } + } + Change::Delete(_) => { + has_vanished = true; + } + } + } + + // Send vanished UIDs + if arguments.include_vanished && has_vanished { + // Add to vanished all known destroyed Ids + let vanished = mailbox + .sequence_expand_missing(&arguments.sequence_set, true) + .await; + + if !vanished.is_empty() { + let mut buf = Vec::with_capacity(vanished.len() * 3); + Vanished { + earlier: true, + ids: vanished, + } + .serialize(&mut buf); + self.write_bytes(buf).await; + } + } + + // Filter out ids without changes + if changed_ids.is_empty() { + // Condstore was just enabled, return highest modseq. + if enabled_condstore { + self.write_bytes( + StatusResponse::ok("Highest Modseq") + .with_code(ResponseCode::highest_modseq(modseq)) + .into_bytes(), + ) + .await; + } + return StatusResponse::completed(Command::Fetch(is_uid)).with_tag(arguments.tag); + } + ids = changed_ids; + arguments.attributes.push_unique(Attribute::ModSeq); + } + + // Obtain shared messages + let access_token = match self.get_access_token().await { + Ok(access_token) => access_token, + Err(response) => return response.with_tag(arguments.tag), + }; + let can_modify_ids = if access_token.is_shared(account_id) { + match self + .jmap + .shared_messages(&access_token, account_id, Acl::ModifyItems) + .await + { + Ok(document_ids) => document_ids.into(), + Err(_) => return StatusResponse::database_failure().with_tag(arguments.tag), + } + } else { + None + }; + + // Build properties list + let mut set_seen_flags = false; + let mut needs_thread_id = false; + let mut needs_blobs = false; + + for attribute in &arguments.attributes { + match attribute { + Attribute::Envelope + | Attribute::Rfc822Header + | Attribute::Body + | Attribute::BodyStructure + | Attribute::BinarySize { .. } => { + /* + Note that this did not result in \Seen being set, because + RFC822.HEADER response data occurs as a result of a FETCH + of RFC822.HEADER. BODY[HEADER] response data occurs as a + result of a FETCH of BODY[HEADER] (which sets \Seen) or + BODY.PEEK[HEADER] (which does not set \Seen). + */ + needs_blobs = true; + } + Attribute::BodySection { peek, .. } | Attribute::Binary { peek, .. } => { + if mailbox.is_select && !*peek { + set_seen_flags = true; + } + needs_blobs = true; + } + Attribute::Rfc822Text | Attribute::Rfc822 => { + if mailbox.is_select { + set_seen_flags = true; + } + needs_blobs = true; + } + Attribute::ThreadId => { + needs_thread_id = true; + } + _ => (), + } + } + if is_uid { + arguments.attributes.push_unique(Attribute::Uid); + } + + let mut set_seen_ids = Vec::new(); + + // Process each message + for (id, imap_id) in ids { + let uid = imap_id.uid; + let seqnum = imap_id.seqnum; + + // Obtain attributes and keywords + let (email, keywords) = if let (Ok(Some(email)), Ok(Some(keywords))) = ( + self.jmap + .get_property::>( + account_id, + Collection::Email, + id, + &Property::BodyStructure, + ) + .await, + self.jmap + .get_property::>>( + account_id, + Collection::Email, + id, + &Property::Keywords, + ) + .await, + ) { + (email, keywords) + } else { + tracing::debug!( + event = "not-found", + account_id = account_id, + collection = ?Collection::Email, + document_id = id, + "Message metadata not found"); + continue; + }; + + // Fetch and parse blob + let raw_message = if needs_blobs { + // Retrieve raw message if needed + let blob_id = BlobId::maildir(account_id, id); + match self.jmap.get_blob(&blob_id.kind, 0..u32::MAX).await { + Ok(Some(raw_message)) => raw_message.into(), + Ok(None) => { + tracing::warn!(event = "not-found", + account_id = account_id, + collection = ?Collection::Email, + document_id = id, + blob_id = ?blob_id, + "Blob not found"); + continue; + } + Err(_) => { + return StatusResponse::database_failure().with_tag(arguments.tag); + } + } + } else { + None + }; + + let message = if let Some(raw_message) = &raw_message { + if let Some(message) = Message::parse(raw_message) { + message.into() + } else { + tracing::warn!( + event = "parse-error", + account_id = account_id, + collection = ?Collection::Email, + document_id = id, + blob_id = ?BlobId::maildir(account_id, id), + "Failed to parse stored message"); + continue; + } + } else { + None + }; + + // Build response + let mut items = Vec::with_capacity(arguments.attributes.len()); + let set_seen_flag = set_seen_flags + && !keywords.inner.iter().any(|k| k == &Keyword::Seen) + && can_modify_ids.as_ref().map_or(true, |ids| ids.contains(id)); + let thread_id = if needs_thread_id || set_seen_flag { + if let Ok(Some(thread_id)) = self + .jmap + .get_property::(account_id, Collection::Email, id, Property::ThreadId) + .await + { + thread_id + } else { + continue; + } + } else { + 0 + }; + for attribute in &arguments.attributes { + match attribute { + Attribute::Envelope => { + items.push(DataItem::Envelope { + envelope: message.as_ref().unwrap().envelope(), + }); + } + Attribute::Flags => { + let mut flags = keywords + .inner + .iter() + .map(|k| Flag::from(k.clone())) + .collect::>(); + if set_seen_flag { + flags.push(Flag::Seen); + } + items.push(DataItem::Flags { flags }); + } + Attribute::InternalDate => { + if let Some(date) = email.get(&Property::ReceivedAt).as_date() { + items.push(DataItem::InternalDate { + date: date.timestamp(), + }); + } + } + Attribute::Preview { .. } => { + items.push(DataItem::Preview { + contents: email.get(&Property::Preview).as_string().map(|p| p.into()), + }); + } + Attribute::Rfc822Size => { + items.push(DataItem::Rfc822Size { + size: email.get(&Property::Size).as_uint().unwrap_or(0) as usize, + }); + } + Attribute::Uid => { + items.push(DataItem::Uid { uid }); + } + Attribute::Rfc822 => { + items.push(DataItem::Rfc822 { + contents: String::from_utf8_lossy(raw_message.as_ref().unwrap()), + }); + } + Attribute::Rfc822Header => { + let message = message.as_ref().unwrap().root_part(); + if let Some(header) = raw_message + .as_ref() + .unwrap() + .get(message.offset_header..message.offset_body) + { + items.push(DataItem::Rfc822Header { + contents: String::from_utf8_lossy(header), + }); + } + } + Attribute::Rfc822Text => { + let message = message.as_ref().unwrap().root_part(); + if let Some(text) = raw_message + .as_ref() + .unwrap() + .get(message.offset_body..message.offset_end) + { + items.push(DataItem::Rfc822Text { + contents: String::from_utf8_lossy(text), + }); + } + } + Attribute::Body => { + items.push(DataItem::Body { + part: message.as_ref().unwrap().body_structure(false), + }); + } + Attribute::BodyStructure => { + items.push(DataItem::BodyStructure { + part: message.as_ref().unwrap().body_structure(true), + }); + } + Attribute::BodySection { + sections, partial, .. + } => { + if let Some(contents) = + message.as_ref().unwrap().body_section(sections, *partial) + { + items.push(DataItem::BodySection { + sections: sections.to_vec(), + origin_octet: partial.map(|(start, _)| start), + contents, + }); + } + } + + Attribute::Binary { + sections, partial, .. + } => match message.as_ref().unwrap().binary(sections, *partial) { + Ok(Some(contents)) => { + items.push(DataItem::Binary { + sections: sections.to_vec(), + offset: partial.map(|(start, _)| start), + contents, + }); + } + Err(_) => { + self.write_bytes( + StatusResponse::no(format!( + "Failed to decode part {} of message {}.", + sections + .iter() + .map(|s| s.to_string()) + .collect::>() + .join("."), + if is_uid { uid } else { seqnum } + )) + .with_code(ResponseCode::UnknownCte) + .into_bytes(), + ) + .await; + continue; + } + _ => (), + }, + Attribute::BinarySize { sections } => { + if let Some(size) = message.as_ref().unwrap().binary_size(sections) { + items.push(DataItem::BinarySize { + sections: sections.to_vec(), + size, + }); + } + } + Attribute::ModSeq => { + if let Ok(Some(modseq)) = self + .jmap + .get_property::(account_id, Collection::Email, id, Property::Cid) + .await + { + items.push(DataItem::ModSeq { modseq }); + } + } + Attribute::EmailId => { + items.push(DataItem::EmailId { + email_id: Id::from_parts(account_id, id).to_string(), + }); + } + Attribute::ThreadId => { + items.push(DataItem::ThreadId { + thread_id: Id::from_parts(account_id, thread_id).to_string(), + }); + } + } + } + + // Add flags to the response if the message was unseen + if set_seen_flag && !arguments.attributes.contains(&Attribute::Flags) { + let mut flags = keywords + .inner + .iter() + .map(|k| Flag::from(k.clone())) + .collect::>(); + flags.push(Flag::Seen); + items.push(DataItem::Flags { flags }); + } + + // Serialize fetch item + let mut buf = Vec::with_capacity(128); + FetchItem { id: seqnum, items }.serialize(&mut buf); + if !self.write_bytes(buf).await { + return StatusResponse::completed(Command::Fetch(is_uid)).with_tag(arguments.tag); + } + + // Add to set flags + if set_seen_flag { + set_seen_ids.push((Id::from_parts(thread_id, id), keywords)); + } + } + + // Set Seen ids + if !set_seen_ids.is_empty() { + let mut changelog = match self.jmap.begin_changes(account_id).await { + Ok(changelog) => changelog, + Err(_) => { + return StatusResponse::database_failure().with_tag(arguments.tag); + } + }; + for (id, mut keywords) in set_seen_ids { + keywords.inner.push(Keyword::Seen); + let mut batch = BatchBuilder::new(); + batch + .with_account_id(account_id) + .with_collection(Collection::Email) + .update_document(id.document_id()) + .assert_value(Property::Keywords, &keywords) + .value(Property::Keywords, keywords.inner, F_VALUE) + .value(Property::Keywords, Keyword::Seen, F_BITMAP) + .value(Property::Cid, changelog.change_id, F_VALUE); + match self.jmap.write_batch(batch).await { + Ok(_) => { + changelog.log_update(Collection::Email, id); + } + Err(MethodError::ServerUnavailable) => {} + Err(_) => { + return StatusResponse::database_failure().with_tag(arguments.tag); + } + } + } + if !changelog.is_empty() { + let change_id = changelog.change_id; + if self + .jmap + .commit_changes(account_id, changelog) + .await + .is_err() + { + return StatusResponse::database_failure().with_tag(arguments.tag); + } + modseq = change_id.into(); + self.jmap + .broadcast_state_change( + StateChange::new(account_id).with_change(TypeState::Email, change_id), + ) + .await; + } + } + + // Condstore was enabled with this command + if enabled_condstore { + self.write_bytes( + StatusResponse::ok("Highest Modseq") + .with_code(ResponseCode::highest_modseq(modseq)) + .into_bytes(), + ) + .await; + } + + StatusResponse::completed(Command::Fetch(is_uid)).with_tag(arguments.tag) + } +} + +trait AsImapDataItem<'x> { + fn body_structure(&self, is_extended: bool) -> BodyPart; + fn body_section<'z: 'x>( + &'z self, + sections: &[Section], + partial: Option<(u32, u32)>, + ) -> Option>; + fn binary( + &self, + sections: &[u32], + partial: Option<(u32, u32)>, + ) -> Result, ()>; + fn binary_size(&self, sections: &[u32]) -> Option; + fn as_body_part(&self, part_id: usize, is_extended: bool) -> BodyPart; + fn envelope(&self) -> Envelope; +} + +impl<'x> AsImapDataItem<'x> for Message<'x> { + fn body_structure(&self, is_extended: bool) -> BodyPart { + let mut stack = Vec::new(); + let mut parts = [0].iter(); + let mut message = self; + let mut root_part = None; + + loop { + while let Some(part_id) = parts.next() { + let mut part = message.as_body_part(*part_id, is_extended); + + match &message.parts[*part_id].body { + PartType::Message(nested_message) => { + part.set_envelope(nested_message.envelope()); + if let Some(root_part) = root_part { + stack.push((root_part, parts, message.into())); + } + root_part = part.into(); + parts = [0].iter(); + message = nested_message; + continue; + } + PartType::Multipart(subparts) => { + if let Some(root_part) = root_part { + stack.push((root_part, parts, None)); + } + root_part = part.into(); + parts = subparts.iter(); + continue; + } + _ => (), + } + if let Some(root_part) = &mut root_part { + root_part.add_part(part); + } else { + return part; + } + } + if let Some((mut prev_root_part, prev_parts, prev_message)) = stack.pop() { + if let Some(prev_message) = prev_message { + message = prev_message; + } + + prev_root_part.add_part(root_part.unwrap()); + parts = prev_parts; + root_part = prev_root_part.into(); + } else { + break; + } + } + + root_part.unwrap() + } + + fn as_body_part(&self, part_id: usize, is_extended: bool) -> BodyPart { + let part = &self.parts[part_id]; + let body = self.raw_message.get(part.offset_body..part.offset_end); + let (is_multipart, is_text) = match &part.body { + PartType::Text(_) | PartType::Html(_) => (false, true), + PartType::Multipart(_) => (true, false), + _ => (false, false), + }; + let content_type = part + .headers + .rfc(&RfcHeader::ContentType) + .and_then(|ct| ct.as_content_type_ref()); + + let mut body_md5 = None; + let mut extension = BodyPartExtension::default(); + let mut fields = BodyPartFields::default(); + + if !is_multipart || is_extended { + fields.body_parameters = content_type.as_ref().and_then(|ct| { + ct.attributes.as_ref().map(|at| { + at.iter() + .map(|(h, v)| (h.as_ref().into(), v.as_ref().into())) + .collect::>() + }) + }) + } + + if !is_multipart { + fields.body_subtype = content_type + .as_ref() + .and_then(|ct| ct.c_subtype.as_ref().map(|cs| cs.as_ref().into())); + + fields.body_id = part + .headers + .rfc(&RfcHeader::ContentId) + .and_then(|id| id.as_text_ref().map(|id| format!("<{}>", id).into())); + + fields.body_description = part + .headers + .rfc(&RfcHeader::ContentDescription) + .and_then(|ct| ct.as_text_ref().map(|ct| ct.into())); + + fields.body_encoding = part + .headers + .rfc(&RfcHeader::ContentTransferEncoding) + .and_then(|ct| ct.as_text_ref().map(|ct| ct.into())); + + fields.body_size_octets = body.as_ref().map(|b| b.len()).unwrap_or(0); + + if is_text { + if fields.body_subtype.is_none() { + fields.body_subtype = Some("plain".into()); + } + if fields.body_encoding.is_none() { + fields.body_encoding = Some("7bit".into()); + } + if fields.body_parameters.is_none() { + fields.body_parameters = Some(vec![("charset".into(), "us-ascii".into())]); + } + } + } + + if is_extended { + if !is_multipart { + body_md5 = body + .as_ref() + .map(|b| format!("{:x}", md5::compute(b)).into()); + } + + extension.body_disposition = + part.headers.rfc(&RfcHeader::ContentDisposition).map(|cd| { + let cd = cd.content_type(); + + ( + cd.c_type.as_ref().into(), + cd.attributes + .as_ref() + .map(|at| { + at.iter() + .map(|(h, v)| (h.as_ref().into(), v.as_ref().into())) + .collect::>() + }) + .unwrap_or_default(), + ) + }); + + extension.body_language = + part.headers + .rfc(&RfcHeader::ContentLanguage) + .and_then(|hv| { + hv.as_text_list() + .map(|list| list.into_iter().map(|item| item.into()).collect()) + }); + + extension.body_location = part + .headers + .rfc(&RfcHeader::ContentLocation) + .and_then(|ct| ct.as_text_ref().map(|ct| ct.into())); + } + + match &part.body { + PartType::Multipart(parts) => BodyPart::Multipart { + body_parts: Vec::with_capacity(parts.len()), + body_subtype: content_type + .as_ref() + .and_then(|ct| ct.c_subtype.as_ref().map(|cs| cs.as_ref().into())) + .unwrap_or_else(|| "".into()), + body_parameters: fields.body_parameters, + extension, + }, + PartType::Message(_) => BodyPart::Message { + fields, + envelope: None, + body: None, + body_size_lines: 0, + body_md5, + extension, + }, + _ => { + if is_text { + BodyPart::Text { + fields, + body_size_lines: body + .as_ref() + .map(|b| b.iter().filter(|&&ch| ch == b'\n').count()) + .unwrap_or(0), + body_md5, + extension, + } + } else { + BodyPart::Basic { + body_type: content_type + .as_ref() + .map(|ct| Cow::from(ct.c_type.as_ref())), + fields, + body_md5, + extension, + } + } + } + } + } + + fn body_section<'z: 'x>( + &'z self, + sections: &[Section], + partial: Option<(u32, u32)>, + ) -> Option> { + let mut part = self.root_part(); + if sections.is_empty() { + return String::from_utf8_lossy(get_partial_bytes( + self.raw_message.get(part.offset_header..part.offset_end)?, + partial, + )) + .into(); + } + + let mut message = self; + let sections_single = sections.len() == 1; + let mut sections_iter = sections.iter().peekable(); + + while let Some(section) = sections_iter.next() { + match section { + Section::Part { num } => { + part = if let Some(sub_part_ids) = part.sub_parts() { + sub_part_ids + .get((*num).saturating_sub(1) as usize) + .and_then(|pos| message.parts.get(*pos)) + } else if *num == 1 && (sections_single || part.is_message()) { + Some(part) + } else { + None + }?; + + if let (PartType::Message(nested_message), Some(_)) = + (&part.body, sections_iter.peek()) + { + message = nested_message; + part = message.root_part(); + } + } + Section::Header => { + return String::from_utf8_lossy(get_partial_bytes( + message + .raw_message + .get(part.offset_header..part.offset_body)?, + partial, + )) + .into(); + } + Section::HeaderFields { not, fields } => { + let mut headers = + Vec::with_capacity(part.offset_body.saturating_sub(part.offset_header)); + for header in &part.headers { + let header_name = header.name.as_str(); + if fields.iter().any(|f| header_name.eq_ignore_ascii_case(f)) != *not { + headers.extend_from_slice(header_name.as_bytes()); + headers.push(b':'); + headers.extend_from_slice( + message + .raw_message + .get(header.offset_start..header.offset_end) + .unwrap_or(b""), + ); + } + } + + headers.extend_from_slice(b"\r\n"); + + return Some(if partial.is_none() { + String::from_utf8(headers).map_or_else( + |err| String::from_utf8_lossy(err.as_bytes()).into_owned().into(), + |s| s.into(), + ) + } else { + String::from_utf8_lossy(get_partial_bytes(&headers, partial)) + .into_owned() + .into() + }); + } + Section::Text => { + return String::from_utf8_lossy(get_partial_bytes( + message.raw_message.get(part.offset_body..part.offset_end)?, + partial, + )) + .into(); + } + Section::Mime => { + let mut headers = + Vec::with_capacity(part.offset_body.saturating_sub(part.offset_header)); + for header in &part.headers { + if header.name.is_mime_header() { + headers.extend_from_slice(header.name.as_str().as_bytes()); + headers.extend_from_slice(b":"); + headers.extend_from_slice( + message + .raw_message + .get(header.offset_start..header.offset_end) + .unwrap_or(b""), + ); + } + } + headers.extend_from_slice(b"\r\n"); + return Some(if partial.is_none() { + String::from_utf8(headers).map_or_else( + |err| String::from_utf8_lossy(err.as_bytes()).into_owned().into(), + |s| s.into(), + ) + } else { + String::from_utf8_lossy(get_partial_bytes(&headers, partial)) + .into_owned() + .into() + }); + } + } + } + + // BODY[x] should return both headers and body, but most clients + // expect BODY[x] to return only the body, just like BOXY[x.TEXT] does. + + String::from_utf8_lossy(get_partial_bytes( + message.raw_message.get(part.offset_body..part.offset_end)?, + partial, + )) + .into() + + /*String::from_utf8_lossy(get_partial_bytes( + raw_message.get(part.offset_header..part.offset_end)?, + partial, + )) + .into()*/ + } + + fn binary( + &self, + sections: &[u32], + partial: Option<(u32, u32)>, + ) -> Result, ()> { + let mut message = self; + let mut part = self.root_part(); + let mut sections_iter = sections.iter().peekable(); + + while let Some(section) = sections_iter.next() { + part = if let Some(part) = part + .sub_parts() + .and_then(|p| p.get((*section).saturating_sub(1) as usize)) + .and_then(|p| message.parts.get(*p)) + { + part + } else { + return Ok(None); + }; + if let (PartType::Message(nested_message), Some(_)) = (&part.body, sections_iter.peek()) + { + message = nested_message; + part = message.root_part(); + } + } + + if !part.is_encoding_problem { + Ok(match &part.body { + PartType::Text(text) | PartType::Html(text) => BodyContents::Text( + String::from_utf8_lossy(get_partial_bytes(text.as_bytes(), partial)), + ) + .into(), + PartType::Binary(bytes) | PartType::InlineBinary(bytes) => { + BodyContents::Bytes(get_partial_bytes(bytes.as_ref(), partial).into()).into() + } + PartType::Message(message) => BodyContents::Bytes( + get_partial_bytes( + message + .raw_message + .get( + message.root_part().raw_header_offset() + ..message.root_part().raw_end_offset(), + ) + .unwrap_or(&b""[..]), + partial, + ) + .into(), + ) + .into(), + PartType::Multipart(_) => None, + }) + } else { + Err(()) + } + } + + fn binary_size(&self, sections: &[u32]) -> Option { + let mut message = self; + let mut part = self.root_part(); + let mut sections_iter = sections.iter().peekable(); + + while let Some(section) = sections_iter.next() { + part = message.parts.get( + *part + .sub_parts()? + .get((*section).saturating_sub(1) as usize)?, + )?; + if let (PartType::Message(nested_message), Some(_)) = (&part.body, sections_iter.peek()) + { + message = nested_message; + part = message.root_part(); + } + } + + match &part.body { + PartType::Text(text) | PartType::Html(text) => text.len(), + PartType::Binary(bytes) | PartType::InlineBinary(bytes) => bytes.len(), + PartType::Message(message) => message.root_part().raw_len(), + PartType::Multipart(_) => 0, + } + .into() + } + + fn envelope(&self) -> Envelope { + Envelope { + date: self.date().map(|dt| dt.to_timestamp()), + subject: self.subject().map(|s| s.into()), + from: self + .header_values(RfcHeader::From) + .flat_map(|a| a.as_imap_address()) + .collect(), + sender: self + .header_values(RfcHeader::Sender) + .flat_map(|a| a.as_imap_address()) + .collect(), + reply_to: self + .header_values(RfcHeader::ReplyTo) + .flat_map(|a| a.as_imap_address()) + .collect(), + to: self + .header_values(RfcHeader::To) + .flat_map(|a| a.as_imap_address()) + .collect(), + cc: self + .header_values(RfcHeader::Cc) + .flat_map(|a| a.as_imap_address()) + .collect(), + bcc: self + .header_values(RfcHeader::Bcc) + .flat_map(|a| a.as_imap_address()) + .collect(), + in_reply_to: self.in_reply_to().as_text_list().map(|list| { + let mut irt = String::with_capacity(list.len() * 10); + for (pos, l) in list.iter().enumerate() { + if pos > 0 { + irt.push(' '); + } + irt.push('<'); + irt.push_str(l.as_ref()); + irt.push('>'); + } + irt.into() + }), + message_id: self.message_id().map(|id| format!("<{}>", id).into()), + } + } +} + +#[inline(always)] +fn get_partial_bytes(bytes: &[u8], partial: Option<(u32, u32)>) -> &[u8] { + if let Some((start, end)) = partial { + if let Some(bytes) = + bytes.get(start as usize..std::cmp::min((start + end) as usize, bytes.len())) + { + bytes + } else { + &[] + } + } else { + bytes + } +} + +trait AsImapAddress { + fn as_imap_address(&self) -> Vec; +} + +impl AsImapAddress for mail_parser::HeaderValue<'_> { + fn as_imap_address(&self) -> Vec { + let mut addresses = Vec::new(); + + match self { + mail_parser::HeaderValue::Address(addr) => { + if let Some(email) = &addr.address { + addresses.push(fetch::Address::Single(fetch::EmailAddress { + name: addr.name.as_ref().map(|n| n.as_ref().into()), + address: email.as_ref().into(), + })); + } + } + mail_parser::HeaderValue::AddressList(list) => { + for addr in list { + if let Some(email) = &addr.address { + addresses.push(fetch::Address::Single(fetch::EmailAddress { + name: addr.name.as_ref().map(|n| n.as_ref().into()), + address: email.as_ref().into(), + })); + } + } + } + mail_parser::HeaderValue::Group(group) => { + addresses.push(fetch::Address::Group(fetch::AddressGroup { + name: group.name.as_ref().map(|n| n.as_ref().into()), + addresses: group + .addresses + .iter() + .filter_map(|addr| { + fetch::EmailAddress { + name: addr.name.as_ref().map(|n| n.as_ref().into()), + address: addr.address.as_ref()?.as_ref().into(), + } + .into() + }) + .collect(), + })); + } + mail_parser::HeaderValue::GroupList(list) => { + for group in list { + addresses.push(fetch::Address::Group(fetch::AddressGroup { + name: group.name.as_ref().map(|n| n.as_ref().into()), + addresses: group + .addresses + .iter() + .filter_map(|addr| { + fetch::EmailAddress { + name: addr.name.as_ref().map(|n| n.as_ref().into()), + address: addr.address.as_ref()?.as_ref().into(), + } + .into() + }) + .collect(), + })); + } + } + _ => (), + } + + addresses + } +} diff --git a/crates/imap/src/op/mod.rs b/crates/imap/src/op/mod.rs index 99848c10..2925c5d0 100644 --- a/crates/imap/src/op/mod.rs +++ b/crates/imap/src/op/mod.rs @@ -1,11 +1,28 @@ +use ::store::query::log::Query; use imap_proto::StatusResponse; pub mod authenticate; pub mod create; pub mod delete; +pub mod fetch; pub mod list; pub mod rename; pub mod status; +pub mod store; pub mod subscribe; +trait FromModSeq { + fn from_modseq(modseq: u64) -> Self; +} + +impl FromModSeq for Query { + fn from_modseq(modseq: u64) -> Self { + if modseq > 0 { + Query::Since(modseq - 1) + } else { + Query::All + } + } +} + pub type Result = std::result::Result; diff --git a/crates/imap/src/op/status.rs b/crates/imap/src/op/status.rs index 75954575..005951a6 100644 --- a/crates/imap/src/op/status.rs +++ b/crates/imap/src/op/status.rs @@ -105,14 +105,14 @@ impl SessionData { match item { Status::Messages => { if let Some(value) = mailbox_state.total_messages { - items_response.push((*item, StatusItemType::Number(value))); + items_response.push((*item, StatusItemType::Number(value as u64))); } else { items_update.insert(*item); } } Status::UidNext => { if let Some(value) = mailbox_state.uid_next { - items_response.push((*item, StatusItemType::Number(value))); + items_response.push((*item, StatusItemType::Number(value as u64))); } else { items_update.insert(*item); do_synchronize = true; @@ -120,7 +120,7 @@ impl SessionData { } Status::UidValidity => { if let Some(value) = mailbox_state.uid_validity { - items_response.push((*item, StatusItemType::Number(value))); + items_response.push((*item, StatusItemType::Number(value as u64))); } else { items_update.insert(*item); do_synchronize = true; @@ -128,21 +128,21 @@ impl SessionData { } Status::Unseen => { if let Some(value) = mailbox_state.total_unseen { - items_response.push((*item, StatusItemType::Number(value))); + items_response.push((*item, StatusItemType::Number(value as u64))); } else { items_update.insert(*item); } } Status::Deleted => { if let Some(value) = mailbox_state.total_deleted { - items_response.push((*item, StatusItemType::Number(value))); + items_response.push((*item, StatusItemType::Number(value as u64))); } else { items_update.insert(*item); } } Status::Size => { if let Some(value) = mailbox_state.size { - items_response.push((*item, StatusItemType::Number(value))); + items_response.push((*item, StatusItemType::Number(value as u64))); } else { items_update.insert(*item); } @@ -151,7 +151,7 @@ impl SessionData { items_response.push(( *item, StatusItemType::Number( - account.state_email.map(|id| id + 1).unwrap_or(0) as u32, + account.state_email.map(|id| id + 1).unwrap_or(0), ), )); } @@ -203,11 +203,9 @@ impl SessionData { for item in items_update { let result = match item { - Status::Messages => { - message_ids.as_ref().map(|v| v.len()).unwrap_or(0) as u32 - } - Status::UidNext => mailbox_state.as_ref().unwrap().uid_next, - Status::UidValidity => mailbox_state.as_ref().unwrap().uid_validity, + Status::Messages => message_ids.as_ref().map(|v| v.len()).unwrap_or(0), + Status::UidNext => mailbox_state.as_ref().unwrap().uid_next as u64, + Status::UidValidity => mailbox_state.as_ref().unwrap().uid_validity as u64, Status::Unseen => { if let (Some(message_ids), Some(mailbox_message_ids), Some(mut seen)) = ( &message_ids, @@ -223,7 +221,7 @@ impl SessionData { ) { seen ^= message_ids; seen &= mailbox_message_ids.as_ref(); - seen.len() as u32 + seen.len() } else { 0 } @@ -241,7 +239,7 @@ impl SessionData { .await?, ) { deleted &= mailbox_message_ids.as_ref(); - deleted.len() as u32 + deleted.len() } else { 0 } @@ -249,7 +247,7 @@ impl SessionData { Status::Size => { if let Some(mailbox_message_ids) = &mailbox_message_ids { self.calculate_mailbox_size(mailbox.account_id, mailbox_message_ids) - .await? + .await? as u64 } else { 0 } @@ -260,7 +258,7 @@ impl SessionData { }; items_response.push((item, StatusItemType::Number(result))); - values_update.push((item, result)); + values_update.push((item, result as u32)); } } else { let message_ids = Arc::new( @@ -271,9 +269,9 @@ impl SessionData { ); for item in items_update { let result = match item { - Status::Messages => message_ids.len() as u32, - Status::UidNext => mailbox_state.as_ref().unwrap().uid_next, - Status::UidValidity => mailbox_state.as_ref().unwrap().uid_validity, + Status::Messages => message_ids.len(), + Status::UidNext => mailbox_state.as_ref().unwrap().uid_next as u64, + Status::UidValidity => mailbox_state.as_ref().unwrap().uid_validity as u64, Status::Unseen => self .jmap .get_tag( @@ -287,7 +285,7 @@ impl SessionData { seen ^= message_ids.as_ref(); seen.len() }) - .unwrap_or(0) as u32, + .unwrap_or(0), Status::Deleted => self .jmap .get_tag( @@ -298,11 +296,11 @@ impl SessionData { ) .await? .map(|v| v.len()) - .unwrap_or(0) as u32, + .unwrap_or(0), Status::Size => { if !message_ids.is_empty() { self.calculate_mailbox_size(mailbox.account_id, &message_ids) - .await? + .await? as u64 } else { 0 } @@ -313,7 +311,7 @@ impl SessionData { }; items_response.push((item, StatusItemType::Number(result))); - values_update.push((item, result)); + values_update.push((item, result as u32)); } } diff --git a/crates/imap/src/op/store.rs b/crates/imap/src/op/store.rs new file mode 100644 index 00000000..b6294cc6 --- /dev/null +++ b/crates/imap/src/op/store.rs @@ -0,0 +1,395 @@ +/* + * Copyright (c) 2020-2022, Stalwart Labs Ltd. + * + * This file is part of the Stalwart IMAP Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use std::sync::Arc; + +use ahash::AHashSet; +use imap_proto::{ + protocol::{ + fetch::{DataItem, FetchItem}, + store::{Arguments, Operation, Response}, + Flag, ImapResponse, + }, + receiver::Request, + Command, ResponseCode, ResponseType, StatusResponse, +}; +use jmap::email::set::TagManager; +use jmap_proto::{ + error::method::MethodError, + types::{ + acl::Acl, collection::Collection, id::Id, keyword::Keyword, property::Property, + state::StateChange, type_state::TypeState, + }, +}; +use store::{ + query::log::{Change, Query}, + write::{assert::HashedValue, log::ChangeLogBuilder, BatchBuilder, F_VALUE}, +}; +use tokio::io::AsyncRead; + +use crate::core::{SelectedMailbox, Session, SessionData}; + +use super::FromModSeq; + +impl Session { + pub async fn handle_store( + &mut self, + request: Request, + is_uid: bool, + ) -> Result<(), ()> { + match request.parse_store() { + Ok(arguments) => { + let (data, mailbox) = self.state.select_data(); + let is_condstore = self.is_condstore || mailbox.is_condstore; + let is_qresync = self.is_qresync; + + tokio::spawn(async move { + let bytes = match data + .store(arguments, mailbox, is_uid, is_condstore, is_qresync) + .await + { + Ok(response) => response, + Err(response) => response.into_bytes(), + }; + data.write_bytes(bytes).await; + }); + Ok(()) + } + Err(response) => self.write_bytes(response.into_bytes()).await, + } + } +} + +impl SessionData { + pub async fn store( + &self, + arguments: Arguments, + mailbox: Arc, + is_uid: bool, + is_condstore: bool, + is_qresync: bool, + ) -> Result, StatusResponse> { + // Resync messages if needed + let account_id = mailbox.id.account_id; + if is_uid { + // Don't synchronize if we're not using UIDs as seqnums might change. + self.synchronize_messages(&mailbox, is_qresync) + .await + .map_err(|r| r.with_tag(&arguments.tag))?; + } + + // Convert IMAP ids to JMAP ids. + let mut ids = match mailbox + .sequence_to_ids(&arguments.sequence_set, is_uid) + .await + { + Ok(ids) => { + if ids.is_empty() { + return Err( + StatusResponse::completed(Command::Store(is_uid)).with_tag(arguments.tag) + ); + } + ids + } + Err(response) => { + return Err(response.with_tag(arguments.tag)); + } + }; + + // Obtain shared messages + let access_token = match self.get_access_token().await { + Ok(access_token) => access_token, + Err(response) => return Err(response.with_tag(arguments.tag)), + }; + let can_modify_ids = if access_token.is_shared(account_id) { + match self + .jmap + .shared_messages(&access_token, account_id, Acl::ModifyItems) + .await + { + Ok(document_ids) => document_ids.into(), + Err(_) => return Err(StatusResponse::database_failure().with_tag(arguments.tag)), + } + } else { + None + }; + + // Filter out unchanged since ids + let mut response_code = None; + let mut unchanged_failed = false; + if let Some(unchanged_since) = arguments.unchanged_since { + // Obtain changes since the modseq. + let changelog = match self + .jmap + .changes_( + account_id, + Collection::Email, + Query::from_modseq(unchanged_since), + ) + .await + { + Ok(changelog) => changelog, + Err(_) => return Err(StatusResponse::database_failure().with_tag(arguments.tag)), + }; + + let mut modified = mailbox + .sequence_expand_missing(&arguments.sequence_set, is_uid) + .await; + + // Add all IDs that changed in this mailbox + for change in changelog.changes { + let (Change::Insert(id) + | Change::Update(id) + | Change::ChildUpdate(id) + | Change::Delete(id)) = change; + let id = (id & u32::MAX as u64) as u32; + if let Some(imap_id) = ids.remove(&id) { + if is_uid { + modified.push(imap_id.uid); + } else { + modified.push(imap_id.seqnum); + if matches!(change, Change::Delete(_)) { + unchanged_failed = true; + } + } + } + } + + if !modified.is_empty() { + modified.sort_unstable(); + response_code = ResponseCode::Modified { ids: modified }.into(); + } + } + + // Build response + let mut response = if !unchanged_failed { + StatusResponse::completed(Command::Store(is_uid)) + } else { + StatusResponse::no("Some of the messages no longer exist.") + } + .with_tag(arguments.tag); + if let Some(response_code) = response_code { + response = response.with_code(response_code) + } + if ids.is_empty() { + return Err(response); + } + let mut items = Response { + items: Vec::with_capacity(ids.len()), + }; + + // Process each change + let set_keywords = arguments + .keywords + .into_iter() + .map(Keyword::from) + .collect::>(); + let mut changelog = ChangeLogBuilder::new(); + let mut changed_mailboxes = AHashSet::new(); + for (id, imap_id) in ids { + // Check ACLs + if can_modify_ids + .as_ref() + .map_or(false, |can_modify_ids| !can_modify_ids.contains(id)) + { + response.rtype = ResponseType::No; + response.message = "Not enough permissions to modify one or more messages.".into(); + continue; + } + + // Obtain current keywords + let (mut keywords, thread_id) = if let (Some(keywords), Some(thread_id)) = ( + self.jmap + .get_property::>>( + account_id, + Collection::Email, + id, + Property::Keywords, + ) + .await + .map_err(|_| { + StatusResponse::database_failure().with_tag(response.tag.as_ref().unwrap()) + })?, + self.jmap + .get_property::(account_id, Collection::Email, id, Property::ThreadId) + .await + .map_err(|_| { + StatusResponse::database_failure().with_tag(response.tag.as_ref().unwrap()) + })?, + ) { + (TagManager::new(keywords), thread_id) + } else { + continue; + }; + + // Apply changes + match arguments.operation { + Operation::Set => { + keywords.set(set_keywords.clone()); + } + Operation::Add => { + for keyword in &set_keywords { + keywords.update(keyword.clone(), true); + } + } + Operation::Clear => { + for keyword in &set_keywords { + keywords.update(keyword.clone(), true); + } + } + } + + if keywords.has_changes() { + // Convert keywords to flags + let seen_changed = keywords + .changed_tags() + .any(|keyword| keyword == &Keyword::Seen); + let flags = if !arguments.is_silent { + keywords + .current() + .iter() + .cloned() + .map(Flag::from) + .collect::>() + } else { + vec![] + }; + + // Write changes + let mut batch = BatchBuilder::new(); + batch + .with_account_id(account_id) + .with_collection(Collection::Email) + .update_document(id); + keywords.update_batch(&mut batch, Property::Keywords); + if changelog.change_id == u64::MAX { + changelog.change_id = + self.jmap.assign_change_id(account_id).await.map_err(|_| { + StatusResponse::database_failure() + .with_tag(response.tag.as_ref().unwrap()) + })? + } + batch.value(Property::Cid, changelog.change_id, F_VALUE); + match self.jmap.write_batch(batch).await { + Ok(_) => { + // Set all current mailboxes as changed if the Seen tag changed + if seen_changed { + if let Some(mailboxes) = self + .jmap + .get_property::>( + account_id, + Collection::Email, + id, + Property::MailboxIds, + ) + .await + .map_err(|_| { + StatusResponse::database_failure() + .with_tag(response.tag.as_ref().unwrap()) + })? + { + for mailbox_id in mailboxes { + changed_mailboxes.insert(mailbox_id); + } + } + } + changelog.log_update(Collection::Email, Id::from_parts(thread_id, id)); + + // Add item to response + if !arguments.is_silent { + let mut data_items = vec![DataItem::Flags { flags }]; + if is_uid { + data_items.push(DataItem::Uid { uid: imap_id.uid }); + } + if is_condstore { + data_items.push(DataItem::ModSeq { + modseq: changelog.change_id, + }); + } + items.items.push(FetchItem { + id: imap_id.seqnum, + items: data_items, + }); + } else if is_condstore { + items.items.push(FetchItem { + id: imap_id.seqnum, + items: if is_uid { + vec![ + DataItem::ModSeq { + modseq: changelog.change_id, + }, + DataItem::Uid { uid: imap_id.uid }, + ] + } else { + vec![DataItem::ModSeq { + modseq: changelog.change_id, + }] + }, + }); + } + } + Err(MethodError::ServerUnavailable) => { + response.rtype = ResponseType::No; + response.message = "Some messaged could not be updated.".into(); + } + Err(_) => { + return Err(StatusResponse::database_failure() + .with_tag(response.tag.as_ref().unwrap())); + } + } + } + } + + // Log mailbox changes + for mailbox_id in &changed_mailboxes { + changelog.log_child_update(Collection::Mailbox, *mailbox_id); + } + + // Write changes + if !changelog.is_empty() { + let change_id = changelog.change_id; + if self + .jmap + .commit_changes(account_id, changelog) + .await + .is_err() + { + return Err( + StatusResponse::database_failure().with_tag(response.tag.as_ref().unwrap()) + ); + } + self.jmap + .broadcast_state_change(if !changed_mailboxes.is_empty() { + StateChange::new(account_id) + .with_change(TypeState::Email, change_id) + .with_change(TypeState::Mailbox, change_id) + } else { + StateChange::new(account_id).with_change(TypeState::Email, change_id) + }) + .await; + } + + // Send response + Ok(response.serialize(items.serialize())) + } +} diff --git a/crates/jmap/src/changes/write.rs b/crates/jmap/src/changes/write.rs index 81de3dc4..8a5134e5 100644 --- a/crates/jmap/src/changes/write.rs +++ b/crates/jmap/src/changes/write.rs @@ -28,38 +28,32 @@ use crate::JMAP; impl JMAP { pub async fn begin_changes(&self, account_id: u32) -> Result { - Ok(ChangeLogBuilder::with_change_id( - self.store - .assign_change_id(account_id) - .await - .map_err(|err| { - tracing::error!( - event = "error", - context = "change_log", - error = ?err, - "Failed to assign changeId."); - MethodError::ServerPartialFail - })?, - )) + self.assign_change_id(account_id) + .await + .map(ChangeLogBuilder::with_change_id) } + + pub async fn assign_change_id(&self, account_id: u32) -> Result { + self.store + .assign_change_id(account_id) + .await + .map_err(|err| { + tracing::error!( + event = "error", + context = "change_log", + error = ?err, + "Failed to assign changeId."); + MethodError::ServerPartialFail + }) + } + pub async fn commit_changes( &self, account_id: u32, mut changes: ChangeLogBuilder, ) -> Result { if changes.change_id == u64::MAX { - changes.change_id = self - .store - .assign_change_id(account_id) - .await - .map_err(|err| { - tracing::error!( - event = "error", - context = "change_log", - error = ?err, - "Failed to assign changeId."); - MethodError::ServerPartialFail - })?; + changes.change_id = self.assign_change_id(account_id).await?; } let state = State::from(changes.change_id); diff --git a/crates/jmap/src/email/ingest.rs b/crates/jmap/src/email/ingest.rs index 4543371d..fad81d7b 100644 --- a/crates/jmap/src/email/ingest.rs +++ b/crates/jmap/src/email/ingest.rs @@ -264,6 +264,7 @@ impl JMAP { "Failed to index message."); IngestError::Temporary })? + .value(Property::Cid, change_id, F_VALUE) .value(Property::ThreadId, thread_id, F_VALUE | F_BITMAP) .custom(changes); self.store.write(batch.build()).await.map_err(|err| { diff --git a/crates/jmap/src/email/set.rs b/crates/jmap/src/email/set.rs index 71fb5425..5b7af1a5 100644 --- a/crates/jmap/src/email/set.rs +++ b/crates/jmap/src/email/set.rs @@ -876,6 +876,12 @@ impl JMAP { // Update keywords property keywords.update_batch(&mut batch, Property::Keywords); + + // Update last change id + if changes.change_id == u64::MAX { + changes.change_id = self.assign_change_id(account_id).await?; + } + batch.value(Property::Cid, changes.change_id, F_VALUE); } // Process mailboxes @@ -1051,6 +1057,9 @@ impl JMAP { .with_collection(Collection::Email) .delete_document(document_id); + // Remove last changeId + batch.value(Property::Cid, (), F_VALUE | F_CLEAR); + // Remove mailboxes let mailboxes = if let Some(mailboxes) = self .get_property::>>( @@ -1260,7 +1269,7 @@ impl JMAP { } } -struct TagManager< +pub struct TagManager< T: PartialEq + Clone + ToBitmaps + SerializeInto + Serialize + DeserializeFrom + Sync + Send, > { current: HashedValue>, diff --git a/tests/Cargo.toml b/tests/Cargo.toml index b3ec8e64..1f9c086a 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -14,6 +14,8 @@ store = { path = "../crates/store", features = ["test_mode"] } directory = { path = "../crates/directory" } jmap = { path = "../crates/jmap", features = ["test_mode"] } jmap_proto = { path = "../crates/jmap-proto" } +imap = { path = "../crates/imap", features = ["test_mode"] } +imap_proto = { path = "../crates/imap-proto" } smtp = { path = "../crates/smtp", features = ["test_mode", "local_delivery"] } smtp-proto = { git = "https://github.com/stalwartlabs/smtp-proto" } mail-send = { git = "https://github.com/stalwartlabs/mail-send" } diff --git a/tests/src/imap/body_structure.rs b/tests/src/imap/body_structure.rs new file mode 100644 index 00000000..11938b3c --- /dev/null +++ b/tests/src/imap/body_structure.rs @@ -0,0 +1,193 @@ +use std::{fs, path::PathBuf}; + +use imap_proto::protocol::fetch::Section; +use mail_parser::Message; + +#[test] +fn body_structure() { + let mut test_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + test_dir.push("resources"); + test_dir.push("imap"); + test_dir.push("messages"); + for file_name in fs::read_dir(&test_dir).unwrap() { + let mut file_name = file_name.as_ref().unwrap().path(); + if file_name.extension().map_or(true, |e| e != "txt") { + continue; + } + + let raw_message = fs::read(&file_name).unwrap(); + let message = Message::parse(&raw_message).unwrap(); + let mut buf = Vec::new(); + + // Serialize body and bodystructure + for is_extended in [false, true] { + let mut buf_ = Vec::new(); + message + .body_structure(is_extended) + .serialize(&mut buf_, is_extended); + if is_extended { + buf.extend_from_slice(b"BODYSTRUCTURE "); + } else { + buf.extend_from_slice(b"BODY "); + } + + // Poor man's indentation + let mut indent_count = 0; + let mut in_quote = false; + for ch in buf_ { + if ch == b'(' && !in_quote { + buf.extend_from_slice(b"(\n"); + indent_count += 1; + for _ in 0..indent_count { + buf.extend_from_slice(b" "); + } + } else if ch == b')' && !in_quote { + buf.push(b'\n'); + indent_count -= 1; + for _ in 0..indent_count { + buf.extend_from_slice(b" "); + } + buf.push(b')'); + } else { + if ch == b'"' { + in_quote = !in_quote; + } + buf.push(ch); + } + } + buf.extend_from_slice(b"\n\n"); + } + + // Serialize body parts + let mut iter = 1..9; + let mut stack = Vec::new(); + let mut sections = Vec::new(); + loop { + 'inner: while let Some(part_id) = iter.next() { + if part_id == 1 { + for section in [ + None, + Some(Section::Header), + Some(Section::Text), + Some(Section::Mime), + ] { + let mut body_sections = sections + .iter() + .map(|id| Section::Part { num: *id }) + .collect::>(); + let is_first = if let Some(section) = section { + body_sections.push(section); + false + } else { + true + }; + + if let Some(contents) = message.body_section(&body_sections, None) { + DataItem::BodySection { + sections: body_sections, + origin_octet: None, + contents, + } + .serialize(&mut buf); + + if is_first { + match message.binary(§ions, None) { + Ok(Some(contents)) => { + buf.push(b'\n'); + DataItem::Binary { + sections: sections.clone(), + offset: None, + contents: match contents { + BodyContents::Bytes(_) => { + BodyContents::Text("[binary content]".into()) + } + text => text, + }, + } + .serialize(&mut buf); + } + Ok(None) => (), + Err(_) => { + buf.push(b'\n'); + buf.extend_from_slice( + &StatusResponse::no(format!( + "Failed to decode part {} of message {}.", + sections + .iter() + .map(|s| s.to_string()) + .collect::>() + .join("."), + 0 + )) + .with_code(ResponseCode::UnknownCte) + .serialize(Vec::new()), + ); + } + } + + if let Some(size) = message.binary_size(§ions) { + buf.push(b'\n'); + DataItem::BinarySize { + sections: sections.clone(), + size, + } + .serialize(&mut buf); + } + } + + buf.extend_from_slice(b"\n----------------------------------\n"); + } else { + break 'inner; + } + } + } + sections.push(part_id); + stack.push(iter); + iter = 1..9; + } + if let Some(prev_iter) = stack.pop() { + sections.pop(); + iter = prev_iter; + } else { + break; + } + } + + // Check header fields and partial sections + for sections in [ + vec![Section::HeaderFields { + not: false, + fields: vec!["From".to_string(), "To".to_string()], + }], + vec![Section::HeaderFields { + not: true, + fields: vec!["Subject".to_string(), "Cc".to_string()], + }], + ] { + DataItem::BodySection { + contents: message.body_section(§ions, None).unwrap(), + sections: sections.clone(), + origin_octet: None, + } + .serialize(&mut buf); + buf.extend_from_slice(b"\n----------------------------------\n"); + DataItem::BodySection { + contents: message.body_section(§ions, (10, 25).into()).unwrap(), + sections, + origin_octet: 10.into(), + } + .serialize(&mut buf); + buf.extend_from_slice(b"\n----------------------------------\n"); + } + + file_name.set_extension("imap"); + + let expected_result = fs::read(&file_name).unwrap(); + + if buf != expected_result { + file_name.set_extension("imap_failed"); + fs::write(&file_name, buf).unwrap(); + panic!("Failed test, written output to {}", file_name.display()); + } + } +} diff --git a/tests/src/imap/mod.rs b/tests/src/imap/mod.rs new file mode 100644 index 00000000..665a37dc --- /dev/null +++ b/tests/src/imap/mod.rs @@ -0,0 +1 @@ +pub mod body_structure; diff --git a/tests/src/lib.rs b/tests/src/lib.rs index f095c462..0d265fbe 100644 --- a/tests/src/lib.rs +++ b/tests/src/lib.rs @@ -26,6 +26,8 @@ use std::path::PathBuf; #[cfg(test)] pub mod directory; #[cfg(test)] +pub mod imap; +#[cfg(test)] pub mod jmap; #[cfg(test)] pub mod smtp;