Multiple bugfixes found while running imap-test.

This commit is contained in:
mdecimus 2023-07-02 20:15:19 +02:00
parent 381cedb088
commit 048a65a019
27 changed files with 522 additions and 292 deletions

1
Cargo.lock generated
View file

@ -1742,6 +1742,7 @@ dependencies = [
"chrono",
"jmap_proto",
"mail-parser",
"tokio",
]
[[package]]

View file

@ -9,3 +9,6 @@ jmap_proto = { path = "../jmap-proto" }
mail-parser = { git = "https://github.com/stalwartlabs/mail-parser", features = ["full_encoding", "serde_support", "ludicrous_mode"] }
ahash = { version = "0.8" }
chrono = { version = "0.4"}
[dev-dependencies]
tokio = { version = "1.23", features = ["full"] }

View file

@ -164,6 +164,18 @@ mod tests {
}],
},
),
(
"A003 APPEND \"hi\" \"20-Nov-2022 23:59:59 +0300\" ~{1+}\r\na\r\n",
append::Arguments {
tag: "A003".to_string(),
mailbox_name: "hi".to_string(),
messages: vec![Message {
message: vec![b'a'],
flags: vec![],
received_at: Some(1668977999),
}],
},
),
] {
assert_eq!(
receiver

View file

@ -23,6 +23,8 @@
use std::borrow::Cow;
use mail_parser::DateTime;
use super::{
literal_string, quoted_rfc2822_or_nil, quoted_string, quoted_string_or_nil, quoted_timestamp,
Flag, ImapResponse, Sequence,
@ -176,7 +178,7 @@ pub enum BodyContents<'x> {
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct Envelope<'x> {
pub date: Option<i64>,
pub date: Option<DateTime>,
pub subject: Option<Cow<'x, str>>,
pub from: Vec<Address<'x>>,
pub sender: Vec<Address<'x>>,
@ -648,7 +650,7 @@ impl Section {
impl<'x> Envelope<'x> {
pub fn serialize(&self, buf: &mut Vec<u8>) {
buf.push(b'(');
quoted_rfc2822_or_nil(buf, self.date);
quoted_rfc2822_or_nil(buf, &self.date);
buf.push(b' ');
quoted_string_or_nil(buf, self.subject.as_deref());
self.serialize_addresses(buf, &self.from);
@ -914,6 +916,8 @@ impl<'x> ImapResponse for Response<'x> {
#[cfg(test)]
mod tests {
use mail_parser::DateTime;
use crate::protocol::{Flag, ImapResponse};
use super::{
@ -927,7 +931,7 @@ mod tests {
(
super::DataItem::Envelope {
envelope: Envelope {
date: 837570205.into(),
date: DateTime::from_timestamp(837570205).into(),
subject: Some("IMAP4rev2 WG mtg summary and minutes".into()),
from: vec![Address::Single(EmailAddress {
name: Some("Terry Gray".into()),
@ -975,7 +979,7 @@ mod tests {
(
super::DataItem::Envelope {
envelope: Envelope {
date: 837570205.into(),
date: DateTime::from_timestamp(837570205).into(),
subject: Some("Group test".into()),
from: vec![Address::Single(EmailAddress {
name: Some("Bill Foobar".into()),
@ -1049,7 +1053,7 @@ mod tests {
body_size_octets: 9323,
},
envelope: Box::new(Envelope {
date: 837570205.into(),
date: DateTime::from_timestamp(837570205).into(),
subject: Some("Hello world!".into()),
from: vec![Address::Single(EmailAddress {
name: Some("Terry Gray".into()),

View file

@ -208,20 +208,13 @@ pub fn quoted_timestamp(buf: &mut Vec<u8>, timestamp: i64) {
buf.push(b'"');
}
pub fn quoted_rfc2822(buf: &mut Vec<u8>, timestamp: i64) {
pub fn quoted_rfc2822(buf: &mut Vec<u8>, timestamp: &mail_parser::DateTime) {
buf.push(b'"');
buf.extend_from_slice(
DateTime::<Utc>::from_utc(
NaiveDateTime::from_timestamp_opt(timestamp, 0).unwrap_or_default(),
Utc,
)
.to_rfc2822()
.as_bytes(),
);
buf.extend_from_slice(timestamp.to_rfc822().as_bytes());
buf.push(b'"');
}
pub fn quoted_rfc2822_or_nil(buf: &mut Vec<u8>, timestamp: Option<i64>) {
pub fn quoted_rfc2822_or_nil(buf: &mut Vec<u8>, timestamp: &Option<mail_parser::DateTime>) {
if let Some(timestamp) = timestamp {
quoted_rfc2822(buf, timestamp);
} else {

View file

@ -234,8 +234,14 @@ impl<T: CommandParser> Receiver<T> {
self.push_argument(false)?;
self.state = State::ArgumentQuoted { escaped: false };
}
b'{' if last_ch.is_ascii_whitespace() => {
self.push_argument(false)?;
b'{' if last_ch.is_ascii_whitespace()
|| (last_ch == b'~' && self.buf.len() == 1) =>
{
if last_ch != b'~' {
self.push_argument(false)?;
} else {
self.buf.clear();
}
self.state = State::Literal { non_sync: false };
}
b'(' => {

View file

@ -36,10 +36,9 @@ use super::{SelectedMailbox, Session, SessionData, State, IMAP};
impl<T: AsyncRead> Session<T> {
pub async fn ingest(&mut self, bytes: &[u8]) -> crate::Result<bool> {
/*let tmp = "dd";
for line in String::from_utf8_lossy(bytes).split("\r\n") {
println!("<- {:?}", &line[..std::cmp::min(line.len(), 100)]);
}*/
let c = println!("<- {:?}", &line[..std::cmp::min(line.len(), 100)]);
}
tracing::trace!(parent: &self.span,
event = "read",
@ -52,7 +51,7 @@ impl<T: AsyncRead> Session<T> {
loop {
match self.receiver.parse(&mut bytes) {
Ok(request) => match request.is_allowed(&self.state, self.is_tls) {
Ok(request) => match self.is_allowed(request) {
Ok(request) => {
requests.push(request);
}
@ -222,12 +221,9 @@ pub fn group_requests(
grouped_requests
}
trait IsAllowed: Sized {
fn is_allowed(self, state: &State, is_tls: bool) -> Result<Self, StatusResponse>;
}
impl IsAllowed for Request<Command> {
fn is_allowed(self, state: &State, is_tls: bool) -> Result<Self, StatusResponse> {
impl<T: AsyncRead> Session<T> {
fn is_allowed(&self, request: Request<Command>) -> Result<Request<Command>, StatusResponse> {
let state = &self.state;
// Rate limit request
if let State::Authenticated { data } | State::Selected { data, .. } = state {
if !data
@ -238,39 +234,39 @@ impl IsAllowed for Request<Command> {
.is_allowed()
{
return Err(StatusResponse::no("Too many requests")
.with_tag(self.tag)
.with_tag(request.tag)
.with_code(ResponseCode::Limit));
}
}
match &self.command {
Command::Capability | Command::Noop | Command::Logout | Command::Id => Ok(self),
match &request.command {
Command::Capability | Command::Noop | Command::Logout | Command::Id => Ok(request),
Command::StartTls => {
if !is_tls {
Ok(self)
if !self.is_tls {
Ok(request)
} else {
Err(StatusResponse::no("Already in TLS mode.").with_tag(self.tag))
Err(StatusResponse::no("Already in TLS mode.").with_tag(request.tag))
}
}
Command::Authenticate => {
if let State::NotAuthenticated { .. } = state {
Ok(self)
Ok(request)
} else {
Err(StatusResponse::no("Already authenticated.").with_tag(self.tag))
Err(StatusResponse::no("Already authenticated.").with_tag(request.tag))
}
}
Command::Login => {
if let State::NotAuthenticated { .. } = state {
if is_tls {
Ok(self)
if self.is_tls || self.imap.allow_plain_auth {
Ok(request)
} else {
Err(
StatusResponse::no("LOGIN is disabled on the clear-text port.")
.with_tag(self.tag),
.with_tag(request.tag),
)
}
} else {
Err(StatusResponse::no("Already authenticated.").with_tag(self.tag))
Err(StatusResponse::no("Already authenticated.").with_tag(request.tag))
}
}
Command::Enable
@ -294,9 +290,9 @@ impl IsAllowed for Request<Command> {
| Command::MyRights
| Command::Unauthenticate => {
if let State::Authenticated { .. } | State::Selected { .. } = state {
Ok(self)
Ok(request)
} else {
Err(StatusResponse::no("Not authenticated.").with_tag(self.tag))
Err(StatusResponse::no("Not authenticated.").with_tag(request.tag))
}
}
Command::Close
@ -313,21 +309,21 @@ impl IsAllowed for Request<Command> {
State::Selected { mailbox, .. } => {
if mailbox.is_select
|| !matches!(
self.command,
request.command,
Command::Store(_) | Command::Expunge(_) | Command::Move(_),
)
{
Ok(self)
Ok(request)
} else {
Err(StatusResponse::no("Not permitted in EXAMINE state.")
.with_tag(self.tag))
.with_tag(request.tag))
}
}
State::Authenticated { .. } => {
Err(StatusResponse::bad("No mailbox is selected.").with_tag(self.tag))
Err(StatusResponse::bad("No mailbox is selected.").with_tag(request.tag))
}
State::NotAuthenticated { .. } => {
Err(StatusResponse::no("Not authenticated.").with_tag(self.tag))
Err(StatusResponse::no("Not authenticated.").with_tag(request.tag))
}
},
}

View file

@ -15,7 +15,7 @@ use utils::codec::leb128::{Leb128Iterator, Leb128Vec};
use crate::core::ImapId;
use super::{MailboxId, MailboxState, SelectedMailbox, SessionData};
use super::{MailboxId, MailboxState, NextMailboxState, SelectedMailbox, SessionData};
#[derive(Debug)]
struct UidMap {
@ -55,7 +55,7 @@ impl SessionData {
.await?;
// Obtain current state
let last_state = self
let modseq = self
.jmap
.store
.get_last_change_id(mailbox.account_id, Collection::Email)
@ -212,7 +212,8 @@ impl SessionData {
id_to_imap,
uid_to_id,
uid_max,
last_state,
modseq,
next_state: None,
});
} else {
let uid_next = (id_list.len() + 1) as u32;
@ -259,7 +260,8 @@ impl SessionData {
id_to_imap,
uid_to_id,
uid_max: uid_next.saturating_sub(1),
last_state,
modseq,
next_state: None,
});
}
}
@ -268,40 +270,85 @@ impl SessionData {
pub async fn synchronize_messages(
&self,
mailbox: &SelectedMailbox,
is_qresync: bool,
is_uid: bool,
) -> crate::op::Result<Option<u64>> {
// Obtain current modseq
let modseq = self.get_modseq(mailbox.id.account_id).await?;
if mailbox.state.lock().last_state == modseq {
return Ok(modseq);
}
if mailbox.state.lock().modseq != modseq {
// Synchronize messages
let new_state = self.fetch_messages(&mailbox.id).await?;
let mut current_state = mailbox.state.lock();
// Synchronize messages
let new_state = self.fetch_messages(&mailbox.id).await?;
// Add missing uids
let mut deletions = current_state
.next_state
.take()
.map(|state| state.deletions)
.unwrap_or_default();
let mut uid_to_id = std::mem::take(&mut current_state.uid_to_id);
for imap_id in current_state.id_to_imap.values_mut() {
if imap_id.uid != u32::MAX && !new_state.uid_to_id.contains_key(&imap_id.uid) {
// Add to deletions
deletions.push(*imap_id);
// Update UIDs
let mut buf = Vec::with_capacity(64);
let (new_message_count, deletions) = mailbox.update_mailbox_state(new_state, true);
if let Some(deletions) = deletions {
let mut ids = deletions
.into_iter()
.map(|id| {
if is_uid || is_qresync {
id.uid
} else {
id.seqnum
}
})
.collect::<Vec<u32>>();
ids.sort_unstable();
expunge::Response { is_qresync, ids }.serialize_to(&mut buf);
}
if let Some(new_message_count) = new_message_count {
Exists {
total_messages: new_message_count,
// Invalidate entries
uid_to_id.insert(imap_id.uid, u32::MAX);
imap_id.uid = u32::MAX;
}
}
current_state.uid_to_id = uid_to_id;
// Update state
current_state.modseq = new_state.modseq;
current_state.next_state = Some(Box::new(NextMailboxState {
next_state: new_state,
deletions,
}));
}
Ok(modseq)
}
pub async fn write_mailbox_changes(
&self,
mailbox: &SelectedMailbox,
is_qresync: bool,
is_uid: bool,
) -> crate::op::Result<Option<u64>> {
// Resync mailbox
let modseq = self.synchronize_messages(mailbox).await?;
let mut buf = Vec::new();
{
let mut current_state = mailbox.state.lock();
if let Some(next_state) = current_state.next_state.take() {
if !next_state.deletions.is_empty() {
let mut ids = next_state
.deletions
.into_iter()
.map(|id| {
if is_uid || is_qresync {
id.uid
} else {
id.seqnum
}
})
.collect::<Vec<u32>>();
ids.sort_unstable();
expunge::Response { is_qresync, ids }.serialize_to(&mut buf);
}
if !buf.is_empty()
|| next_state
.next_state
.uid_max
.saturating_sub(current_state.uid_max)
> 0
{
Exists {
total_messages: next_state.next_state.total_messages,
}
.serialize(&mut buf);
}
*current_state = next_state.next_state;
}
.serialize(&mut buf);
}
if !buf.is_empty() {
self.write_bytes(buf).await;
@ -344,17 +391,19 @@ impl SelectedMailbox {
return Ok(ids);
}
let max_uid = state.uid_max;
let max_seqnum = state.total_messages as u32;
for (id, imap_id) in &state.id_to_imap {
let matched = if is_uid {
sequence.contains(imap_id.uid, max_uid)
} else {
sequence.contains(imap_id.seqnum, max_seqnum)
};
if matched {
ids.insert(*id, *imap_id);
if is_uid {
for (id, imap_id) in &state.id_to_imap {
if imap_id.uid != u32::MAX && sequence.contains(imap_id.uid, state.uid_max) {
ids.insert(*id, *imap_id);
}
}
} else {
for (id, imap_id) in &state.id_to_imap {
if imap_id.uid != u32::MAX
&& sequence.contains(imap_id.seqnum, state.total_messages as u32)
{
ids.insert(*id, *imap_id);
}
}
}
@ -369,7 +418,9 @@ impl SelectedMailbox {
for imap_id in saved_ids.iter() {
if let Some(id) = state.uid_to_id.get(&imap_id.uid) {
ids.insert(*id, *imap_id);
if *id != u32::MAX {
ids.insert(*id, *imap_id);
}
}
}
@ -383,7 +434,7 @@ impl SelectedMailbox {
let state = self.state.lock();
if is_uid {
for uid in sequence.expand(state.uid_max) {
if !state.uid_to_id.contains_key(&uid) {
if state.uid_to_id.get(&uid).map_or(true, |id| *id == u32::MAX) {
deleted_ids.push(uid);
}
}
@ -397,7 +448,11 @@ impl SelectedMailbox {
} else if let Some(saved_ids) = self.get_saved_search().await {
let state = self.state.lock();
for id in saved_ids.iter() {
if !state.uid_to_id.contains_key(&id.uid) {
if state
.uid_to_id
.get(&id.uid)
.map_or(true, |id| *id == u32::MAX)
{
deleted_ids.push(if is_uid { id.uid } else { id.seqnum });
}
}
@ -405,77 +460,6 @@ impl SelectedMailbox {
deleted_ids.sort_unstable();
deleted_ids
}
pub fn id_to_uid(&self, ids: &[u32]) -> Vec<ImapId> {
let mut imap_ids = Vec::with_capacity(ids.len());
let state = self.state.lock();
for id in ids {
if let Some(imap_id) = state.id_to_imap.get(id) {
imap_ids.push(*imap_id);
}
}
imap_ids
}
pub fn uid_to_id(&self, imap_ids: &[ImapId]) -> Vec<u32> {
let mut ids = Vec::with_capacity(imap_ids.len());
let state = self.state.lock();
for imap_id in imap_ids {
if let Some(id) = state.uid_to_id.get(&imap_id.uid) {
ids.push(*id);
}
}
ids
}
pub fn is_in_sync(&self, ids: &[u32]) -> bool {
let state = self.state.lock();
for id in ids {
if !state.id_to_imap.contains_key(id) {
return false;
}
}
true
}
pub fn update_mailbox_state(
&self,
mailbox_state: MailboxState,
return_deleted: bool,
) -> (Option<usize>, Option<Vec<ImapId>>) {
let mut state = self.state.lock();
let mailbox_size = if mailbox_state.total_messages != state.total_messages {
mailbox_state.total_messages.into()
} else {
None
};
let deletions = if return_deleted {
let mut deletions = Vec::new();
for (id, imap_id) in &state.id_to_imap {
if !mailbox_state.id_to_imap.contains_key(id) {
deletions.push(*imap_id);
}
}
if !deletions.is_empty() {
Some(deletions)
} else {
None
}
} else {
None
};
*state = mailbox_state;
(mailbox_size, deletions)
}
}
impl Serialize for &UidMap {

View file

@ -51,6 +51,7 @@ pub struct IMAP {
pub max_auth_failures: u32,
pub name_shared: String,
pub name_all: String,
pub allow_plain_auth: bool,
pub timeout_auth: Duration,
pub timeout_unauth: Duration,
@ -137,7 +138,14 @@ pub struct MailboxState {
pub id_to_imap: AHashMap<u32, ImapId>,
pub uid_to_id: AHashMap<u32, u32>,
pub total_messages: usize,
pub last_state: Option<u64>,
pub modseq: Option<u64>,
pub next_state: Option<Box<NextMailboxState>>,
}
#[derive(Debug)]
pub struct NextMailboxState {
pub next_state: MailboxState,
pub deletions: Vec<ImapId>,
}
#[derive(Debug, Default)]

View file

@ -116,13 +116,14 @@ pub fn spawn_writer(mut stream: Event, span: tracing::Span) -> mpsc::Sender<Even
impl<T: AsyncRead> Session<T> {
pub async fn write_bytes(&self, bytes: impl Into<Cow<'static, [u8]>>) -> crate::OpResult {
/*let tmp = "dd";
println!(
let bytes = bytes.into();
let c = println!(
"-> {:?}",
String::from_utf8_lossy(&bytes[..std::cmp::min(bytes.len(), 100)])
);*/
);
if let Err(err) = self.writer.send(Event::Bytes(bytes.into())).await {
if let Err(err) = self.writer.send(Event::Bytes(bytes)).await {
debug!("Failed to send bytes: {}", err);
Err(())
} else {
@ -133,13 +134,13 @@ impl<T: AsyncRead> Session<T> {
impl SessionData {
pub async fn write_bytes(&self, bytes: impl Into<Cow<'static, [u8]>>) -> bool {
/*let tmp = "dd";
println!(
let bytes = bytes.into();
let c = println!(
"-> {:?}",
String::from_utf8_lossy(&bytes[..std::cmp::min(bytes.len(), 100)])
);*/
);
if let Err(err) = self.writer.send(Event::Bytes(bytes.into())).await {
if let Err(err) = self.writer.send(Event::Bytes(bytes)).await {
debug!("Failed to send bytes: {}", err);
false
} else {

View file

@ -51,6 +51,7 @@ impl IMAP {
),
rate_requests: config.property_or_static("imap.rate-limit.rate", "1000/1m")?,
rate_concurrent: config.property("imap.rate-limit.concurrent")?.unwrap_or(4),
allow_plain_auth: config.property_or_static("imap.auth.allow-plain-text", "false")?,
}))
}
}

View file

@ -182,7 +182,7 @@ impl SessionData {
if !created_ids.is_empty() {
let (uids, uid_validity) = match selected_mailbox {
Some(selected_mailbox) if selected_mailbox.id == mailbox => {
self.synchronize_messages(&selected_mailbox, is_qresync, true)
self.write_mailbox_changes(&selected_mailbox, is_qresync, is_qresync)
.await
.map_err(|r| r.with_tag(&arguments.tag))?;
let mailbox = selected_mailbox.state.lock();

View file

@ -428,7 +428,7 @@ impl SessionData {
// Resynchronize source mailbox on a successful move
if did_move {
self.synchronize_messages(&src_mailbox, is_qresync, is_uid)
self.write_mailbox_changes(&src_mailbox, is_qresync, is_uid)
.await
.map_err(|r| r.with_tag(&arguments.tag))?;
}

View file

@ -109,7 +109,7 @@ impl<T: AsyncRead> Session<T> {
// Synchronize messages
match data
.synchronize_messages(&mailbox, self.is_qresync, is_uid)
.write_mailbox_changes(&mailbox, self.is_qresync, is_uid)
.await
{
Ok(_) => {

View file

@ -113,14 +113,7 @@ impl SessionData {
// Resync messages if needed
let account_id = mailbox.id.account_id;
let mut modseq = match {
if is_uid {
self.synchronize_messages(&mailbox, is_qresync, true).await
} else {
// Don't synchronize if we're not using UIDs as seqnums might change.
self.get_modseq(mailbox.id.account_id).await
}
} {
let mut modseq = match self.synchronize_messages(&mailbox).await {
Ok(modseq) => modseq,
Err(response) => return response.with_tag(arguments.tag),
};
@ -1022,7 +1015,7 @@ impl<'x> AsImapDataItem<'x> for Message<'x> {
fn envelope(&self) -> Envelope {
Envelope {
date: self.date().map(|dt| dt.to_timestamp()),
date: self.date().map(|d| d.clone()),
subject: self.subject().map(|s| s.into()),
from: self
.header_values(RfcHeader::From)

View file

@ -212,13 +212,13 @@ impl SessionData {
// Synchronize emails
if let Some(mailbox) = mailbox {
// Obtain changes since last sync
let last_state = mailbox.state.lock().last_state;
let modseq = mailbox.state.lock().modseq;
match self
.synchronize_messages(mailbox, is_qresync, is_qresync)
.write_mailbox_changes(mailbox, is_qresync, is_qresync)
.await
{
Ok(new_state) => {
if new_state == last_state {
if new_state == modseq {
return;
}
}
@ -234,7 +234,7 @@ impl SessionData {
.changes_(
mailbox.id.account_id,
Collection::Email,
last_state.map(Query::Since).unwrap_or(Query::All),
modseq.map(Query::Since).unwrap_or(Query::All),
)
.await
{

View file

@ -42,7 +42,7 @@ use store::{
};
use tokio::{io::AsyncRead, sync::watch};
use crate::core::{ImapId, SavedSearch, SelectedMailbox, Session, SessionData};
use crate::core::{ImapId, MailboxState, SavedSearch, SelectedMailbox, Session, SessionData};
use super::{FromModSeq, ToModSeq};
@ -71,7 +71,6 @@ impl<T: AsyncRead> Session<T> {
} else {
(None, None)
};
let is_qresync = self.is_qresync;
tokio::spawn(async move {
let tag = std::mem::take(&mut arguments.tag);
@ -81,7 +80,6 @@ impl<T: AsyncRead> Session<T> {
mailbox.clone(),
results_tx,
prev_saved_search.clone(),
is_qresync,
is_uid,
)
.await
@ -122,7 +120,6 @@ impl SessionData {
mailbox: Arc<SelectedMailbox>,
results_tx: Option<watch::Sender<Arc<Vec<ImapId>>>>,
prev_saved_search: Option<Option<Arc<Vec<ImapId>>>>,
is_qresync: bool,
is_uid: bool,
) -> Result<search::Response, StatusResponse> {
// Run query
@ -131,22 +128,14 @@ impl SessionData {
.await?;
// Obtain modseq
let mut highest_modseq = None;
if is_uid {
let modseq = self
.synchronize_messages(&mailbox, is_qresync, true)
.await?;
if include_highest_modseq {
highest_modseq = modseq.to_modseq().into();
}
} else if include_highest_modseq {
// Don't synchronize if we're not using UIDs as seqnums might change.
highest_modseq = self
.get_modseq(mailbox.id.account_id)
let highest_modseq = if include_highest_modseq {
self.synchronize_messages(&mailbox)
.await?
.to_modseq()
.into();
}
.into()
} else {
None
};
// Sort and map ids
let mut min: Option<(u32, ImapId)> = None;
@ -294,9 +283,12 @@ impl SessionData {
(&sequence, &prev_saved_search)
{
if let Some(prev_saved_search) = prev_saved_search {
let state = mailbox.state.lock();
for imap_id in prev_saved_search.iter() {
if let Some(id) = mailbox.state.lock().uid_to_id.get(&imap_id.uid) {
set.insert(*id);
if let Some(id) = state.uid_to_id.get(&imap_id.uid) {
if *id != u32::MAX {
set.insert(*id);
}
}
}
} else {
@ -690,31 +682,30 @@ impl SelectedMailbox {
) {
let state = self.state.lock();
let find_min_or_max = find_min || find_max;
for id in ids {
if let Some(imap_id) = state.id_to_imap.get(&(id as u32)) {
let id = if is_uid { imap_id.uid } else { imap_id.seqnum };
for document_id in ids {
if let Some((id, imap_id)) = state.map_result_id(document_id, is_uid) {
if find_min_or_max {
if find_min {
if let Some((prev_min, _)) = min {
if id < *prev_min {
*min = Some((id, *imap_id));
*min = Some((id, imap_id));
}
} else {
*min = Some((id, *imap_id));
*min = Some((id, imap_id));
}
}
if find_max {
if let Some((prev_max, _)) = max {
if id > *prev_max {
*max = Some((id, *imap_id));
*max = Some((id, imap_id));
}
} else {
*max = Some((id, *imap_id));
*max = Some((id, imap_id));
}
}
} else {
imap_ids.push(id);
saved_results.as_mut().map(|r| r.push(*imap_id));
saved_results.as_mut().map(|r| r.push(imap_id));
}
*total += 1;
}
@ -728,6 +719,27 @@ impl SelectedMailbox {
}
}
impl MailboxState {
pub fn map_result_id(&self, document_id: u32, is_uid: bool) -> Option<(u32, ImapId)> {
if let Some(imap_id) = self.id_to_imap.get(&document_id) {
if imap_id.uid != u32::MAX {
return Some((if is_uid { imap_id.uid } else { imap_id.seqnum }, *imap_id));
}
}
if is_uid {
self.next_state.as_ref().and_then(|s| {
s.next_state
.id_to_imap
.get(&document_id)
.map(|imap_id| (imap_id.uid, *imap_id))
})
} else {
None
}
}
}
impl SavedSearch {
pub async fn unwrap(&self) -> Option<Arc<Vec<ImapId>>> {
match self {

View file

@ -63,7 +63,7 @@ impl<T: AsyncRead> Session<T> {
let uid_next = state.uid_next;
let total_messages = state.total_messages;
let highest_modseq = if is_condstore {
state.last_state.to_modseq().into()
state.modseq.to_modseq().into()
} else {
None
};

View file

@ -61,13 +61,9 @@ impl<T: AsyncRead> Session<T> {
Ok(arguments) => {
let (data, mailbox) = self.state.select_data();
let is_condstore = self.is_condstore || mailbox.is_condstore;
let is_qresync = self.is_qresync;
tokio::spawn(async move {
let bytes = match data
.store(arguments, mailbox, is_uid, is_condstore, is_qresync)
.await
{
let bytes = match data.store(arguments, mailbox, is_uid, is_condstore).await {
Ok(response) => response,
Err(response) => response.into_bytes(),
};
@ -87,16 +83,12 @@ impl SessionData {
mailbox: Arc<SelectedMailbox>,
is_uid: bool,
is_condstore: bool,
is_qresync: bool,
) -> Result<Vec<u8>, StatusResponse> {
// Resync messages if needed
let account_id = mailbox.id.account_id;
if is_uid {
// Don't synchronize if we're not using UIDs as seqnums might change.
self.synchronize_messages(&mailbox, is_qresync, true)
.await
.map_err(|r| r.with_tag(&arguments.tag))?;
}
self.synchronize_messages(&mailbox)
.await
.map_err(|r| r.with_tag(&arguments.tag))?;
// Convert IMAP ids to JMAP ids.
let mut ids = match mailbox

View file

@ -79,6 +79,11 @@ impl SessionData {
.query(arguments.filter, &mailbox, &None, is_uid)
.await?;
// Synchronize mailbox
if !result_set.results.is_empty() {
self.synchronize_messages(&mailbox).await?;
}
// Obtain threadIds for matching messages
let thread_ids = self
.jmap
@ -111,13 +116,13 @@ impl SessionData {
let mut threads: AHashMap<u32, Vec<u32>> = AHashMap::new();
let state = mailbox.state.lock();
for (document_id, thread_id) in result_set.results.into_iter().zip(thread_ids) {
if let (Some(thread_id), Some(imap_id)) =
(thread_id, state.id_to_imap.get(&document_id))
if let (Some(thread_id), Some((imap_id, _))) =
(thread_id, state.map_result_id(document_id, is_uid))
{
threads
.entry(thread_id)
.or_insert_with(|| Vec::new())
.push(if is_uid { imap_id.uid } else { imap_id.seqnum });
.push(imap_id);
}
}

View file

@ -630,8 +630,8 @@ impl IntoIndex for &u64 {
impl IntoIndex for &u32 {
fn into_index(self, index_as: IndexAs) -> Vec<u8> {
match index_as {
IndexAs::Integer => self.serialize(),
_ => unreachable!(),
IndexAs::Integer | IndexAs::IntegerList => self.serialize(),
_ => unreachable!("index as {index_as:?} not supported for u32"),
}
}
}
@ -641,7 +641,7 @@ impl IntoIndex for &Id {
match index_as {
IndexAs::Integer => self.document_id().serialize(),
IndexAs::LongInteger => self.id().serialize(),
_ => unreachable!(),
_ => unreachable!("index as {index_as:?} not supported for Id"),
}
}
}

View file

@ -1208,7 +1208,7 @@ impl JMAP {
event = "error",
context = "email_delete",
error = ?err,
"Failed to deserialize term index.");
"Failed to deserialize token index while deleting email.");
MethodError::ServerPartialFail
})?
{

View file

@ -703,63 +703,71 @@ impl Deserialize for TokenIndex {
impl TokenIndex {
fn from_bytes(bytes: &[u8]) -> Option<Self> {
let (num_tokens, mut pos) = bytes.read_leb128::<u32>()?;
let mut tokens = Vec::with_capacity(num_tokens as usize);
for _ in 0..num_tokens {
let nil_pos = bytes.get(pos..)?.iter().position(|b| b == &0)?;
tokens.push(String::from_utf8(bytes.get(pos..pos + nil_pos)?.to_vec()).ok()?);
pos += nil_pos + 1;
}
let mut terms = Vec::new();
while pos < bytes.len() {
let item_len =
u32::from_le_bytes(bytes.get(pos..pos + LENGTH_SIZE)?.try_into().ok()?) as usize;
pos += LENGTH_SIZE;
let mut field_terms = Terms {
field_id: *bytes.get(pos)?,
exact_terms: AHashSet::default(),
stemmed_terms: AHashSet::default(),
};
pos += 1;
let bytes_read = bytes.get(pos..)?.skip_leb128()?;
pos += bytes_read;
let (terms_len, bytes_read) = bytes.get(pos..)?.read_leb128::<usize>()?;
pos += bytes_read;
let mut term_pos = 0;
let mut byte_pos = pos;
while term_pos < terms_len {
let (bytes_read, chunk) = TermIndex::uncompress_chunk(
bytes.get(byte_pos..)?,
(terms_len - term_pos) * 2,
None,
)
.ok()?;
byte_pos += bytes_read;
for encoded_term in chunk.chunks_exact(2) {
let term_id = encoded_term[0];
let term_id_stemmed = encoded_term[1];
field_terms.exact_terms.insert(term_id);
if term_id != term_id_stemmed {
field_terms.stemmed_terms.insert(term_id_stemmed);
}
term_pos += 1;
}
if !bytes.is_empty() {
let (num_tokens, mut pos) = bytes.read_leb128::<u32>()?;
let mut tokens = Vec::with_capacity(num_tokens as usize);
for _ in 0..num_tokens {
let nil_pos = bytes.get(pos..)?.iter().position(|b| b == &0)?;
tokens.push(String::from_utf8(bytes.get(pos..pos + nil_pos)?.to_vec()).ok()?);
pos += nil_pos + 1;
}
terms.push(field_terms);
pos += item_len;
}
let mut terms = Vec::new();
while pos < bytes.len() {
let item_len =
u32::from_le_bytes(bytes.get(pos..pos + LENGTH_SIZE)?.try_into().ok()?)
as usize;
pos += LENGTH_SIZE;
Some(TokenIndex { tokens, terms })
let mut field_terms = Terms {
field_id: *bytes.get(pos)?,
exact_terms: AHashSet::default(),
stemmed_terms: AHashSet::default(),
};
pos += 1;
let bytes_read = bytes.get(pos..)?.skip_leb128()?;
pos += bytes_read;
let (terms_len, bytes_read) = bytes.get(pos..)?.read_leb128::<usize>()?;
pos += bytes_read;
let mut term_pos = 0;
let mut byte_pos = pos;
while term_pos < terms_len {
let (bytes_read, chunk) = TermIndex::uncompress_chunk(
bytes.get(byte_pos..)?,
(terms_len - term_pos) * 2,
None,
)
.ok()?;
byte_pos += bytes_read;
for encoded_term in chunk.chunks_exact(2) {
let term_id = encoded_term[0];
let term_id_stemmed = encoded_term[1];
field_terms.exact_terms.insert(term_id);
if term_id != term_id_stemmed {
field_terms.stemmed_terms.insert(term_id_stemmed);
}
term_pos += 1;
}
}
terms.push(field_terms);
pos += item_len;
}
Some(TokenIndex { tokens, terms })
} else {
Some(TokenIndex {
tokens: Vec::new(),
terms: Vec::new(),
})
}
}
}

View file

@ -114,7 +114,7 @@ pub fn enable_tracing(config: &Config) -> config::Result<Option<WorkerGuard>> {
"stdout" => {
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(env_filter)
//.with_env_filter(env_filter)
.finish(),
)
.failed("Failed to set subscriber");

View file

@ -0,0 +1,198 @@
[server]
hostname = "test.example.org"
[server.listener.jmap]
bind = ["127.0.0.1:9990"]
url = "https://127.0.0.1:8899"
protocol = "jmap"
max-connections = 8192
[server.listener.imap]
bind = ["127.0.0.1:9991"]
protocol = "imap"
max-connections = 8192
[server.listener.imaptls]
bind = ["127.0.0.1:9992"]
protocol = "imap"
max-connections = 8192
tls.implicit = true
[server.listener.sieve]
bind = ["127.0.0.1:9993"]
protocol = "managesieve"
max-connections = 8192
tls.implicit = true
[server.listener.smtps]
bind = ['127.0.0.1:9994']
greeting = 'Test SMTP instance'
protocol = 'smtp'
tls.implicit = true
[server.listener.smtp]
bind = ['127.0.0.1:9995']
greeting = 'Test SMTP instance'
protocol = 'smtp'
tls.implicit = false
[server.socket]
reuse-addr = true
[server.tls]
enable = true
implicit = false
certificate = "default"
[global.tracing]
method = "stdout"
[session.ehlo]
reject-non-fqdn = false
[session.rcpt]
relay = [ { if = "authenticated-as", ne = "", then = true },
{ else = false } ]
directory = "local"
[session.rcpt.errors]
total = 5
wait = "1ms"
[queue]
path = "/tmp/stalwart-test"
hash = 64
[report]
path = "/tmp/stalwart-test"
hash = 64
[resolver]
type = "system"
[queue.outbound]
next-hop = [ { if = "rcpt-domain", in-list = "local/domains", then = "local" },
{ else = false } ]
[remote."mock-smtp"]
address = "localhost"
port = 9999
protocol = "smtp"
[remote."mock-smtp".tls]
implicit = false
allow-invalid-certs = true
[session.extensions]
future-release = [ { if = "authenticated-as", ne = "", then = "99999999d"},
{ else = false } ]
[store]
db.path = "/tmp/stalwart-test/sqlite.db"
[store.blob]
type = "local"
[store.blob.local]
path = "/tmp/stalwart-test"
[certificate.default]
cert = "file://./tests/resources/tls_cert.pem"
private-key = "file://./tests/resources/tls_privatekey.pem"
[jmap]
directory = "local"
[jmap.protocol]
set.max-objects = 100000
[jmap.protocol.request]
max-concurrent = 8
[jmap.protocol.upload]
max-size = 5000000
max-concurrent = 4
ttl = "1m"
[jmap.protocol.upload.quota]
files = 3
size = 50000
[jmap.rate-limit]
account.rate = "1000/1m"
authentication.rate = "100/2s"
anonymous.rate = "100/1m"
[jmap.event-source]
throttle = "500ms"
[jmap.web-sockets]
throttle = "500ms"
[jmap.push]
throttle = "500ms"
attempts.interval = "500ms"
[directory."local"]
type = "memory"
[directory."local".options]
catch-all = true
subaddressing = true
[directory."local".lookup]
domains = ["example.org"]
[[directory."local".users]]
name = "admin"
description = "Superadmin"
secret = "donotuse"
[[directory."local".users]]
name = "john"
description = "John Doe"
secret = "12345"
email = ["john@example.org", "jdoe@example.org", "john.doe@example.org"]
email-list = ["info@example.org"]
member-of = ["sales"]
[[directory."local".users]]
name = "jane"
description = "Jane Doe"
secret = "abcde"
email = "jane@example.org"
email-list = ["info@example.org"]
member-of = ["sales", "support"]
[[directory."local".users]]
name = "bill"
description = "Bill Foobar"
secret = "$2y$05$bvIG6Nmid91Mu9RcmmWZfO5HJIMCT8riNW0hEp8f6/FuA2/mHZFpe"
quota = 500000
email = "bill@example.org"
email-list = ["info@example.org"]
[[directory."local".groups]]
name = "sales"
description = "Sales Team"
[[directory."local".groups]]
name = "support"
description = "Support Team"
[oauth]
key = "parerga_und_paralipomena"
max-auth-attempts = 1
[oauth.expiry]
user-code = "1s"
token = "1s"
refresh-token = "3s"
refresh-token-renew = "2s"
[imap.auth]
allow-plain-text = true
[imap.rate-limit]
rate = "10000/1s"
concurrent = 9000

View file

@ -49,7 +49,7 @@ pub async fn test(imap: &mut ImapConnection, _imap_check: &mut ImapConnection) {
.assert_contains("EMAILID (")
.assert_contains("but then I thought, why not do both?")
.assert_contains(concat!(
"ENVELOPE (\"Sat, 20 Nov 2021 22:22:01 +0000\" ",
"ENVELOPE (\"Sat, 20 Nov 2021 14:22:01 -0800\" ",
"\"Why not both importing AND exporting? ☺\" ",
"((\"Art Vandelay (Vandelay Industries)\" NIL \"art\" \"vandelay.com\")) ",
"((\"Art Vandelay (Vandelay Industries)\" NIL \"art\" \"vandelay.com\")) ",

View file

@ -26,6 +26,13 @@ use imap_proto::ResponseType;
use super::{AssertResult, ImapConnection, Type};
pub async fn test(mut imap: &mut ImapConnection, mut imap_check: &mut ImapConnection) {
// Create third connection for testing
let mut other_conn = ImapConnection::connect(b"_z ").await;
other_conn
.send("AUTHENTICATE PLAIN {32+}\r\nAGpkb2VAZXhhbXBsZS5jb20Ac2VjcmV0")
.await;
other_conn.assert_read(Type::Tagged, ResponseType::Ok).await;
// List folders
imap.send("LIST \"\" \"*\"").await;
imap.assert_read(Type::Tagged, ResponseType::Ok)
@ -48,6 +55,12 @@ pub async fn test(mut imap: &mut ImapConnection, mut imap_check: &mut ImapConnec
imap.assert_read(Type::Tagged, ResponseType::Ok).await;
imap.send("CREATE \"Fruit/Apple/Green\"").await;
imap.assert_read(Type::Tagged, ResponseType::Ok).await;
// Select folder from another connection
other_conn.send("SELECT \"Tofu\"").await;
other_conn.assert_read(Type::Tagged, ResponseType::Ok).await;
// Make sure folders are visible
for imap in [&mut imap, &mut imap_check] {
imap.send("LIST \"\" \"*\"").await;
imap.assert_read(Type::Tagged, ResponseType::Ok)