Group pipelined IMAP FETCH and STATUS operations

This commit is contained in:
mdecimus 2025-01-30 12:39:46 +01:00
parent 4c7052d227
commit 54733de0ff
3 changed files with 93 additions and 62 deletions

View file

@ -111,7 +111,7 @@ impl<T: SessionStream> Session<T> {
.await
.map(|_| SessionResult::Continue),
Command::Status => self
.handle_status(request)
.handle_status(group_requests(&mut requests, vec![request]))
.await
.map(|_| SessionResult::Continue),
Command::Append => self
@ -134,8 +134,8 @@ impl<T: SessionStream> Session<T> {
.handle_search(request, false, is_uid)
.await
.map(|_| SessionResult::Continue),
Command::Fetch(is_uid) => self
.handle_fetch(request, is_uid)
Command::Fetch(_) => self
.handle_fetch(group_requests(&mut requests, vec![request]))
.await
.map(|_| SessionResult::Continue),
Command::Store(is_uid) => self

View file

@ -42,43 +42,61 @@ use trc::AddContext;
use super::{FromModSeq, ImapContext};
impl<T: SessionStream> Session<T> {
pub async fn handle_fetch(
&mut self,
request: Request<Command>,
is_uid: bool,
) -> trc::Result<()> {
pub async fn handle_fetch(&mut self, requests: Vec<Request<Command>>) -> trc::Result<()> {
// Validate access
self.assert_has_permission(Permission::ImapFetch)?;
let op_start = Instant::now();
let arguments = request.parse_fetch()?;
let (data, mailbox) = self.state.select_data();
let is_qresync = self.is_qresync;
let is_rev2 = self.version.is_rev2();
let enabled_condstore = if !self.is_condstore && arguments.changed_since.is_some()
|| arguments.attributes.contains(&Attribute::ModSeq)
{
self.is_condstore = true;
true
} else {
false
};
let mut ops = Vec::with_capacity(requests.len());
for request in requests {
let is_uid = matches!(request.command, Command::Fetch(true));
match request.parse_fetch() {
Ok(arguments) => {
let enabled_condstore = if !self.is_condstore
&& arguments.changed_since.is_some()
|| arguments.attributes.contains(&Attribute::ModSeq)
{
self.is_condstore = true;
true
} else {
false
};
ops.push(Ok((is_uid, enabled_condstore, arguments)));
}
Err(err) => {
ops.push(Err(err));
}
}
}
spawn_op!(data, {
let response = data
.fetch(
arguments,
mailbox,
is_uid,
is_qresync,
is_rev2,
enabled_condstore,
op_start,
)
.await?;
data.write_bytes(response.into_bytes()).await
for op in ops {
match op {
Ok((is_uid, enabled_condstore, arguments)) => {
let response = data
.fetch(
arguments,
mailbox.clone(),
is_uid,
is_qresync,
is_rev2,
enabled_condstore,
Instant::now(),
)
.await?;
data.write_bytes(response.into_bytes()).await?;
}
Err(err) => data.write_error(err).await?,
}
}
Ok(())
})
}
}

View file

@ -34,47 +34,60 @@ use trc::AddContext;
use super::ToModSeq;
impl<T: SessionStream> Session<T> {
pub async fn handle_status(&mut self, request: Request<Command>) -> trc::Result<()> {
pub async fn handle_status(&mut self, requests: Vec<Request<Command>>) -> trc::Result<()> {
// Validate access
self.assert_has_permission(Permission::ImapStatus)?;
let op_start = Instant::now();
let arguments = request.parse_status(self.version)?;
let version = self.version;
let data = self.state.session_data();
spawn_op!(data, {
// Refresh mailboxes
data.synchronize_mailboxes(false)
.await
.imap_ctx(&arguments.tag, trc::location!())?;
let mut did_sync = false;
// Fetch status
let status = data
.status(arguments.mailbox_name, &arguments.items)
.await
.imap_ctx(&arguments.tag, trc::location!())?;
for request in requests.into_iter() {
match request.parse_status(version) {
Ok(arguments) => {
let op_start = Instant::now();
if !did_sync {
// Refresh mailboxes
data.synchronize_mailboxes(false)
.await
.imap_ctx(&arguments.tag, trc::location!())?;
did_sync = true;
}
trc::event!(
Imap(trc::ImapEvent::Status),
SpanId = data.session_id,
MailboxName = status.mailbox_name.clone(),
Details = arguments
.items
.iter()
.map(|c| trc::Value::from(format!("{c:?}")))
.collect::<Vec<_>>(),
Elapsed = op_start.elapsed()
);
// Fetch status
let status = data
.status(arguments.mailbox_name, &arguments.items)
.await
.imap_ctx(&arguments.tag, trc::location!())?;
let mut buf = Vec::with_capacity(32);
status.serialize(&mut buf, version.is_rev2());
data.write_bytes(
StatusResponse::completed(Command::Status)
.with_tag(arguments.tag)
.serialize(buf),
)
.await
trc::event!(
Imap(trc::ImapEvent::Status),
SpanId = data.session_id,
MailboxName = status.mailbox_name.clone(),
Details = arguments
.items
.iter()
.map(|c| trc::Value::from(format!("{c:?}")))
.collect::<Vec<_>>(),
Elapsed = op_start.elapsed()
);
let mut buf = Vec::with_capacity(32);
status.serialize(&mut buf, version.is_rev2());
data.write_bytes(
StatusResponse::completed(Command::Status)
.with_tag(arguments.tag)
.serialize(buf),
)
.await?;
}
Err(err) => data.write_error(err).await?,
}
}
Ok(())
})
}
}