IMAP idle, noop, search and thread commands.

This commit is contained in:
Me 2023-06-28 17:55:40 +02:00
parent 34e21ef03f
commit f5048da232
12 changed files with 1274 additions and 26 deletions

View file

@ -60,7 +60,7 @@ pub struct Response {
pub min: Option<u32>,
pub max: Option<u32>,
pub count: Option<u32>,
pub highest_modseq: Option<u32>,
pub highest_modseq: Option<u64>,
}
#[derive(Debug, Clone, PartialEq, Eq)]

View file

@ -1,7 +1,4 @@
use std::{
hash::{BuildHasher, Hash, Hasher},
sync::Arc,
};
use std::hash::{BuildHasher, Hash, Hasher};
use ahash::{AHashMap, AHashSet, AHasher, RandomState};
use imap_proto::{
@ -18,7 +15,7 @@ use utils::codec::leb128::{Leb128Iterator, Leb128Vec};
use crate::core::ImapId;
use super::{MailboxId, MailboxState, SavedSearch, SelectedMailbox, SessionData};
use super::{MailboxId, MailboxState, SelectedMailbox, SessionData};
struct UidMap {
uid_next: u32,
@ -482,21 +479,6 @@ impl SelectedMailbox {
(mailbox_size, deletions)
}
pub async fn get_saved_search(&self) -> Option<Arc<Vec<ImapId>>> {
let mut rx = match &*self.saved_search.lock() {
SavedSearch::InFlight { rx } => rx.clone(),
SavedSearch::Results { items } => {
return Some(items.clone());
}
SavedSearch::None => {
return None;
}
};
rx.changed().await.ok();
let v = rx.borrow();
Some(v.clone())
}
}
impl Serialize for &UidMap {

View file

@ -45,6 +45,7 @@ pub struct IMAP {
pub timeout_auth: Duration,
pub timeout_unauth: Duration,
pub timeout_idle: Duration,
pub greeting_plain: Vec<u8>,
pub greeting_tls: Vec<u8>,

View file

@ -28,6 +28,7 @@ impl IMAP {
.to_string(),
timeout_auth: config.property_or_static("imap.timeout.authenticated", "30m")?,
timeout_unauth: config.property_or_static("imap.timeout.anonymous", "1m")?,
timeout_idle: config.property_or_static("imap.timeout.idle", "30m")?,
greeting_plain: StatusResponse::ok(SERVER_GREETING)
.with_code(ResponseCode::Capability {
capabilities: Capability::all_capabilities(false, false),

285
crates/imap/src/op/idle.rs Normal file
View file

@ -0,0 +1,285 @@
/*
* Copyright (c) 2020-2022, Stalwart Labs Ltd.
*
* This file is part of the Stalwart IMAP Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::sync::Arc;
use ahash::AHashSet;
use imap_proto::{
protocol::{
fetch,
list::{Attribute, ListItem},
status::Status,
Sequence,
},
receiver::Request,
Command, ResponseCode, StatusResponse,
};
use jmap_proto::types::{collection::Collection, type_state::TypeState};
use store::query::log::Query;
use tokio::io::{AsyncRead, AsyncReadExt};
use utils::map::bitmap::Bitmap;
use crate::core::{SelectedMailbox, Session, SessionData, State};
impl<T: AsyncRead> Session<T> {
pub async fn handle_idle(&mut self, request: Request<Command>) -> crate::OpResult {
let (data, mailbox, types) = match &self.state {
State::Authenticated { data } => {
(data.clone(), None, Bitmap::from_iter([TypeState::Mailbox]))
}
State::Selected { data, mailbox, .. } => (
data.clone(),
mailbox.clone().into(),
Bitmap::from_iter([
TypeState::Email,
TypeState::Mailbox,
TypeState::EmailDelivery,
]),
),
_ => unreachable!(),
};
let is_rev2 = self.version.is_rev2();
let is_qresync = self.is_qresync;
// Register with state manager
let mut change_rx = if let Some(change_rx) = self
.jmap
.subscribe_state_manager(data.account_id, data.account_id, types)
.await
{
change_rx
} else {
return self
.write_bytes(
StatusResponse::no("It was not possible to start IDLE.")
.with_tag(request.tag)
.with_code(ResponseCode::ContactAdmin)
.into_bytes(),
)
.await;
};
// Send continuation response
self.write_bytes(b"+ Idling, send 'DONE' to stop.\r\n".to_vec())
.await?;
tracing::debug!(parent: &self.span, event = "stat", context = "idle", "Starting IDLE.");
let mut buf = vec![0; 1024];
loop {
tokio::select! {
result = tokio::time::timeout(self.imap.timeout_idle, self.stream_rx.read(&mut buf)) => {
match result {
Ok(Ok(bytes_read)) => {
if bytes_read > 0 {
if (&buf[..bytes_read]).windows(4).any(|w| w == b"DONE") {
tracing::debug!(parent: &self.span, event = "stop", context = "idle", "Stopping IDLE.");
return self.write_bytes(StatusResponse::completed(Command::Idle)
.with_tag(request.tag)
.into_bytes()).await;
}
} else {
tracing::debug!(parent: &self.span, event = "close", "IMAP connection closed by client.");
return Err(());
}
},
Ok(Err(err)) => {
tracing::debug!(parent: &self.span, event = "error", reason = %err, "IMAP connection error.");
return Err(());
},
Err(_) => {
self.write_bytes(&b"* BYE IDLE timed out.\r\n"[..]).await.ok();
tracing::debug!(parent: &self.span, "IDLE timed out.");
return Err(());
}
}
}
state_change = change_rx.recv() => {
if let Some(state_change) = state_change {
let mut has_mailbox_changes = false;
let mut has_email_changes = false;
for (type_state, _) in state_change.types {
match type_state {
TypeState::Email | TypeState::EmailDelivery => {
has_email_changes = true;
}
TypeState::Mailbox => {
has_mailbox_changes = true;
}
_ => {}
}
}
if has_mailbox_changes || has_email_changes {
data.write_changes(&mailbox, has_mailbox_changes, has_email_changes, is_qresync, is_rev2).await;
}
} else {
self.write_bytes(&b"* BYE Server shutting down.\r\n"[..]).await.ok();
tracing::debug!(parent: &self.span, "IDLE channel closed.");
return Err(());
}
}
}
}
}
}
impl SessionData {
pub async fn write_changes(
&self,
mailbox: &Option<Arc<SelectedMailbox>>,
check_mailboxes: bool,
check_emails: bool,
is_qresync: bool,
is_rev2: bool,
) {
// Fetch all changed mailboxes
if check_mailboxes {
match self.synchronize_mailboxes(true).await {
Ok(Some(changes)) => {
let mut buf = Vec::with_capacity(64);
// List deleted mailboxes
for mailbox_name in changes.deleted {
ListItem {
mailbox_name,
attributes: vec![Attribute::NonExistent],
tags: vec![],
}
.serialize(&mut buf, is_rev2, false);
}
// List added mailboxes
for mailbox_name in changes.added {
ListItem {
mailbox_name: mailbox_name.to_string(),
attributes: vec![],
tags: vec![],
}
.serialize(&mut buf, is_rev2, false);
}
// Obtain status of changed mailboxes
for mailbox_name in changes.changed {
if let Ok(status) = self
.status(
mailbox_name,
&[
Status::Messages,
Status::Unseen,
Status::UidNext,
Status::UidValidity,
],
)
.await
{
status.serialize(&mut buf, is_rev2);
}
}
if !buf.is_empty() {
self.write_bytes(buf).await;
}
}
Err(_) => {
tracing::debug!(parent: &self.span, "Failed to refresh mailboxes.");
}
_ => unreachable!(),
}
}
// Fetch selected mailbox changes
if check_emails {
// Synchronize emails
if let Some(mailbox) = mailbox {
// Obtain changes since last sync
let last_state = mailbox.state.lock().last_state;
match self
.synchronize_messages(mailbox, is_qresync, is_qresync)
.await
{
Ok(new_state) => {
if new_state == last_state {
return;
}
}
Err(response) => {
self.write_bytes(response.into_bytes()).await;
return;
}
}
// Obtain changed messages
let changed_ids = match self
.jmap
.changes_(
mailbox.id.account_id,
Collection::Email,
last_state.map(Query::Since).unwrap_or(Query::All),
)
.await
{
Ok(changelog) => {
let state = mailbox.state.lock();
changelog
.changes
.into_iter()
.filter_map(|change| {
state
.id_to_imap
.get(&((change.unwrap_id() & u32::MAX as u64) as u32))
.map(|id| id.uid)
})
.collect::<AHashSet<_>>()
}
Err(_) => {
self.write_bytes(StatusResponse::database_failure().into_bytes())
.await;
return;
}
};
if !changed_ids.is_empty() {
self.fetch(
fetch::Arguments {
tag: String::new(),
sequence_set: Sequence::List {
items: changed_ids
.into_iter()
.map(|uid| Sequence::Number { value: uid })
.collect(),
},
attributes: vec![fetch::Attribute::Flags, fetch::Attribute::Uid],
changed_since: None,
include_vanished: false,
},
mailbox.clone(),
true,
is_qresync,
false,
)
.await;
}
}
}
}
}

View file

@ -10,20 +10,28 @@ pub mod delete;
pub mod enable;
pub mod expunge;
pub mod fetch;
pub mod idle;
pub mod list;
pub mod login;
pub mod logout;
pub mod namespace;
pub mod noop;
pub mod rename;
pub mod search;
pub mod select;
pub mod status;
pub mod store;
pub mod subscribe;
pub mod thread;
trait FromModSeq {
fn from_modseq(modseq: u64) -> Self;
}
trait ToModSeq {
fn to_modseq(&self) -> u64;
}
impl FromModSeq for Query {
fn from_modseq(modseq: u64) -> Self {
if modseq > 0 {
@ -34,4 +42,10 @@ impl FromModSeq for Query {
}
}
impl ToModSeq for Option<u64> {
fn to_modseq(&self) -> u64 {
self.map(|modseq| modseq + 1).unwrap_or(0)
}
}
pub type Result<T> = std::result::Result<T, StatusResponse>;

View file

@ -0,0 +1,65 @@
/*
* Copyright (c) 2020-2022, Stalwart Labs Ltd.
*
* This file is part of the Stalwart IMAP Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use imap_proto::{receiver::Request, Command, StatusResponse};
use tokio::io::AsyncRead;
use crate::core::{Session, State};
impl<T: AsyncRead> Session<T> {
pub async fn handle_noop(
&mut self,
request: Request<Command>,
is_check: bool,
) -> Result<(), ()> {
match &self.state {
State::Authenticated { data } => {
data.write_changes(&None, true, false, self.is_qresync, self.version.is_rev2())
.await;
}
State::Selected { data, mailbox, .. } => {
data.write_changes(
&Some(mailbox.clone()),
true,
true,
self.is_qresync,
self.version.is_rev2(),
)
.await;
}
_ => (),
}
self.write_bytes(
StatusResponse::completed(if !is_check {
Command::Noop
} else {
Command::Check
})
.with_tag(request.tag)
.into_bytes(),
)
.await
}
}

View file

@ -0,0 +1,744 @@
/*
* Copyright (c) 2020-2022, Stalwart Labs Ltd.
*
* This file is part of the Stalwart IMAP Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::sync::Arc;
use imap_proto::{
protocol::{
search::{self, Arguments, Filter, Response, ResultOption},
Sequence,
},
receiver::Request,
Command, StatusResponse,
};
use jmap_proto::types::{collection::Collection, id::Id, keyword::Keyword, property::Property};
use mail_parser::{HeaderName, RfcHeader};
use store::{
fts::{builder::MAX_TOKEN_LENGTH, Language},
query::{self, log::Query, sort::Pagination, ResultSet},
roaring::RoaringBitmap,
write::now,
};
use tokio::{io::AsyncRead, sync::watch};
use crate::core::{ImapId, SavedSearch, SelectedMailbox, Session, SessionData};
use super::{FromModSeq, ToModSeq};
impl<T: AsyncRead> Session<T> {
pub async fn handle_search(
&mut self,
request: Request<Command>,
is_sort: bool,
is_uid: bool,
) -> Result<(), ()> {
match if !is_sort {
request.parse_search(self.version)
} else {
request.parse_sort()
} {
Ok(mut arguments) => {
let (data, mailbox) = self.state.mailbox_state();
// Create channel for results
let (results_tx, prev_saved_search) =
if arguments.result_options.contains(&ResultOption::Save) {
let prev_saved_search = Some(mailbox.get_saved_search().await);
let (tx, rx) = watch::channel(Arc::new(Vec::new()));
*mailbox.saved_search.lock() = SavedSearch::InFlight { rx };
(tx.into(), prev_saved_search)
} else {
(None, None)
};
let is_qresync = self.is_qresync;
tokio::spawn(async move {
let tag = std::mem::take(&mut arguments.tag);
let bytes = match data
.search(
arguments,
mailbox.clone(),
results_tx,
prev_saved_search.clone(),
is_qresync,
is_uid,
)
.await
{
Ok(response) => {
let response = response.serialize(&tag);
StatusResponse::completed(if !is_sort {
Command::Search(is_uid)
} else {
Command::Sort(is_uid)
})
.with_tag(tag)
.serialize(response)
}
Err(response) => {
if let Some(prev_saved_search) = prev_saved_search {
*mailbox.saved_search.lock() = prev_saved_search
.map_or(SavedSearch::None, |s| SavedSearch::Results {
items: s,
});
}
response.with_tag(tag).into_bytes()
}
};
data.write_bytes(bytes).await;
});
Ok(())
}
Err(response) => self.write_bytes(response.into_bytes()).await,
}
}
}
impl SessionData {
pub async fn search(
&self,
arguments: Arguments,
mailbox: Arc<SelectedMailbox>,
results_tx: Option<watch::Sender<Arc<Vec<ImapId>>>>,
prev_saved_search: Option<Option<Arc<Vec<ImapId>>>>,
is_qresync: bool,
is_uid: bool,
) -> Result<search::Response, StatusResponse> {
// Run query
let (result_set, include_highest_modseq) = self
.query(arguments.filter, &mailbox, &prev_saved_search, is_uid)
.await?;
// Obtain modseq
let mut highest_modseq = None;
if is_uid {
let modseq = self
.synchronize_messages(&mailbox, is_qresync, true)
.await?;
if include_highest_modseq {
highest_modseq = modseq.to_modseq().into();
}
} else if include_highest_modseq {
// Don't synchronize if we're not using UIDs as seqnums might change.
highest_modseq = self
.get_modseq(mailbox.id.account_id)
.await?
.to_modseq()
.into();
}
// Sort and map ids
let mut min: Option<(u32, ImapId)> = None;
let mut max: Option<(u32, ImapId)> = None;
let mut total = 0;
let results_len = result_set.results.len() as usize;
let mut saved_results = if results_tx.is_some() {
Some(Vec::with_capacity(results_len))
} else {
None
};
let mut imap_ids = Vec::with_capacity(results_len);
let is_sort = if let Some(sort) = arguments.sort {
mailbox.map_search_results(
self.jmap
.store
.sort(
result_set,
sort.into_iter()
.map(|item| match item.sort {
search::Sort::Arrival => {
query::Comparator::field(Property::ReceivedAt, item.ascending)
}
search::Sort::Cc => {
query::Comparator::field(Property::Cc, item.ascending)
}
search::Sort::Date => {
query::Comparator::field(Property::SentAt, item.ascending)
}
search::Sort::From | search::Sort::DisplayFrom => {
query::Comparator::field(Property::From, item.ascending)
}
search::Sort::Size => {
query::Comparator::field(Property::Size, item.ascending)
}
search::Sort::Subject => {
query::Comparator::field(Property::Subject, item.ascending)
}
search::Sort::To | search::Sort::DisplayTo => {
query::Comparator::field(Property::To, item.ascending)
}
})
.collect::<Vec<_>>(),
Pagination::new(results_len, 0, None, 0),
)
.await
.map_err(|_| StatusResponse::database_failure())?
.ids
.into_iter()
.map(|id| id as u32),
is_uid,
arguments.result_options.contains(&ResultOption::Min),
arguments.result_options.contains(&ResultOption::Max),
&mut min,
&mut max,
&mut total,
&mut imap_ids,
&mut saved_results,
);
true
} else {
mailbox.map_search_results(
result_set.results.into_iter(),
is_uid,
arguments.result_options.contains(&ResultOption::Min),
arguments.result_options.contains(&ResultOption::Max),
&mut min,
&mut max,
&mut total,
&mut imap_ids,
&mut saved_results,
);
imap_ids.sort_unstable();
false
};
// Save results
if let (Some(results_tx), Some(saved_results)) = (results_tx, saved_results) {
let saved_results = Arc::new(saved_results.clone());
*mailbox.saved_search.lock() = SavedSearch::Results {
items: saved_results.clone(),
};
results_tx.send(saved_results).ok();
}
// Build response
Ok(Response {
is_uid,
min: min.map(|(id, _)| id),
max: max.map(|(id, _)| id),
count: if arguments.result_options.contains(&ResultOption::Count) {
Some(total)
} else {
None
},
ids: if arguments.result_options.is_empty()
|| arguments.result_options.contains(&ResultOption::All)
{
imap_ids
} else {
vec![]
},
is_sort,
is_esearch: arguments.is_esearch,
highest_modseq,
})
}
pub async fn query(
&self,
imap_filter: Vec<Filter>,
mailbox: &SelectedMailbox,
prev_saved_search: &Option<Option<Arc<Vec<ImapId>>>>,
is_uid: bool,
) -> Result<(ResultSet, bool), StatusResponse> {
// Obtain message ids
let mut filters = Vec::with_capacity(imap_filter.len() + 1);
let message_ids = if let Some(mailbox_id) = mailbox.id.mailbox_id {
let ids = self
.jmap
.get_tag(
mailbox.id.account_id,
Collection::Email,
Property::MailboxIds,
mailbox_id,
)
.await?
.unwrap_or_default();
filters.push(query::Filter::is_in_set(ids.clone()));
ids
} else {
self.jmap
.get_document_ids(mailbox.id.account_id, Collection::Email)
.await?
.unwrap_or_default()
};
// Convert query
let mut include_highest_modseq = false;
for filter in imap_filter {
match filter {
search::Filter::Sequence(sequence, uid_filter) => {
let mut set = RoaringBitmap::new();
if let (Sequence::SavedSearch, Some(prev_saved_search)) =
(&sequence, &prev_saved_search)
{
if let Some(prev_saved_search) = prev_saved_search {
for imap_id in prev_saved_search.iter() {
if let Some(id) = mailbox.state.lock().uid_to_id.get(&imap_id.uid) {
set.insert(*id);
}
}
} else {
return Err(StatusResponse::no("No saved search found."));
}
} else {
for id in mailbox
.sequence_to_ids(&sequence, is_uid || uid_filter)
.await?
.keys()
{
set.insert(*id);
}
}
filters.push(query::Filter::is_in_set(set));
}
search::Filter::All => {
filters.push(query::Filter::is_in_set(message_ids.clone()));
}
search::Filter::Answered => {
filters.push(query::Filter::is_in_bitmap(
Property::Keywords,
Keyword::Answered,
));
}
search::Filter::Bcc(text) => {
filters.push(query::Filter::has_text(Property::Bcc, text, Language::None));
}
search::Filter::Before(date) => {
filters.push(query::Filter::lt(Property::ReceivedAt, date as u64));
}
search::Filter::Body(text) => {
filters.push(query::Filter::has_text_detect(
Property::TextBody,
text,
self.jmap.config.default_language,
));
}
search::Filter::Cc(text) => {
filters.push(query::Filter::has_text(Property::Cc, text, Language::None));
}
search::Filter::Deleted => {
filters.push(query::Filter::is_in_bitmap(
Property::Keywords,
Keyword::Deleted,
));
}
search::Filter::Draft => {
filters.push(query::Filter::is_in_bitmap(
Property::Keywords,
Keyword::Draft,
));
}
search::Filter::Flagged => {
filters.push(query::Filter::is_in_bitmap(
Property::Keywords,
Keyword::Flagged,
));
}
search::Filter::From(text) => {
filters.push(query::Filter::has_text(
Property::From,
text,
Language::None,
));
}
search::Filter::Header(header, value) => {
if let Some(HeaderName::Rfc(header_name)) = HeaderName::parse(&header) {
let is_id = matches!(
header_name,
RfcHeader::MessageId
| RfcHeader::InReplyTo
| RfcHeader::References
| RfcHeader::ResentMessageId
);
let tokens = if !value.is_empty() {
let header_num = u8::from(header_name).to_string();
value
.split_ascii_whitespace()
.filter_map(|token| {
if token.len() < MAX_TOKEN_LENGTH {
if is_id {
format!("{header_num}{token}")
} else {
format!("{header_num}{}", token.to_lowercase())
}
.into()
} else {
None
}
})
.collect::<Vec<_>>()
} else {
vec![]
};
match tokens.len() {
0 => {
filters.push(query::Filter::has_raw_text(
Property::Headers,
u8::from(header_name).to_string(),
));
}
1 => {
filters.push(query::Filter::has_raw_text(
Property::Headers,
tokens.into_iter().next().unwrap(),
));
}
_ => {
filters.push(query::Filter::And);
for token in tokens {
filters.push(query::Filter::has_raw_text(
Property::Headers,
token,
));
}
filters.push(query::Filter::End);
}
}
} else {
return Err(StatusResponse::no(format!(
"Querying non-RFC header '{header}' is not allowed.",
)));
};
}
search::Filter::Keyword(keyword) => {
filters.push(query::Filter::is_in_bitmap(
Property::Keywords,
Keyword::from(keyword),
));
}
search::Filter::Larger(size) => {
filters.push(query::Filter::gt(Property::Size, size));
}
search::Filter::On(date) => {
filters.push(query::Filter::And);
filters.push(query::Filter::gt(Property::ReceivedAt, date as u64));
filters.push(query::Filter::lt(
Property::ReceivedAt,
(date + 86400) as u64,
));
filters.push(query::Filter::End);
}
search::Filter::Seen => {
filters.push(query::Filter::is_in_bitmap(
Property::Keywords,
Keyword::Seen,
));
}
search::Filter::SentBefore(date) => {
filters.push(query::Filter::lt(Property::SentAt, date as u64));
}
search::Filter::SentOn(date) => {
filters.push(query::Filter::And);
filters.push(query::Filter::gt(Property::SentAt, date as u64));
filters.push(query::Filter::lt(Property::SentAt, (date + 86400) as u64));
filters.push(query::Filter::End);
}
search::Filter::SentSince(date) => {
filters.push(query::Filter::gt(Property::SentAt, date as u64));
}
search::Filter::Since(date) => {
filters.push(query::Filter::gt(Property::ReceivedAt, date as u64));
}
search::Filter::Smaller(size) => {
filters.push(query::Filter::lt(Property::Size, size));
}
search::Filter::Subject(text) => {
filters.push(query::Filter::has_text_detect(
Property::Subject,
text,
self.jmap.config.default_language,
));
}
search::Filter::Text(text) => {
filters.push(query::Filter::Or);
filters.push(query::Filter::has_text(
Property::From,
&text,
Language::None,
));
filters.push(query::Filter::has_text(Property::To, &text, Language::None));
filters.push(query::Filter::has_text(Property::Cc, &text, Language::None));
filters.push(query::Filter::has_text(
Property::Bcc,
&text,
Language::None,
));
filters.push(query::Filter::has_text_detect(
Property::Subject,
&text,
self.jmap.config.default_language,
));
filters.push(query::Filter::has_text_detect(
Property::TextBody,
&text,
self.jmap.config.default_language,
));
filters.push(query::Filter::has_text_detect(
Property::Attachments,
text,
self.jmap.config.default_language,
));
filters.push(query::Filter::End);
}
search::Filter::To(text) => {
filters.push(query::Filter::has_text(Property::To, text, Language::None));
}
search::Filter::Unanswered => {
filters.push(query::Filter::Not);
filters.push(query::Filter::is_in_bitmap(
Property::Keywords,
Keyword::Answered,
));
filters.push(query::Filter::End);
}
search::Filter::Undeleted => {
filters.push(query::Filter::Not);
filters.push(query::Filter::is_in_bitmap(
Property::Keywords,
Keyword::Deleted,
));
filters.push(query::Filter::End);
}
search::Filter::Undraft => {
filters.push(query::Filter::Not);
filters.push(query::Filter::is_in_bitmap(
Property::Keywords,
Keyword::Draft,
));
filters.push(query::Filter::End);
}
search::Filter::Unflagged => {
filters.push(query::Filter::Not);
filters.push(query::Filter::is_in_bitmap(
Property::Keywords,
Keyword::Flagged,
));
filters.push(query::Filter::End);
}
search::Filter::Unkeyword(keyword) => {
filters.push(query::Filter::Not);
filters.push(query::Filter::is_in_bitmap(
Property::Keywords,
Keyword::from(keyword),
));
filters.push(query::Filter::End);
}
search::Filter::Unseen => {
filters.push(query::Filter::Not);
filters.push(query::Filter::is_in_bitmap(
Property::Keywords,
Keyword::Seen,
));
filters.push(query::Filter::End);
}
search::Filter::And => {
filters.push(query::Filter::And);
}
search::Filter::Or => {
filters.push(query::Filter::Or);
}
search::Filter::Not => {
filters.push(query::Filter::Not);
}
search::Filter::End => {
filters.push(query::Filter::End);
}
search::Filter::Recent => {
filters.push(query::Filter::is_in_bitmap(
Property::Keywords,
Keyword::Recent,
));
}
search::Filter::New => {
filters.push(query::Filter::And);
filters.push(query::Filter::is_in_bitmap(
Property::Keywords,
Keyword::Recent,
));
filters.push(query::Filter::Not);
filters.push(query::Filter::is_in_bitmap(
Property::Keywords,
Keyword::Seen,
));
filters.push(query::Filter::End);
filters.push(query::Filter::End);
}
search::Filter::Old => {
filters.push(query::Filter::Not);
filters.push(query::Filter::is_in_bitmap(
Property::Keywords,
Keyword::Seen,
));
filters.push(query::Filter::End);
}
search::Filter::Older(secs) => {
filters.push(query::Filter::le(
Property::ReceivedAt,
now().saturating_sub(secs as u64),
));
}
search::Filter::Younger(secs) => {
filters.push(query::Filter::ge(
Property::ReceivedAt,
now().saturating_sub(secs as u64),
));
}
search::Filter::ModSeq((modseq, _)) => {
let mut set = RoaringBitmap::new();
for change in self
.jmap
.changes_(
mailbox.id.account_id,
Collection::Email,
Query::from_modseq(modseq),
)
.await?
.changes
{
let id = (change.unwrap_id() & u32::MAX as u64) as u32;
if message_ids.contains(id) {
set.insert(id);
}
}
filters.push(query::Filter::is_in_set(set));
include_highest_modseq = true;
}
search::Filter::EmailId(id) => {
if let Some(id) = Id::from_bytes(id.as_bytes()) {
filters.push(query::Filter::is_in_set(
RoaringBitmap::from_sorted_iter([id.document_id()]).unwrap(),
));
} else {
return Err(StatusResponse::no(format!(
"Failed to parse email id '{id}'.",
)));
}
}
search::Filter::ThreadId(id) => {
if let Some(id) = Id::from_bytes(id.as_bytes()) {
filters.push(query::Filter::is_in_bitmap(
Property::ThreadId,
id.document_id(),
));
} else {
return Err(StatusResponse::no(format!(
"Failed to parse thread id '{id}'.",
)));
}
}
}
}
// Run query
self.jmap
.filter(mailbox.id.account_id, Collection::Email, filters)
.await
.map(|res| (res, include_highest_modseq))
.map_err(|err| err.into())
}
}
impl SelectedMailbox {
pub async fn get_saved_search(&self) -> Option<Arc<Vec<ImapId>>> {
let mut rx = match &*self.saved_search.lock() {
SavedSearch::InFlight { rx } => rx.clone(),
SavedSearch::Results { items } => {
return Some(items.clone());
}
SavedSearch::None => {
return None;
}
};
rx.changed().await.ok();
let v = rx.borrow();
Some(v.clone())
}
pub fn map_search_results(
&self,
ids: impl Iterator<Item = u32>,
is_uid: bool,
find_min: bool,
find_max: bool,
min: &mut Option<(u32, ImapId)>,
max: &mut Option<(u32, ImapId)>,
total: &mut u32,
imap_ids: &mut Vec<u32>,
saved_results: &mut Option<Vec<ImapId>>,
) {
let state = self.state.lock();
let find_min_or_max = find_min || find_max;
for id in ids {
if let Some(imap_id) = state.id_to_imap.get(&(id as u32)) {
let id = if is_uid { imap_id.uid } else { imap_id.seqnum };
if find_min_or_max {
if find_min {
if let Some((prev_min, _)) = min {
if id < *prev_min {
*min = Some((id, *imap_id));
}
} else {
*min = Some((id, *imap_id));
}
}
if find_max {
if let Some((prev_max, _)) = max {
if id > *prev_max {
*max = Some((id, *imap_id));
}
} else {
*max = Some((id, *imap_id));
}
}
} else {
imap_ids.push(id);
saved_results.as_mut().map(|r| r.push(*imap_id));
}
*total += 1;
}
}
if find_min || find_max {
for (id, imap_id) in [min, max].into_iter().flatten() {
imap_ids.push(*id);
saved_results.as_mut().map(|r| r.push(*imap_id));
}
}
}
}
impl SavedSearch {
pub async fn unwrap(&self) -> Option<Arc<Vec<ImapId>>> {
match self {
SavedSearch::InFlight { rx } => {
let mut rx = rx.clone();
rx.changed().await.ok();
let v = rx.borrow();
Some(v.clone())
}
SavedSearch::Results { items } => Some(items.clone()),
SavedSearch::None => None,
}
}
}

View file

@ -34,6 +34,8 @@ use tokio::io::AsyncRead;
use crate::core::{SavedSearch, SelectedMailbox, Session, State};
use super::ToModSeq;
impl<T: AsyncRead> Session<T> {
pub async fn handle_select(&mut self, request: Request<Command>) -> crate::OpResult {
let is_select = request.command == Command::Select;
@ -60,7 +62,11 @@ impl<T: AsyncRead> Session<T> {
let uid_validity = state.uid_validity;
let uid_next = state.uid_next;
let total_messages = state.total_messages;
let highest_modseq = state.last_state;
let highest_modseq = if is_condstore {
state.last_state.to_modseq().into()
} else {
None
};
let mailbox = Arc::new(SelectedMailbox {
id: mailbox,
state: parking_lot::Mutex::new(state),
@ -111,7 +117,7 @@ impl<T: AsyncRead> Session<T> {
uid_next,
closed_previous,
is_rev2: self.version.is_rev2(),
highest_modseq: highest_modseq.map(|id| id + 1),
highest_modseq,
mailbox_id: Id::from_parts(
mailbox.id.account_id,
mailbox.id.mailbox_id.unwrap_or(u32::MAX),

View file

@ -36,6 +36,8 @@ use tokio::io::AsyncRead;
use crate::core::{Mailbox, Session, SessionData};
use super::ToModSeq;
impl<T: AsyncRead> Session<T> {
pub async fn handle_status(&mut self, request: Request<Command>) -> crate::OpResult {
match request.parse_status(self.version) {
@ -150,9 +152,7 @@ impl SessionData {
Status::HighestModSeq => {
items_response.push((
*item,
StatusItemType::Number(
account.state_email.map(|id| id + 1).unwrap_or(0),
),
StatusItemType::Number(account.state_email.to_modseq()),
));
}
Status::MailboxId => {

View file

@ -0,0 +1,130 @@
/*
* Copyright (c) 2020-2022, Stalwart Labs Ltd.
*
* This file is part of the Stalwart IMAP Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::sync::Arc;
use ahash::AHashMap;
use imap_proto::{
protocol::{
thread::{Arguments, Response},
ImapResponse,
},
receiver::Request,
Command, StatusResponse,
};
use jmap_proto::types::{collection::Collection, property::Property};
use store::ValueKey;
use tokio::io::AsyncRead;
use crate::core::{SelectedMailbox, Session, SessionData};
impl<T: AsyncRead> Session<T> {
pub async fn handle_thread(
&mut self,
mut request: Request<Command>,
is_uid: bool,
) -> Result<(), ()> {
let command = request.command;
let tag = std::mem::take(&mut request.tag);
match request.parse_thread() {
Ok(arguments) => {
let (data, mailbox) = self.state.mailbox_state();
tokio::spawn(async move {
let bytes = match data.thread(arguments, mailbox, is_uid).await {
Ok(response) => StatusResponse::completed(command)
.with_tag(tag)
.serialize(response.serialize()),
Err(response) => response.with_tag(tag).into_bytes(),
};
data.write_bytes(bytes).await;
});
Ok(())
}
Err(response) => self.write_bytes(response.into_bytes()).await,
}
}
}
impl SessionData {
pub async fn thread(
&self,
arguments: Arguments,
mailbox: Arc<SelectedMailbox>,
is_uid: bool,
) -> Result<Response, StatusResponse> {
// Run query
let (result_set, _) = self
.query(arguments.filter, &mailbox, &None, is_uid)
.await?;
// Obtain threadIds for matching messages
let thread_ids = self
.jmap
.store
.get_values::<u32>(
result_set
.results
.iter()
.map(|document_id| {
ValueKey::new(
mailbox.id.account_id,
Collection::Email,
document_id,
Property::ThreadId,
)
})
.collect(),
)
.await
.map_err(|err| {
tracing::error!(
event = "error",
context = "thread_query",
error = ?err,
"Failed to obtain threadIds.");
StatusResponse::database_failure()
})?;
// Group messages by thread
let mut threads: AHashMap<u32, Vec<u32>> = AHashMap::new();
let state = mailbox.state.lock();
for (document_id, thread_id) in result_set.results.into_iter().zip(thread_ids) {
if let (Some(thread_id), Some(imap_id)) =
(thread_id, state.id_to_imap.get(&document_id))
{
threads
.entry(thread_id)
.or_insert_with(|| Vec::new())
.push(if is_uid { imap_id.uid } else { imap_id.seqnum });
}
}
// Build response
Ok(Response {
is_uid,
threads: threads.into_iter().map(|(_, messages)| messages).collect(),
})
}
}

View file

@ -200,3 +200,23 @@ impl Changes {
Some(())
}
}
impl Change {
pub fn id(&self) -> u64 {
match self {
Change::Insert(id) => *id,
Change::Update(id) => *id,
Change::ChildUpdate(id) => *id,
Change::Delete(id) => *id,
}
}
pub fn unwrap_id(self) -> u64 {
match self {
Change::Insert(id) => id,
Change::Update(id) => id,
Change::ChildUpdate(id) => id,
Change::Delete(id) => id,
}
}
}