diff --git a/crates/imap-proto/src/protocol/search.rs b/crates/imap-proto/src/protocol/search.rs index df98c541..022ba224 100644 --- a/crates/imap-proto/src/protocol/search.rs +++ b/crates/imap-proto/src/protocol/search.rs @@ -60,7 +60,7 @@ pub struct Response { pub min: Option, pub max: Option, pub count: Option, - pub highest_modseq: Option, + pub highest_modseq: Option, } #[derive(Debug, Clone, PartialEq, Eq)] diff --git a/crates/imap/src/core/message.rs b/crates/imap/src/core/message.rs index c4a0b60c..87b3f50f 100644 --- a/crates/imap/src/core/message.rs +++ b/crates/imap/src/core/message.rs @@ -1,7 +1,4 @@ -use std::{ - hash::{BuildHasher, Hash, Hasher}, - sync::Arc, -}; +use std::hash::{BuildHasher, Hash, Hasher}; use ahash::{AHashMap, AHashSet, AHasher, RandomState}; use imap_proto::{ @@ -18,7 +15,7 @@ use utils::codec::leb128::{Leb128Iterator, Leb128Vec}; use crate::core::ImapId; -use super::{MailboxId, MailboxState, SavedSearch, SelectedMailbox, SessionData}; +use super::{MailboxId, MailboxState, SelectedMailbox, SessionData}; struct UidMap { uid_next: u32, @@ -482,21 +479,6 @@ impl SelectedMailbox { (mailbox_size, deletions) } - - pub async fn get_saved_search(&self) -> Option>> { - let mut rx = match &*self.saved_search.lock() { - SavedSearch::InFlight { rx } => rx.clone(), - SavedSearch::Results { items } => { - return Some(items.clone()); - } - SavedSearch::None => { - return None; - } - }; - rx.changed().await.ok(); - let v = rx.borrow(); - Some(v.clone()) - } } impl Serialize for &UidMap { diff --git a/crates/imap/src/core/mod.rs b/crates/imap/src/core/mod.rs index 066dc50c..e8e45ec2 100644 --- a/crates/imap/src/core/mod.rs +++ b/crates/imap/src/core/mod.rs @@ -45,6 +45,7 @@ pub struct IMAP { pub timeout_auth: Duration, pub timeout_unauth: Duration, + pub timeout_idle: Duration, pub greeting_plain: Vec, pub greeting_tls: Vec, diff --git a/crates/imap/src/lib.rs b/crates/imap/src/lib.rs index a67d7438..920d40d2 100644 --- a/crates/imap/src/lib.rs +++ b/crates/imap/src/lib.rs @@ -28,6 +28,7 @@ impl IMAP { .to_string(), timeout_auth: config.property_or_static("imap.timeout.authenticated", "30m")?, timeout_unauth: config.property_or_static("imap.timeout.anonymous", "1m")?, + timeout_idle: config.property_or_static("imap.timeout.idle", "30m")?, greeting_plain: StatusResponse::ok(SERVER_GREETING) .with_code(ResponseCode::Capability { capabilities: Capability::all_capabilities(false, false), diff --git a/crates/imap/src/op/idle.rs b/crates/imap/src/op/idle.rs new file mode 100644 index 00000000..a2b0d351 --- /dev/null +++ b/crates/imap/src/op/idle.rs @@ -0,0 +1,285 @@ +/* + * Copyright (c) 2020-2022, Stalwart Labs Ltd. + * + * This file is part of the Stalwart IMAP Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use std::sync::Arc; + +use ahash::AHashSet; +use imap_proto::{ + protocol::{ + fetch, + list::{Attribute, ListItem}, + status::Status, + Sequence, + }, + receiver::Request, + Command, ResponseCode, StatusResponse, +}; + +use jmap_proto::types::{collection::Collection, type_state::TypeState}; +use store::query::log::Query; +use tokio::io::{AsyncRead, AsyncReadExt}; +use utils::map::bitmap::Bitmap; + +use crate::core::{SelectedMailbox, Session, SessionData, State}; + +impl Session { + pub async fn handle_idle(&mut self, request: Request) -> crate::OpResult { + let (data, mailbox, types) = match &self.state { + State::Authenticated { data } => { + (data.clone(), None, Bitmap::from_iter([TypeState::Mailbox])) + } + State::Selected { data, mailbox, .. } => ( + data.clone(), + mailbox.clone().into(), + Bitmap::from_iter([ + TypeState::Email, + TypeState::Mailbox, + TypeState::EmailDelivery, + ]), + ), + _ => unreachable!(), + }; + let is_rev2 = self.version.is_rev2(); + let is_qresync = self.is_qresync; + + // Register with state manager + let mut change_rx = if let Some(change_rx) = self + .jmap + .subscribe_state_manager(data.account_id, data.account_id, types) + .await + { + change_rx + } else { + return self + .write_bytes( + StatusResponse::no("It was not possible to start IDLE.") + .with_tag(request.tag) + .with_code(ResponseCode::ContactAdmin) + .into_bytes(), + ) + .await; + }; + + // Send continuation response + self.write_bytes(b"+ Idling, send 'DONE' to stop.\r\n".to_vec()) + .await?; + tracing::debug!(parent: &self.span, event = "stat", context = "idle", "Starting IDLE."); + let mut buf = vec![0; 1024]; + loop { + tokio::select! { + result = tokio::time::timeout(self.imap.timeout_idle, self.stream_rx.read(&mut buf)) => { + match result { + Ok(Ok(bytes_read)) => { + if bytes_read > 0 { + if (&buf[..bytes_read]).windows(4).any(|w| w == b"DONE") { + tracing::debug!(parent: &self.span, event = "stop", context = "idle", "Stopping IDLE."); + return self.write_bytes(StatusResponse::completed(Command::Idle) + .with_tag(request.tag) + .into_bytes()).await; + } + } else { + tracing::debug!(parent: &self.span, event = "close", "IMAP connection closed by client."); + return Err(()); + } + }, + Ok(Err(err)) => { + tracing::debug!(parent: &self.span, event = "error", reason = %err, "IMAP connection error."); + return Err(()); + }, + Err(_) => { + self.write_bytes(&b"* BYE IDLE timed out.\r\n"[..]).await.ok(); + tracing::debug!(parent: &self.span, "IDLE timed out."); + return Err(()); + } + } + } + state_change = change_rx.recv() => { + if let Some(state_change) = state_change { + let mut has_mailbox_changes = false; + let mut has_email_changes = false; + + for (type_state, _) in state_change.types { + match type_state { + TypeState::Email | TypeState::EmailDelivery => { + has_email_changes = true; + } + TypeState::Mailbox => { + has_mailbox_changes = true; + } + _ => {} + } + } + + if has_mailbox_changes || has_email_changes { + data.write_changes(&mailbox, has_mailbox_changes, has_email_changes, is_qresync, is_rev2).await; + } + } else { + self.write_bytes(&b"* BYE Server shutting down.\r\n"[..]).await.ok(); + tracing::debug!(parent: &self.span, "IDLE channel closed."); + return Err(()); + } + } + } + } + } +} + +impl SessionData { + pub async fn write_changes( + &self, + mailbox: &Option>, + check_mailboxes: bool, + check_emails: bool, + is_qresync: bool, + is_rev2: bool, + ) { + // Fetch all changed mailboxes + if check_mailboxes { + match self.synchronize_mailboxes(true).await { + Ok(Some(changes)) => { + let mut buf = Vec::with_capacity(64); + + // List deleted mailboxes + for mailbox_name in changes.deleted { + ListItem { + mailbox_name, + attributes: vec![Attribute::NonExistent], + tags: vec![], + } + .serialize(&mut buf, is_rev2, false); + } + + // List added mailboxes + for mailbox_name in changes.added { + ListItem { + mailbox_name: mailbox_name.to_string(), + attributes: vec![], + tags: vec![], + } + .serialize(&mut buf, is_rev2, false); + } + // Obtain status of changed mailboxes + for mailbox_name in changes.changed { + if let Ok(status) = self + .status( + mailbox_name, + &[ + Status::Messages, + Status::Unseen, + Status::UidNext, + Status::UidValidity, + ], + ) + .await + { + status.serialize(&mut buf, is_rev2); + } + } + + if !buf.is_empty() { + self.write_bytes(buf).await; + } + } + Err(_) => { + tracing::debug!(parent: &self.span, "Failed to refresh mailboxes."); + } + _ => unreachable!(), + } + } + + // Fetch selected mailbox changes + if check_emails { + // Synchronize emails + if let Some(mailbox) = mailbox { + // Obtain changes since last sync + let last_state = mailbox.state.lock().last_state; + match self + .synchronize_messages(mailbox, is_qresync, is_qresync) + .await + { + Ok(new_state) => { + if new_state == last_state { + return; + } + } + Err(response) => { + self.write_bytes(response.into_bytes()).await; + return; + } + } + + // Obtain changed messages + let changed_ids = match self + .jmap + .changes_( + mailbox.id.account_id, + Collection::Email, + last_state.map(Query::Since).unwrap_or(Query::All), + ) + .await + { + Ok(changelog) => { + let state = mailbox.state.lock(); + changelog + .changes + .into_iter() + .filter_map(|change| { + state + .id_to_imap + .get(&((change.unwrap_id() & u32::MAX as u64) as u32)) + .map(|id| id.uid) + }) + .collect::>() + } + Err(_) => { + self.write_bytes(StatusResponse::database_failure().into_bytes()) + .await; + return; + } + }; + + if !changed_ids.is_empty() { + self.fetch( + fetch::Arguments { + tag: String::new(), + sequence_set: Sequence::List { + items: changed_ids + .into_iter() + .map(|uid| Sequence::Number { value: uid }) + .collect(), + }, + attributes: vec![fetch::Attribute::Flags, fetch::Attribute::Uid], + changed_since: None, + include_vanished: false, + }, + mailbox.clone(), + true, + is_qresync, + false, + ) + .await; + } + } + } + } +} diff --git a/crates/imap/src/op/mod.rs b/crates/imap/src/op/mod.rs index 74ba4ed9..519b22c7 100644 --- a/crates/imap/src/op/mod.rs +++ b/crates/imap/src/op/mod.rs @@ -10,20 +10,28 @@ pub mod delete; pub mod enable; pub mod expunge; pub mod fetch; +pub mod idle; pub mod list; pub mod login; pub mod logout; pub mod namespace; +pub mod noop; pub mod rename; +pub mod search; pub mod select; pub mod status; pub mod store; pub mod subscribe; +pub mod thread; trait FromModSeq { fn from_modseq(modseq: u64) -> Self; } +trait ToModSeq { + fn to_modseq(&self) -> u64; +} + impl FromModSeq for Query { fn from_modseq(modseq: u64) -> Self { if modseq > 0 { @@ -34,4 +42,10 @@ impl FromModSeq for Query { } } +impl ToModSeq for Option { + fn to_modseq(&self) -> u64 { + self.map(|modseq| modseq + 1).unwrap_or(0) + } +} + pub type Result = std::result::Result; diff --git a/crates/imap/src/op/noop.rs b/crates/imap/src/op/noop.rs new file mode 100644 index 00000000..67e83219 --- /dev/null +++ b/crates/imap/src/op/noop.rs @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2020-2022, Stalwart Labs Ltd. + * + * This file is part of the Stalwart IMAP Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use imap_proto::{receiver::Request, Command, StatusResponse}; + +use tokio::io::AsyncRead; + +use crate::core::{Session, State}; + +impl Session { + pub async fn handle_noop( + &mut self, + request: Request, + is_check: bool, + ) -> Result<(), ()> { + match &self.state { + State::Authenticated { data } => { + data.write_changes(&None, true, false, self.is_qresync, self.version.is_rev2()) + .await; + } + State::Selected { data, mailbox, .. } => { + data.write_changes( + &Some(mailbox.clone()), + true, + true, + self.is_qresync, + self.version.is_rev2(), + ) + .await; + } + _ => (), + } + + self.write_bytes( + StatusResponse::completed(if !is_check { + Command::Noop + } else { + Command::Check + }) + .with_tag(request.tag) + .into_bytes(), + ) + .await + } +} diff --git a/crates/imap/src/op/search.rs b/crates/imap/src/op/search.rs new file mode 100644 index 00000000..262a6279 --- /dev/null +++ b/crates/imap/src/op/search.rs @@ -0,0 +1,744 @@ +/* + * Copyright (c) 2020-2022, Stalwart Labs Ltd. + * + * This file is part of the Stalwart IMAP Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use std::sync::Arc; + +use imap_proto::{ + protocol::{ + search::{self, Arguments, Filter, Response, ResultOption}, + Sequence, + }, + receiver::Request, + Command, StatusResponse, +}; + +use jmap_proto::types::{collection::Collection, id::Id, keyword::Keyword, property::Property}; +use mail_parser::{HeaderName, RfcHeader}; +use store::{ + fts::{builder::MAX_TOKEN_LENGTH, Language}, + query::{self, log::Query, sort::Pagination, ResultSet}, + roaring::RoaringBitmap, + write::now, +}; +use tokio::{io::AsyncRead, sync::watch}; + +use crate::core::{ImapId, SavedSearch, SelectedMailbox, Session, SessionData}; + +use super::{FromModSeq, ToModSeq}; + +impl Session { + pub async fn handle_search( + &mut self, + request: Request, + is_sort: bool, + is_uid: bool, + ) -> Result<(), ()> { + match if !is_sort { + request.parse_search(self.version) + } else { + request.parse_sort() + } { + Ok(mut arguments) => { + let (data, mailbox) = self.state.mailbox_state(); + + // Create channel for results + let (results_tx, prev_saved_search) = + if arguments.result_options.contains(&ResultOption::Save) { + let prev_saved_search = Some(mailbox.get_saved_search().await); + let (tx, rx) = watch::channel(Arc::new(Vec::new())); + *mailbox.saved_search.lock() = SavedSearch::InFlight { rx }; + (tx.into(), prev_saved_search) + } else { + (None, None) + }; + let is_qresync = self.is_qresync; + + tokio::spawn(async move { + let tag = std::mem::take(&mut arguments.tag); + let bytes = match data + .search( + arguments, + mailbox.clone(), + results_tx, + prev_saved_search.clone(), + is_qresync, + is_uid, + ) + .await + { + Ok(response) => { + let response = response.serialize(&tag); + StatusResponse::completed(if !is_sort { + Command::Search(is_uid) + } else { + Command::Sort(is_uid) + }) + .with_tag(tag) + .serialize(response) + } + Err(response) => { + if let Some(prev_saved_search) = prev_saved_search { + *mailbox.saved_search.lock() = prev_saved_search + .map_or(SavedSearch::None, |s| SavedSearch::Results { + items: s, + }); + } + response.with_tag(tag).into_bytes() + } + }; + data.write_bytes(bytes).await; + }); + Ok(()) + } + Err(response) => self.write_bytes(response.into_bytes()).await, + } + } +} + +impl SessionData { + pub async fn search( + &self, + arguments: Arguments, + mailbox: Arc, + results_tx: Option>>>, + prev_saved_search: Option>>>, + is_qresync: bool, + is_uid: bool, + ) -> Result { + // Run query + let (result_set, include_highest_modseq) = self + .query(arguments.filter, &mailbox, &prev_saved_search, is_uid) + .await?; + + // Obtain modseq + let mut highest_modseq = None; + if is_uid { + let modseq = self + .synchronize_messages(&mailbox, is_qresync, true) + .await?; + if include_highest_modseq { + highest_modseq = modseq.to_modseq().into(); + } + } else if include_highest_modseq { + // Don't synchronize if we're not using UIDs as seqnums might change. + highest_modseq = self + .get_modseq(mailbox.id.account_id) + .await? + .to_modseq() + .into(); + } + + // Sort and map ids + let mut min: Option<(u32, ImapId)> = None; + let mut max: Option<(u32, ImapId)> = None; + let mut total = 0; + let results_len = result_set.results.len() as usize; + let mut saved_results = if results_tx.is_some() { + Some(Vec::with_capacity(results_len)) + } else { + None + }; + let mut imap_ids = Vec::with_capacity(results_len); + let is_sort = if let Some(sort) = arguments.sort { + mailbox.map_search_results( + self.jmap + .store + .sort( + result_set, + sort.into_iter() + .map(|item| match item.sort { + search::Sort::Arrival => { + query::Comparator::field(Property::ReceivedAt, item.ascending) + } + search::Sort::Cc => { + query::Comparator::field(Property::Cc, item.ascending) + } + search::Sort::Date => { + query::Comparator::field(Property::SentAt, item.ascending) + } + search::Sort::From | search::Sort::DisplayFrom => { + query::Comparator::field(Property::From, item.ascending) + } + search::Sort::Size => { + query::Comparator::field(Property::Size, item.ascending) + } + search::Sort::Subject => { + query::Comparator::field(Property::Subject, item.ascending) + } + search::Sort::To | search::Sort::DisplayTo => { + query::Comparator::field(Property::To, item.ascending) + } + }) + .collect::>(), + Pagination::new(results_len, 0, None, 0), + ) + .await + .map_err(|_| StatusResponse::database_failure())? + .ids + .into_iter() + .map(|id| id as u32), + is_uid, + arguments.result_options.contains(&ResultOption::Min), + arguments.result_options.contains(&ResultOption::Max), + &mut min, + &mut max, + &mut total, + &mut imap_ids, + &mut saved_results, + ); + true + } else { + mailbox.map_search_results( + result_set.results.into_iter(), + is_uid, + arguments.result_options.contains(&ResultOption::Min), + arguments.result_options.contains(&ResultOption::Max), + &mut min, + &mut max, + &mut total, + &mut imap_ids, + &mut saved_results, + ); + imap_ids.sort_unstable(); + false + }; + + // Save results + if let (Some(results_tx), Some(saved_results)) = (results_tx, saved_results) { + let saved_results = Arc::new(saved_results.clone()); + *mailbox.saved_search.lock() = SavedSearch::Results { + items: saved_results.clone(), + }; + results_tx.send(saved_results).ok(); + } + + // Build response + Ok(Response { + is_uid, + min: min.map(|(id, _)| id), + max: max.map(|(id, _)| id), + count: if arguments.result_options.contains(&ResultOption::Count) { + Some(total) + } else { + None + }, + ids: if arguments.result_options.is_empty() + || arguments.result_options.contains(&ResultOption::All) + { + imap_ids + } else { + vec![] + }, + is_sort, + is_esearch: arguments.is_esearch, + highest_modseq, + }) + } + + pub async fn query( + &self, + imap_filter: Vec, + mailbox: &SelectedMailbox, + prev_saved_search: &Option>>>, + is_uid: bool, + ) -> Result<(ResultSet, bool), StatusResponse> { + // Obtain message ids + let mut filters = Vec::with_capacity(imap_filter.len() + 1); + let message_ids = if let Some(mailbox_id) = mailbox.id.mailbox_id { + let ids = self + .jmap + .get_tag( + mailbox.id.account_id, + Collection::Email, + Property::MailboxIds, + mailbox_id, + ) + .await? + .unwrap_or_default(); + filters.push(query::Filter::is_in_set(ids.clone())); + ids + } else { + self.jmap + .get_document_ids(mailbox.id.account_id, Collection::Email) + .await? + .unwrap_or_default() + }; + + // Convert query + let mut include_highest_modseq = false; + for filter in imap_filter { + match filter { + search::Filter::Sequence(sequence, uid_filter) => { + let mut set = RoaringBitmap::new(); + if let (Sequence::SavedSearch, Some(prev_saved_search)) = + (&sequence, &prev_saved_search) + { + if let Some(prev_saved_search) = prev_saved_search { + for imap_id in prev_saved_search.iter() { + if let Some(id) = mailbox.state.lock().uid_to_id.get(&imap_id.uid) { + set.insert(*id); + } + } + } else { + return Err(StatusResponse::no("No saved search found.")); + } + } else { + for id in mailbox + .sequence_to_ids(&sequence, is_uid || uid_filter) + .await? + .keys() + { + set.insert(*id); + } + } + filters.push(query::Filter::is_in_set(set)); + } + search::Filter::All => { + filters.push(query::Filter::is_in_set(message_ids.clone())); + } + search::Filter::Answered => { + filters.push(query::Filter::is_in_bitmap( + Property::Keywords, + Keyword::Answered, + )); + } + search::Filter::Bcc(text) => { + filters.push(query::Filter::has_text(Property::Bcc, text, Language::None)); + } + search::Filter::Before(date) => { + filters.push(query::Filter::lt(Property::ReceivedAt, date as u64)); + } + search::Filter::Body(text) => { + filters.push(query::Filter::has_text_detect( + Property::TextBody, + text, + self.jmap.config.default_language, + )); + } + search::Filter::Cc(text) => { + filters.push(query::Filter::has_text(Property::Cc, text, Language::None)); + } + search::Filter::Deleted => { + filters.push(query::Filter::is_in_bitmap( + Property::Keywords, + Keyword::Deleted, + )); + } + search::Filter::Draft => { + filters.push(query::Filter::is_in_bitmap( + Property::Keywords, + Keyword::Draft, + )); + } + search::Filter::Flagged => { + filters.push(query::Filter::is_in_bitmap( + Property::Keywords, + Keyword::Flagged, + )); + } + search::Filter::From(text) => { + filters.push(query::Filter::has_text( + Property::From, + text, + Language::None, + )); + } + search::Filter::Header(header, value) => { + if let Some(HeaderName::Rfc(header_name)) = HeaderName::parse(&header) { + let is_id = matches!( + header_name, + RfcHeader::MessageId + | RfcHeader::InReplyTo + | RfcHeader::References + | RfcHeader::ResentMessageId + ); + let tokens = if !value.is_empty() { + let header_num = u8::from(header_name).to_string(); + value + .split_ascii_whitespace() + .filter_map(|token| { + if token.len() < MAX_TOKEN_LENGTH { + if is_id { + format!("{header_num}{token}") + } else { + format!("{header_num}{}", token.to_lowercase()) + } + .into() + } else { + None + } + }) + .collect::>() + } else { + vec![] + }; + match tokens.len() { + 0 => { + filters.push(query::Filter::has_raw_text( + Property::Headers, + u8::from(header_name).to_string(), + )); + } + 1 => { + filters.push(query::Filter::has_raw_text( + Property::Headers, + tokens.into_iter().next().unwrap(), + )); + } + _ => { + filters.push(query::Filter::And); + for token in tokens { + filters.push(query::Filter::has_raw_text( + Property::Headers, + token, + )); + } + filters.push(query::Filter::End); + } + } + } else { + return Err(StatusResponse::no(format!( + "Querying non-RFC header '{header}' is not allowed.", + ))); + }; + } + search::Filter::Keyword(keyword) => { + filters.push(query::Filter::is_in_bitmap( + Property::Keywords, + Keyword::from(keyword), + )); + } + search::Filter::Larger(size) => { + filters.push(query::Filter::gt(Property::Size, size)); + } + search::Filter::On(date) => { + filters.push(query::Filter::And); + filters.push(query::Filter::gt(Property::ReceivedAt, date as u64)); + filters.push(query::Filter::lt( + Property::ReceivedAt, + (date + 86400) as u64, + )); + filters.push(query::Filter::End); + } + search::Filter::Seen => { + filters.push(query::Filter::is_in_bitmap( + Property::Keywords, + Keyword::Seen, + )); + } + search::Filter::SentBefore(date) => { + filters.push(query::Filter::lt(Property::SentAt, date as u64)); + } + search::Filter::SentOn(date) => { + filters.push(query::Filter::And); + filters.push(query::Filter::gt(Property::SentAt, date as u64)); + filters.push(query::Filter::lt(Property::SentAt, (date + 86400) as u64)); + filters.push(query::Filter::End); + } + search::Filter::SentSince(date) => { + filters.push(query::Filter::gt(Property::SentAt, date as u64)); + } + search::Filter::Since(date) => { + filters.push(query::Filter::gt(Property::ReceivedAt, date as u64)); + } + search::Filter::Smaller(size) => { + filters.push(query::Filter::lt(Property::Size, size)); + } + search::Filter::Subject(text) => { + filters.push(query::Filter::has_text_detect( + Property::Subject, + text, + self.jmap.config.default_language, + )); + } + search::Filter::Text(text) => { + filters.push(query::Filter::Or); + filters.push(query::Filter::has_text( + Property::From, + &text, + Language::None, + )); + filters.push(query::Filter::has_text(Property::To, &text, Language::None)); + filters.push(query::Filter::has_text(Property::Cc, &text, Language::None)); + filters.push(query::Filter::has_text( + Property::Bcc, + &text, + Language::None, + )); + filters.push(query::Filter::has_text_detect( + Property::Subject, + &text, + self.jmap.config.default_language, + )); + filters.push(query::Filter::has_text_detect( + Property::TextBody, + &text, + self.jmap.config.default_language, + )); + filters.push(query::Filter::has_text_detect( + Property::Attachments, + text, + self.jmap.config.default_language, + )); + filters.push(query::Filter::End); + } + search::Filter::To(text) => { + filters.push(query::Filter::has_text(Property::To, text, Language::None)); + } + search::Filter::Unanswered => { + filters.push(query::Filter::Not); + filters.push(query::Filter::is_in_bitmap( + Property::Keywords, + Keyword::Answered, + )); + filters.push(query::Filter::End); + } + search::Filter::Undeleted => { + filters.push(query::Filter::Not); + filters.push(query::Filter::is_in_bitmap( + Property::Keywords, + Keyword::Deleted, + )); + filters.push(query::Filter::End); + } + search::Filter::Undraft => { + filters.push(query::Filter::Not); + filters.push(query::Filter::is_in_bitmap( + Property::Keywords, + Keyword::Draft, + )); + filters.push(query::Filter::End); + } + search::Filter::Unflagged => { + filters.push(query::Filter::Not); + filters.push(query::Filter::is_in_bitmap( + Property::Keywords, + Keyword::Flagged, + )); + filters.push(query::Filter::End); + } + search::Filter::Unkeyword(keyword) => { + filters.push(query::Filter::Not); + filters.push(query::Filter::is_in_bitmap( + Property::Keywords, + Keyword::from(keyword), + )); + filters.push(query::Filter::End); + } + search::Filter::Unseen => { + filters.push(query::Filter::Not); + filters.push(query::Filter::is_in_bitmap( + Property::Keywords, + Keyword::Seen, + )); + filters.push(query::Filter::End); + } + search::Filter::And => { + filters.push(query::Filter::And); + } + search::Filter::Or => { + filters.push(query::Filter::Or); + } + search::Filter::Not => { + filters.push(query::Filter::Not); + } + search::Filter::End => { + filters.push(query::Filter::End); + } + search::Filter::Recent => { + filters.push(query::Filter::is_in_bitmap( + Property::Keywords, + Keyword::Recent, + )); + } + search::Filter::New => { + filters.push(query::Filter::And); + filters.push(query::Filter::is_in_bitmap( + Property::Keywords, + Keyword::Recent, + )); + filters.push(query::Filter::Not); + filters.push(query::Filter::is_in_bitmap( + Property::Keywords, + Keyword::Seen, + )); + filters.push(query::Filter::End); + filters.push(query::Filter::End); + } + search::Filter::Old => { + filters.push(query::Filter::Not); + filters.push(query::Filter::is_in_bitmap( + Property::Keywords, + Keyword::Seen, + )); + filters.push(query::Filter::End); + } + search::Filter::Older(secs) => { + filters.push(query::Filter::le( + Property::ReceivedAt, + now().saturating_sub(secs as u64), + )); + } + search::Filter::Younger(secs) => { + filters.push(query::Filter::ge( + Property::ReceivedAt, + now().saturating_sub(secs as u64), + )); + } + search::Filter::ModSeq((modseq, _)) => { + let mut set = RoaringBitmap::new(); + for change in self + .jmap + .changes_( + mailbox.id.account_id, + Collection::Email, + Query::from_modseq(modseq), + ) + .await? + .changes + { + let id = (change.unwrap_id() & u32::MAX as u64) as u32; + if message_ids.contains(id) { + set.insert(id); + } + } + filters.push(query::Filter::is_in_set(set)); + include_highest_modseq = true; + } + search::Filter::EmailId(id) => { + if let Some(id) = Id::from_bytes(id.as_bytes()) { + filters.push(query::Filter::is_in_set( + RoaringBitmap::from_sorted_iter([id.document_id()]).unwrap(), + )); + } else { + return Err(StatusResponse::no(format!( + "Failed to parse email id '{id}'.", + ))); + } + } + search::Filter::ThreadId(id) => { + if let Some(id) = Id::from_bytes(id.as_bytes()) { + filters.push(query::Filter::is_in_bitmap( + Property::ThreadId, + id.document_id(), + )); + } else { + return Err(StatusResponse::no(format!( + "Failed to parse thread id '{id}'.", + ))); + } + } + } + } + + // Run query + self.jmap + .filter(mailbox.id.account_id, Collection::Email, filters) + .await + .map(|res| (res, include_highest_modseq)) + .map_err(|err| err.into()) + } +} + +impl SelectedMailbox { + pub async fn get_saved_search(&self) -> Option>> { + let mut rx = match &*self.saved_search.lock() { + SavedSearch::InFlight { rx } => rx.clone(), + SavedSearch::Results { items } => { + return Some(items.clone()); + } + SavedSearch::None => { + return None; + } + }; + rx.changed().await.ok(); + let v = rx.borrow(); + Some(v.clone()) + } + + pub fn map_search_results( + &self, + ids: impl Iterator, + is_uid: bool, + find_min: bool, + find_max: bool, + min: &mut Option<(u32, ImapId)>, + max: &mut Option<(u32, ImapId)>, + total: &mut u32, + imap_ids: &mut Vec, + saved_results: &mut Option>, + ) { + let state = self.state.lock(); + let find_min_or_max = find_min || find_max; + for id in ids { + if let Some(imap_id) = state.id_to_imap.get(&(id as u32)) { + let id = if is_uid { imap_id.uid } else { imap_id.seqnum }; + if find_min_or_max { + if find_min { + if let Some((prev_min, _)) = min { + if id < *prev_min { + *min = Some((id, *imap_id)); + } + } else { + *min = Some((id, *imap_id)); + } + } + if find_max { + if let Some((prev_max, _)) = max { + if id > *prev_max { + *max = Some((id, *imap_id)); + } + } else { + *max = Some((id, *imap_id)); + } + } + } else { + imap_ids.push(id); + saved_results.as_mut().map(|r| r.push(*imap_id)); + } + *total += 1; + } + } + if find_min || find_max { + for (id, imap_id) in [min, max].into_iter().flatten() { + imap_ids.push(*id); + saved_results.as_mut().map(|r| r.push(*imap_id)); + } + } + } +} + +impl SavedSearch { + pub async fn unwrap(&self) -> Option>> { + match self { + SavedSearch::InFlight { rx } => { + let mut rx = rx.clone(); + rx.changed().await.ok(); + let v = rx.borrow(); + Some(v.clone()) + } + SavedSearch::Results { items } => Some(items.clone()), + SavedSearch::None => None, + } + } +} diff --git a/crates/imap/src/op/select.rs b/crates/imap/src/op/select.rs index e8ed1915..aca4691d 100644 --- a/crates/imap/src/op/select.rs +++ b/crates/imap/src/op/select.rs @@ -34,6 +34,8 @@ use tokio::io::AsyncRead; use crate::core::{SavedSearch, SelectedMailbox, Session, State}; +use super::ToModSeq; + impl Session { pub async fn handle_select(&mut self, request: Request) -> crate::OpResult { let is_select = request.command == Command::Select; @@ -60,7 +62,11 @@ impl Session { let uid_validity = state.uid_validity; let uid_next = state.uid_next; let total_messages = state.total_messages; - let highest_modseq = state.last_state; + let highest_modseq = if is_condstore { + state.last_state.to_modseq().into() + } else { + None + }; let mailbox = Arc::new(SelectedMailbox { id: mailbox, state: parking_lot::Mutex::new(state), @@ -111,7 +117,7 @@ impl Session { uid_next, closed_previous, is_rev2: self.version.is_rev2(), - highest_modseq: highest_modseq.map(|id| id + 1), + highest_modseq, mailbox_id: Id::from_parts( mailbox.id.account_id, mailbox.id.mailbox_id.unwrap_or(u32::MAX), diff --git a/crates/imap/src/op/status.rs b/crates/imap/src/op/status.rs index 005951a6..c4f0c27a 100644 --- a/crates/imap/src/op/status.rs +++ b/crates/imap/src/op/status.rs @@ -36,6 +36,8 @@ use tokio::io::AsyncRead; use crate::core::{Mailbox, Session, SessionData}; +use super::ToModSeq; + impl Session { pub async fn handle_status(&mut self, request: Request) -> crate::OpResult { match request.parse_status(self.version) { @@ -150,9 +152,7 @@ impl SessionData { Status::HighestModSeq => { items_response.push(( *item, - StatusItemType::Number( - account.state_email.map(|id| id + 1).unwrap_or(0), - ), + StatusItemType::Number(account.state_email.to_modseq()), )); } Status::MailboxId => { diff --git a/crates/imap/src/op/thread.rs b/crates/imap/src/op/thread.rs new file mode 100644 index 00000000..13842695 --- /dev/null +++ b/crates/imap/src/op/thread.rs @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2020-2022, Stalwart Labs Ltd. + * + * This file is part of the Stalwart IMAP Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use std::sync::Arc; + +use ahash::AHashMap; +use imap_proto::{ + protocol::{ + thread::{Arguments, Response}, + ImapResponse, + }, + receiver::Request, + Command, StatusResponse, +}; + +use jmap_proto::types::{collection::Collection, property::Property}; +use store::ValueKey; +use tokio::io::AsyncRead; + +use crate::core::{SelectedMailbox, Session, SessionData}; + +impl Session { + pub async fn handle_thread( + &mut self, + mut request: Request, + is_uid: bool, + ) -> Result<(), ()> { + let command = request.command; + let tag = std::mem::take(&mut request.tag); + match request.parse_thread() { + Ok(arguments) => { + let (data, mailbox) = self.state.mailbox_state(); + + tokio::spawn(async move { + let bytes = match data.thread(arguments, mailbox, is_uid).await { + Ok(response) => StatusResponse::completed(command) + .with_tag(tag) + .serialize(response.serialize()), + Err(response) => response.with_tag(tag).into_bytes(), + }; + data.write_bytes(bytes).await; + }); + Ok(()) + } + Err(response) => self.write_bytes(response.into_bytes()).await, + } + } +} + +impl SessionData { + pub async fn thread( + &self, + arguments: Arguments, + mailbox: Arc, + is_uid: bool, + ) -> Result { + // Run query + let (result_set, _) = self + .query(arguments.filter, &mailbox, &None, is_uid) + .await?; + + // Obtain threadIds for matching messages + let thread_ids = self + .jmap + .store + .get_values::( + result_set + .results + .iter() + .map(|document_id| { + ValueKey::new( + mailbox.id.account_id, + Collection::Email, + document_id, + Property::ThreadId, + ) + }) + .collect(), + ) + .await + .map_err(|err| { + tracing::error!( + event = "error", + context = "thread_query", + error = ?err, + "Failed to obtain threadIds."); + StatusResponse::database_failure() + })?; + + // Group messages by thread + let mut threads: AHashMap> = AHashMap::new(); + let state = mailbox.state.lock(); + for (document_id, thread_id) in result_set.results.into_iter().zip(thread_ids) { + if let (Some(thread_id), Some(imap_id)) = + (thread_id, state.id_to_imap.get(&document_id)) + { + threads + .entry(thread_id) + .or_insert_with(|| Vec::new()) + .push(if is_uid { imap_id.uid } else { imap_id.seqnum }); + } + } + + // Build response + Ok(Response { + is_uid, + threads: threads.into_iter().map(|(_, messages)| messages).collect(), + }) + } +} diff --git a/crates/store/src/query/log.rs b/crates/store/src/query/log.rs index 12405dd1..c810a74c 100644 --- a/crates/store/src/query/log.rs +++ b/crates/store/src/query/log.rs @@ -200,3 +200,23 @@ impl Changes { Some(()) } } + +impl Change { + pub fn id(&self) -> u64 { + match self { + Change::Insert(id) => *id, + Change::Update(id) => *id, + Change::ChildUpdate(id) => *id, + Change::Delete(id) => *id, + } + } + + pub fn unwrap_id(self) -> u64 { + match self { + Change::Insert(id) => id, + Change::Update(id) => id, + Change::ChildUpdate(id) => id, + Change::Delete(id) => id, + } + } +}