From 048a65a019723f8603eb84b7a108fd0789efc44b Mon Sep 17 00:00:00 2001 From: mdecimus Date: Sun, 2 Jul 2023 20:15:19 +0200 Subject: [PATCH] Multiple bugfixes found while running imap-test. --- Cargo.lock | 1 + crates/imap-proto/Cargo.toml | 3 + crates/imap-proto/src/parser/append.rs | 12 ++ crates/imap-proto/src/protocol/fetch.rs | 14 +- crates/imap-proto/src/protocol/mod.rs | 13 +- crates/imap-proto/src/receiver.rs | 10 +- crates/imap/src/core/client.rs | 54 +++--- crates/imap/src/core/message.rs | 218 +++++++++++------------- crates/imap/src/core/mod.rs | 10 +- crates/imap/src/core/writer.rs | 17 +- crates/imap/src/lib.rs | 1 + crates/imap/src/op/append.rs | 2 +- crates/imap/src/op/copy_move.rs | 2 +- crates/imap/src/op/expunge.rs | 2 +- crates/imap/src/op/fetch.rs | 11 +- crates/imap/src/op/idle.rs | 8 +- crates/imap/src/op/search.rs | 68 +++++--- crates/imap/src/op/select.rs | 2 +- crates/imap/src/op/store.rs | 16 +- crates/imap/src/op/thread.rs | 11 +- crates/jmap-proto/src/object/index.rs | 6 +- crates/jmap/src/email/set.rs | 2 +- crates/store/src/fts/term_index.rs | 116 +++++++------ crates/utils/src/lib.rs | 2 +- tests/resources/test_config.toml | 198 +++++++++++++++++++++ tests/src/imap/fetch.rs | 2 +- tests/src/imap/mailbox.rs | 13 ++ 27 files changed, 522 insertions(+), 292 deletions(-) create mode 100644 tests/resources/test_config.toml diff --git a/Cargo.lock b/Cargo.lock index 1de2b100..65e5ef08 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1742,6 +1742,7 @@ dependencies = [ "chrono", "jmap_proto", "mail-parser", + "tokio", ] [[package]] diff --git a/crates/imap-proto/Cargo.toml b/crates/imap-proto/Cargo.toml index faa16d59..d466bd06 100644 --- a/crates/imap-proto/Cargo.toml +++ b/crates/imap-proto/Cargo.toml @@ -9,3 +9,6 @@ jmap_proto = { path = "../jmap-proto" } mail-parser = { git = "https://github.com/stalwartlabs/mail-parser", features = ["full_encoding", "serde_support", "ludicrous_mode"] } ahash = { version = "0.8" } chrono = { version = "0.4"} + +[dev-dependencies] +tokio = { version = "1.23", features = ["full"] } diff --git a/crates/imap-proto/src/parser/append.rs b/crates/imap-proto/src/parser/append.rs index 3c32b68d..824ab123 100644 --- a/crates/imap-proto/src/parser/append.rs +++ b/crates/imap-proto/src/parser/append.rs @@ -164,6 +164,18 @@ mod tests { }], }, ), + ( + "A003 APPEND \"hi\" \"20-Nov-2022 23:59:59 +0300\" ~{1+}\r\na\r\n", + append::Arguments { + tag: "A003".to_string(), + mailbox_name: "hi".to_string(), + messages: vec![Message { + message: vec![b'a'], + flags: vec![], + received_at: Some(1668977999), + }], + }, + ), ] { assert_eq!( receiver diff --git a/crates/imap-proto/src/protocol/fetch.rs b/crates/imap-proto/src/protocol/fetch.rs index 5f59446d..8a558fa9 100644 --- a/crates/imap-proto/src/protocol/fetch.rs +++ b/crates/imap-proto/src/protocol/fetch.rs @@ -23,6 +23,8 @@ use std::borrow::Cow; +use mail_parser::DateTime; + use super::{ literal_string, quoted_rfc2822_or_nil, quoted_string, quoted_string_or_nil, quoted_timestamp, Flag, ImapResponse, Sequence, @@ -176,7 +178,7 @@ pub enum BodyContents<'x> { #[derive(Debug, Clone, PartialEq, Eq, Default)] pub struct Envelope<'x> { - pub date: Option, + pub date: Option, pub subject: Option>, pub from: Vec>, pub sender: Vec>, @@ -648,7 +650,7 @@ impl Section { impl<'x> Envelope<'x> { pub fn serialize(&self, buf: &mut Vec) { buf.push(b'('); - quoted_rfc2822_or_nil(buf, self.date); + quoted_rfc2822_or_nil(buf, &self.date); buf.push(b' '); quoted_string_or_nil(buf, self.subject.as_deref()); self.serialize_addresses(buf, &self.from); @@ -914,6 +916,8 @@ impl<'x> ImapResponse for Response<'x> { #[cfg(test)] mod tests { + use mail_parser::DateTime; + use crate::protocol::{Flag, ImapResponse}; use super::{ @@ -927,7 +931,7 @@ mod tests { ( super::DataItem::Envelope { envelope: Envelope { - date: 837570205.into(), + date: DateTime::from_timestamp(837570205).into(), subject: Some("IMAP4rev2 WG mtg summary and minutes".into()), from: vec![Address::Single(EmailAddress { name: Some("Terry Gray".into()), @@ -975,7 +979,7 @@ mod tests { ( super::DataItem::Envelope { envelope: Envelope { - date: 837570205.into(), + date: DateTime::from_timestamp(837570205).into(), subject: Some("Group test".into()), from: vec![Address::Single(EmailAddress { name: Some("Bill Foobar".into()), @@ -1049,7 +1053,7 @@ mod tests { body_size_octets: 9323, }, envelope: Box::new(Envelope { - date: 837570205.into(), + date: DateTime::from_timestamp(837570205).into(), subject: Some("Hello world!".into()), from: vec![Address::Single(EmailAddress { name: Some("Terry Gray".into()), diff --git a/crates/imap-proto/src/protocol/mod.rs b/crates/imap-proto/src/protocol/mod.rs index fa09c0dd..2c0e2388 100644 --- a/crates/imap-proto/src/protocol/mod.rs +++ b/crates/imap-proto/src/protocol/mod.rs @@ -208,20 +208,13 @@ pub fn quoted_timestamp(buf: &mut Vec, timestamp: i64) { buf.push(b'"'); } -pub fn quoted_rfc2822(buf: &mut Vec, timestamp: i64) { +pub fn quoted_rfc2822(buf: &mut Vec, timestamp: &mail_parser::DateTime) { buf.push(b'"'); - buf.extend_from_slice( - DateTime::::from_utc( - NaiveDateTime::from_timestamp_opt(timestamp, 0).unwrap_or_default(), - Utc, - ) - .to_rfc2822() - .as_bytes(), - ); + buf.extend_from_slice(timestamp.to_rfc822().as_bytes()); buf.push(b'"'); } -pub fn quoted_rfc2822_or_nil(buf: &mut Vec, timestamp: Option) { +pub fn quoted_rfc2822_or_nil(buf: &mut Vec, timestamp: &Option) { if let Some(timestamp) = timestamp { quoted_rfc2822(buf, timestamp); } else { diff --git a/crates/imap-proto/src/receiver.rs b/crates/imap-proto/src/receiver.rs index 1ce2eb40..56f56d07 100644 --- a/crates/imap-proto/src/receiver.rs +++ b/crates/imap-proto/src/receiver.rs @@ -234,8 +234,14 @@ impl Receiver { self.push_argument(false)?; self.state = State::ArgumentQuoted { escaped: false }; } - b'{' if last_ch.is_ascii_whitespace() => { - self.push_argument(false)?; + b'{' if last_ch.is_ascii_whitespace() + || (last_ch == b'~' && self.buf.len() == 1) => + { + if last_ch != b'~' { + self.push_argument(false)?; + } else { + self.buf.clear(); + } self.state = State::Literal { non_sync: false }; } b'(' => { diff --git a/crates/imap/src/core/client.rs b/crates/imap/src/core/client.rs index 8bce0f06..cd501f9e 100644 --- a/crates/imap/src/core/client.rs +++ b/crates/imap/src/core/client.rs @@ -36,10 +36,9 @@ use super::{SelectedMailbox, Session, SessionData, State, IMAP}; impl Session { pub async fn ingest(&mut self, bytes: &[u8]) -> crate::Result { - /*let tmp = "dd"; for line in String::from_utf8_lossy(bytes).split("\r\n") { - println!("<- {:?}", &line[..std::cmp::min(line.len(), 100)]); - }*/ + let c = println!("<- {:?}", &line[..std::cmp::min(line.len(), 100)]); + } tracing::trace!(parent: &self.span, event = "read", @@ -52,7 +51,7 @@ impl Session { loop { match self.receiver.parse(&mut bytes) { - Ok(request) => match request.is_allowed(&self.state, self.is_tls) { + Ok(request) => match self.is_allowed(request) { Ok(request) => { requests.push(request); } @@ -222,12 +221,9 @@ pub fn group_requests( grouped_requests } -trait IsAllowed: Sized { - fn is_allowed(self, state: &State, is_tls: bool) -> Result; -} - -impl IsAllowed for Request { - fn is_allowed(self, state: &State, is_tls: bool) -> Result { +impl Session { + fn is_allowed(&self, request: Request) -> Result, StatusResponse> { + let state = &self.state; // Rate limit request if let State::Authenticated { data } | State::Selected { data, .. } = state { if !data @@ -238,39 +234,39 @@ impl IsAllowed for Request { .is_allowed() { return Err(StatusResponse::no("Too many requests") - .with_tag(self.tag) + .with_tag(request.tag) .with_code(ResponseCode::Limit)); } } - match &self.command { - Command::Capability | Command::Noop | Command::Logout | Command::Id => Ok(self), + match &request.command { + Command::Capability | Command::Noop | Command::Logout | Command::Id => Ok(request), Command::StartTls => { - if !is_tls { - Ok(self) + if !self.is_tls { + Ok(request) } else { - Err(StatusResponse::no("Already in TLS mode.").with_tag(self.tag)) + Err(StatusResponse::no("Already in TLS mode.").with_tag(request.tag)) } } Command::Authenticate => { if let State::NotAuthenticated { .. } = state { - Ok(self) + Ok(request) } else { - Err(StatusResponse::no("Already authenticated.").with_tag(self.tag)) + Err(StatusResponse::no("Already authenticated.").with_tag(request.tag)) } } Command::Login => { if let State::NotAuthenticated { .. } = state { - if is_tls { - Ok(self) + if self.is_tls || self.imap.allow_plain_auth { + Ok(request) } else { Err( StatusResponse::no("LOGIN is disabled on the clear-text port.") - .with_tag(self.tag), + .with_tag(request.tag), ) } } else { - Err(StatusResponse::no("Already authenticated.").with_tag(self.tag)) + Err(StatusResponse::no("Already authenticated.").with_tag(request.tag)) } } Command::Enable @@ -294,9 +290,9 @@ impl IsAllowed for Request { | Command::MyRights | Command::Unauthenticate => { if let State::Authenticated { .. } | State::Selected { .. } = state { - Ok(self) + Ok(request) } else { - Err(StatusResponse::no("Not authenticated.").with_tag(self.tag)) + Err(StatusResponse::no("Not authenticated.").with_tag(request.tag)) } } Command::Close @@ -313,21 +309,21 @@ impl IsAllowed for Request { State::Selected { mailbox, .. } => { if mailbox.is_select || !matches!( - self.command, + request.command, Command::Store(_) | Command::Expunge(_) | Command::Move(_), ) { - Ok(self) + Ok(request) } else { Err(StatusResponse::no("Not permitted in EXAMINE state.") - .with_tag(self.tag)) + .with_tag(request.tag)) } } State::Authenticated { .. } => { - Err(StatusResponse::bad("No mailbox is selected.").with_tag(self.tag)) + Err(StatusResponse::bad("No mailbox is selected.").with_tag(request.tag)) } State::NotAuthenticated { .. } => { - Err(StatusResponse::no("Not authenticated.").with_tag(self.tag)) + Err(StatusResponse::no("Not authenticated.").with_tag(request.tag)) } }, } diff --git a/crates/imap/src/core/message.rs b/crates/imap/src/core/message.rs index a2f24bd0..a23505b8 100644 --- a/crates/imap/src/core/message.rs +++ b/crates/imap/src/core/message.rs @@ -15,7 +15,7 @@ use utils::codec::leb128::{Leb128Iterator, Leb128Vec}; use crate::core::ImapId; -use super::{MailboxId, MailboxState, SelectedMailbox, SessionData}; +use super::{MailboxId, MailboxState, NextMailboxState, SelectedMailbox, SessionData}; #[derive(Debug)] struct UidMap { @@ -55,7 +55,7 @@ impl SessionData { .await?; // Obtain current state - let last_state = self + let modseq = self .jmap .store .get_last_change_id(mailbox.account_id, Collection::Email) @@ -212,7 +212,8 @@ impl SessionData { id_to_imap, uid_to_id, uid_max, - last_state, + modseq, + next_state: None, }); } else { let uid_next = (id_list.len() + 1) as u32; @@ -259,7 +260,8 @@ impl SessionData { id_to_imap, uid_to_id, uid_max: uid_next.saturating_sub(1), - last_state, + modseq, + next_state: None, }); } } @@ -268,40 +270,85 @@ impl SessionData { pub async fn synchronize_messages( &self, mailbox: &SelectedMailbox, - is_qresync: bool, - is_uid: bool, ) -> crate::op::Result> { // Obtain current modseq let modseq = self.get_modseq(mailbox.id.account_id).await?; - if mailbox.state.lock().last_state == modseq { - return Ok(modseq); - } + if mailbox.state.lock().modseq != modseq { + // Synchronize messages + let new_state = self.fetch_messages(&mailbox.id).await?; + let mut current_state = mailbox.state.lock(); - // Synchronize messages - let new_state = self.fetch_messages(&mailbox.id).await?; + // Add missing uids + let mut deletions = current_state + .next_state + .take() + .map(|state| state.deletions) + .unwrap_or_default(); + let mut uid_to_id = std::mem::take(&mut current_state.uid_to_id); + for imap_id in current_state.id_to_imap.values_mut() { + if imap_id.uid != u32::MAX && !new_state.uid_to_id.contains_key(&imap_id.uid) { + // Add to deletions + deletions.push(*imap_id); - // Update UIDs - let mut buf = Vec::with_capacity(64); - let (new_message_count, deletions) = mailbox.update_mailbox_state(new_state, true); - if let Some(deletions) = deletions { - let mut ids = deletions - .into_iter() - .map(|id| { - if is_uid || is_qresync { - id.uid - } else { - id.seqnum - } - }) - .collect::>(); - ids.sort_unstable(); - expunge::Response { is_qresync, ids }.serialize_to(&mut buf); - } - if let Some(new_message_count) = new_message_count { - Exists { - total_messages: new_message_count, + // Invalidate entries + uid_to_id.insert(imap_id.uid, u32::MAX); + imap_id.uid = u32::MAX; + } + } + current_state.uid_to_id = uid_to_id; + + // Update state + current_state.modseq = new_state.modseq; + current_state.next_state = Some(Box::new(NextMailboxState { + next_state: new_state, + deletions, + })); + } + + Ok(modseq) + } + + pub async fn write_mailbox_changes( + &self, + mailbox: &SelectedMailbox, + is_qresync: bool, + is_uid: bool, + ) -> crate::op::Result> { + // Resync mailbox + let modseq = self.synchronize_messages(mailbox).await?; + let mut buf = Vec::new(); + { + let mut current_state = mailbox.state.lock(); + if let Some(next_state) = current_state.next_state.take() { + if !next_state.deletions.is_empty() { + let mut ids = next_state + .deletions + .into_iter() + .map(|id| { + if is_uid || is_qresync { + id.uid + } else { + id.seqnum + } + }) + .collect::>(); + ids.sort_unstable(); + expunge::Response { is_qresync, ids }.serialize_to(&mut buf); + } + if !buf.is_empty() + || next_state + .next_state + .uid_max + .saturating_sub(current_state.uid_max) + > 0 + { + Exists { + total_messages: next_state.next_state.total_messages, + } + .serialize(&mut buf); + } + *current_state = next_state.next_state; } - .serialize(&mut buf); } if !buf.is_empty() { self.write_bytes(buf).await; @@ -344,17 +391,19 @@ impl SelectedMailbox { return Ok(ids); } - let max_uid = state.uid_max; - let max_seqnum = state.total_messages as u32; - - for (id, imap_id) in &state.id_to_imap { - let matched = if is_uid { - sequence.contains(imap_id.uid, max_uid) - } else { - sequence.contains(imap_id.seqnum, max_seqnum) - }; - if matched { - ids.insert(*id, *imap_id); + if is_uid { + for (id, imap_id) in &state.id_to_imap { + if imap_id.uid != u32::MAX && sequence.contains(imap_id.uid, state.uid_max) { + ids.insert(*id, *imap_id); + } + } + } else { + for (id, imap_id) in &state.id_to_imap { + if imap_id.uid != u32::MAX + && sequence.contains(imap_id.seqnum, state.total_messages as u32) + { + ids.insert(*id, *imap_id); + } } } @@ -369,7 +418,9 @@ impl SelectedMailbox { for imap_id in saved_ids.iter() { if let Some(id) = state.uid_to_id.get(&imap_id.uid) { - ids.insert(*id, *imap_id); + if *id != u32::MAX { + ids.insert(*id, *imap_id); + } } } @@ -383,7 +434,7 @@ impl SelectedMailbox { let state = self.state.lock(); if is_uid { for uid in sequence.expand(state.uid_max) { - if !state.uid_to_id.contains_key(&uid) { + if state.uid_to_id.get(&uid).map_or(true, |id| *id == u32::MAX) { deleted_ids.push(uid); } } @@ -397,7 +448,11 @@ impl SelectedMailbox { } else if let Some(saved_ids) = self.get_saved_search().await { let state = self.state.lock(); for id in saved_ids.iter() { - if !state.uid_to_id.contains_key(&id.uid) { + if state + .uid_to_id + .get(&id.uid) + .map_or(true, |id| *id == u32::MAX) + { deleted_ids.push(if is_uid { id.uid } else { id.seqnum }); } } @@ -405,77 +460,6 @@ impl SelectedMailbox { deleted_ids.sort_unstable(); deleted_ids } - - pub fn id_to_uid(&self, ids: &[u32]) -> Vec { - let mut imap_ids = Vec::with_capacity(ids.len()); - let state = self.state.lock(); - - for id in ids { - if let Some(imap_id) = state.id_to_imap.get(id) { - imap_ids.push(*imap_id); - } - } - - imap_ids - } - - pub fn uid_to_id(&self, imap_ids: &[ImapId]) -> Vec { - let mut ids = Vec::with_capacity(imap_ids.len()); - let state = self.state.lock(); - - for imap_id in imap_ids { - if let Some(id) = state.uid_to_id.get(&imap_id.uid) { - ids.push(*id); - } - } - - ids - } - - pub fn is_in_sync(&self, ids: &[u32]) -> bool { - let state = self.state.lock(); - - for id in ids { - if !state.id_to_imap.contains_key(id) { - return false; - } - } - true - } - - pub fn update_mailbox_state( - &self, - mailbox_state: MailboxState, - return_deleted: bool, - ) -> (Option, Option>) { - let mut state = self.state.lock(); - let mailbox_size = if mailbox_state.total_messages != state.total_messages { - mailbox_state.total_messages.into() - } else { - None - }; - let deletions = if return_deleted { - let mut deletions = Vec::new(); - - for (id, imap_id) in &state.id_to_imap { - if !mailbox_state.id_to_imap.contains_key(id) { - deletions.push(*imap_id); - } - } - - if !deletions.is_empty() { - Some(deletions) - } else { - None - } - } else { - None - }; - - *state = mailbox_state; - - (mailbox_size, deletions) - } } impl Serialize for &UidMap { diff --git a/crates/imap/src/core/mod.rs b/crates/imap/src/core/mod.rs index c9171ade..7fe08daf 100644 --- a/crates/imap/src/core/mod.rs +++ b/crates/imap/src/core/mod.rs @@ -51,6 +51,7 @@ pub struct IMAP { pub max_auth_failures: u32, pub name_shared: String, pub name_all: String, + pub allow_plain_auth: bool, pub timeout_auth: Duration, pub timeout_unauth: Duration, @@ -137,7 +138,14 @@ pub struct MailboxState { pub id_to_imap: AHashMap, pub uid_to_id: AHashMap, pub total_messages: usize, - pub last_state: Option, + pub modseq: Option, + pub next_state: Option>, +} + +#[derive(Debug)] +pub struct NextMailboxState { + pub next_state: MailboxState, + pub deletions: Vec, } #[derive(Debug, Default)] diff --git a/crates/imap/src/core/writer.rs b/crates/imap/src/core/writer.rs index 14cb7298..79f88de7 100644 --- a/crates/imap/src/core/writer.rs +++ b/crates/imap/src/core/writer.rs @@ -116,13 +116,14 @@ pub fn spawn_writer(mut stream: Event, span: tracing::Span) -> mpsc::Sender Session { pub async fn write_bytes(&self, bytes: impl Into>) -> crate::OpResult { - /*let tmp = "dd"; - println!( + let bytes = bytes.into(); + + let c = println!( "-> {:?}", String::from_utf8_lossy(&bytes[..std::cmp::min(bytes.len(), 100)]) - );*/ + ); - if let Err(err) = self.writer.send(Event::Bytes(bytes.into())).await { + if let Err(err) = self.writer.send(Event::Bytes(bytes)).await { debug!("Failed to send bytes: {}", err); Err(()) } else { @@ -133,13 +134,13 @@ impl Session { impl SessionData { pub async fn write_bytes(&self, bytes: impl Into>) -> bool { - /*let tmp = "dd"; - println!( + let bytes = bytes.into(); + let c = println!( "-> {:?}", String::from_utf8_lossy(&bytes[..std::cmp::min(bytes.len(), 100)]) - );*/ + ); - if let Err(err) = self.writer.send(Event::Bytes(bytes.into())).await { + if let Err(err) = self.writer.send(Event::Bytes(bytes)).await { debug!("Failed to send bytes: {}", err); false } else { diff --git a/crates/imap/src/lib.rs b/crates/imap/src/lib.rs index 232cac29..d64f087f 100644 --- a/crates/imap/src/lib.rs +++ b/crates/imap/src/lib.rs @@ -51,6 +51,7 @@ impl IMAP { ), rate_requests: config.property_or_static("imap.rate-limit.rate", "1000/1m")?, rate_concurrent: config.property("imap.rate-limit.concurrent")?.unwrap_or(4), + allow_plain_auth: config.property_or_static("imap.auth.allow-plain-text", "false")?, })) } } diff --git a/crates/imap/src/op/append.rs b/crates/imap/src/op/append.rs index 0c031411..50295f90 100644 --- a/crates/imap/src/op/append.rs +++ b/crates/imap/src/op/append.rs @@ -182,7 +182,7 @@ impl SessionData { if !created_ids.is_empty() { let (uids, uid_validity) = match selected_mailbox { Some(selected_mailbox) if selected_mailbox.id == mailbox => { - self.synchronize_messages(&selected_mailbox, is_qresync, true) + self.write_mailbox_changes(&selected_mailbox, is_qresync, is_qresync) .await .map_err(|r| r.with_tag(&arguments.tag))?; let mailbox = selected_mailbox.state.lock(); diff --git a/crates/imap/src/op/copy_move.rs b/crates/imap/src/op/copy_move.rs index 3cfd4d8f..a5ec8a54 100644 --- a/crates/imap/src/op/copy_move.rs +++ b/crates/imap/src/op/copy_move.rs @@ -428,7 +428,7 @@ impl SessionData { // Resynchronize source mailbox on a successful move if did_move { - self.synchronize_messages(&src_mailbox, is_qresync, is_uid) + self.write_mailbox_changes(&src_mailbox, is_qresync, is_uid) .await .map_err(|r| r.with_tag(&arguments.tag))?; } diff --git a/crates/imap/src/op/expunge.rs b/crates/imap/src/op/expunge.rs index 3cd8fe36..db41f1dd 100644 --- a/crates/imap/src/op/expunge.rs +++ b/crates/imap/src/op/expunge.rs @@ -109,7 +109,7 @@ impl Session { // Synchronize messages match data - .synchronize_messages(&mailbox, self.is_qresync, is_uid) + .write_mailbox_changes(&mailbox, self.is_qresync, is_uid) .await { Ok(_) => { diff --git a/crates/imap/src/op/fetch.rs b/crates/imap/src/op/fetch.rs index 4deabeac..2d93b16d 100644 --- a/crates/imap/src/op/fetch.rs +++ b/crates/imap/src/op/fetch.rs @@ -113,14 +113,7 @@ impl SessionData { // Resync messages if needed let account_id = mailbox.id.account_id; - let mut modseq = match { - if is_uid { - self.synchronize_messages(&mailbox, is_qresync, true).await - } else { - // Don't synchronize if we're not using UIDs as seqnums might change. - self.get_modseq(mailbox.id.account_id).await - } - } { + let mut modseq = match self.synchronize_messages(&mailbox).await { Ok(modseq) => modseq, Err(response) => return response.with_tag(arguments.tag), }; @@ -1022,7 +1015,7 @@ impl<'x> AsImapDataItem<'x> for Message<'x> { fn envelope(&self) -> Envelope { Envelope { - date: self.date().map(|dt| dt.to_timestamp()), + date: self.date().map(|d| d.clone()), subject: self.subject().map(|s| s.into()), from: self .header_values(RfcHeader::From) diff --git a/crates/imap/src/op/idle.rs b/crates/imap/src/op/idle.rs index 487262fe..833d5bfb 100644 --- a/crates/imap/src/op/idle.rs +++ b/crates/imap/src/op/idle.rs @@ -212,13 +212,13 @@ impl SessionData { // Synchronize emails if let Some(mailbox) = mailbox { // Obtain changes since last sync - let last_state = mailbox.state.lock().last_state; + let modseq = mailbox.state.lock().modseq; match self - .synchronize_messages(mailbox, is_qresync, is_qresync) + .write_mailbox_changes(mailbox, is_qresync, is_qresync) .await { Ok(new_state) => { - if new_state == last_state { + if new_state == modseq { return; } } @@ -234,7 +234,7 @@ impl SessionData { .changes_( mailbox.id.account_id, Collection::Email, - last_state.map(Query::Since).unwrap_or(Query::All), + modseq.map(Query::Since).unwrap_or(Query::All), ) .await { diff --git a/crates/imap/src/op/search.rs b/crates/imap/src/op/search.rs index 7285ba7a..653f9947 100644 --- a/crates/imap/src/op/search.rs +++ b/crates/imap/src/op/search.rs @@ -42,7 +42,7 @@ use store::{ }; use tokio::{io::AsyncRead, sync::watch}; -use crate::core::{ImapId, SavedSearch, SelectedMailbox, Session, SessionData}; +use crate::core::{ImapId, MailboxState, SavedSearch, SelectedMailbox, Session, SessionData}; use super::{FromModSeq, ToModSeq}; @@ -71,7 +71,6 @@ impl Session { } else { (None, None) }; - let is_qresync = self.is_qresync; tokio::spawn(async move { let tag = std::mem::take(&mut arguments.tag); @@ -81,7 +80,6 @@ impl Session { mailbox.clone(), results_tx, prev_saved_search.clone(), - is_qresync, is_uid, ) .await @@ -122,7 +120,6 @@ impl SessionData { mailbox: Arc, results_tx: Option>>>, prev_saved_search: Option>>>, - is_qresync: bool, is_uid: bool, ) -> Result { // Run query @@ -131,22 +128,14 @@ impl SessionData { .await?; // Obtain modseq - let mut highest_modseq = None; - if is_uid { - let modseq = self - .synchronize_messages(&mailbox, is_qresync, true) - .await?; - if include_highest_modseq { - highest_modseq = modseq.to_modseq().into(); - } - } else if include_highest_modseq { - // Don't synchronize if we're not using UIDs as seqnums might change. - highest_modseq = self - .get_modseq(mailbox.id.account_id) + let highest_modseq = if include_highest_modseq { + self.synchronize_messages(&mailbox) .await? .to_modseq() - .into(); - } + .into() + } else { + None + }; // Sort and map ids let mut min: Option<(u32, ImapId)> = None; @@ -294,9 +283,12 @@ impl SessionData { (&sequence, &prev_saved_search) { if let Some(prev_saved_search) = prev_saved_search { + let state = mailbox.state.lock(); for imap_id in prev_saved_search.iter() { - if let Some(id) = mailbox.state.lock().uid_to_id.get(&imap_id.uid) { - set.insert(*id); + if let Some(id) = state.uid_to_id.get(&imap_id.uid) { + if *id != u32::MAX { + set.insert(*id); + } } } } else { @@ -690,31 +682,30 @@ impl SelectedMailbox { ) { let state = self.state.lock(); let find_min_or_max = find_min || find_max; - for id in ids { - if let Some(imap_id) = state.id_to_imap.get(&(id as u32)) { - let id = if is_uid { imap_id.uid } else { imap_id.seqnum }; + for document_id in ids { + if let Some((id, imap_id)) = state.map_result_id(document_id, is_uid) { if find_min_or_max { if find_min { if let Some((prev_min, _)) = min { if id < *prev_min { - *min = Some((id, *imap_id)); + *min = Some((id, imap_id)); } } else { - *min = Some((id, *imap_id)); + *min = Some((id, imap_id)); } } if find_max { if let Some((prev_max, _)) = max { if id > *prev_max { - *max = Some((id, *imap_id)); + *max = Some((id, imap_id)); } } else { - *max = Some((id, *imap_id)); + *max = Some((id, imap_id)); } } } else { imap_ids.push(id); - saved_results.as_mut().map(|r| r.push(*imap_id)); + saved_results.as_mut().map(|r| r.push(imap_id)); } *total += 1; } @@ -728,6 +719,27 @@ impl SelectedMailbox { } } +impl MailboxState { + pub fn map_result_id(&self, document_id: u32, is_uid: bool) -> Option<(u32, ImapId)> { + if let Some(imap_id) = self.id_to_imap.get(&document_id) { + if imap_id.uid != u32::MAX { + return Some((if is_uid { imap_id.uid } else { imap_id.seqnum }, *imap_id)); + } + } + + if is_uid { + self.next_state.as_ref().and_then(|s| { + s.next_state + .id_to_imap + .get(&document_id) + .map(|imap_id| (imap_id.uid, *imap_id)) + }) + } else { + None + } + } +} + impl SavedSearch { pub async fn unwrap(&self) -> Option>> { match self { diff --git a/crates/imap/src/op/select.rs b/crates/imap/src/op/select.rs index 83c93285..73e915e2 100644 --- a/crates/imap/src/op/select.rs +++ b/crates/imap/src/op/select.rs @@ -63,7 +63,7 @@ impl Session { let uid_next = state.uid_next; let total_messages = state.total_messages; let highest_modseq = if is_condstore { - state.last_state.to_modseq().into() + state.modseq.to_modseq().into() } else { None }; diff --git a/crates/imap/src/op/store.rs b/crates/imap/src/op/store.rs index 7b830c7c..02b6e3bc 100644 --- a/crates/imap/src/op/store.rs +++ b/crates/imap/src/op/store.rs @@ -61,13 +61,9 @@ impl Session { Ok(arguments) => { let (data, mailbox) = self.state.select_data(); let is_condstore = self.is_condstore || mailbox.is_condstore; - let is_qresync = self.is_qresync; tokio::spawn(async move { - let bytes = match data - .store(arguments, mailbox, is_uid, is_condstore, is_qresync) - .await - { + let bytes = match data.store(arguments, mailbox, is_uid, is_condstore).await { Ok(response) => response, Err(response) => response.into_bytes(), }; @@ -87,16 +83,12 @@ impl SessionData { mailbox: Arc, is_uid: bool, is_condstore: bool, - is_qresync: bool, ) -> Result, StatusResponse> { // Resync messages if needed let account_id = mailbox.id.account_id; - if is_uid { - // Don't synchronize if we're not using UIDs as seqnums might change. - self.synchronize_messages(&mailbox, is_qresync, true) - .await - .map_err(|r| r.with_tag(&arguments.tag))?; - } + self.synchronize_messages(&mailbox) + .await + .map_err(|r| r.with_tag(&arguments.tag))?; // Convert IMAP ids to JMAP ids. let mut ids = match mailbox diff --git a/crates/imap/src/op/thread.rs b/crates/imap/src/op/thread.rs index 9b27e9b5..ebcc74e1 100644 --- a/crates/imap/src/op/thread.rs +++ b/crates/imap/src/op/thread.rs @@ -79,6 +79,11 @@ impl SessionData { .query(arguments.filter, &mailbox, &None, is_uid) .await?; + // Synchronize mailbox + if !result_set.results.is_empty() { + self.synchronize_messages(&mailbox).await?; + } + // Obtain threadIds for matching messages let thread_ids = self .jmap @@ -111,13 +116,13 @@ impl SessionData { let mut threads: AHashMap> = AHashMap::new(); let state = mailbox.state.lock(); for (document_id, thread_id) in result_set.results.into_iter().zip(thread_ids) { - if let (Some(thread_id), Some(imap_id)) = - (thread_id, state.id_to_imap.get(&document_id)) + if let (Some(thread_id), Some((imap_id, _))) = + (thread_id, state.map_result_id(document_id, is_uid)) { threads .entry(thread_id) .or_insert_with(|| Vec::new()) - .push(if is_uid { imap_id.uid } else { imap_id.seqnum }); + .push(imap_id); } } diff --git a/crates/jmap-proto/src/object/index.rs b/crates/jmap-proto/src/object/index.rs index 304c5c79..35c0660b 100644 --- a/crates/jmap-proto/src/object/index.rs +++ b/crates/jmap-proto/src/object/index.rs @@ -630,8 +630,8 @@ impl IntoIndex for &u64 { impl IntoIndex for &u32 { fn into_index(self, index_as: IndexAs) -> Vec { match index_as { - IndexAs::Integer => self.serialize(), - _ => unreachable!(), + IndexAs::Integer | IndexAs::IntegerList => self.serialize(), + _ => unreachable!("index as {index_as:?} not supported for u32"), } } } @@ -641,7 +641,7 @@ impl IntoIndex for &Id { match index_as { IndexAs::Integer => self.document_id().serialize(), IndexAs::LongInteger => self.id().serialize(), - _ => unreachable!(), + _ => unreachable!("index as {index_as:?} not supported for Id"), } } } diff --git a/crates/jmap/src/email/set.rs b/crates/jmap/src/email/set.rs index 082c4b9b..f9d6b6fb 100644 --- a/crates/jmap/src/email/set.rs +++ b/crates/jmap/src/email/set.rs @@ -1208,7 +1208,7 @@ impl JMAP { event = "error", context = "email_delete", error = ?err, - "Failed to deserialize term index."); + "Failed to deserialize token index while deleting email."); MethodError::ServerPartialFail })? { diff --git a/crates/store/src/fts/term_index.rs b/crates/store/src/fts/term_index.rs index 6bcf4a37..36f441ff 100644 --- a/crates/store/src/fts/term_index.rs +++ b/crates/store/src/fts/term_index.rs @@ -703,63 +703,71 @@ impl Deserialize for TokenIndex { impl TokenIndex { fn from_bytes(bytes: &[u8]) -> Option { - let (num_tokens, mut pos) = bytes.read_leb128::()?; - let mut tokens = Vec::with_capacity(num_tokens as usize); - for _ in 0..num_tokens { - let nil_pos = bytes.get(pos..)?.iter().position(|b| b == &0)?; - tokens.push(String::from_utf8(bytes.get(pos..pos + nil_pos)?.to_vec()).ok()?); - pos += nil_pos + 1; - } - - let mut terms = Vec::new(); - while pos < bytes.len() { - let item_len = - u32::from_le_bytes(bytes.get(pos..pos + LENGTH_SIZE)?.try_into().ok()?) as usize; - pos += LENGTH_SIZE; - - let mut field_terms = Terms { - field_id: *bytes.get(pos)?, - exact_terms: AHashSet::default(), - stemmed_terms: AHashSet::default(), - }; - pos += 1; - - let bytes_read = bytes.get(pos..)?.skip_leb128()?; - pos += bytes_read; - - let (terms_len, bytes_read) = bytes.get(pos..)?.read_leb128::()?; - pos += bytes_read; - - let mut term_pos = 0; - let mut byte_pos = pos; - - while term_pos < terms_len { - let (bytes_read, chunk) = TermIndex::uncompress_chunk( - bytes.get(byte_pos..)?, - (terms_len - term_pos) * 2, - None, - ) - .ok()?; - - byte_pos += bytes_read; - - for encoded_term in chunk.chunks_exact(2) { - let term_id = encoded_term[0]; - let term_id_stemmed = encoded_term[1]; - - field_terms.exact_terms.insert(term_id); - if term_id != term_id_stemmed { - field_terms.stemmed_terms.insert(term_id_stemmed); - } - term_pos += 1; - } + if !bytes.is_empty() { + let (num_tokens, mut pos) = bytes.read_leb128::()?; + let mut tokens = Vec::with_capacity(num_tokens as usize); + for _ in 0..num_tokens { + let nil_pos = bytes.get(pos..)?.iter().position(|b| b == &0)?; + tokens.push(String::from_utf8(bytes.get(pos..pos + nil_pos)?.to_vec()).ok()?); + pos += nil_pos + 1; } - terms.push(field_terms); - pos += item_len; - } + let mut terms = Vec::new(); + while pos < bytes.len() { + let item_len = + u32::from_le_bytes(bytes.get(pos..pos + LENGTH_SIZE)?.try_into().ok()?) + as usize; + pos += LENGTH_SIZE; - Some(TokenIndex { tokens, terms }) + let mut field_terms = Terms { + field_id: *bytes.get(pos)?, + exact_terms: AHashSet::default(), + stemmed_terms: AHashSet::default(), + }; + pos += 1; + + let bytes_read = bytes.get(pos..)?.skip_leb128()?; + pos += bytes_read; + + let (terms_len, bytes_read) = bytes.get(pos..)?.read_leb128::()?; + pos += bytes_read; + + let mut term_pos = 0; + let mut byte_pos = pos; + + while term_pos < terms_len { + let (bytes_read, chunk) = TermIndex::uncompress_chunk( + bytes.get(byte_pos..)?, + (terms_len - term_pos) * 2, + None, + ) + .ok()?; + + byte_pos += bytes_read; + + for encoded_term in chunk.chunks_exact(2) { + let term_id = encoded_term[0]; + let term_id_stemmed = encoded_term[1]; + + field_terms.exact_terms.insert(term_id); + if term_id != term_id_stemmed { + field_terms.stemmed_terms.insert(term_id_stemmed); + } + term_pos += 1; + } + } + + terms.push(field_terms); + pos += item_len; + } + + Some(TokenIndex { tokens, terms }) + } else { + Some(TokenIndex { + tokens: Vec::new(), + terms: Vec::new(), + }) + } } } diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index 84896458..e642d06e 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -114,7 +114,7 @@ pub fn enable_tracing(config: &Config) -> config::Result> { "stdout" => { tracing::subscriber::set_global_default( tracing_subscriber::FmtSubscriber::builder() - .with_env_filter(env_filter) + //.with_env_filter(env_filter) .finish(), ) .failed("Failed to set subscriber"); diff --git a/tests/resources/test_config.toml b/tests/resources/test_config.toml new file mode 100644 index 00000000..66544b9c --- /dev/null +++ b/tests/resources/test_config.toml @@ -0,0 +1,198 @@ +[server] +hostname = "test.example.org" + +[server.listener.jmap] +bind = ["127.0.0.1:9990"] +url = "https://127.0.0.1:8899" +protocol = "jmap" +max-connections = 8192 + +[server.listener.imap] +bind = ["127.0.0.1:9991"] +protocol = "imap" +max-connections = 8192 + +[server.listener.imaptls] +bind = ["127.0.0.1:9992"] +protocol = "imap" +max-connections = 8192 +tls.implicit = true + +[server.listener.sieve] +bind = ["127.0.0.1:9993"] +protocol = "managesieve" +max-connections = 8192 +tls.implicit = true + +[server.listener.smtps] +bind = ['127.0.0.1:9994'] +greeting = 'Test SMTP instance' +protocol = 'smtp' +tls.implicit = true + +[server.listener.smtp] +bind = ['127.0.0.1:9995'] +greeting = 'Test SMTP instance' +protocol = 'smtp' +tls.implicit = false + +[server.socket] +reuse-addr = true + +[server.tls] +enable = true +implicit = false +certificate = "default" + +[global.tracing] +method = "stdout" + +[session.ehlo] +reject-non-fqdn = false + +[session.rcpt] +relay = [ { if = "authenticated-as", ne = "", then = true }, + { else = false } ] +directory = "local" + +[session.rcpt.errors] +total = 5 +wait = "1ms" + +[queue] +path = "/tmp/stalwart-test" +hash = 64 + +[report] +path = "/tmp/stalwart-test" +hash = 64 + +[resolver] +type = "system" + +[queue.outbound] +next-hop = [ { if = "rcpt-domain", in-list = "local/domains", then = "local" }, + { else = false } ] + +[remote."mock-smtp"] +address = "localhost" +port = 9999 +protocol = "smtp" + +[remote."mock-smtp".tls] +implicit = false +allow-invalid-certs = true + +[session.extensions] +future-release = [ { if = "authenticated-as", ne = "", then = "99999999d"}, + { else = false } ] + +[store] +db.path = "/tmp/stalwart-test/sqlite.db" + +[store.blob] +type = "local" + +[store.blob.local] +path = "/tmp/stalwart-test" + +[certificate.default] +cert = "file://./tests/resources/tls_cert.pem" +private-key = "file://./tests/resources/tls_privatekey.pem" + +[jmap] +directory = "local" + +[jmap.protocol] +set.max-objects = 100000 + +[jmap.protocol.request] +max-concurrent = 8 + +[jmap.protocol.upload] +max-size = 5000000 +max-concurrent = 4 +ttl = "1m" + +[jmap.protocol.upload.quota] +files = 3 +size = 50000 + +[jmap.rate-limit] +account.rate = "1000/1m" +authentication.rate = "100/2s" +anonymous.rate = "100/1m" + +[jmap.event-source] +throttle = "500ms" + +[jmap.web-sockets] +throttle = "500ms" + +[jmap.push] +throttle = "500ms" +attempts.interval = "500ms" + +[directory."local"] +type = "memory" + +[directory."local".options] +catch-all = true +subaddressing = true + +[directory."local".lookup] +domains = ["example.org"] + +[[directory."local".users]] +name = "admin" +description = "Superadmin" +secret = "donotuse" + +[[directory."local".users]] +name = "john" +description = "John Doe" +secret = "12345" +email = ["john@example.org", "jdoe@example.org", "john.doe@example.org"] +email-list = ["info@example.org"] +member-of = ["sales"] + +[[directory."local".users]] +name = "jane" +description = "Jane Doe" +secret = "abcde" +email = "jane@example.org" +email-list = ["info@example.org"] +member-of = ["sales", "support"] + +[[directory."local".users]] +name = "bill" +description = "Bill Foobar" +secret = "$2y$05$bvIG6Nmid91Mu9RcmmWZfO5HJIMCT8riNW0hEp8f6/FuA2/mHZFpe" +quota = 500000 +email = "bill@example.org" +email-list = ["info@example.org"] + +[[directory."local".groups]] +name = "sales" +description = "Sales Team" + +[[directory."local".groups]] +name = "support" +description = "Support Team" + +[oauth] +key = "parerga_und_paralipomena" +max-auth-attempts = 1 + +[oauth.expiry] +user-code = "1s" +token = "1s" +refresh-token = "3s" +refresh-token-renew = "2s" + +[imap.auth] +allow-plain-text = true + +[imap.rate-limit] +rate = "10000/1s" +concurrent = 9000 diff --git a/tests/src/imap/fetch.rs b/tests/src/imap/fetch.rs index de07e67b..21aa74ab 100644 --- a/tests/src/imap/fetch.rs +++ b/tests/src/imap/fetch.rs @@ -49,7 +49,7 @@ pub async fn test(imap: &mut ImapConnection, _imap_check: &mut ImapConnection) { .assert_contains("EMAILID (") .assert_contains("but then I thought, why not do both?") .assert_contains(concat!( - "ENVELOPE (\"Sat, 20 Nov 2021 22:22:01 +0000\" ", + "ENVELOPE (\"Sat, 20 Nov 2021 14:22:01 -0800\" ", "\"Why not both importing AND exporting? ☺\" ", "((\"Art Vandelay (Vandelay Industries)\" NIL \"art\" \"vandelay.com\")) ", "((\"Art Vandelay (Vandelay Industries)\" NIL \"art\" \"vandelay.com\")) ", diff --git a/tests/src/imap/mailbox.rs b/tests/src/imap/mailbox.rs index e17d8c7b..f2ddcb0e 100644 --- a/tests/src/imap/mailbox.rs +++ b/tests/src/imap/mailbox.rs @@ -26,6 +26,13 @@ use imap_proto::ResponseType; use super::{AssertResult, ImapConnection, Type}; pub async fn test(mut imap: &mut ImapConnection, mut imap_check: &mut ImapConnection) { + // Create third connection for testing + let mut other_conn = ImapConnection::connect(b"_z ").await; + other_conn + .send("AUTHENTICATE PLAIN {32+}\r\nAGpkb2VAZXhhbXBsZS5jb20Ac2VjcmV0") + .await; + other_conn.assert_read(Type::Tagged, ResponseType::Ok).await; + // List folders imap.send("LIST \"\" \"*\"").await; imap.assert_read(Type::Tagged, ResponseType::Ok) @@ -48,6 +55,12 @@ pub async fn test(mut imap: &mut ImapConnection, mut imap_check: &mut ImapConnec imap.assert_read(Type::Tagged, ResponseType::Ok).await; imap.send("CREATE \"Fruit/Apple/Green\"").await; imap.assert_read(Type::Tagged, ResponseType::Ok).await; + + // Select folder from another connection + other_conn.send("SELECT \"Tofu\"").await; + other_conn.assert_read(Type::Tagged, ResponseType::Ok).await; + + // Make sure folders are visible for imap in [&mut imap, &mut imap_check] { imap.send("LIST \"\" \"*\"").await; imap.assert_read(Type::Tagged, ResponseType::Ok)