Improved topological sorting

This commit is contained in:
mdecimus 2025-03-04 15:50:53 +01:00
parent 1c460c7f3b
commit 3554dea4cb
43 changed files with 1230 additions and 313 deletions

5
Cargo.lock generated
View file

@ -1252,6 +1252,7 @@ dependencies = [
"idna",
"imagesize",
"imap_proto",
"indexmap 2.7.1",
"infer 0.16.0",
"jmap_proto",
"libc",
@ -1672,10 +1673,12 @@ version = "0.11.5"
dependencies = [
"common",
"dav-proto",
"directory",
"groupware",
"hashify",
"http_proto",
"hyper 1.6.0",
"trc",
]
[[package]]
@ -2733,6 +2736,7 @@ dependencies = [
"directory",
"hashify",
"jmap_proto",
"rkyv 0.8.10",
"tokio",
"utils",
]
@ -3440,6 +3444,7 @@ dependencies = [
"directory",
"email",
"imap_proto",
"indexmap 2.7.1",
"jmap_proto",
"mail-parser",
"mail-send",

View file

@ -68,6 +68,7 @@ p384 = { version = "0.13", features = ["ecdh"] }
num_cpus = "1.13.1"
hashify = "0.2"
rkyv = { version = "0.8.10", features = ["little_endian"] }
indexmap = "2.7.1"
[target.'cfg(unix)'.dependencies]
privdrop = "0.5.3"

View file

@ -0,0 +1,22 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use utils::config::Config;
#[derive(Debug, Clone, Default)]
pub struct DavConfig {
pub max_request_size: usize,
}
impl DavConfig {
pub fn parse(config: &mut Config) -> Self {
DavConfig {
max_request_size: config
.property("dav.limits.request-size")
.unwrap_or(25 * 1024 * 1024),
}
}
}

View file

@ -7,21 +7,22 @@
use std::{str::FromStr, sync::Arc};
use arc_swap::ArcSwap;
use base64::{engine::general_purpose, Engine};
use base64::{Engine, engine::general_purpose};
use dav::DavConfig;
use directory::{Directories, Directory};
use hyper::{
header::{HeaderName, HeaderValue, AUTHORIZATION},
HeaderMap,
header::{AUTHORIZATION, HeaderName, HeaderValue},
};
use ring::signature::{EcdsaKeyPair, RsaKeyPair};
use spamfilter::SpamFilterConfig;
use store::{BlobBackend, BlobStore, FtsStore, InMemoryStore, Store, Stores};
use telemetry::Metrics;
use utils::config::{utils::AsKey, Config};
use utils::config::{Config, utils::AsKey};
use crate::{
auth::oauth::config::OAuthConfig, expr::*, listener::tls::AcmeProviders,
manager::config::ConfigManager, Core, Network, Security,
Core, Network, Security, auth::oauth::config::OAuthConfig, expr::*,
listener::tls::AcmeProviders, manager::config::ConfigManager,
};
use self::{
@ -29,6 +30,7 @@ use self::{
storage::Storage,
};
pub mod dav;
pub mod imap;
pub mod inner;
pub mod jmap;
@ -186,6 +188,7 @@ impl Core {
acme: AcmeProviders::parse(config),
metrics: Metrics::parse(config),
spam: SpamFilterConfig::parse(config).await,
dav: DavConfig::parse(config),
storage: Storage {
data,
blob,

View file

@ -5,7 +5,6 @@
*/
use std::{
collections::BTreeMap,
hash::{BuildHasher, Hasher},
net::{IpAddr, Ipv4Addr, Ipv6Addr},
sync::{
@ -19,6 +18,7 @@ use ahash::{AHashMap, AHashSet};
use arc_swap::ArcSwap;
use auth::{AccessToken, oauth::config::OAuthConfig, roles::RolePermissions};
use config::{
dav::DavConfig,
imap::ImapConfig,
jmap::settings::JmapConfig,
network::Network,
@ -33,6 +33,7 @@ use config::{
};
use imap_proto::protocol::list::Attribute;
use indexmap::IndexMap;
use ipc::{HousekeeperEvent, QueueEvent, ReportingEvent, StateEvent};
use listener::{asn::AsnGeoLookupData, blocked::Security, tls::AcmeProviders};
@ -191,7 +192,7 @@ pub struct MailboxId {
pub struct Account {
pub account_id: u32,
pub prefix: Option<String>,
pub mailbox_names: BTreeMap<String, u32>,
pub mailbox_names: IndexMap<String, u32>,
pub mailbox_state: AHashMap<u32, Mailbox>,
pub state_email: Option<u64>,
pub state_mailbox: Option<u64>,
@ -252,6 +253,7 @@ pub struct Core {
pub oauth: OAuthConfig,
pub smtp: SmtpConfig,
pub jmap: JmapConfig,
pub dav: DavConfig,
pub spam: SpamFilterConfig,
pub imap: ImapConfig,
pub metrics: Metrics,

View file

@ -0,0 +1,175 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use ahash::AHashMap;
use jmap_proto::types::{collection::Collection, property::Property};
use store::{
Deserialize, IndexKey, IterateParams, SerializeInfallible, U32_LEN, ValueKey,
write::{Archive, ValueClass, key::DeserializeBigEndian},
};
use trc::AddContext;
use utils::topological::{TopologicalSort, TopologicalSortIterator};
use crate::Server;
pub struct ExpandedFolders {
names: AHashMap<u32, (String, u32)>,
iter: TopologicalSortIterator<u32>,
}
pub trait FolderHierarchy: Sync + Send {
fn name(&self) -> String;
fn parent_id(&self) -> u32;
}
pub trait TopologyBuilder: Sync + Send {
fn insert(&mut self, folder_id: u32, parent_id: u32);
}
impl Server {
pub async fn fetch_folders<T>(
&self,
account_id: u32,
collection: Collection,
) -> trc::Result<ExpandedFolders>
where
T: rkyv::Archive,
T::Archived: FolderHierarchy
+ for<'a> rkyv::bytecheck::CheckBytes<
rkyv::api::high::HighValidator<'a, rkyv::rancor::Error>,
> + rkyv::Deserialize<T, rkyv::api::high::HighDeserializer<rkyv::rancor::Error>>,
{
let collection_: u8 = collection.into();
let mut names = AHashMap::with_capacity(10);
let mut topological_sort = TopologicalSort::with_capacity(10);
self.core
.storage
.data
.iterate(
IterateParams::new(
ValueKey {
account_id,
collection: collection_,
document_id: 0,
class: ValueClass::Property(Property::Value.into()),
},
ValueKey {
account_id,
collection: collection_,
document_id: u32::MAX,
class: ValueClass::Property(Property::Value.into()),
},
),
|key, value| {
let document_id = key.deserialize_be_u32(key.len() - U32_LEN)? + 1;
let archive = <Archive as Deserialize>::deserialize(value)?;
let folder = archive.unarchive::<T>()?;
let parent_id = folder.parent_id();
topological_sort.insert(parent_id, document_id);
names.insert(document_id, (folder.name(), parent_id));
Ok(true)
},
)
.await
.add_context(|err| {
err.caused_by(trc::location!())
.account_id(account_id)
.collection(collection)
})?;
Ok(ExpandedFolders {
names,
iter: topological_sort.into_iterator(),
})
}
pub async fn fetch_folder_topology<T>(
&self,
account_id: u32,
collection: Collection,
topology: &mut impl TopologyBuilder,
) -> trc::Result<()>
where
T: TopologyBuilder,
{
self.store()
.iterate(
IterateParams::new(
IndexKey {
account_id,
collection: collection.into(),
document_id: 0,
field: Property::ParentId.into(),
key: 0u32.serialize(),
},
IndexKey {
account_id,
collection: collection.into(),
document_id: u32::MAX,
field: Property::ParentId.into(),
key: u32::MAX.serialize(),
},
)
.no_values()
.ascending(),
|key, _| {
let document_id = key
.get(key.len() - U32_LEN..)
.ok_or_else(|| trc::Error::corrupted_key(key, None, trc::location!()))
.and_then(u32::deserialize)?;
let parent_id = key
.get(key.len() - (U32_LEN * 2)..key.len() - U32_LEN)
.ok_or_else(|| trc::Error::corrupted_key(key, None, trc::location!()))
.and_then(u32::deserialize)?;
topology.insert(document_id, parent_id);
Ok(true)
},
)
.await
.caused_by(trc::location!())?;
Ok(())
}
}
impl ExpandedFolders {
pub fn format<T>(mut self, formatter: T) -> Self
where
T: Fn(u32, &str) -> Option<String>,
{
for (document_id, (name, _)) in &mut self.names {
if let Some(new_name) = formatter(*document_id - 1, name) {
*name = new_name;
}
}
self
}
pub fn into_iterator(mut self) -> impl Iterator<Item = (u32, String)> + Sync + Send {
for folder_id in self.iter.by_ref() {
if folder_id != 0 {
if let Some((name, parent_name, parent_id)) =
self.names.get(&folder_id).and_then(|(name, parent_id)| {
self.names
.get(parent_id)
.map(|(parent_name, _)| (name, parent_name, *parent_id))
})
{
let name = format!("{parent_name}/{name}");
self.names.insert(folder_id, (name, parent_id));
}
}
}
self.names.into_iter().map(|(id, (name, _))| (id - 1, name))
}
}

View file

@ -5,6 +5,7 @@
*/
pub mod blob;
pub mod folder;
pub mod index;
pub mod state;
pub mod tag;

View file

@ -8,7 +8,9 @@ resolver = "2"
dav-proto = { path = "/Users/me/code/dav-proto" }
common = { path = "../common" }
groupware = { path = "../groupware" }
directory = { path = "../directory" }
http_proto = { path = "../http-proto" }
trc = { path = "../trc" }
hashify = { version = "0.2" }
hyper = { version = "1.0.1", features = ["server", "http1", "http2"] }

View file

@ -0,0 +1,5 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/

View file

@ -0,0 +1,5 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/

View file

@ -0,0 +1,29 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use common::{Server, auth::AccessToken};
use dav_proto::{RequestHeaders, schema::request::Acl};
use http_proto::HttpResponse;
pub(crate) trait FileAclRequestHandler: Sync + Send {
fn handle_file_acl_request(
&self,
access_token: &AccessToken,
headers: RequestHeaders<'_>,
request: Acl,
) -> impl Future<Output = crate::Result<HttpResponse>> + Send;
}
impl FileAclRequestHandler for Server {
async fn handle_file_acl_request(
&self,
access_token: &AccessToken,
headers: RequestHeaders<'_>,
request: Acl,
) -> crate::Result<HttpResponse> {
todo!()
}
}

View file

@ -0,0 +1,29 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use common::{Server, auth::AccessToken};
use dav_proto::{RequestHeaders, schema::request::SyncCollection};
use http_proto::HttpResponse;
pub(crate) trait FileChangesRequestHandler: Sync + Send {
fn handle_file_changes_request(
&self,
access_token: &AccessToken,
headers: RequestHeaders<'_>,
request: SyncCollection,
) -> impl Future<Output = crate::Result<HttpResponse>> + Send;
}
impl FileChangesRequestHandler for Server {
async fn handle_file_changes_request(
&self,
access_token: &AccessToken,
headers: RequestHeaders<'_>,
request: SyncCollection,
) -> crate::Result<HttpResponse> {
todo!()
}
}

View file

@ -0,0 +1,29 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use common::{Server, auth::AccessToken};
use dav_proto::RequestHeaders;
use http_proto::HttpResponse;
pub(crate) trait FileCopyMoveRequestHandler: Sync + Send {
fn handle_file_copy_move_request(
&self,
access_token: &AccessToken,
headers: RequestHeaders<'_>,
is_move: bool,
) -> impl Future<Output = crate::Result<HttpResponse>> + Send;
}
impl FileCopyMoveRequestHandler for Server {
async fn handle_file_copy_move_request(
&self,
access_token: &AccessToken,
headers: RequestHeaders<'_>,
is_move: bool,
) -> crate::Result<HttpResponse> {
todo!()
}
}

View file

@ -0,0 +1,27 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use common::{Server, auth::AccessToken};
use dav_proto::RequestHeaders;
use http_proto::HttpResponse;
pub(crate) trait FileDeleteRequestHandler: Sync + Send {
fn handle_file_delete_request(
&self,
access_token: &AccessToken,
headers: RequestHeaders<'_>,
) -> impl Future<Output = crate::Result<HttpResponse>> + Send;
}
impl FileDeleteRequestHandler for Server {
async fn handle_file_delete_request(
&self,
access_token: &AccessToken,
headers: RequestHeaders<'_>,
) -> crate::Result<HttpResponse> {
todo!()
}
}

View file

@ -0,0 +1,29 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use common::{Server, auth::AccessToken};
use dav_proto::RequestHeaders;
use http_proto::HttpResponse;
pub(crate) trait FileGetRequestHandler: Sync + Send {
fn handle_file_get_request(
&self,
access_token: &AccessToken,
headers: RequestHeaders<'_>,
is_head: bool,
) -> impl Future<Output = crate::Result<HttpResponse>> + Send;
}
impl FileGetRequestHandler for Server {
async fn handle_file_get_request(
&self,
access_token: &AccessToken,
headers: RequestHeaders<'_>,
is_head: bool,
) -> crate::Result<HttpResponse> {
todo!()
}
}

View file

@ -0,0 +1,29 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use common::{Server, auth::AccessToken};
use dav_proto::{RequestHeaders, schema::request::LockInfo};
use http_proto::HttpResponse;
pub(crate) trait FileLockRequestHandler: Sync + Send {
fn handle_file_lock_request(
&self,
access_token: &AccessToken,
headers: RequestHeaders<'_>,
request: Option<LockInfo>,
) -> impl Future<Output = crate::Result<HttpResponse>> + Send;
}
impl FileLockRequestHandler for Server {
async fn handle_file_lock_request(
&self,
access_token: &AccessToken,
headers: RequestHeaders<'_>,
request: Option<LockInfo>,
) -> crate::Result<HttpResponse> {
todo!()
}
}

View file

@ -0,0 +1,29 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use common::{Server, auth::AccessToken};
use dav_proto::{RequestHeaders, schema::request::MkCol};
use http_proto::HttpResponse;
pub(crate) trait FileMkColRequestHandler: Sync + Send {
fn handle_file_mkcol_request(
&self,
access_token: &AccessToken,
headers: RequestHeaders<'_>,
request: Option<MkCol>,
) -> impl Future<Output = crate::Result<HttpResponse>> + Send;
}
impl FileMkColRequestHandler for Server {
async fn handle_file_mkcol_request(
&self,
access_token: &AccessToken,
headers: RequestHeaders<'_>,
request: Option<MkCol>,
) -> crate::Result<HttpResponse> {
todo!()
}
}

View file

@ -0,0 +1,22 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
pub mod acl;
pub mod changes;
pub mod copy_move;
pub mod delete;
pub mod get;
pub mod lock;
pub mod mkcol;
pub mod propfind;
pub mod proppatch;
pub mod update;
pub(crate) enum UpdateType {
Post(Vec<u8>),
Put(Vec<u8>),
Patch(Vec<u8>),
}

View file

@ -0,0 +1,29 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use common::{Server, auth::AccessToken};
use dav_proto::{RequestHeaders, schema::request::PropFind};
use http_proto::HttpResponse;
pub(crate) trait FilePropFindRequestHandler: Sync + Send {
fn handle_file_propfind_request(
&self,
access_token: &AccessToken,
headers: RequestHeaders<'_>,
request: PropFind,
) -> impl Future<Output = crate::Result<HttpResponse>> + Send;
}
impl FilePropFindRequestHandler for Server {
async fn handle_file_propfind_request(
&self,
access_token: &AccessToken,
headers: RequestHeaders<'_>,
request: PropFind,
) -> crate::Result<HttpResponse> {
todo!()
}
}

View file

@ -0,0 +1,29 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use common::{Server, auth::AccessToken};
use dav_proto::{RequestHeaders, schema::request::PropertyUpdate};
use http_proto::HttpResponse;
pub(crate) trait FilePropPatchRequestHandler: Sync + Send {
fn handle_file_proppatch_request(
&self,
access_token: &AccessToken,
headers: RequestHeaders<'_>,
request: PropertyUpdate,
) -> impl Future<Output = crate::Result<HttpResponse>> + Send;
}
impl FilePropPatchRequestHandler for Server {
async fn handle_file_proppatch_request(
&self,
access_token: &AccessToken,
headers: RequestHeaders<'_>,
request: PropertyUpdate,
) -> crate::Result<HttpResponse> {
todo!()
}
}

View file

@ -0,0 +1,31 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use common::{Server, auth::AccessToken};
use dav_proto::{RequestHeaders, schema::request::SyncCollection};
use http_proto::HttpResponse;
use super::UpdateType;
pub(crate) trait FileUpdateRequestHandler: Sync + Send {
fn handle_file_update_request(
&self,
access_token: &AccessToken,
headers: RequestHeaders<'_>,
request: UpdateType,
) -> impl Future<Output = crate::Result<HttpResponse>> + Send;
}
impl FileUpdateRequestHandler for Server {
async fn handle_file_update_request(
&self,
access_token: &AccessToken,
headers: RequestHeaders<'_>,
request: UpdateType,
) -> crate::Result<HttpResponse> {
todo!()
}
}

View file

@ -4,16 +4,24 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
pub mod calendar;
pub mod card;
pub mod file;
pub mod principal;
pub mod request;
use dav_proto::schema::request::Report;
use http_proto::HttpResponse;
use hyper::{Method, StatusCode};
pub(crate) type Result<T> = std::result::Result<T, DavError>;
#[derive(Debug, Clone, Copy)]
pub enum DavResource {
Card,
Cal,
File,
Principal,
}
#[derive(Debug, Clone, Copy)]
@ -22,6 +30,7 @@ pub enum DavMethod {
PUT,
POST,
DELETE,
HEAD,
PATCH,
PROPFIND,
PROPPATCH,
@ -32,6 +41,13 @@ pub enum DavMethod {
LOCK,
UNLOCK,
OPTIONS,
ACL,
}
pub(crate) enum DavError {
Parse(dav_proto::parser::Error),
Internal(trc::Error),
UnsupportedReport(Report),
}
impl DavResource {
@ -39,7 +55,8 @@ impl DavResource {
hashify::tiny_map!(service.as_bytes(),
"card" => DavResource::Card,
"cal" => DavResource::Cal,
"file" => DavResource::File
"file" => DavResource::File,
"pri" => DavResource::Principal,
)
}
@ -59,6 +76,7 @@ impl DavMethod {
Method::OPTIONS => Some(DavMethod::OPTIONS),
Method::POST => Some(DavMethod::POST),
Method::PATCH => Some(DavMethod::PATCH),
Method::HEAD => Some(DavMethod::HEAD),
_ => {
hashify::tiny_map!(method.as_str().as_bytes(),
"PROPFIND" => DavMethod::PROPFIND,
@ -68,7 +86,8 @@ impl DavMethod {
"COPY" => DavMethod::COPY,
"MOVE" => DavMethod::MOVE,
"LOCK" => DavMethod::LOCK,
"UNLOCK" => DavMethod::UNLOCK
"UNLOCK" => DavMethod::UNLOCK,
"ACL" => DavMethod::ACL
)
}
}
@ -84,9 +103,8 @@ impl DavMethod {
| DavMethod::PROPPATCH
| DavMethod::PROPFIND
| DavMethod::REPORT
| DavMethod::MKCOL
| DavMethod::COPY
| DavMethod::MOVE
| DavMethod::LOCK
| DavMethod::ACL
)
}
}

View file

@ -0,0 +1,5 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/

View file

@ -7,9 +7,25 @@
use std::sync::Arc;
use common::{Server, auth::AccessToken};
use http_proto::{HttpRequest, HttpResponse, HttpSessionData};
use dav_proto::{
RequestHeaders,
parser::{DavParser, tokenizer::Tokenizer},
schema::request::{Acl, LockInfo, MkCol, PropFind, PropertyUpdate, Report},
};
use directory::Permission;
use http_proto::{HttpRequest, HttpResponse, HttpSessionData, request::fetch_body};
use hyper::{StatusCode, header};
use crate::{DavMethod, DavResource};
use crate::{
DavError, DavMethod, DavResource,
file::{
UpdateType, acl::FileAclRequestHandler, changes::FileChangesRequestHandler,
copy_move::FileCopyMoveRequestHandler, delete::FileDeleteRequestHandler,
get::FileGetRequestHandler, lock::FileLockRequestHandler, mkcol::FileMkColRequestHandler,
propfind::FilePropFindRequestHandler, proppatch::FilePropPatchRequestHandler,
update::FileUpdateRequestHandler,
},
};
pub trait DavRequestHandler: Sync + Send {
fn handle_dav_request(
@ -19,20 +35,206 @@ pub trait DavRequestHandler: Sync + Send {
session: &HttpSessionData,
resource: DavResource,
method: DavMethod,
body: Vec<u8>,
) -> impl Future<Output = HttpResponse> + Send;
}
pub(crate) trait DavRequestDispatcher: Sync + Send {
fn dispatch_dav_request(
&self,
request: &HttpRequest,
access_token: Arc<AccessToken>,
resource: DavResource,
method: DavMethod,
body: Vec<u8>,
) -> impl Future<Output = crate::Result<HttpResponse>> + Send;
}
impl DavRequestDispatcher for Server {
async fn dispatch_dav_request(
&self,
request: &HttpRequest,
access_token: Arc<AccessToken>,
resource: DavResource,
method: DavMethod,
body: Vec<u8>,
) -> crate::Result<HttpResponse> {
// Parse headers
let mut headers = RequestHeaders::new(request.uri().path());
for (key, value) in request.headers() {
headers.parse(key.as_str(), value.to_str().unwrap_or_default());
}
// Dispatch
match resource {
DavResource::Card => {
todo!()
}
DavResource::Cal => {
todo!()
}
DavResource::Principal => {
todo!()
}
DavResource::File => match method {
DavMethod::PROPFIND => {
self.handle_file_propfind_request(
&access_token,
headers,
PropFind::parse(&mut Tokenizer::new(&body))?,
)
.await
}
DavMethod::PROPPATCH => {
self.handle_file_proppatch_request(
&access_token,
headers,
PropertyUpdate::parse(&mut Tokenizer::new(&body))?,
)
.await
}
DavMethod::MKCOL => {
self.handle_file_mkcol_request(
&access_token,
headers,
if !body.is_empty() {
Some(MkCol::parse(&mut Tokenizer::new(&body))?)
} else {
None
},
)
.await
}
DavMethod::GET => {
self.handle_file_get_request(&access_token, headers, false)
.await
}
DavMethod::HEAD => {
self.handle_file_get_request(&access_token, headers, true)
.await
}
DavMethod::DELETE => {
self.handle_file_delete_request(&access_token, headers)
.await
}
DavMethod::PUT => {
self.handle_file_update_request(&access_token, headers, UpdateType::Put(body))
.await
}
DavMethod::POST => {
self.handle_file_update_request(&access_token, headers, UpdateType::Post(body))
.await
}
DavMethod::PATCH => {
self.handle_file_update_request(&access_token, headers, UpdateType::Patch(body))
.await
}
DavMethod::COPY => {
self.handle_file_copy_move_request(&access_token, headers, false)
.await
}
DavMethod::MOVE => {
self.handle_file_copy_move_request(&access_token, headers, true)
.await
}
DavMethod::LOCK => {
self.handle_file_lock_request(
&access_token,
headers,
LockInfo::parse(&mut Tokenizer::new(&body))?.into(),
)
.await
}
DavMethod::UNLOCK => {
self.handle_file_lock_request(&access_token, headers, None)
.await
}
DavMethod::ACL => {
self.handle_file_acl_request(
&access_token,
headers,
Acl::parse(&mut Tokenizer::new(&body))?,
)
.await
}
DavMethod::REPORT => match Report::parse(&mut Tokenizer::new(&body))? {
Report::SyncCollection(sync_collection) => {
self.handle_file_changes_request(&access_token, headers, sync_collection)
.await
}
report => Err(DavError::UnsupportedReport(report)),
},
DavMethod::OPTIONS => unreachable!(),
},
}
}
}
impl DavRequestHandler for Server {
async fn handle_dav_request(
&self,
request: HttpRequest,
mut request: HttpRequest,
access_token: Arc<AccessToken>,
session: &HttpSessionData,
resource: DavResource,
method: DavMethod,
body: Vec<u8>,
) -> HttpResponse {
todo!()
let body = if method.has_body()
|| request
.headers()
.get(header::CONTENT_LENGTH)
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse::<u64>().ok())
.is_some_and(|len| len > 0)
{
if let Some(body) = fetch_body(
&mut request,
if !access_token.has_permission(Permission::UnlimitedUploads) {
self.core.dav.max_request_size
} else {
0
},
session.session_id,
)
.await
{
body
} else {
trc::event!(
Limit(trc::LimitEvent::SizeRequest),
SpanId = session.session_id,
Contents = "Request body too large",
);
return HttpResponse::new(StatusCode::PAYLOAD_TOO_LARGE);
}
} else {
Vec::new()
};
match self
.dispatch_dav_request(&request, access_token, resource, method, body)
.await
{
Ok(response) => response,
Err(DavError::Internal(err)) => {
trc::error!(err.span_id(session.session_id));
HttpResponse::new(StatusCode::INTERNAL_SERVER_ERROR)
}
Err(DavError::UnsupportedReport(report)) => HttpResponse::new(StatusCode::BAD_REQUEST),
Err(DavError::Parse(err)) => HttpResponse::new(StatusCode::BAD_REQUEST),
}
}
}
impl From<dav_proto::parser::Error> for DavError {
fn from(err: dav_proto::parser::Error) -> Self {
DavError::Parse(err)
}
}
impl From<trc::Error> for DavError {
fn from(err: trc::Error) -> Self {
DavError::Internal(err)
}
}

View file

@ -6,12 +6,15 @@
use common::{
config::jmap::settings::SpecialUse,
storage::index::{IndexValue, IndexableObject},
storage::{
folder::FolderHierarchy,
index::{IndexValue, IndexableObject},
},
};
use jmap_proto::types::property::Property;
use store::write::{MaybeDynamicId, TagValue};
use super::{ArchivedUidMailbox, Mailbox, UidMailbox};
use super::{ArchivedMailbox, ArchivedUidMailbox, Mailbox, UidMailbox};
impl IndexableObject for Mailbox {
fn index_values(&self) -> impl Iterator<Item = IndexValue<'_>> {
@ -46,6 +49,16 @@ impl IndexableObject for Mailbox {
}
}
impl FolderHierarchy for ArchivedMailbox {
fn name(&self) -> String {
self.name.to_string()
}
fn parent_id(&self) -> u32 {
u32::from(self.parent_id)
}
}
impl From<&UidMailbox> for TagValue<MaybeDynamicId> {
fn from(value: &UidMailbox) -> Self {
TagValue::Id(MaybeDynamicId::Static(value.mailbox_id))

View file

@ -10,10 +10,10 @@ use common::{Server, config::jmap::settings::SpecialUse, storage::index::ObjectI
use jmap_proto::types::{collection::Collection, keyword::Keyword, property::Property};
use store::{
SerializeInfallible,
ahash::AHashSet,
ahash::{AHashMap, AHashSet},
query::Filter,
roaring::RoaringBitmap,
write::{Archive, BatchBuilder},
write::BatchBuilder,
};
use trc::AddContext;
@ -46,13 +46,6 @@ pub trait MailboxFnc: Sync + Send {
message_ids: &Option<RoaringBitmap>,
) -> impl Future<Output = trc::Result<Option<RoaringBitmap>>> + Send;
fn mailbox_expand_path<'x>(
&self,
account_id: u32,
path: &'x str,
exact_match: bool,
) -> impl Future<Output = trc::Result<Option<ExpandPath<'x>>>> + Send;
fn mailbox_get_by_name(
&self,
account_id: u32,
@ -128,35 +121,53 @@ impl MailboxFnc for Server {
account_id: u32,
path: &str,
) -> trc::Result<Option<(u32, Option<u64>)>> {
let expanded_path =
if let Some(expand_path) = self.mailbox_expand_path(account_id, path, false).await? {
expand_path
} else {
return Ok(None);
};
let folders = self
.fetch_folders::<Mailbox>(account_id, Collection::Mailbox)
.await
.caused_by(trc::location!())?
.format(|mailbox_id, name| {
if mailbox_id == INBOX_ID {
Some("inbox".to_string())
} else {
Some(name.to_lowercase())
}
})
.into_iterator()
.map(|(document_id, name)| (name, document_id))
.collect::<AHashMap<String, u32>>();
let mut next_parent_id = 0;
let mut path = expanded_path.path.into_iter().enumerate().peekable();
'outer: while let Some((pos, name)) = path.peek() {
let is_inbox = *pos == 0 && name.eq_ignore_ascii_case("inbox");
let mut create_paths = Vec::with_capacity(2);
for (part, parent_id, document_id) in &expanded_path.found_names {
if (part.eq(name) || (is_inbox && part.eq_ignore_ascii_case("inbox")))
&& *parent_id == next_parent_id
{
next_parent_id = *document_id;
path.next();
continue 'outer;
let mut path = path.split('/').map(|v| v.trim());
let mut found_path = String::with_capacity(16);
{
while let Some(name) = path.next() {
if !found_path.is_empty() {
found_path.push('/');
}
for ch in name.chars() {
for ch in ch.to_lowercase() {
found_path.push(ch);
}
}
if let Some(document_id) = folders.get(&found_path) {
next_parent_id = *document_id + 1;
} else {
create_paths.push(name.to_string());
create_paths.extend(path.map(|v| v.to_string()));
break;
}
}
break;
}
// Create missing folders
if path.peek().is_some() {
if !create_paths.is_empty() {
let mut changes = self.begin_changes(account_id)?;
for (_, name) in path {
for name in create_paths {
if name.len() > self.core.jmap.mailbox_name_max_len {
return Ok(None);
}
@ -257,97 +268,16 @@ impl MailboxFnc for Server {
}
}
async fn mailbox_expand_path<'x>(
&self,
account_id: u32,
path: &'x str,
exact_match: bool,
) -> trc::Result<Option<ExpandPath<'x>>> {
let path = path
.split('/')
.filter_map(|p| {
let p = p.trim();
if !p.is_empty() { p.into() } else { None }
})
.collect::<Vec<_>>();
if path.is_empty() || path.len() > self.core.jmap.mailbox_max_depth {
return Ok(None);
}
let mut filter = Vec::with_capacity(path.len() + 2);
let mut has_inbox = false;
filter.push(Filter::Or);
for (pos, item) in path.iter().enumerate() {
if pos == 0 && item.eq_ignore_ascii_case("inbox") {
has_inbox = true;
} else {
filter.push(Filter::eq(Property::Name, item.to_lowercase().into_bytes()));
}
}
filter.push(Filter::End);
let mut document_ids = if filter.len() > 2 {
self.store()
.filter(account_id, Collection::Mailbox, filter)
.await
.caused_by(trc::location!())?
.results
} else {
RoaringBitmap::new()
};
if has_inbox {
document_ids.insert(INBOX_ID);
}
if exact_match && (document_ids.len() as usize) < path.len() {
return Ok(None);
}
let mut found_names = Vec::new();
for document_id in document_ids {
if let Some(obj) = self
.get_property::<Archive>(
account_id,
Collection::Mailbox,
document_id,
Property::Value,
)
.await?
{
let obj = obj.unarchive::<Mailbox>()?;
found_names.push((
obj.name.to_string(),
u32::from(obj.parent_id),
document_id + 1,
));
} else {
return Ok(None);
}
}
Ok(Some(ExpandPath { path, found_names }))
}
async fn mailbox_get_by_name(&self, account_id: u32, path: &str) -> trc::Result<Option<u32>> {
Ok(self
.mailbox_expand_path(account_id, path, true)
.await?
.and_then(|ep| {
let mut next_parent_id = 0;
'outer: for (pos, name) in ep.path.iter().enumerate() {
let is_inbox = pos == 0 && name.eq_ignore_ascii_case("inbox");
for (part, parent_id, document_id) in &ep.found_names {
if (part.eq(name) || (is_inbox && part.eq_ignore_ascii_case("inbox")))
&& *parent_id == next_parent_id
{
next_parent_id = *document_id;
continue 'outer;
}
}
return None;
}
Some(next_parent_id - 1)
}))
self.fetch_folders::<Mailbox>(account_id, Collection::Mailbox)
.await
.map(|folders| {
folders
.format(|mailbox_id, _| (mailbox_id == INBOX_ID).then(|| "INBOX".to_string()))
.into_iterator()
.find(|(_, folder_name)| folder_name.eq_ignore_ascii_case(path))
.map(|(document_id, _)| document_id)
})
}
async fn mailbox_get_by_role(

View file

@ -11,6 +11,7 @@ jmap_proto = { path = "../jmap-proto" }
directory = { path = "../directory" }
calcard = { path = "/Users/me/code/calcard" }
hashify = "0.2"
rkyv = { version = "0.8.10", features = ["little_endian"] }
[features]
test_mode = []

View file

@ -4,11 +4,12 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use calcard::vcard::VCard;
use calcard::vcard::VCard;
use jmap_proto::types::{acl::Acl, value::AclGrant};
pub struct AddressBook {
pub name: String,
pub display_name: Option<String>,
pub description: Option<String>,
pub sort_order: u32,
pub is_default: bool,
@ -25,6 +26,7 @@ pub enum AddressBookRight {
pub struct ContactCard {
pub name: Option<String>,
pub display_name: Option<String>,
pub addressbook_ids: Vec<u32>,
pub card: VCard,
pub created: u64,

View file

@ -0,0 +1,52 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use common::storage::{
folder::FolderHierarchy,
index::{IndexValue, IndexableObject},
};
use jmap_proto::types::property::Property;
use super::{ArchivedFileNode, FileNode};
impl IndexableObject for FileNode {
fn index_values(&self) -> impl Iterator<Item = IndexValue<'_>> {
let mut filters = Vec::with_capacity(5);
filters.extend([
IndexValue::Text {
field: Property::Name.into(),
value: self.name.to_lowercase().into(),
},
IndexValue::U32 {
field: Property::ParentId.into(),
value: self.parent_id.into(),
},
IndexValue::Acl { value: &self.acls },
]);
if let Some(file) = &self.file {
filters.extend([
IndexValue::U32 {
field: Property::Size.into(),
value: file.size.into(),
},
IndexValue::Quota { used: file.size },
]);
}
filters.into_iter()
}
}
impl FolderHierarchy for ArchivedFileNode {
fn name(&self) -> String {
self.name.to_string()
}
fn parent_id(&self) -> u32 {
u32::from(self.parent_id)
}
}

View file

@ -4,16 +4,30 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use jmap_proto::types::{blob::BlobId, value::AclGrant};
pub mod index;
use jmap_proto::types::value::AclGrant;
use utils::BlobHash;
#[derive(
rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq,
)]
pub struct FileNode {
pub parent_id: Option<u32>,
pub blob_id: Option<BlobId>,
pub size: Option<u64>,
pub parent_id: u32,
pub name: String,
pub media_type: Option<String>,
pub executable: bool,
pub display_name: Option<String>,
pub file: Option<FileProperties>,
pub created: u64,
pub modified: u64,
pub acls: Vec<AclGrant>,
}
#[derive(
rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq,
)]
pub struct FileProperties {
pub blob_hash: BlobHash,
pub size: u32,
pub media_type: Option<String>,
pub executable: bool,
}

View file

@ -48,13 +48,17 @@ impl HttpResponse {
}
pub fn with_text_body(mut self, body: impl Into<String>) -> Self {
self.body = HttpResponseBody::Text(body.into());
self
let body = body.into();
let body_len = body.len();
self.body = HttpResponseBody::Text(body);
self.with_header(header::CONTENT_LENGTH, body_len)
}
pub fn with_binary_body(mut self, body: impl Into<Vec<u8>>) -> Self {
self.body = HttpResponseBody::Binary(body.into());
self
let body = body.into();
let body_len = body.len();
self.body = HttpResponseBody::Binary(body);
self.with_header(header::CONTENT_LENGTH, body_len)
}
pub fn with_stream_body(

View file

@ -214,23 +214,8 @@ impl ParseHttp for Server {
// Authenticate request
let (_in_flight, access_token) =
self.authenticate_headers(&req, &session, false).await?;
let body = if method.has_body() {
fetch_body(
&mut req,
if !access_token.has_permission(Permission::UnlimitedUploads) {
self.core.jmap.upload_max_size
} else {
0
},
session.session_id,
)
.await
.ok_or_else(|| trc::LimitEvent::SizeRequest.into_err())?
} else {
Vec::new()
};
self.handle_dav_request(req, access_token, &session, resource, method, body)
self.handle_dav_request(req, access_token, &session, resource, method)
.await
}
(_, None) => HttpResponse::new(StatusCode::METHOD_NOT_ALLOWED),

View file

@ -24,6 +24,7 @@ parking_lot = "0.12"
ahash = { version = "0.8" }
md5 = "0.7.0"
rand = "0.9.0"
indexmap = "2.7.1"
[features]

View file

@ -4,11 +4,6 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use std::{
collections::BTreeMap,
sync::{Arc, atomic::Ordering},
};
use ahash::AHashMap;
use common::{
AccountId, Mailbox,
@ -20,13 +15,16 @@ use common::{
use directory::{QueryBy, backend::internal::PrincipalField};
use email::mailbox::{INBOX_ID, manage::MailboxFnc};
use imap_proto::protocol::list::Attribute;
use indexmap::IndexMap;
use jmap_proto::types::{acl::Acl, collection::Collection, id::Id, property::Property};
use parking_lot::Mutex;
use std::sync::{Arc, atomic::Ordering};
use store::{
query::log::{Change, Query},
write::Archive,
};
use trc::AddContext;
use utils::topological::TopologicalSort;
use super::{Account, MailboxId, MailboxSync, Session, SessionData};
@ -157,8 +155,10 @@ impl<T: SessionStream> SessionData<T> {
is_subscribed: bool,
}
let mut mailboxes = Vec::with_capacity(10);
let mut mailboxes = AHashMap::with_capacity(10);
let mut special_uses = AHashMap::new();
let mut mailbox_topology = TopologicalSort::with_capacity(10);
for (mailbox_id, mailbox_) in self
.server
.get_properties::<Archive, _, _>(
@ -179,35 +179,30 @@ impl<T: SessionStream> SessionData<T> {
special_uses.insert(role, mailbox_id);
}
// Add mailbox id
mailboxes.push(MailboxData {
// Build mailbox data
let mailbox = MailboxData {
mailbox_id,
parent_id: u32::from(mailbox.parent_id),
role,
name: mailbox.name.to_string(),
is_subscribed: mailbox.is_subscribed(access_token.primary_id()),
});
};
mailbox_topology.insert(mailbox.parent_id, mailbox.mailbox_id + 1);
// Add mailbox id
mailboxes.insert(mailbox.mailbox_id, mailbox);
}
// Build tree
let mut iter = mailboxes.iter();
let mut parent_id = 0;
let mut path = Vec::new();
let mut iter_stack = Vec::new();
// Build account
let message_ids = self
.server
.get_document_ids(account_id, Collection::Email)
.await
.caused_by(trc::location!())?;
if let Some(mailbox_prefix) = &mailbox_prefix {
path.push(mailbox_prefix.to_string());
};
let mut account = Account {
account_id,
prefix: mailbox_prefix,
mailbox_names: BTreeMap::new(),
mailbox_names: IndexMap::with_capacity(mailboxes.len()),
mailbox_state: AHashMap::with_capacity(mailboxes.len()),
state_mailbox,
state_email,
@ -224,99 +219,122 @@ impl<T: SessionStream> SessionData<T> {
* (std::mem::size_of::<email::mailbox::Mailbox>() + std::mem::size_of::<u32>())))
as u64;
loop {
while let Some(mailbox) = iter.next() {
if mailbox.parent_id == parent_id {
let mut mailbox_path = path.clone();
if mailbox.mailbox_id != INBOX_ID || account.prefix.is_some() {
mailbox_path.push(mailbox.name.clone());
} else {
mailbox_path.push("INBOX".to_string());
}
let has_children = mailboxes
.iter()
.any(|child| child.parent_id == mailbox.mailbox_id + 1);
account.mailbox_state.insert(
mailbox.mailbox_id,
Mailbox {
has_children,
is_subscribed: mailbox.is_subscribed,
special_use: match mailbox.role {
SpecialUse::Trash => Some(Attribute::Trash),
SpecialUse::Junk => Some(Attribute::Junk),
SpecialUse::Drafts => Some(Attribute::Drafts),
SpecialUse::Archive => Some(Attribute::Archive),
SpecialUse::Sent => Some(Attribute::Sent),
SpecialUse::Important => Some(Attribute::Important),
_ => None,
},
total_messages: self
.server
.get_tag(
account_id,
Collection::Email,
Property::MailboxIds,
mailbox.mailbox_id,
)
.await
.caused_by(trc::location!())?
.map(|v| v.len())
.unwrap_or(0)
.into(),
total_unseen: self
.server
.mailbox_unread_tags(account_id, mailbox.mailbox_id, &message_ids)
.await
.caused_by(trc::location!())?
.map(|v| v.len())
.unwrap_or(0)
.into(),
..Default::default()
},
);
let mut mailbox_name = mailbox_path.join("/");
if mailbox_name.eq_ignore_ascii_case("inbox") && mailbox.mailbox_id != INBOX_ID
{
// If there is another mailbox called Inbox, rename it to avoid conflicts
mailbox_name = format!("{mailbox_name} 2");
}
// Map special use folder aliases to their internal ids
let effective_mailbox_id = self
// Build mailbox state
for mailbox in mailboxes.values() {
account.mailbox_state.insert(
mailbox.mailbox_id,
Mailbox {
has_children: mailboxes
.values()
.any(|child| child.parent_id == mailbox.mailbox_id + 1),
is_subscribed: mailbox.is_subscribed,
special_use: match mailbox.role {
SpecialUse::Trash => Some(Attribute::Trash),
SpecialUse::Junk => Some(Attribute::Junk),
SpecialUse::Drafts => Some(Attribute::Drafts),
SpecialUse::Archive => Some(Attribute::Archive),
SpecialUse::Sent => Some(Attribute::Sent),
SpecialUse::Important => Some(Attribute::Important),
_ => None,
},
total_messages: self
.server
.core
.jmap
.default_folders
.iter()
.find(|f| {
f.name == mailbox_name || f.aliases.iter().any(|a| a == &mailbox_name)
})
.and_then(|f| special_uses.get(&f.special_use))
.copied()
.unwrap_or(mailbox.mailbox_id);
.get_tag(
account_id,
Collection::Email,
Property::MailboxIds,
mailbox.mailbox_id,
)
.await
.caused_by(trc::location!())?
.map(|v| v.len())
.unwrap_or(0)
.into(),
total_unseen: self
.server
.mailbox_unread_tags(account_id, mailbox.mailbox_id, &message_ids)
.await
.caused_by(trc::location!())?
.map(|v| v.len())
.unwrap_or(0)
.into(),
..Default::default()
},
);
}
account
.mailbox_names
.insert(mailbox_name, effective_mailbox_id);
// Build mailbox tree
for mailbox_id in mailbox_topology.into_iterator() {
if mailbox_id == 0 {
continue;
}
let mailbox_id = mailbox_id - 1;
let (mailbox_name, parent_id) = mailboxes
.get(&mailbox_id)
.map(|m| {
(
m.name.as_str(),
if m.parent_id == 0 {
None
} else {
Some(m.parent_id - 1)
},
)
})
.unwrap();
if has_children && iter_stack.len() < 100 {
iter_stack.push((iter, parent_id, path));
parent_id = mailbox.mailbox_id + 1;
path = mailbox_path;
iter = mailboxes.iter();
}
// Obtain folder name
let (mailbox_name, did_rename) = if mailbox_id != INBOX_ID || account.prefix.is_some() {
// If there is another mailbox called Inbox, rename it to avoid conflicts
if parent_id.is_none() || !mailbox_name.eq_ignore_ascii_case("inbox") {
(mailbox_name, false)
} else {
("INBOX 2", true)
}
}
if let Some((prev_iter, prev_parent_id, prev_path)) = iter_stack.pop() {
iter = prev_iter;
parent_id = prev_parent_id;
path = prev_path;
} else {
break;
}
("INBOX", true)
};
// Map special use folder aliases to their internal ids
let effective_mailbox_id = self
.server
.core
.jmap
.default_folders
.iter()
.find(|f| f.name == mailbox_name || f.aliases.iter().any(|a| a == mailbox_name))
.and_then(|f| special_uses.get(&f.special_use))
.copied()
.unwrap_or(mailbox_id);
// Update mailbox name
let full_name = if let Some(parent_id) = parent_id {
let full_name = format!(
"{}/{}",
mailboxes
.get(&parent_id)
.map(|m| m.name.as_str())
.unwrap_or_default(),
mailbox_name
);
mailboxes.get_mut(&mailbox_id).unwrap().name = full_name.clone();
full_name
} else if let Some(prefix) = &account.prefix {
let full_name = format!("{prefix}/{mailbox_name}");
mailboxes.get_mut(&mailbox_id).unwrap().name = full_name.clone();
full_name
} else if did_rename {
let full_name = mailbox_name.to_string();
mailboxes.get_mut(&mailbox_id).unwrap().name = full_name.clone();
full_name
} else {
mailbox_name.to_string()
};
// Insert mailbox
account
.mailbox_names
.insert(full_name, effective_mailbox_id);
}
// Update cache

View file

@ -114,7 +114,7 @@ impl<T: SessionStream> SessionData<T> {
// Update mailbox cache
for account in self.mailboxes.lock().iter_mut() {
if account.account_id == account_id {
account.mailbox_names.remove(&arguments.mailbox_name);
account.mailbox_names.shift_remove(&arguments.mailbox_name);
account.mailbox_state.remove(&mailbox_id);
break;
}

View file

@ -241,7 +241,7 @@ impl<T: SessionStream> SessionData<T> {
}
}
new_mailbox_names.insert(params.full_path, mailbox_id);
account.mailbox_names = new_mailbox_names;
account.mailbox_names = new_mailbox_names.into_iter().collect();
break;
}
}

View file

@ -4,7 +4,7 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use common::{Server, auth::AccessToken};
use common::{Server, auth::AccessToken, storage::folder::TopologyBuilder};
use email::mailbox::manage::MailboxFnc;
use jmap_proto::{
method::query::{Comparator, Filter, QueryRequest, QueryResponse, SortProperty},
@ -16,7 +16,6 @@ use store::{
ahash::{AHashMap, AHashSet},
query::{self, sort::Pagination},
roaring::RoaringBitmap,
write::Archive,
};
use crate::{JmapMethods, UpdateResults};
@ -115,29 +114,18 @@ impl MailboxQuery for Server {
let (mut response, mut paginate) = self.build_query_response(&result_set, &request).await?;
// Build mailbox tree
let mut hierarchy = AHashMap::default();
let mut tree = AHashMap::default();
let mut topology;
if (filter_as_tree || sort_as_tree)
&& (paginate.is_some()
|| (response.total.is_some_and(|total| total > 0) && filter_as_tree))
{
for (document_id, value) in self
.get_properties::<Archive, _, _>(
account_id,
Collection::Mailbox,
&mailbox_ids,
Property::Value,
)
.await?
{
let todo = "use index";
let mailbox = value.unarchive::<email::mailbox::Mailbox>()?;
let parent_id = u32::from(mailbox.parent_id);
hierarchy.insert(document_id + 1, parent_id);
tree.entry(parent_id)
.or_insert_with(AHashSet::default)
.insert(document_id + 1);
}
topology = FolderTopology::with_capacity(mailbox_ids.len() as usize);
self.fetch_folder_topology::<FolderTopology>(
account_id,
Collection::Mailbox,
&mut topology,
)
.await?;
if filter_as_tree {
let mut filtered_ids = RoaringBitmap::new();
@ -147,7 +135,7 @@ impl MailboxQuery for Server {
let mut jmap_id = document_id + 1;
for _ in 0..self.core.jmap.mailbox_max_depth {
if let Some(&parent_id) = hierarchy.get(&jmap_id) {
if let Some(&parent_id) = topology.hierarchy.get(&jmap_id) {
if parent_id == 0 {
keep = true;
break;
@ -178,6 +166,8 @@ impl MailboxQuery for Server {
result_set.results = filtered_ids;
}
}
} else {
topology = FolderTopology::with_capacity(0);
}
if let Some(mut paginate) = paginate {
@ -218,13 +208,14 @@ impl MailboxQuery for Server {
let mut jmap_id = 0;
'outer: for _ in 0..(response.ids.len() * 10 * self.core.jmap.mailbox_max_depth) {
let (mut children, mut it) = if let Some(children) = tree.remove(&jmap_id) {
(children, response.ids.iter())
} else if let Some(prev) = stack.pop() {
prev
} else {
break;
};
let (mut children, mut it) =
if let Some(children) = topology.tree.remove(&jmap_id) {
(children, response.ids.iter())
} else if let Some(prev) = stack.pop() {
prev
} else {
break;
};
while let Some(&id) = it.next() {
let next_id = id.document_id() + 1;
@ -256,3 +247,27 @@ impl MailboxQuery for Server {
Ok(response)
}
}
struct FolderTopology {
hierarchy: AHashMap<u32, u32>,
tree: AHashMap<u32, AHashSet<u32>>,
}
impl FolderTopology {
pub fn with_capacity(capacity: usize) -> Self {
Self {
hierarchy: AHashMap::with_capacity(capacity),
tree: AHashMap::with_capacity(capacity),
}
}
}
impl TopologyBuilder for FolderTopology {
fn insert(&mut self, document_id: u32, parent_id: u32) {
self.hierarchy.insert(document_id + 1, parent_id);
self.tree
.entry(parent_id)
.or_default()
.insert(document_id + 1);
}
}

View file

@ -77,7 +77,10 @@ impl EmailSubmissionSet for Server {
let mut changes = ChangeLogBuilder::new();
let mut success_email_ids = HashMap::new();
for (id, object) in request.unwrap_create() {
match Box::pin(self.send_message(account_id, &response, instance, object)).await? {
match self
.send_message(account_id, &response, instance, object)
.await?
{
Ok(submission) => {
// Add id mapping
success_email_ids.insert(
@ -597,7 +600,7 @@ impl EmailSubmissionSet for Server {
Session::<NullIo>::local(self.clone(), instance.clone(), SessionData::default());
// MAIL FROM
let _ = session.handle_mail_from(mail_from).await;
let _ = Box::pin(session.handle_mail_from(mail_from)).await;
if let Some(error) = session.has_failed() {
return Ok(Err(SetError::new(SetErrorType::ForbiddenMailFrom)
.with_description(format!(
@ -611,7 +614,7 @@ impl EmailSubmissionSet for Server {
let mut has_success = false;
for rcpt in rcpt_to {
let addr = rcpt.address.clone();
let _ = session.handle_rcpt_to(rcpt).await;
let _ = Box::pin(session.handle_rcpt_to(rcpt)).await;
let response = session.has_failed();
if response.is_none() {
has_success = true;
@ -622,7 +625,7 @@ impl EmailSubmissionSet for Server {
// DATA
if has_success {
session.data.message = message;
let response = session.queue_message().await;
let response = Box::pin(session.queue_message()).await;
if let State::Accepted(queue_id) = session.state {
submission.queue_id = Some(queue_id);
} else {

View file

@ -15,7 +15,7 @@ serde = { version = "1.0", features = ["derive"]}
mail-auth = { version = "0.6" }
smtp-proto = { version = "0.1" }
mail-send = { version = "0.5", default-features = false, features = ["cram-md5", "ring", "tls12"] }
ahash = { version = "0.8" }
ahash = { version = "0.8", features = ["serde"] }
chrono = "0.4"
rand = "0.9.0"
webpki-roots = { version = "0.26"}

View file

@ -13,6 +13,7 @@ pub mod glob;
pub mod json;
pub mod map;
pub mod snowflake;
pub mod topological;
pub mod url_params;
use futures::StreamExt;

View file

@ -0,0 +1,120 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use ahash::AHashMap;
use std::{collections::VecDeque, hash::Hash};
#[derive(Debug)]
pub struct TopologicalSort<T: Copy + Eq + Hash> {
edges: AHashMap<T, Vec<T>>,
count: AHashMap<T, usize>,
}
impl<T: Copy + Eq + Hash + std::fmt::Debug> TopologicalSort<T> {
pub fn with_capacity(capacity: usize) -> Self {
Self {
edges: AHashMap::with_capacity(capacity),
count: AHashMap::with_capacity(capacity),
}
}
pub fn insert(&mut self, from: T, to: T) {
self.count.entry(from).or_insert(0);
self.edges.entry(from).or_default().push(to);
*self.count.entry(to).or_insert(0) += 1;
}
pub fn into_iterator(mut self) -> TopologicalSortIterator<T> {
let mut no_edges = VecDeque::with_capacity(self.count.len());
self.count.retain(|node, count| {
if *count == 0 {
no_edges.push_back(*node);
false
} else {
true
}
});
TopologicalSortIterator {
edges: self.edges,
count: self.count,
no_edges,
}
}
}
#[derive(Debug)]
pub struct TopologicalSortIterator<T: Copy + Eq + Hash> {
edges: AHashMap<T, Vec<T>>,
count: AHashMap<T, usize>,
no_edges: VecDeque<T>,
}
impl<T: Copy + Eq + Hash> Iterator for TopologicalSortIterator<T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
let no_edge = self.no_edges.pop_back()?;
if let Some(edges) = self.edges.get(&no_edge) {
for neighbor in edges {
if let Some(count) = self.count.get_mut(neighbor) {
*count -= 1;
if *count == 0 {
self.count.remove(neighbor);
self.no_edges.push_front(*neighbor);
}
}
}
}
Some(no_edge)
}
}
impl<T: Copy + Eq + Hash> TopologicalSortIterator<T> {
pub fn is_valid(&self) -> bool {
self.count.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_topological_sort() {
let mut sort = TopologicalSort::with_capacity(6);
sort.insert(1, 2);
sort.insert(1, 3);
sort.insert(2, 4);
sort.insert(3, 4);
sort.insert(4, 5);
sort.insert(5, 6);
let mut iter = sort.into_iterator();
assert_eq!(iter.next(), Some(1));
assert_eq!(iter.next(), Some(2));
assert_eq!(iter.next(), Some(3));
assert_eq!(iter.next(), Some(4));
assert_eq!(iter.next(), Some(5));
assert_eq!(iter.next(), Some(6));
assert_eq!(iter.next(), None);
assert!(iter.is_valid(), "{:?}", iter);
}
#[test]
fn test_topological_sort_cycle() {
let mut sort = TopologicalSort::with_capacity(6);
sort.insert(1, 2);
sort.insert(2, 3);
sort.insert(3, 1);
let mut iter = sort.into_iterator();
assert_eq!(iter.next(), None);
assert!(!iter.is_valid());
}
}

View file

@ -2,12 +2,12 @@ require ["fileinto", "mailbox", "mailboxid", "special-use", "ihave", "imap4flags
# SpecialUse extension tests
if not specialuse_exists ["inbox", "trash"] {
error "Special-use mailboxes INBOX or TRASH do not exist.";
error "Special-use mailboxes INBOX or TRASH do not exist (lowercase).";
}
if not anyof(specialuse_exists "Inbox" "inbox",
specialuse_exists "Deleted Items" "trash") {
error "Special-use mailboxes INBOX or TRASH do not exist.";
error "Special-use mailboxes INBOX or TRASH do not exist (mixed-case).";
}
if specialuse_exists "dingleberry" {

View file

@ -374,7 +374,7 @@ pub async fn jmap_tests() {
.await;
webhooks::test(&mut params).await;
/*email_query::test(&mut params, delete).await;
email_query::test(&mut params, delete).await;
email_get::test(&mut params).await;
email_set::test(&mut params).await;
email_parse::test(&mut params).await;
@ -387,7 +387,7 @@ pub async fn jmap_tests() {
mailbox::test(&mut params).await;
delivery::test(&mut params).await;
auth_acl::test(&mut params).await;
auth_limits::test(&mut params).await;*/
auth_limits::test(&mut params).await;
auth_oauth::test(&mut params).await;
event_source::test(&mut params).await;
push_subscription::test(&mut params).await;