IMAP fetch and store commands.

This commit is contained in:
Mauro D 2023-06-25 18:00:45 +00:00
parent ae8bedde0e
commit b5f2af7d6a
19 changed files with 1976 additions and 78 deletions

3
Cargo.lock generated
View file

@ -1715,6 +1715,7 @@ dependencies = [
"jmap_proto",
"mail-parser",
"mail-send",
"md5",
"parking_lot",
"rustls 0.21.1",
"rustls-pemfile",
@ -4123,6 +4124,8 @@ dependencies = [
"futures",
"http-body-util",
"hyper 1.0.0-rc.3",
"imap",
"imap_proto",
"jmap",
"jmap-client",
"jmap_proto",

View file

@ -134,7 +134,7 @@ pub enum ResponseCode {
ids: Vec<u32>,
},
HighestModseq {
modseq: u32,
modseq: u64,
},
// ObjectID
@ -160,6 +160,14 @@ pub enum ResponseType {
Bye,
}
impl ResponseCode {
pub fn highest_modseq(modseq: Option<u64>) -> Self {
ResponseCode::HighestModseq {
modseq: modseq.map(|id| id + 1).unwrap_or(0),
}
}
}
impl StatusResponse {
pub fn bad(message: impl Into<Cow<'static, str>>) -> Self {
StatusResponse {

View file

@ -140,7 +140,7 @@ pub enum DataItem<'x> {
contents: Option<Cow<'x, str>>,
},
ModSeq {
modseq: u32,
modseq: u64,
},
EmailId {
email_id: String,

View file

@ -25,6 +25,7 @@ use std::{cmp::Ordering, fmt::Display};
use ahash::AHashSet;
use chrono::{DateTime, NaiveDateTime, Utc};
use jmap_proto::types::keyword::Keyword;
use crate::{Command, ResponseCode, ResponseType, StatusResponse};
@ -263,22 +264,44 @@ impl Flag {
Flag::Keyword(keyword) => keyword.as_bytes(),
});
}
}
pub fn to_jmap(&self) -> &str {
match self {
Flag::Seen => "$seen",
Flag::Draft => "$draft",
Flag::Flagged => "$flagged",
Flag::Answered => "$answered",
Flag::Recent => "$recent",
Flag::Important => "$important",
Flag::Phishing => "$phishing",
Flag::Junk => "$junk",
Flag::NotJunk => "$notjunk",
Flag::Deleted => "$deleted",
Flag::Forwarded => "$forwarded",
Flag::MDNSent => "$mdnsent",
Flag::Keyword(keyword) => keyword,
impl From<Keyword> for Flag {
fn from(value: Keyword) -> Self {
match value {
Keyword::Seen => Flag::Seen,
Keyword::Draft => Flag::Draft,
Keyword::Flagged => Flag::Flagged,
Keyword::Answered => Flag::Answered,
Keyword::Recent => Flag::Recent,
Keyword::Important => Flag::Important,
Keyword::Phishing => Flag::Phishing,
Keyword::Junk => Flag::Junk,
Keyword::NotJunk => Flag::NotJunk,
Keyword::Deleted => Flag::Deleted,
Keyword::Forwarded => Flag::Forwarded,
Keyword::MdnSent => Flag::MDNSent,
Keyword::Other(value) => Flag::Keyword(value),
}
}
}
impl From<Flag> for Keyword {
fn from(value: Flag) -> Self {
match value {
Flag::Seen => Keyword::Seen,
Flag::Draft => Keyword::Draft,
Flag::Flagged => Keyword::Flagged,
Flag::Answered => Keyword::Answered,
Flag::Recent => Keyword::Recent,
Flag::Important => Keyword::Important,
Flag::Phishing => Keyword::Phishing,
Flag::Junk => Keyword::Junk,
Flag::NotJunk => Keyword::NotJunk,
Flag::Deleted => Keyword::Deleted,
Flag::Forwarded => Keyword::Forwarded,
Flag::MDNSent => Keyword::MdnSent,
Flag::Keyword(value) => Keyword::Other(value),
}
}
}

View file

@ -53,7 +53,7 @@ pub struct StatusItem {
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StatusItemType {
Number(u32),
Number(u64),
String(String),
}

View file

@ -20,3 +20,7 @@ tokio-rustls = { version = "0.24.0"}
parking_lot = "0.12"
tracing = "0.1"
ahash = { version = "0.8" }
md5 = "0.7.0"
[features]
test_mode = []

View file

@ -4,7 +4,10 @@ use std::{
};
use ahash::{AHashMap, AHashSet, AHasher, RandomState};
use imap_proto::{protocol::Sequence, StatusResponse};
use imap_proto::{
protocol::{expunge, select::Exists, Sequence},
StatusResponse,
};
use jmap_proto::types::{collection::Collection, property::Property};
use store::{
roaring::RoaringBitmap,
@ -189,7 +192,7 @@ impl SessionData {
let uid_map = uid_map.inner;
let mut id_to_imap = AHashMap::with_capacity(uid_map.items.len());
let mut uid_to_id = AHashMap::with_capacity(uid_map.items.len());
let mut uids = Vec::with_capacity(uid_map.items.len());
let mut uid_max = 0;
for (seqnum, item) in uid_map.items.into_iter().enumerate() {
id_to_imap.insert(
@ -200,16 +203,16 @@ impl SessionData {
},
);
uid_to_id.insert(item.uid, item.id);
uids.push(item.uid);
uid_max = item.uid;
}
return Ok(MailboxState {
uid_next: uid_map.uid_next,
uid_validity: uid_map.uid_validity,
total_messages: uids.len(),
total_messages: id_to_imap.len(),
id_to_imap,
uid_to_id,
uids,
uid_max,
last_state,
});
} else {
@ -265,12 +268,72 @@ impl SessionData {
total_messages: uids.len(),
id_to_imap,
uid_to_id,
uids,
uid_max: uid_next.saturating_sub(1),
last_state,
});
}
}
}
pub async fn synchronize_messages(
&self,
mailbox: &SelectedMailbox,
is_qresync: bool,
) -> crate::op::Result<Option<u64>> {
// Obtain current modseq
let modseq = self.get_modseq(mailbox.id.account_id).await?;
if mailbox.state.lock().last_state == modseq {
return Ok(modseq);
}
// Synchronize messages
let new_state = self.fetch_messages(&mailbox.id).await?;
// Update UIDs
let mut buf = Vec::with_capacity(64);
let (new_message_count, deletions) = mailbox.update_mailbox_state(new_state, true);
if let Some(deletions) = deletions {
expunge::Response {
is_qresync,
ids: deletions
.into_iter()
.map(|id| if !is_qresync { id.seqnum } else { id.uid })
.collect(),
}
.serialize_to(&mut buf);
}
if let Some(new_message_count) = new_message_count {
Exists {
total_messages: new_message_count,
}
.serialize(&mut buf);
}
if !buf.is_empty() {
self.write_bytes(buf).await;
}
Ok(modseq)
}
pub async fn get_modseq(&self, account_id: u32) -> crate::op::Result<Option<u64>> {
// Obtain current modseq
if let Ok(modseq) = self
.jmap
.store
.get_last_change_id(account_id, Collection::Email)
.await
{
Ok(modseq)
} else {
tracing::error!(parent: &self.span,
event = "error",
context = "store",
account_id = account_id,
collection = ?Collection::Email,
"Failed to obtain modseq");
Err(StatusResponse::database_failure())
}
}
}
impl SelectedMailbox {
@ -282,12 +345,12 @@ impl SelectedMailbox {
if !sequence.is_saved_search() {
let mut ids = AHashMap::new();
let state = self.state.lock();
if state.uids.is_empty() {
if state.id_to_imap.is_empty() {
return Ok(ids);
}
let max_uid = state.uids.last().copied().unwrap_or(0);
let max_seqnum = state.uids.len() as u32;
let max_uid = state.uid_max;
let max_seqnum = state.total_messages as u32;
for (id, imap_id) in &state.id_to_imap {
let matched = if is_uid {
@ -296,7 +359,7 @@ impl SelectedMailbox {
sequence.contains(imap_id.seqnum, max_seqnum)
};
if matched {
ids.insert(id.clone(), *imap_id);
ids.insert(*id, *imap_id);
}
}
@ -319,6 +382,35 @@ impl SelectedMailbox {
}
}
pub async fn sequence_expand_missing(&self, sequence: &Sequence, is_uid: bool) -> Vec<u32> {
let mut deleted_ids = Vec::new();
if !sequence.is_saved_search() {
let state = self.state.lock();
if is_uid {
for uid in sequence.expand(state.uid_max) {
if !state.uid_to_id.contains_key(&uid) {
deleted_ids.push(uid);
}
}
} else {
for seqnum in sequence.expand(state.total_messages as u32) {
if seqnum > state.total_messages as u32 {
deleted_ids.push(seqnum);
}
}
}
} else if let Some(saved_ids) = self.get_saved_search().await {
let state = self.state.lock();
for id in saved_ids.iter() {
if !state.uid_to_id.contains_key(&id.uid) {
deleted_ids.push(if is_uid { id.uid } else { id.seqnum });
}
}
}
deleted_ids.sort_unstable();
deleted_ids
}
pub fn id_to_uid(&self, ids: &[u32]) -> Vec<ImapId> {
let mut imap_ids = Vec::with_capacity(ids.len());
let state = self.state.lock();

View file

@ -118,9 +118,9 @@ pub struct MailboxId {
pub struct MailboxState {
pub uid_next: u32,
pub uid_validity: u32,
pub uid_max: u32,
pub id_to_imap: AHashMap<u32, ImapId>,
pub uid_to_id: AHashMap<u32, u32>,
pub uids: Vec<u32>,
pub total_messages: usize,
pub last_state: Option<u64>,
}

1156
crates/imap/src/op/fetch.rs Normal file

File diff suppressed because it is too large Load diff

View file

@ -1,11 +1,28 @@
use ::store::query::log::Query;
use imap_proto::StatusResponse;
pub mod authenticate;
pub mod create;
pub mod delete;
pub mod fetch;
pub mod list;
pub mod rename;
pub mod status;
pub mod store;
pub mod subscribe;
trait FromModSeq {
fn from_modseq(modseq: u64) -> Self;
}
impl FromModSeq for Query {
fn from_modseq(modseq: u64) -> Self {
if modseq > 0 {
Query::Since(modseq - 1)
} else {
Query::All
}
}
}
pub type Result<T> = std::result::Result<T, StatusResponse>;

View file

@ -105,14 +105,14 @@ impl SessionData {
match item {
Status::Messages => {
if let Some(value) = mailbox_state.total_messages {
items_response.push((*item, StatusItemType::Number(value)));
items_response.push((*item, StatusItemType::Number(value as u64)));
} else {
items_update.insert(*item);
}
}
Status::UidNext => {
if let Some(value) = mailbox_state.uid_next {
items_response.push((*item, StatusItemType::Number(value)));
items_response.push((*item, StatusItemType::Number(value as u64)));
} else {
items_update.insert(*item);
do_synchronize = true;
@ -120,7 +120,7 @@ impl SessionData {
}
Status::UidValidity => {
if let Some(value) = mailbox_state.uid_validity {
items_response.push((*item, StatusItemType::Number(value)));
items_response.push((*item, StatusItemType::Number(value as u64)));
} else {
items_update.insert(*item);
do_synchronize = true;
@ -128,21 +128,21 @@ impl SessionData {
}
Status::Unseen => {
if let Some(value) = mailbox_state.total_unseen {
items_response.push((*item, StatusItemType::Number(value)));
items_response.push((*item, StatusItemType::Number(value as u64)));
} else {
items_update.insert(*item);
}
}
Status::Deleted => {
if let Some(value) = mailbox_state.total_deleted {
items_response.push((*item, StatusItemType::Number(value)));
items_response.push((*item, StatusItemType::Number(value as u64)));
} else {
items_update.insert(*item);
}
}
Status::Size => {
if let Some(value) = mailbox_state.size {
items_response.push((*item, StatusItemType::Number(value)));
items_response.push((*item, StatusItemType::Number(value as u64)));
} else {
items_update.insert(*item);
}
@ -151,7 +151,7 @@ impl SessionData {
items_response.push((
*item,
StatusItemType::Number(
account.state_email.map(|id| id + 1).unwrap_or(0) as u32,
account.state_email.map(|id| id + 1).unwrap_or(0),
),
));
}
@ -203,11 +203,9 @@ impl SessionData {
for item in items_update {
let result = match item {
Status::Messages => {
message_ids.as_ref().map(|v| v.len()).unwrap_or(0) as u32
}
Status::UidNext => mailbox_state.as_ref().unwrap().uid_next,
Status::UidValidity => mailbox_state.as_ref().unwrap().uid_validity,
Status::Messages => message_ids.as_ref().map(|v| v.len()).unwrap_or(0),
Status::UidNext => mailbox_state.as_ref().unwrap().uid_next as u64,
Status::UidValidity => mailbox_state.as_ref().unwrap().uid_validity as u64,
Status::Unseen => {
if let (Some(message_ids), Some(mailbox_message_ids), Some(mut seen)) = (
&message_ids,
@ -223,7 +221,7 @@ impl SessionData {
) {
seen ^= message_ids;
seen &= mailbox_message_ids.as_ref();
seen.len() as u32
seen.len()
} else {
0
}
@ -241,7 +239,7 @@ impl SessionData {
.await?,
) {
deleted &= mailbox_message_ids.as_ref();
deleted.len() as u32
deleted.len()
} else {
0
}
@ -249,7 +247,7 @@ impl SessionData {
Status::Size => {
if let Some(mailbox_message_ids) = &mailbox_message_ids {
self.calculate_mailbox_size(mailbox.account_id, mailbox_message_ids)
.await?
.await? as u64
} else {
0
}
@ -260,7 +258,7 @@ impl SessionData {
};
items_response.push((item, StatusItemType::Number(result)));
values_update.push((item, result));
values_update.push((item, result as u32));
}
} else {
let message_ids = Arc::new(
@ -271,9 +269,9 @@ impl SessionData {
);
for item in items_update {
let result = match item {
Status::Messages => message_ids.len() as u32,
Status::UidNext => mailbox_state.as_ref().unwrap().uid_next,
Status::UidValidity => mailbox_state.as_ref().unwrap().uid_validity,
Status::Messages => message_ids.len(),
Status::UidNext => mailbox_state.as_ref().unwrap().uid_next as u64,
Status::UidValidity => mailbox_state.as_ref().unwrap().uid_validity as u64,
Status::Unseen => self
.jmap
.get_tag(
@ -287,7 +285,7 @@ impl SessionData {
seen ^= message_ids.as_ref();
seen.len()
})
.unwrap_or(0) as u32,
.unwrap_or(0),
Status::Deleted => self
.jmap
.get_tag(
@ -298,11 +296,11 @@ impl SessionData {
)
.await?
.map(|v| v.len())
.unwrap_or(0) as u32,
.unwrap_or(0),
Status::Size => {
if !message_ids.is_empty() {
self.calculate_mailbox_size(mailbox.account_id, &message_ids)
.await?
.await? as u64
} else {
0
}
@ -313,7 +311,7 @@ impl SessionData {
};
items_response.push((item, StatusItemType::Number(result)));
values_update.push((item, result));
values_update.push((item, result as u32));
}
}

395
crates/imap/src/op/store.rs Normal file
View file

@ -0,0 +1,395 @@
/*
* Copyright (c) 2020-2022, Stalwart Labs Ltd.
*
* This file is part of the Stalwart IMAP Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::sync::Arc;
use ahash::AHashSet;
use imap_proto::{
protocol::{
fetch::{DataItem, FetchItem},
store::{Arguments, Operation, Response},
Flag, ImapResponse,
},
receiver::Request,
Command, ResponseCode, ResponseType, StatusResponse,
};
use jmap::email::set::TagManager;
use jmap_proto::{
error::method::MethodError,
types::{
acl::Acl, collection::Collection, id::Id, keyword::Keyword, property::Property,
state::StateChange, type_state::TypeState,
},
};
use store::{
query::log::{Change, Query},
write::{assert::HashedValue, log::ChangeLogBuilder, BatchBuilder, F_VALUE},
};
use tokio::io::AsyncRead;
use crate::core::{SelectedMailbox, Session, SessionData};
use super::FromModSeq;
impl<T: AsyncRead> Session<T> {
pub async fn handle_store(
&mut self,
request: Request<Command>,
is_uid: bool,
) -> Result<(), ()> {
match request.parse_store() {
Ok(arguments) => {
let (data, mailbox) = self.state.select_data();
let is_condstore = self.is_condstore || mailbox.is_condstore;
let is_qresync = self.is_qresync;
tokio::spawn(async move {
let bytes = match data
.store(arguments, mailbox, is_uid, is_condstore, is_qresync)
.await
{
Ok(response) => response,
Err(response) => response.into_bytes(),
};
data.write_bytes(bytes).await;
});
Ok(())
}
Err(response) => self.write_bytes(response.into_bytes()).await,
}
}
}
impl SessionData {
pub async fn store(
&self,
arguments: Arguments,
mailbox: Arc<SelectedMailbox>,
is_uid: bool,
is_condstore: bool,
is_qresync: bool,
) -> Result<Vec<u8>, StatusResponse> {
// Resync messages if needed
let account_id = mailbox.id.account_id;
if is_uid {
// Don't synchronize if we're not using UIDs as seqnums might change.
self.synchronize_messages(&mailbox, is_qresync)
.await
.map_err(|r| r.with_tag(&arguments.tag))?;
}
// Convert IMAP ids to JMAP ids.
let mut ids = match mailbox
.sequence_to_ids(&arguments.sequence_set, is_uid)
.await
{
Ok(ids) => {
if ids.is_empty() {
return Err(
StatusResponse::completed(Command::Store(is_uid)).with_tag(arguments.tag)
);
}
ids
}
Err(response) => {
return Err(response.with_tag(arguments.tag));
}
};
// Obtain shared messages
let access_token = match self.get_access_token().await {
Ok(access_token) => access_token,
Err(response) => return Err(response.with_tag(arguments.tag)),
};
let can_modify_ids = if access_token.is_shared(account_id) {
match self
.jmap
.shared_messages(&access_token, account_id, Acl::ModifyItems)
.await
{
Ok(document_ids) => document_ids.into(),
Err(_) => return Err(StatusResponse::database_failure().with_tag(arguments.tag)),
}
} else {
None
};
// Filter out unchanged since ids
let mut response_code = None;
let mut unchanged_failed = false;
if let Some(unchanged_since) = arguments.unchanged_since {
// Obtain changes since the modseq.
let changelog = match self
.jmap
.changes_(
account_id,
Collection::Email,
Query::from_modseq(unchanged_since),
)
.await
{
Ok(changelog) => changelog,
Err(_) => return Err(StatusResponse::database_failure().with_tag(arguments.tag)),
};
let mut modified = mailbox
.sequence_expand_missing(&arguments.sequence_set, is_uid)
.await;
// Add all IDs that changed in this mailbox
for change in changelog.changes {
let (Change::Insert(id)
| Change::Update(id)
| Change::ChildUpdate(id)
| Change::Delete(id)) = change;
let id = (id & u32::MAX as u64) as u32;
if let Some(imap_id) = ids.remove(&id) {
if is_uid {
modified.push(imap_id.uid);
} else {
modified.push(imap_id.seqnum);
if matches!(change, Change::Delete(_)) {
unchanged_failed = true;
}
}
}
}
if !modified.is_empty() {
modified.sort_unstable();
response_code = ResponseCode::Modified { ids: modified }.into();
}
}
// Build response
let mut response = if !unchanged_failed {
StatusResponse::completed(Command::Store(is_uid))
} else {
StatusResponse::no("Some of the messages no longer exist.")
}
.with_tag(arguments.tag);
if let Some(response_code) = response_code {
response = response.with_code(response_code)
}
if ids.is_empty() {
return Err(response);
}
let mut items = Response {
items: Vec::with_capacity(ids.len()),
};
// Process each change
let set_keywords = arguments
.keywords
.into_iter()
.map(Keyword::from)
.collect::<Vec<_>>();
let mut changelog = ChangeLogBuilder::new();
let mut changed_mailboxes = AHashSet::new();
for (id, imap_id) in ids {
// Check ACLs
if can_modify_ids
.as_ref()
.map_or(false, |can_modify_ids| !can_modify_ids.contains(id))
{
response.rtype = ResponseType::No;
response.message = "Not enough permissions to modify one or more messages.".into();
continue;
}
// Obtain current keywords
let (mut keywords, thread_id) = if let (Some(keywords), Some(thread_id)) = (
self.jmap
.get_property::<HashedValue<Vec<Keyword>>>(
account_id,
Collection::Email,
id,
Property::Keywords,
)
.await
.map_err(|_| {
StatusResponse::database_failure().with_tag(response.tag.as_ref().unwrap())
})?,
self.jmap
.get_property::<u32>(account_id, Collection::Email, id, Property::ThreadId)
.await
.map_err(|_| {
StatusResponse::database_failure().with_tag(response.tag.as_ref().unwrap())
})?,
) {
(TagManager::new(keywords), thread_id)
} else {
continue;
};
// Apply changes
match arguments.operation {
Operation::Set => {
keywords.set(set_keywords.clone());
}
Operation::Add => {
for keyword in &set_keywords {
keywords.update(keyword.clone(), true);
}
}
Operation::Clear => {
for keyword in &set_keywords {
keywords.update(keyword.clone(), true);
}
}
}
if keywords.has_changes() {
// Convert keywords to flags
let seen_changed = keywords
.changed_tags()
.any(|keyword| keyword == &Keyword::Seen);
let flags = if !arguments.is_silent {
keywords
.current()
.iter()
.cloned()
.map(Flag::from)
.collect::<Vec<_>>()
} else {
vec![]
};
// Write changes
let mut batch = BatchBuilder::new();
batch
.with_account_id(account_id)
.with_collection(Collection::Email)
.update_document(id);
keywords.update_batch(&mut batch, Property::Keywords);
if changelog.change_id == u64::MAX {
changelog.change_id =
self.jmap.assign_change_id(account_id).await.map_err(|_| {
StatusResponse::database_failure()
.with_tag(response.tag.as_ref().unwrap())
})?
}
batch.value(Property::Cid, changelog.change_id, F_VALUE);
match self.jmap.write_batch(batch).await {
Ok(_) => {
// Set all current mailboxes as changed if the Seen tag changed
if seen_changed {
if let Some(mailboxes) = self
.jmap
.get_property::<Vec<u32>>(
account_id,
Collection::Email,
id,
Property::MailboxIds,
)
.await
.map_err(|_| {
StatusResponse::database_failure()
.with_tag(response.tag.as_ref().unwrap())
})?
{
for mailbox_id in mailboxes {
changed_mailboxes.insert(mailbox_id);
}
}
}
changelog.log_update(Collection::Email, Id::from_parts(thread_id, id));
// Add item to response
if !arguments.is_silent {
let mut data_items = vec![DataItem::Flags { flags }];
if is_uid {
data_items.push(DataItem::Uid { uid: imap_id.uid });
}
if is_condstore {
data_items.push(DataItem::ModSeq {
modseq: changelog.change_id,
});
}
items.items.push(FetchItem {
id: imap_id.seqnum,
items: data_items,
});
} else if is_condstore {
items.items.push(FetchItem {
id: imap_id.seqnum,
items: if is_uid {
vec![
DataItem::ModSeq {
modseq: changelog.change_id,
},
DataItem::Uid { uid: imap_id.uid },
]
} else {
vec![DataItem::ModSeq {
modseq: changelog.change_id,
}]
},
});
}
}
Err(MethodError::ServerUnavailable) => {
response.rtype = ResponseType::No;
response.message = "Some messaged could not be updated.".into();
}
Err(_) => {
return Err(StatusResponse::database_failure()
.with_tag(response.tag.as_ref().unwrap()));
}
}
}
}
// Log mailbox changes
for mailbox_id in &changed_mailboxes {
changelog.log_child_update(Collection::Mailbox, *mailbox_id);
}
// Write changes
if !changelog.is_empty() {
let change_id = changelog.change_id;
if self
.jmap
.commit_changes(account_id, changelog)
.await
.is_err()
{
return Err(
StatusResponse::database_failure().with_tag(response.tag.as_ref().unwrap())
);
}
self.jmap
.broadcast_state_change(if !changed_mailboxes.is_empty() {
StateChange::new(account_id)
.with_change(TypeState::Email, change_id)
.with_change(TypeState::Mailbox, change_id)
} else {
StateChange::new(account_id).with_change(TypeState::Email, change_id)
})
.await;
}
// Send response
Ok(response.serialize(items.serialize()))
}
}

View file

@ -28,38 +28,32 @@ use crate::JMAP;
impl JMAP {
pub async fn begin_changes(&self, account_id: u32) -> Result<ChangeLogBuilder, MethodError> {
Ok(ChangeLogBuilder::with_change_id(
self.store
.assign_change_id(account_id)
.await
.map_err(|err| {
tracing::error!(
event = "error",
context = "change_log",
error = ?err,
"Failed to assign changeId.");
MethodError::ServerPartialFail
})?,
))
self.assign_change_id(account_id)
.await
.map(ChangeLogBuilder::with_change_id)
}
pub async fn assign_change_id(&self, account_id: u32) -> Result<u64, MethodError> {
self.store
.assign_change_id(account_id)
.await
.map_err(|err| {
tracing::error!(
event = "error",
context = "change_log",
error = ?err,
"Failed to assign changeId.");
MethodError::ServerPartialFail
})
}
pub async fn commit_changes(
&self,
account_id: u32,
mut changes: ChangeLogBuilder,
) -> Result<State, MethodError> {
if changes.change_id == u64::MAX {
changes.change_id = self
.store
.assign_change_id(account_id)
.await
.map_err(|err| {
tracing::error!(
event = "error",
context = "change_log",
error = ?err,
"Failed to assign changeId.");
MethodError::ServerPartialFail
})?;
changes.change_id = self.assign_change_id(account_id).await?;
}
let state = State::from(changes.change_id);

View file

@ -264,6 +264,7 @@ impl JMAP {
"Failed to index message.");
IngestError::Temporary
})?
.value(Property::Cid, change_id, F_VALUE)
.value(Property::ThreadId, thread_id, F_VALUE | F_BITMAP)
.custom(changes);
self.store.write(batch.build()).await.map_err(|err| {

View file

@ -876,6 +876,12 @@ impl JMAP {
// Update keywords property
keywords.update_batch(&mut batch, Property::Keywords);
// Update last change id
if changes.change_id == u64::MAX {
changes.change_id = self.assign_change_id(account_id).await?;
}
batch.value(Property::Cid, changes.change_id, F_VALUE);
}
// Process mailboxes
@ -1051,6 +1057,9 @@ impl JMAP {
.with_collection(Collection::Email)
.delete_document(document_id);
// Remove last changeId
batch.value(Property::Cid, (), F_VALUE | F_CLEAR);
// Remove mailboxes
let mailboxes = if let Some(mailboxes) = self
.get_property::<HashedValue<Vec<u32>>>(
@ -1260,7 +1269,7 @@ impl JMAP {
}
}
struct TagManager<
pub struct TagManager<
T: PartialEq + Clone + ToBitmaps + SerializeInto + Serialize + DeserializeFrom + Sync + Send,
> {
current: HashedValue<Vec<T>>,

View file

@ -14,6 +14,8 @@ store = { path = "../crates/store", features = ["test_mode"] }
directory = { path = "../crates/directory" }
jmap = { path = "../crates/jmap", features = ["test_mode"] }
jmap_proto = { path = "../crates/jmap-proto" }
imap = { path = "../crates/imap", features = ["test_mode"] }
imap_proto = { path = "../crates/imap-proto" }
smtp = { path = "../crates/smtp", features = ["test_mode", "local_delivery"] }
smtp-proto = { git = "https://github.com/stalwartlabs/smtp-proto" }
mail-send = { git = "https://github.com/stalwartlabs/mail-send" }

View file

@ -0,0 +1,193 @@
use std::{fs, path::PathBuf};
use imap_proto::protocol::fetch::Section;
use mail_parser::Message;
#[test]
fn body_structure() {
let mut test_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
test_dir.push("resources");
test_dir.push("imap");
test_dir.push("messages");
for file_name in fs::read_dir(&test_dir).unwrap() {
let mut file_name = file_name.as_ref().unwrap().path();
if file_name.extension().map_or(true, |e| e != "txt") {
continue;
}
let raw_message = fs::read(&file_name).unwrap();
let message = Message::parse(&raw_message).unwrap();
let mut buf = Vec::new();
// Serialize body and bodystructure
for is_extended in [false, true] {
let mut buf_ = Vec::new();
message
.body_structure(is_extended)
.serialize(&mut buf_, is_extended);
if is_extended {
buf.extend_from_slice(b"BODYSTRUCTURE ");
} else {
buf.extend_from_slice(b"BODY ");
}
// Poor man's indentation
let mut indent_count = 0;
let mut in_quote = false;
for ch in buf_ {
if ch == b'(' && !in_quote {
buf.extend_from_slice(b"(\n");
indent_count += 1;
for _ in 0..indent_count {
buf.extend_from_slice(b" ");
}
} else if ch == b')' && !in_quote {
buf.push(b'\n');
indent_count -= 1;
for _ in 0..indent_count {
buf.extend_from_slice(b" ");
}
buf.push(b')');
} else {
if ch == b'"' {
in_quote = !in_quote;
}
buf.push(ch);
}
}
buf.extend_from_slice(b"\n\n");
}
// Serialize body parts
let mut iter = 1..9;
let mut stack = Vec::new();
let mut sections = Vec::new();
loop {
'inner: while let Some(part_id) = iter.next() {
if part_id == 1 {
for section in [
None,
Some(Section::Header),
Some(Section::Text),
Some(Section::Mime),
] {
let mut body_sections = sections
.iter()
.map(|id| Section::Part { num: *id })
.collect::<Vec<_>>();
let is_first = if let Some(section) = section {
body_sections.push(section);
false
} else {
true
};
if let Some(contents) = message.body_section(&body_sections, None) {
DataItem::BodySection {
sections: body_sections,
origin_octet: None,
contents,
}
.serialize(&mut buf);
if is_first {
match message.binary(&sections, None) {
Ok(Some(contents)) => {
buf.push(b'\n');
DataItem::Binary {
sections: sections.clone(),
offset: None,
contents: match contents {
BodyContents::Bytes(_) => {
BodyContents::Text("[binary content]".into())
}
text => text,
},
}
.serialize(&mut buf);
}
Ok(None) => (),
Err(_) => {
buf.push(b'\n');
buf.extend_from_slice(
&StatusResponse::no(format!(
"Failed to decode part {} of message {}.",
sections
.iter()
.map(|s| s.to_string())
.collect::<Vec<_>>()
.join("."),
0
))
.with_code(ResponseCode::UnknownCte)
.serialize(Vec::new()),
);
}
}
if let Some(size) = message.binary_size(&sections) {
buf.push(b'\n');
DataItem::BinarySize {
sections: sections.clone(),
size,
}
.serialize(&mut buf);
}
}
buf.extend_from_slice(b"\n----------------------------------\n");
} else {
break 'inner;
}
}
}
sections.push(part_id);
stack.push(iter);
iter = 1..9;
}
if let Some(prev_iter) = stack.pop() {
sections.pop();
iter = prev_iter;
} else {
break;
}
}
// Check header fields and partial sections
for sections in [
vec![Section::HeaderFields {
not: false,
fields: vec!["From".to_string(), "To".to_string()],
}],
vec![Section::HeaderFields {
not: true,
fields: vec!["Subject".to_string(), "Cc".to_string()],
}],
] {
DataItem::BodySection {
contents: message.body_section(&sections, None).unwrap(),
sections: sections.clone(),
origin_octet: None,
}
.serialize(&mut buf);
buf.extend_from_slice(b"\n----------------------------------\n");
DataItem::BodySection {
contents: message.body_section(&sections, (10, 25).into()).unwrap(),
sections,
origin_octet: 10.into(),
}
.serialize(&mut buf);
buf.extend_from_slice(b"\n----------------------------------\n");
}
file_name.set_extension("imap");
let expected_result = fs::read(&file_name).unwrap();
if buf != expected_result {
file_name.set_extension("imap_failed");
fs::write(&file_name, buf).unwrap();
panic!("Failed test, written output to {}", file_name.display());
}
}
}

1
tests/src/imap/mod.rs Normal file
View file

@ -0,0 +1 @@
pub mod body_structure;

View file

@ -26,6 +26,8 @@ use std::path::PathBuf;
#[cfg(test)]
pub mod directory;
#[cfg(test)]
pub mod imap;
#[cfg(test)]
pub mod jmap;
#[cfg(test)]
pub mod smtp;