DAV storage methods

This commit is contained in:
mdecimus 2025-03-06 18:35:46 +01:00
parent 3554dea4cb
commit eadd36f4cb
35 changed files with 1289 additions and 206 deletions

9
Cargo.lock generated
View file

@ -1678,7 +1678,11 @@ dependencies = [
"hashify",
"http_proto",
"hyper 1.6.0",
"jmap_proto",
"percent-encoding",
"store",
"trc",
"utils",
]
[[package]]
@ -1690,6 +1694,7 @@ dependencies = [
"hyper 1.6.0",
"mail-parser",
"quick-xml 0.37.2",
"rkyv 0.8.10",
]
[[package]]
@ -2733,11 +2738,14 @@ version = "0.11.5"
dependencies = [
"calcard",
"common",
"dav-proto",
"directory",
"hashify",
"jmap_proto",
"percent-encoding",
"rkyv 0.8.10",
"tokio",
"trc",
"utils",
]
@ -3120,6 +3128,7 @@ dependencies = [
"http-body-util",
"hyper 1.6.0",
"hyper-util",
"percent-encoding",
"serde",
"serde_json",
"trc",

View file

@ -254,3 +254,25 @@ impl CredentialsUsername for Credentials<String> {
}
}
}
pub trait AsTenantId {
fn tenant_id(&self) -> Option<u32>;
}
impl AsTenantId for Option<u32> {
fn tenant_id(&self) -> Option<u32> {
*self
}
}
impl AsTenantId for AccessToken {
fn tenant_id(&self) -> Option<u32> {
self.tenant.map(|t| t.id)
}
}
impl AsTenantId for ResourceToken {
fn tenant_id(&self) -> Option<u32> {
self.tenant.map(|t| t.id)
}
}

View file

@ -9,14 +9,22 @@ use utils::config::Config;
#[derive(Debug, Clone, Default)]
pub struct DavConfig {
pub max_request_size: usize,
pub dead_property_size: Option<usize>,
pub live_property_size: usize,
}
impl DavConfig {
pub fn parse(config: &mut Config) -> Self {
DavConfig {
max_request_size: config
.property("dav.limits.request-size")
.property("dav.limits.size.request")
.unwrap_or(25 * 1024 * 1024),
dead_property_size: config
.property_or_default::<Option<usize>>("dav.limits.size.dead-property", "1024")
.unwrap_or(Some(1024)),
live_property_size: config
.property("dav.limits.size.live-property")
.unwrap_or(250),
}
}
}

View file

@ -125,6 +125,12 @@ impl Caches {
MB_10,
(std::mem::size_of::<Threads>() + (500 * std::mem::size_of::<u64>())) as u64,
),
files: Cache::from_config(
config,
"file",
MB_10,
(std::mem::size_of::<Threads>() + (500 * std::mem::size_of::<u64>())) as u64,
),
bayes: CacheWithTtl::from_config(
config,
"bayes",

View file

@ -45,6 +45,7 @@ use rustls::sign::CertifiedKey;
use tokio::sync::{Notify, Semaphore, mpsc};
use tokio_rustls::TlsConnector;
use utils::{
bimap::IdBimap,
cache::{Cache, CacheItemWeight, CacheWithTtl},
snowflake::SnowflakeIdGenerator,
};
@ -143,6 +144,7 @@ pub struct Caches {
pub account: Cache<AccountId, Arc<Account>>,
pub mailbox: Cache<MailboxId, Arc<MailboxState>>,
pub threads: Cache<u32, Arc<Threads>>,
pub files: Cache<u32, Arc<Files>>,
pub bayes: CacheWithTtl<TokenHash, Weights>,
@ -244,6 +246,15 @@ pub struct Threads {
pub modseq: Option<u64>,
}
pub struct NameWrapper(pub String);
#[derive(Debug, Default)]
pub struct Files {
pub files: IdBimap,
pub size: u64,
pub modseq: Option<u64>,
}
#[derive(Clone, Default)]
pub struct Core {
pub storage: Storage,
@ -297,6 +308,12 @@ impl CacheItemWeight for HttpAuthCache {
}
}
impl CacheItemWeight for Files {
fn weight(&self) -> u64 {
self.size
}
}
impl MailboxState {
pub fn calculate_weight(&self) -> u64 {
std::mem::size_of::<MailboxState>() as u64
@ -412,6 +429,7 @@ impl Default for Caches {
account: Cache::new(1024, 10 * 1024 * 1024),
mailbox: Cache::new(1024, 10 * 1024 * 1024),
threads: Cache::new(1024, 10 * 1024 * 1024),
files: Cache::new(1024, 10 * 1024 * 1024),
bayes: CacheWithTtl::new(1024, 10 * 1024 * 1024),
dns_rbl: CacheWithTtl::new(1024, 10 * 1024 * 1024),
dns_txt: CacheWithTtl::new(1024, 10 * 1024 * 1024),

View file

@ -142,6 +142,14 @@ impl Server {
}
impl ExpandedFolders {
pub fn len(&self) -> usize {
self.names.len()
}
pub fn is_empty(&self) -> bool {
self.names.is_empty()
}
pub fn format<T>(mut self, formatter: T) -> Self
where
T: Fn(u32, &str) -> Option<String>,

View file

@ -9,10 +9,13 @@ use std::{borrow::Cow, collections::HashSet, fmt::Debug};
use store::{
Serialize, SerializeInfallible,
write::{
Archiver, BatchBuilder, BitmapClass, DirectoryClass, IntoOperations, Operation, ValueOp,
assert::HashedValue,
Archiver, BatchBuilder, BitmapClass, BlobOp, DirectoryClass, IntoOperations, Operation,
ValueOp, assert::HashedValue,
},
};
use utils::BlobHash;
use crate::auth::AsTenantId;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum IndexValue<'x> {
@ -21,6 +24,7 @@ pub enum IndexValue<'x> {
U64 { field: u8, value: Option<u64> },
U32List { field: u8, value: &'x [u32] },
Tag { field: u8, is_set: bool },
Blob { value: BlobHash },
Quota { used: u32 },
Acl { value: &'x [AclGrant] },
}
@ -91,14 +95,10 @@ impl<T: IndexableObject> ObjectIndexBuilder<T> {
self.current.as_ref()
}
pub fn with_tenant_id(mut self, tenant_id: Option<u32>) -> Self {
self.tenant_id = tenant_id;
pub fn with_tenant_id(mut self, tenant: &impl AsTenantId) -> Self {
self.tenant_id = tenant.tenant_id();
self
}
pub fn set_tenant_id(&mut self, tenant_id: u32) {
self.tenant_id = tenant_id.into();
}
}
impl<T: IndexableObject> IntoOperations for ObjectIndexBuilder<T> {
@ -182,6 +182,13 @@ fn build_batch<T: IndexableObject>(
});
}
}
IndexValue::Blob { value } => {
if set {
batch.set(BlobOp::Link { hash: value }, vec![]);
} else {
batch.clear(BlobOp::Link { hash: value });
}
}
IndexValue::Acl { value } => {
for item in value {
batch.ops.push(Operation::acl(
@ -354,6 +361,10 @@ fn merge_batch<T: IndexableObject>(
});
}
}
(IndexValue::Blob { value: old_hash }, IndexValue::Blob { value: new_hash }) => {
batch.clear(BlobOp::Link { hash: old_hash });
batch.set(BlobOp::Link { hash: new_hash }, vec![]);
}
(IndexValue::Acl { value: old_acl }, IndexValue::Acl { value: new_acl }) => {
match (!old_acl.is_empty(), !new_acl.is_empty()) {
(true, true) => {

View file

@ -7,12 +7,16 @@ resolver = "2"
[dependencies]
dav-proto = { path = "/Users/me/code/dav-proto" }
common = { path = "../common" }
store = { path = "../store" }
utils = { path = "../utils" }
groupware = { path = "../groupware" }
directory = { path = "../directory" }
http_proto = { path = "../http-proto" }
jmap_proto = { path = "../jmap-proto" }
trc = { path = "../trc" }
hashify = { version = "0.2" }
hyper = { version = "1.0.1", features = ["server", "http1", "http2"] }
percent-encoding = "2.3.1"
[dev-dependencies]

View file

@ -0,0 +1,56 @@
use common::{Server, auth::AccessToken};
use hyper::StatusCode;
use jmap_proto::types::{acl::Acl, collection::Collection};
use trc::AddContext;
use crate::DavError;
pub(crate) trait DavAclHandler: Sync + Send {
fn validate_and_map_parent_acl(
&self,
access_token: &AccessToken,
account_id: u32,
collection: Collection,
parent_id: Option<u32>,
check_acls: Acl,
) -> impl Future<Output = crate::Result<u32>> + Send;
}
impl DavAclHandler for Server {
async fn validate_and_map_parent_acl(
&self,
access_token: &AccessToken,
account_id: u32,
collection: Collection,
parent_id: Option<u32>,
check_acls: Acl,
) -> crate::Result<u32> {
match parent_id {
Some(parent_id) => {
if access_token.is_member(account_id)
|| self
.has_access_to_document(
access_token,
account_id,
collection,
parent_id,
check_acls,
)
.await
.caused_by(trc::location!())?
{
Ok(parent_id + 1)
} else {
Err(DavError::Code(StatusCode::FORBIDDEN))
}
}
None => {
if access_token.is_member(account_id) {
Ok(0)
} else {
Err(DavError::Code(StatusCode::FORBIDDEN))
}
}
}
}
}

View file

@ -0,0 +1,2 @@
pub mod acl;
pub mod uri;

View file

@ -0,0 +1,84 @@
use common::{Server, auth::AccessToken};
use directory::backend::internal::manage::ManageDirectory;
use http_proto::request::decode_path_element;
use hyper::StatusCode;
use jmap_proto::types::collection::Collection;
use trc::AddContext;
use crate::{DavError, DavResource};
pub(crate) struct UriResource<T> {
pub collection: Collection,
pub account_id: Option<u32>,
pub resource: T,
}
pub(crate) trait DavUriResource: Sync + Send {
fn validate_uri<'x>(
&self,
access_token: &AccessToken,
uri: &'x str,
) -> impl Future<Output = crate::Result<UriResource<Option<&'x str>>>> + Send;
}
impl DavUriResource for Server {
async fn validate_uri<'x>(
&self,
access_token: &AccessToken,
uri: &'x str,
) -> crate::Result<UriResource<Option<&'x str>>> {
let (_, uri_parts) = uri
.split_once("/dav/")
.ok_or(DavError::Code(StatusCode::NOT_FOUND))?;
let mut uri_parts = uri_parts.splitn(3, '/').filter(|x| !x.is_empty());
let mut resource = UriResource {
collection: uri_parts
.next()
.and_then(DavResource::parse)
.ok_or(DavError::Code(StatusCode::NOT_FOUND))?
.into(),
account_id: None,
resource: None,
};
if let Some(account) = uri_parts.next() {
// Parse account id
let account_id = if let Some(account_id) = account.strip_prefix('_') {
account_id
.parse::<u32>()
.map_err(|_| DavError::Code(StatusCode::NOT_FOUND))?
} else {
let account = decode_path_element(account);
if access_token.name == account {
access_token.primary_id
} else {
self.store()
.get_principal_id(&account)
.await
.caused_by(trc::location!())?
.ok_or(DavError::Code(StatusCode::NOT_FOUND))?
}
};
// Validate access
if resource.collection != Collection::Principal
&& !access_token.has_access(account_id, resource.collection)
{
return Err(DavError::Code(StatusCode::FORBIDDEN));
}
// Obtain remaining path
resource.account_id = Some(account_id);
resource.resource = uri_parts.next().map(|uri| uri.trim_end_matches('/'));
}
Ok(resource)
}
}
impl<T> UriResource<T> {
pub fn account_id(&self) -> crate::Result<u32> {
self.account_id.ok_or(DavError::Code(StatusCode::NOT_FOUND))
}
}

View file

@ -4,17 +4,32 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use common::{Server, auth::AccessToken};
use dav_proto::{RequestHeaders, schema::request::Acl};
use common::{Server, auth::AccessToken, sharing::EffectiveAcl};
use dav_proto::RequestHeaders;
use groupware::file::ArchivedFileNode;
use http_proto::HttpResponse;
use hyper::StatusCode;
use jmap_proto::types::{acl::Acl, collection::Collection};
use trc::AddContext;
use crate::DavError;
pub(crate) trait FileAclRequestHandler: Sync + Send {
fn handle_file_acl_request(
&self,
access_token: &AccessToken,
headers: RequestHeaders<'_>,
request: Acl,
request: dav_proto::schema::request::Acl,
) -> impl Future<Output = crate::Result<HttpResponse>> + Send;
fn validate_file_acl(
&self,
access_token: &AccessToken,
account_id: u32,
node: &ArchivedFileNode,
acl_child: Acl,
acl_parent: Acl,
) -> impl Future<Output = crate::Result<()>> + Send;
}
impl FileAclRequestHandler for Server {
@ -22,8 +37,36 @@ impl FileAclRequestHandler for Server {
&self,
access_token: &AccessToken,
headers: RequestHeaders<'_>,
request: Acl,
request: dav_proto::schema::request::Acl,
) -> crate::Result<HttpResponse> {
todo!()
}
async fn validate_file_acl(
&self,
access_token: &AccessToken,
account_id: u32,
node: &ArchivedFileNode,
acl_child: Acl,
acl_parent: Acl,
) -> crate::Result<()> {
if access_token.is_member(account_id)
|| node.acls.effective_acl(access_token).contains(acl_child)
|| (u32::from(node.parent_id) > 0
&& self
.has_access_to_document(
access_token,
account_id,
Collection::FileNode,
u32::from(node.parent_id) - 1,
acl_parent,
)
.await
.caused_by(trc::location!())?)
{
Ok(())
} else {
Err(DavError::Code(StatusCode::FORBIDDEN))
}
}
}

View file

@ -4,9 +4,20 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use common::{Server, auth::AccessToken};
use dav_proto::RequestHeaders;
use common::{Server, auth::AccessToken};
use dav_proto::{RequestHeaders, schema::property::Rfc1123DateTime};
use groupware::file::{FileNode, hierarchy::FileHierarchy};
use http_proto::HttpResponse;
use hyper::StatusCode;
use jmap_proto::types::{acl::Acl, collection::Collection, property::Property};
use store::write::Archive;
use trc::AddContext;
use crate::{
DavError,
common::uri::DavUriResource,
file::{DavFileResource, acl::FileAclRequestHandler},
};
pub(crate) trait FileGetRequestHandler: Sync + Send {
fn handle_file_get_request(
@ -24,6 +35,57 @@ impl FileGetRequestHandler for Server {
headers: RequestHeaders<'_>,
is_head: bool,
) -> crate::Result<HttpResponse> {
todo!()
// Validate URI
let resource = self.validate_uri(access_token, headers.uri).await?;
let account_id = resource.account_id()?;
let files = self
.fetch_file_hierarchy(account_id)
.await
.caused_by(trc::location!())?;
let resource = files.map_resource(resource)?;
// Fetch node
let node_ = self
.get_property::<Archive>(
account_id,
Collection::FileNode,
resource.resource,
Property::Value,
)
.await
.caused_by(trc::location!())?
.ok_or(DavError::Code(StatusCode::NOT_FOUND))?;
let node = node_.unarchive::<FileNode>().caused_by(trc::location!())?;
// Validate ACL
self.validate_file_acl(access_token, account_id, node, Acl::Read, Acl::ReadItems)
.await?;
let (hash, size, content_type) = if let Some(file) = node.file.as_ref() {
(
file.blob_hash.0.as_ref(),
u32::from(file.size) as usize,
file.media_type.as_ref().map(|s| s.as_str()),
)
} else {
return Err(DavError::Code(StatusCode::METHOD_NOT_ALLOWED));
};
let response = HttpResponse::new(StatusCode::OK)
.with_content_type(content_type.unwrap_or("application/octet-stream"))
.with_etag(u64::from(node.change_id))
.with_last_modified(Rfc1123DateTime::new(i64::from(node.modified)).to_string());
if !is_head {
Ok(response.with_binary_body(
self.blob_store()
.get_blob(hash, 0..usize::MAX)
.await
.caused_by(trc::location!())?
.ok_or(DavError::Code(StatusCode::NOT_FOUND))?,
))
} else {
Ok(response.with_content_length(size))
}
}
}

View file

@ -4,9 +4,24 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use common::{Server, auth::AccessToken};
use dav_proto::{RequestHeaders, schema::request::MkCol};
use common::{Server, auth::AccessToken, storage::index::ObjectIndexBuilder};
use dav_proto::{
RequestHeaders,
schema::{Namespace, request::MkCol, response::MkColResponse},
};
use groupware::file::{FileNode, hierarchy::FileHierarchy};
use http_proto::HttpResponse;
use hyper::StatusCode;
use jmap_proto::types::{acl::Acl, collection::Collection};
use store::write::{BatchBuilder, log::LogInsert, now};
use trc::AddContext;
use crate::{
common::{acl::DavAclHandler, uri::DavUriResource},
file::DavFileResource,
};
use super::proppatch::FilePropPatchRequestHandler;
pub(crate) trait FileMkColRequestHandler: Sync + Send {
fn handle_file_mkcol_request(
@ -24,6 +39,66 @@ impl FileMkColRequestHandler for Server {
headers: RequestHeaders<'_>,
request: Option<MkCol>,
) -> crate::Result<HttpResponse> {
todo!()
// Validate URI
let resource = self.validate_uri(access_token, headers.uri).await?;
let account_id = resource.account_id()?;
let files = self
.fetch_file_hierarchy(account_id)
.await
.caused_by(trc::location!())?;
let resource = files.map_parent_resource(resource)?;
// Build collection
let parent_id = self
.validate_and_map_parent_acl(
access_token,
account_id,
Collection::FileNode,
resource.resource.0,
Acl::CreateChild,
)
.await?;
let change_id = self.generate_snowflake_id().caused_by(trc::location!())?;
let now = now();
let mut node = FileNode {
parent_id,
name: resource.resource.1.into_owned(),
display_name: None,
file: None,
created: now as i64,
modified: now as i64,
change_id,
dead_properties: Default::default(),
acls: Default::default(),
};
// Apply MKCOL properties
if let Some(mkcol) = request {
let mut prop_stat = Vec::new();
if !self.apply_file_properties(&mut node, false, mkcol.props, &mut prop_stat) {
return Ok(HttpResponse::new(StatusCode::FORBIDDEN).with_xml_body(
MkColResponse::new(prop_stat)
.with_namespace(Namespace::Dav)
.to_string(),
));
}
}
// Prepare write batch
let mut batch = BatchBuilder::new();
batch
.with_change_id(change_id)
.with_account_id(account_id)
.with_collection(Collection::FileNode)
.create_document()
.log(LogInsert())
.custom(ObjectIndexBuilder::new().with_changes(node))
.caused_by(trc::location!())?;
self.store()
.write(batch)
.await
.caused_by(trc::location!())?;
Ok(HttpResponse::new(StatusCode::CREATED))
}
}

View file

@ -4,7 +4,14 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
pub mod acl;
use std::borrow::Cow;
use common::Files;
use hyper::StatusCode;
use crate::{DavError, common::uri::UriResource};
pub mod acl;
pub mod changes;
pub mod copy_move;
pub mod delete;
@ -15,8 +22,92 @@ pub mod propfind;
pub mod proppatch;
pub mod update;
pub(crate) enum UpdateType {
Post(Vec<u8>),
Put(Vec<u8>),
Patch(Vec<u8>),
pub(crate) trait DavFileResource {
fn map_resource(&self, resource: UriResource<Option<&str>>) -> crate::Result<UriResource<u32>>;
fn map_resource_or_root(
&self,
resource: UriResource<Option<&str>>,
) -> crate::Result<UriResource<Option<u32>>>;
fn map_parent<'x>(&self, resource: &'x str) -> crate::Result<(Option<u32>, Cow<'x, str>)>;
fn map_parent_resource<'x>(
&self,
resource: UriResource<Option<&'x str>>,
) -> crate::Result<UriResource<(Option<u32>, Cow<'x, str>)>>;
}
impl DavFileResource for Files {
fn map_resource(&self, resource: UriResource<Option<&str>>) -> crate::Result<UriResource<u32>> {
resource
.resource
.and_then(|r| self.files.by_name(r))
.map(|r| UriResource {
collection: resource.collection,
account_id: resource.account_id,
resource: r,
})
.ok_or(DavError::Code(StatusCode::NOT_FOUND))
}
fn map_resource_or_root(
&self,
resource: UriResource<Option<&str>>,
) -> crate::Result<UriResource<Option<u32>>> {
Ok(UriResource {
collection: resource.collection,
account_id: resource.account_id,
resource: if let Some(resource) = resource.resource {
Some(
self.files
.by_name(resource)
.ok_or(DavError::Code(StatusCode::NOT_FOUND))?,
)
} else {
None
},
})
}
fn map_parent<'x>(&self, resource: &'x str) -> crate::Result<(Option<u32>, Cow<'x, str>)> {
let (parent, child) = if let Some((parent, child)) = resource.rsplit_once('/') {
(
Some(
self.files
.by_name(parent)
.ok_or(DavError::Code(StatusCode::NOT_FOUND))?,
),
child,
)
} else {
(None, resource)
};
Ok((
parent,
percent_encoding::percent_decode_str(child)
.decode_utf8()
.unwrap_or_else(|_| child.into()),
))
}
fn map_parent_resource<'x>(
&self,
resource: UriResource<Option<&'x str>>,
) -> crate::Result<UriResource<(Option<u32>, Cow<'x, str>)>> {
if let Some(r) = resource.resource {
if self.files.by_name(r).is_none() {
self.map_parent(r).map(|r| UriResource {
collection: resource.collection,
account_id: resource.account_id,
resource: r,
})
} else {
Err(DavError::Code(StatusCode::METHOD_NOT_ALLOWED))
}
} else {
Err(DavError::Code(StatusCode::METHOD_NOT_ALLOWED))
}
}
}

View file

@ -4,9 +4,27 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use common::{Server, auth::AccessToken};
use dav_proto::{RequestHeaders, schema::request::PropertyUpdate};
use common::{Server, auth::AccessToken, storage::index::ObjectIndexBuilder};
use dav_proto::{
RequestHeaders,
schema::{
property::{DavProperty, DavValue, ResourceType, WebDavProperty},
request::{DavPropertyValue, PropertyUpdate},
response::{BaseCondition, MultiStatus, PropStat, Response},
},
};
use groupware::file::{FileNode, hierarchy::FileHierarchy};
use http_proto::HttpResponse;
use hyper::StatusCode;
use jmap_proto::types::{acl::Acl, collection::Collection, property::Property};
use store::write::{Archive, BatchBuilder, assert::HashedValue, log::Changes, now};
use trc::AddContext;
use crate::{
DavError,
common::uri::DavUriResource,
file::{DavFileResource, acl::FileAclRequestHandler},
};
pub(crate) trait FilePropPatchRequestHandler: Sync + Send {
fn handle_file_proppatch_request(
@ -15,6 +33,14 @@ pub(crate) trait FilePropPatchRequestHandler: Sync + Send {
headers: RequestHeaders<'_>,
request: PropertyUpdate,
) -> impl Future<Output = crate::Result<HttpResponse>> + Send;
fn apply_file_properties(
&self,
file: &mut FileNode,
is_update: bool,
properties: Vec<DavPropertyValue>,
items: &mut Vec<PropStat>,
) -> bool;
}
impl FilePropPatchRequestHandler for Server {
@ -24,6 +50,211 @@ impl FilePropPatchRequestHandler for Server {
headers: RequestHeaders<'_>,
request: PropertyUpdate,
) -> crate::Result<HttpResponse> {
todo!()
// Validate URI
let resource = self.validate_uri(access_token, headers.uri).await?;
let uri = headers.uri;
let account_id = resource.account_id()?;
let files = self
.fetch_file_hierarchy(account_id)
.await
.caused_by(trc::location!())?;
let resource = files.map_resource(resource)?;
// Fetch node
let node_ = self
.get_property::<HashedValue<Archive>>(
account_id,
Collection::FileNode,
resource.resource,
Property::Value,
)
.await
.caused_by(trc::location!())?
.ok_or(DavError::Code(StatusCode::NOT_FOUND))?;
let node = node_
.to_unarchived::<FileNode>()
.caused_by(trc::location!())?;
// Validate ACL
self.validate_file_acl(
access_token,
account_id,
node.inner,
Acl::Modify,
Acl::ModifyItems,
)
.await?;
let node = node.into_deserialized().caused_by(trc::location!())?;
let mut new_node = node.inner.clone();
// Remove properties
let mut items = Vec::with_capacity(request.remove.len() + request.set.len());
for property in request.remove {
match property {
DavProperty::WebDav(WebDavProperty::DisplayName) => {
new_node.display_name = None;
items.push(
PropStat::new(DavProperty::WebDav(WebDavProperty::DisplayName))
.with_status(StatusCode::OK),
);
}
DavProperty::WebDav(WebDavProperty::GetContentType) if new_node.file.is_some() => {
new_node.file.as_mut().unwrap().media_type = None;
items.push(
PropStat::new(DavProperty::WebDav(WebDavProperty::GetContentType))
.with_status(StatusCode::OK),
);
}
DavProperty::DeadProperty(dead) => {
new_node.dead_properties.remove_element(&dead);
items.push(
PropStat::new(DavProperty::DeadProperty(dead)).with_status(StatusCode::OK),
);
}
property => {
items.push(
PropStat::new(property)
.with_status(StatusCode::CONFLICT)
.with_response_description("Property cannot be modified"),
);
}
}
}
// Set properties
self.apply_file_properties(&mut new_node, true, request.set, &mut items);
if new_node != node.inner {
// Build node
new_node.modified = now() as i64;
new_node.change_id = self.generate_snowflake_id().caused_by(trc::location!())?;
// Prepare write batch
let mut batch = BatchBuilder::new();
batch
.with_change_id(new_node.change_id)
.with_account_id(account_id)
.with_collection(Collection::FileNode)
.update_document(resource.resource)
.log(Changes::update([resource.resource]))
.custom(
ObjectIndexBuilder::new()
.with_current(node)
.with_changes(new_node)
.with_tenant_id(access_token),
)
.caused_by(trc::location!())?;
self.store()
.write(batch)
.await
.caused_by(trc::location!())?;
}
Ok(HttpResponse::new(StatusCode::MULTI_STATUS)
.with_xml_body(MultiStatus::new(vec![Response::new_propstat(uri, items)]).to_string()))
}
fn apply_file_properties(
&self,
file: &mut FileNode,
is_update: bool,
properties: Vec<DavPropertyValue>,
items: &mut Vec<PropStat>,
) -> bool {
let mut has_errors = false;
for property in properties {
match (property.property, property.value) {
(DavProperty::WebDav(WebDavProperty::DisplayName), DavValue::String(name)) => {
if name.len() <= self.core.dav.live_property_size {
file.display_name = Some(name);
items.push(
PropStat::new(DavProperty::WebDav(WebDavProperty::DisplayName))
.with_status(StatusCode::OK),
);
} else {
items.push(
PropStat::new(DavProperty::WebDav(WebDavProperty::DisplayName))
.with_status(StatusCode::INSUFFICIENT_STORAGE)
.with_response_description("Display name too long"),
);
has_errors = true;
}
}
(DavProperty::WebDav(WebDavProperty::CreationDate), DavValue::Timestamp(dt)) => {
file.created = dt;
}
(DavProperty::WebDav(WebDavProperty::GetContentType), DavValue::String(name))
if file.file.is_some() =>
{
if name.len() <= self.core.dav.live_property_size {
file.file.as_mut().unwrap().media_type = Some(name);
items.push(
PropStat::new(DavProperty::WebDav(WebDavProperty::GetContentType))
.with_status(StatusCode::OK),
);
} else {
items.push(
PropStat::new(DavProperty::WebDav(WebDavProperty::GetContentType))
.with_status(StatusCode::INSUFFICIENT_STORAGE)
.with_response_description("Content-type is too long"),
);
has_errors = true;
}
}
(
DavProperty::WebDav(WebDavProperty::ResourceType),
DavValue::ResourceTypes(types),
) if file.file.is_none() => {
if types.0.len() != 1 || types.0.first() != Some(&ResourceType::Collection) {
items.push(
PropStat::new(DavProperty::WebDav(WebDavProperty::ResourceType))
.with_status(StatusCode::FORBIDDEN)
.with_error(BaseCondition::ValidResourceType),
);
has_errors = true;
} else {
items.push(
PropStat::new(DavProperty::WebDav(WebDavProperty::ResourceType))
.with_status(StatusCode::OK),
);
}
}
(DavProperty::DeadProperty(dead), DavValue::DeadProperty(values))
if self.core.dav.dead_property_size.is_some() =>
{
if is_update {
file.dead_properties.remove_element(&dead);
}
if file.dead_properties.size() + values.size() + dead.size()
< self.core.dav.dead_property_size.unwrap()
{
file.dead_properties.add_element(dead.clone(), values.0);
items.push(
PropStat::new(DavProperty::DeadProperty(dead))
.with_status(StatusCode::OK),
);
} else {
items.push(
PropStat::new(DavProperty::DeadProperty(dead))
.with_status(StatusCode::INSUFFICIENT_STORAGE)
.with_response_description("Dead property is too large."),
);
has_errors = true;
}
}
(property, _) => {
items.push(
PropStat::new(property)
.with_status(StatusCode::CONFLICT)
.with_response_description("Property cannot be modified"),
);
has_errors = true;
}
}
}
!has_errors
}
}

View file

@ -4,18 +4,36 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use common::{Server, auth::AccessToken};
use dav_proto::{RequestHeaders, schema::request::SyncCollection};
use common::{Server, auth::AccessToken, storage::index::ObjectIndexBuilder};
use dav_proto::RequestHeaders;
use groupware::file::{FileNode, FileProperties, hierarchy::FileHierarchy};
use http_proto::HttpResponse;
use hyper::StatusCode;
use jmap_proto::types::{acl::Acl, collection::Collection, property::Property};
use store::write::{
Archive, BatchBuilder,
assert::HashedValue,
log::{Changes, LogInsert},
now,
};
use trc::AddContext;
use utils::BlobHash;
use super::UpdateType;
use crate::{
DavError,
common::{acl::DavAclHandler, uri::DavUriResource},
file::DavFileResource,
};
use super::acl::FileAclRequestHandler;
pub(crate) trait FileUpdateRequestHandler: Sync + Send {
fn handle_file_update_request(
&self,
access_token: &AccessToken,
headers: RequestHeaders<'_>,
request: UpdateType,
bytes: Vec<u8>,
is_patch: bool,
) -> impl Future<Output = crate::Result<HttpResponse>> + Send;
}
@ -24,8 +42,193 @@ impl FileUpdateRequestHandler for Server {
&self,
access_token: &AccessToken,
headers: RequestHeaders<'_>,
request: UpdateType,
bytes: Vec<u8>,
_is_patch: bool,
) -> crate::Result<HttpResponse> {
todo!()
// Validate URI
let resource = self.validate_uri(access_token, headers.uri).await?;
let account_id = resource.account_id()?;
let files = self
.fetch_file_hierarchy(account_id)
.await
.caused_by(trc::location!())?;
let resource_name = resource
.resource
.ok_or(DavError::Code(StatusCode::NOT_FOUND))?;
if let Some(document_id) = files.files.by_name(resource_name) {
// Update
let node_archive_ = self
.get_property::<HashedValue<Archive>>(
account_id,
Collection::FileNode,
document_id,
Property::Value,
)
.await
.caused_by(trc::location!())?
.ok_or(DavError::Code(StatusCode::NOT_FOUND))?;
let node_archive = node_archive_
.to_unarchived::<FileNode>()
.caused_by(trc::location!())?;
let node = node_archive.inner;
// Validate ACL
self.validate_file_acl(
access_token,
account_id,
node,
Acl::Modify,
Acl::ModifyItems,
)
.await?;
// Verify that the node is a file
if let Some(file) = node.file.as_ref() {
if BlobHash::generate(&bytes).as_slice() == file.blob_hash.0.as_slice() {
return Ok(HttpResponse::new(StatusCode::OK));
}
} else {
return Err(DavError::Code(StatusCode::METHOD_NOT_ALLOWED));
}
// Validate quota
let extra_bytes = (bytes.len() as u64)
.saturating_sub(u32::from(node.file.as_ref().unwrap().size) as u64);
if extra_bytes > 0 {
self.has_available_quota(&access_token.as_resource_token(), extra_bytes)
.await?;
}
// Write blob
let blob_hash = self
.put_blob(account_id, &bytes, false)
.await
.caused_by(trc::location!())?
.hash;
// Build node
let change_id = self.generate_snowflake_id().caused_by(trc::location!())?;
let node = node_archive
.into_deserialized()
.caused_by(trc::location!())?;
let mut new_node = node.inner.clone();
let new_file = new_node.file.as_mut().unwrap();
new_file.blob_hash = blob_hash;
new_file.media_type = headers.content_type.map(|v| v.to_string());
new_file.size = bytes.len() as u32;
new_node.modified = now() as i64;
new_node.change_id = change_id;
// Prepare write batch
let mut batch = BatchBuilder::new();
batch
.with_change_id(change_id)
.with_account_id(account_id)
.with_collection(Collection::FileNode)
.update_document(document_id)
.log(Changes::update([document_id]))
.custom(
ObjectIndexBuilder::new()
.with_current(node)
.with_changes(new_node)
.with_tenant_id(access_token),
)
.caused_by(trc::location!())?;
self.store()
.write(batch)
.await
.caused_by(trc::location!())?;
Ok(HttpResponse::new(StatusCode::OK))
} else {
// Insert
let (parent_id, resource_name) = files.map_parent(resource_name)?;
// Validate ACL
let parent_id = self
.validate_and_map_parent_acl(
access_token,
account_id,
Collection::FileNode,
parent_id,
Acl::AddItems,
)
.await?;
// Verify that parent is a collection
if parent_id > 0
&& self
.get_property::<Archive>(
account_id,
Collection::FileNode,
parent_id - 1,
Property::Value,
)
.await
.caused_by(trc::location!())?
.ok_or(DavError::Code(StatusCode::NOT_FOUND))?
.unarchive::<FileNode>()
.caused_by(trc::location!())?
.file
.is_some()
{
return Err(DavError::Code(StatusCode::METHOD_NOT_ALLOWED));
}
// Validate quota
if !bytes.is_empty() {
self.has_available_quota(&access_token.as_resource_token(), bytes.len() as u64)
.await?;
}
// Write blob
let blob_hash = self
.put_blob(account_id, &bytes, false)
.await
.caused_by(trc::location!())?
.hash;
// Build node
let change_id = self.generate_snowflake_id().caused_by(trc::location!())?;
let now = now();
let node = FileNode {
parent_id,
name: resource_name.into_owned(),
display_name: None,
file: Some(FileProperties {
blob_hash,
size: bytes.len() as u32,
media_type: headers.content_type.map(|v| v.to_string()),
executable: false,
}),
created: now as i64,
modified: now as i64,
change_id,
dead_properties: Default::default(),
acls: Default::default(),
};
// Prepare write batch
let mut batch = BatchBuilder::new();
batch
.with_change_id(change_id)
.with_account_id(account_id)
.with_collection(Collection::FileNode)
.create_document()
.log(LogInsert())
.custom(
ObjectIndexBuilder::new()
.with_changes(node)
.with_tenant_id(access_token),
)
.caused_by(trc::location!())?;
self.store()
.write(batch)
.await
.caused_by(trc::location!())?;
Ok(HttpResponse::new(StatusCode::CREATED))
}
}
}

View file

@ -6,13 +6,15 @@
pub mod calendar;
pub mod card;
pub mod common;
pub mod file;
pub mod principal;
pub mod request;
use dav_proto::schema::request::Report;
use dav_proto::schema::{request::Report, response::Condition};
use http_proto::HttpResponse;
use hyper::{Method, StatusCode};
use jmap_proto::types::collection::Collection;
pub(crate) type Result<T> = std::result::Result<T, DavError>;
@ -47,7 +49,19 @@ pub enum DavMethod {
pub(crate) enum DavError {
Parse(dav_proto::parser::Error),
Internal(trc::Error),
UnsupportedReport(Report),
Condition(Condition),
Code(StatusCode),
}
impl From<DavResource> for Collection {
fn from(value: DavResource) -> Self {
match value {
DavResource::Card => Collection::AddressBook,
DavResource::Cal => Collection::Calendar,
DavResource::File => Collection::FileNode,
DavResource::Principal => Collection::Principal,
}
}
}
impl DavResource {
@ -56,7 +70,7 @@ impl DavResource {
"card" => DavResource::Card,
"cal" => DavResource::Cal,
"file" => DavResource::File,
"pri" => DavResource::Principal,
"pal" => DavResource::Principal,
)
}

View file

@ -10,7 +10,11 @@ use common::{Server, auth::AccessToken};
use dav_proto::{
RequestHeaders,
parser::{DavParser, tokenizer::Tokenizer},
schema::request::{Acl, LockInfo, MkCol, PropFind, PropertyUpdate, Report},
schema::{
Namespace,
request::{Acl, LockInfo, MkCol, PropFind, PropertyUpdate, Report},
response::{BaseCondition, ErrorResponse},
},
};
use directory::Permission;
use http_proto::{HttpRequest, HttpResponse, HttpSessionData, request::fetch_body};
@ -19,7 +23,7 @@ use hyper::{StatusCode, header};
use crate::{
DavError, DavMethod, DavResource,
file::{
UpdateType, acl::FileAclRequestHandler, changes::FileChangesRequestHandler,
acl::FileAclRequestHandler, changes::FileChangesRequestHandler,
copy_move::FileCopyMoveRequestHandler, delete::FileDeleteRequestHandler,
get::FileGetRequestHandler, lock::FileLockRequestHandler, mkcol::FileMkColRequestHandler,
propfind::FilePropFindRequestHandler, proppatch::FilePropPatchRequestHandler,
@ -65,6 +69,7 @@ impl DavRequestDispatcher for Server {
}
// Dispatch
let todo = "lock tokens, headers, etc";
match resource {
DavResource::Card => {
todo!()
@ -116,16 +121,12 @@ impl DavRequestDispatcher for Server {
self.handle_file_delete_request(&access_token, headers)
.await
}
DavMethod::PUT => {
self.handle_file_update_request(&access_token, headers, UpdateType::Put(body))
.await
}
DavMethod::POST => {
self.handle_file_update_request(&access_token, headers, UpdateType::Post(body))
DavMethod::PUT | DavMethod::POST => {
self.handle_file_update_request(&access_token, headers, body, false)
.await
}
DavMethod::PATCH => {
self.handle_file_update_request(&access_token, headers, UpdateType::Patch(body))
self.handle_file_update_request(&access_token, headers, body, true)
.await
}
DavMethod::COPY => {
@ -161,7 +162,7 @@ impl DavRequestDispatcher for Server {
self.handle_file_changes_request(&access_token, headers, sync_collection)
.await
}
report => Err(DavError::UnsupportedReport(report)),
_ => Err(DavError::Code(StatusCode::METHOD_NOT_ALLOWED)),
},
DavMethod::OPTIONS => unreachable!(),
},
@ -217,12 +218,36 @@ impl DavRequestHandler for Server {
{
Ok(response) => response,
Err(DavError::Internal(err)) => {
let is_quota_error = matches!(
err.event_type(),
trc::EventType::Limit(trc::LimitEvent::Quota | trc::LimitEvent::TenantQuota)
);
trc::error!(err.span_id(session.session_id));
HttpResponse::new(StatusCode::INTERNAL_SERVER_ERROR)
if is_quota_error {
HttpResponse::new(StatusCode::PRECONDITION_FAILED)
.with_xml_body(
ErrorResponse::new(BaseCondition::QuotaNotExceeded)
.with_namespace(resource)
.to_string(),
)
.with_no_cache()
} else {
HttpResponse::new(StatusCode::INTERNAL_SERVER_ERROR)
}
}
Err(DavError::UnsupportedReport(report)) => HttpResponse::new(StatusCode::BAD_REQUEST),
Err(DavError::Parse(err)) => HttpResponse::new(StatusCode::BAD_REQUEST),
Err(DavError::Condition(condition)) => {
HttpResponse::new(StatusCode::PRECONDITION_FAILED)
.with_xml_body(
ErrorResponse::new(condition)
.with_namespace(resource)
.to_string(),
)
.with_no_cache()
}
Err(DavError::Code(code)) => HttpResponse::new(code),
}
}
}
@ -238,3 +263,13 @@ impl From<trc::Error> for DavError {
DavError::Internal(err)
}
}
impl From<DavResource> for Namespace {
fn from(value: DavResource) -> Self {
match value {
DavResource::Card => Namespace::CardDav,
DavResource::Cal => Namespace::CalDav,
DavResource::File | DavResource::Principal => Namespace::Dav,
}
}
}

View file

@ -129,6 +129,7 @@ impl ManageDirectory for Store {
self.get_principal_info(name).await.map(|v| v.map(|v| v.id))
}
async fn get_principal_info(&self, name: &str) -> trc::Result<Option<PrincipalInfo>> {
let todo = "cache";
self.get_value::<PrincipalInfo>(ValueKey::from(ValueClass::Directory(
DirectoryClass::NameToId(name.as_bytes().to_vec()),
)))

View file

@ -6,7 +6,7 @@
use common::{Server, auth::ResourceToken, storage::index::ObjectIndexBuilder};
use jmap_proto::types::{collection::Collection, property::Property};
use store::write::{Archive, BatchBuilder, BlobOp, assert::HashedValue};
use store::write::{Archive, BatchBuilder, assert::HashedValue};
use trc::AddContext;
use super::SieveScript;
@ -51,16 +51,6 @@ impl SieveScriptDelete for Server {
return Ok(false);
}
let blob_hash = obj.inner.blob_hash.clone();
let mut builder = ObjectIndexBuilder::new().with_current(obj);
// Update tenant quota
#[cfg(feature = "enterprise")]
if self.core.is_enterprise_edition() {
if let Some(tenant) = resource_token.tenant {
builder.set_tenant_id(tenant.id);
}
}
// Delete record
let mut batch = BatchBuilder::new();
batch
@ -68,8 +58,11 @@ impl SieveScriptDelete for Server {
.with_collection(Collection::SieveScript)
.delete_document(document_id)
.clear(Property::EmailIds)
.clear(BlobOp::Link { hash: blob_hash })
.custom(builder)
.custom(
ObjectIndexBuilder::new()
.with_current(obj)
.with_tenant_id(resource_token),
)
.caused_by(trc::location!())?;
self.store()

View file

@ -20,6 +20,9 @@ impl IndexableObject for SieveScript {
field: Property::IsActive.into(),
value: Some(self.is_active as u32),
},
IndexValue::Blob {
value: self.blob_hash.clone(),
},
IndexValue::Quota { used: self.size },
]
.into_iter()

View file

@ -8,10 +8,13 @@ resolver = "2"
utils = { path = "../utils" }
common = { path = "../common" }
jmap_proto = { path = "../jmap-proto" }
trc = { path = "../trc" }
directory = { path = "../directory" }
dav-proto = { path = "/Users/me/code/dav-proto" }
calcard = { path = "/Users/me/code/calcard" }
hashify = "0.2"
rkyv = { version = "0.8.10", features = ["little_endian"] }
percent-encoding = "2.3.1"
[features]
test_mode = []

View file

@ -0,0 +1,72 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use std::sync::Arc;
use common::{Files, Server};
use jmap_proto::types::collection::Collection;
use percent_encoding::NON_ALPHANUMERIC;
use trc::AddContext;
use utils::bimap::IdBimap;
use crate::file::FileNode;
pub trait FileHierarchy: Sync + Send {
fn fetch_file_hierarchy(
&self,
account_id: u32,
) -> impl Future<Output = trc::Result<Arc<Files>>> + Send;
}
impl FileHierarchy for Server {
async fn fetch_file_hierarchy(&self, account_id: u32) -> trc::Result<Arc<Files>> {
let change_id = self
.store()
.get_last_change_id(account_id, Collection::FileNode)
.await
.caused_by(trc::location!())?;
if let Some(files) = self
.inner
.cache
.files
.get(&account_id)
.filter(|x| x.modseq == change_id)
{
Ok(files)
} else {
let mut files = build_file_hierarchy(self, account_id).await?;
files.modseq = change_id;
let files = Arc::new(files);
self.inner.cache.files.insert(account_id, files.clone());
Ok(files)
}
}
}
async fn build_file_hierarchy(server: &Server, account_id: u32) -> trc::Result<Files> {
let list = server
.fetch_folders::<FileNode>(account_id, Collection::FileNode)
.await
.caused_by(trc::location!())?
.format(|_, name| {
percent_encoding::utf8_percent_encode(name, NON_ALPHANUMERIC)
.to_string()
.into()
});
let mut files = Files {
files: IdBimap::with_capacity(list.len()),
size: std::mem::size_of::<Files>() as u64,
modseq: None,
};
for (id, name) in list.into_iterator() {
files.size +=
(std::mem::size_of::<u32>() + std::mem::size_of::<String>() + name.len()) as u64;
files.files.insert(id, name);
}
Ok(files)
}

View file

@ -14,8 +14,13 @@ use super::{ArchivedFileNode, FileNode};
impl IndexableObject for FileNode {
fn index_values(&self) -> impl Iterator<Item = IndexValue<'_>> {
let mut filters = Vec::with_capacity(5);
filters.extend([
let size = self.dead_properties.size() as u32
+ self.display_name.as_ref().map_or(0, |n| n.len() as u32)
+ self.name.len() as u32;
let mut values = Vec::with_capacity(6);
values.extend([
IndexValue::Text {
field: Property::Name.into(),
value: self.name.to_lowercase().into(),
@ -28,16 +33,22 @@ impl IndexableObject for FileNode {
]);
if let Some(file) = &self.file {
filters.extend([
let size = size + file.size;
values.extend([
IndexValue::Blob {
value: file.blob_hash.clone(),
},
IndexValue::U32 {
field: Property::Size.into(),
value: file.size.into(),
value: size.into(),
},
IndexValue::Quota { used: file.size },
IndexValue::Quota { used: size },
]);
} else {
values.push(IndexValue::Quota { used: size });
}
filters.into_iter()
values.into_iter()
}
}

View file

@ -4,8 +4,10 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
pub mod hierarchy;
pub mod index;
use dav_proto::schema::request::DeadProperty;
use jmap_proto::types::value::AclGrant;
use utils::BlobHash;
@ -17,8 +19,10 @@ pub struct FileNode {
pub name: String,
pub display_name: Option<String>,
pub file: Option<FileProperties>,
pub created: u64,
pub modified: u64,
pub created: i64,
pub modified: i64,
pub change_id: u64,
pub dead_properties: DeadProperty,
pub acls: Vec<AclGrant>,
}

View file

@ -13,6 +13,7 @@ hyper = { version = "1.0.1", features = ["server", "http1", "http2"] }
hyper-util = { version = "0.1.1", features = ["tokio"] }
http-body-util = "0.1.0"
form_urlencoded = "1.1.0"
percent-encoding = "2.3.1"
[dev-dependencies]

View file

@ -10,13 +10,11 @@ use http_body_util::BodyExt;
use crate::HttpRequest;
#[inline]
pub fn decode_path_element(item: &str) -> Cow<'_, str> {
// Bit hackish but avoids an extra dependency
form_urlencoded::parse(item.as_bytes())
.into_iter()
.next()
.map(|(k, _)| k)
.unwrap_or_else(|| item.into())
percent_encoding::percent_decode_str(item)
.decode_utf8()
.unwrap_or_else(|_| item.into())
}
pub async fn fetch_body(

View file

@ -36,6 +36,21 @@ impl HttpResponse {
self
}
pub fn with_content_length(mut self, content_length: usize) -> Self {
self.builder = self.builder.header(header::CONTENT_LENGTH, content_length);
self
}
pub fn with_etag(mut self, etag: u64) -> Self {
self.builder = self.builder.header(header::ETAG, format!("\"{etag}\""));
self
}
pub fn with_last_modified(mut self, last_modified: String) -> Self {
self.builder = self.builder.header(header::LAST_MODIFIED, last_modified);
self
}
pub fn with_header<K, V>(mut self, name: K, value: V) -> Self
where
K: TryInto<HeaderName>,
@ -47,18 +62,23 @@ impl HttpResponse {
self
}
pub fn with_xml_body(self, body: impl Into<String>) -> Self {
self.with_text_body(body)
.with_content_type("application/xml; charset=utf-8")
}
pub fn with_text_body(mut self, body: impl Into<String>) -> Self {
let body = body.into();
let body_len = body.len();
self.body = HttpResponseBody::Text(body);
self.with_header(header::CONTENT_LENGTH, body_len)
self.with_content_length(body_len)
}
pub fn with_binary_body(mut self, body: impl Into<Vec<u8>>) -> Self {
let body = body.into();
let body_len = body.len();
self.body = HttpResponseBody::Binary(body);
self.with_header(header::CONTENT_LENGTH, body_len)
self.with_content_length(body_len)
}
pub fn with_stream_body(
@ -94,13 +114,18 @@ impl HttpResponse {
self
}
pub fn with_no_cache(mut self) -> Self {
pub fn with_no_store(mut self) -> Self {
self.builder = self
.builder
.header(header::CACHE_CONTROL, "no-store, no-cache, must-revalidate");
self
}
pub fn with_no_cache(mut self) -> Self {
self.builder = self.builder.header(header::CACHE_CONTROL, "no-cache");
self
}
pub fn with_location<V>(mut self, location: V) -> Self
where
V: TryInto<HeaderValue>,
@ -170,7 +195,7 @@ impl<T: serde::Serialize> ToHttpResponse for JsonResponse<T> {
.with_text_body(serde_json::to_string(&self.inner).unwrap_or_default());
if self.no_cache {
response.with_no_cache()
response.with_no_store()
} else {
response
}

View file

@ -31,7 +31,7 @@ use store::{
BlobClass,
query::Filter,
rand::{Rng, rng},
write::{Archive, BatchBuilder, BlobOp, assert::HashedValue, log::ChangeLogBuilder},
write::{Archive, BatchBuilder, assert::HashedValue, log::ChangeLogBuilder},
};
use trc::AddContext;
@ -100,27 +100,13 @@ impl SieveScriptSet for Server {
let blob_size = sieve.size as usize;
let blob_hash = sieve.blob_hash.clone();
// Increment tenant quota
#[cfg(feature = "enterprise")]
if self.core.is_enterprise_edition() {
if let Some(tenant) = ctx.resource_token.tenant {
builder.set_tenant_id(tenant.id);
}
}
// Write record
let mut batch = BatchBuilder::new();
batch
.with_account_id(account_id)
.with_collection(Collection::SieveScript)
.create_document()
.set(
BlobOp::Link {
hash: blob_hash.clone(),
},
Vec::new(),
)
.custom(builder)
.custom(builder.with_tenant_id(&ctx.resource_token))
.caused_by(trc::location!())?;
let document_id = self
@ -194,7 +180,6 @@ impl SieveScriptSet for Server {
let sieve = sieve
.into_deserialized::<SieveScript>()
.caused_by(trc::location!())?;
let prev_blob_hash = sieve.inner.blob_hash.clone();
match self
.sieve_set_item(
@ -217,38 +202,16 @@ impl SieveScriptSet for Server {
// Store blob
let sieve = &mut builder.changes_mut().unwrap();
sieve.blob_hash = self.put_blob(account_id, &blob, false).await?.hash;
let blob_hash = sieve.blob_hash.clone();
let blob_size = sieve.size as usize;
// Update tenant quota
#[cfg(feature = "enterprise")]
if self.core.is_enterprise_edition() {
if let Some(tenant) = ctx.resource_token.tenant {
builder.set_tenant_id(tenant.id);
}
}
// Update blobId
batch
.clear(BlobOp::Link {
hash: prev_blob_hash,
})
.set(
BlobOp::Link {
hash: blob_hash.clone(),
},
Vec::new(),
);
BlobId {
hash: blob_hash,
hash: sieve.blob_hash.clone(),
class: BlobClass::Linked {
account_id,
collection: Collection::SieveScript.into(),
document_id,
},
section: BlobSection {
size: blob_size,
size: sieve.size as usize,
..Default::default()
}
.into(),
@ -259,7 +222,9 @@ impl SieveScriptSet for Server {
};
// Write record
batch.custom(builder).caused_by(trc::location!())?;
batch
.custom(builder.with_tenant_id(&ctx.resource_token))
.caused_by(trc::location!())?;
if !batch.is_empty() {
changes.log_update(Collection::SieveScript, document_id);

View file

@ -26,7 +26,7 @@ use mail_builder::MessageBuilder;
use mail_parser::decoders::html::html_to_text;
use std::future::Future;
use store::write::{
Archive, BatchBuilder, BlobOp,
Archive, BatchBuilder,
assert::HashedValue,
log::{Changes, LogInsert},
};
@ -255,7 +255,8 @@ impl VacationResponseSet for Server {
let mut obj = ObjectIndexBuilder::new()
.with_current_opt(prev_sieve)
.with_changes(sieve);
.with_changes(sieve)
.with_tenant_id(&resource_token);
// Update id
if let Some(document_id) = document_id {
@ -270,7 +271,7 @@ impl VacationResponseSet for Server {
// Create sieve script only if there are changes
if build_script {
// Upload new blob
let hash = self
obj.changes_mut().unwrap().blob_hash = self
.put_blob(
account_id,
&self.build_script(obj.changes_mut().unwrap())?,
@ -278,31 +279,6 @@ impl VacationResponseSet for Server {
)
.await?
.hash;
let sieve = &mut obj.changes_mut().unwrap();
sieve.blob_hash = hash;
// Link blob
batch.set(
BlobOp::Link {
hash: sieve.blob_hash.clone(),
},
Vec::new(),
);
// Unlink previous blob
if let Some(current) = obj.current() {
batch.clear(BlobOp::Link {
hash: current.inner.blob_hash.clone(),
});
}
// Update tenant quota
#[cfg(feature = "enterprise")]
if self.core.is_enterprise_edition() {
if let Some(tenant) = resource_token.tenant {
obj.set_tenant_id(tenant.id);
}
}
};
// Write changes

View file

@ -10,12 +10,11 @@ use common::{listener::SessionStream, storage::index::ObjectIndexBuilder};
use directory::Permission;
use email::sieve::SieveScript;
use imap_proto::receiver::Request;
use jmap_proto::types::{blob::BlobId, collection::Collection, property::Property};
use jmap_proto::types::{collection::Collection, property::Property};
use sieve::compiler::ErrorType;
use store::{
BlobClass,
query::Filter,
write::{Archive, BatchBuilder, BlobOp, assert::HashedValue, log::LogInsert},
write::{Archive, BatchBuilder, assert::HashedValue, log::LogInsert},
};
use trc::AddContext;
@ -119,51 +118,31 @@ impl<T: SessionStream> Session<T> {
.caused_by(trc::location!())?;
// Write script blob
let blob_id = BlobId::new(
self.server
.put_blob(account_id, &script_bytes, false)
.await
.caused_by(trc::location!())?
.hash,
BlobClass::Linked {
account_id,
collection: Collection::SieveScript.into(),
document_id,
},
)
.with_section_size(script_size as usize);
let prev_blob_hash = script.inner.blob_hash.clone();
let blob_hash = blob_id.hash.clone();
let blob_hash = self
.server
.put_blob(account_id, &script_bytes, false)
.await
.caused_by(trc::location!())?
.hash;
// Write record
let mut obj = ObjectIndexBuilder::new()
.with_changes(
script
.inner
.clone()
.with_size(script_size as u32)
.with_blob_hash(blob_hash.clone()),
)
.with_current(script);
// Update tenant quota
#[cfg(feature = "enterprise")]
if self.server.core.is_enterprise_edition() {
if let Some(tenant) = resource_token.tenant {
obj.set_tenant_id(tenant.id);
}
}
let mut batch = BatchBuilder::new();
batch
.with_account_id(account_id)
.with_collection(Collection::SieveScript)
.update_document(document_id)
.clear(BlobOp::Link {
hash: prev_blob_hash,
})
.set(BlobOp::Link { hash: blob_hash }, Vec::new())
.custom(obj)
.custom(
ObjectIndexBuilder::new()
.with_changes(
script
.inner
.clone()
.with_size(script_size as u32)
.with_blob_hash(blob_hash.clone()),
)
.with_current(script)
.with_tenant_id(&resource_token),
)
.caused_by(trc::location!())?;
self.server
@ -189,28 +168,21 @@ impl<T: SessionStream> Session<T> {
.hash;
// Write record
let mut obj = ObjectIndexBuilder::new().with_changes(
SieveScript::new(name.clone(), blob_hash.clone())
.with_is_active(false)
.with_size(script_size as u32),
);
// Update tenant quota
#[cfg(feature = "enterprise")]
if self.server.core.is_enterprise_edition() {
if let Some(tenant) = resource_token.tenant {
obj.set_tenant_id(tenant.id);
}
}
let mut batch = BatchBuilder::new();
batch
.with_account_id(account_id)
.with_collection(Collection::SieveScript)
.create_document()
.log(LogInsert())
.set(BlobOp::Link { hash: blob_hash }, Vec::new())
.custom(obj)
.custom(
ObjectIndexBuilder::new()
.with_changes(
SieveScript::new(name.clone(), blob_hash.clone())
.with_is_active(false)
.with_size(script_size as u32),
)
.with_tenant_id(&resource_token),
)
.caused_by(trc::location!())?;
let assigned_ids = self

View file

@ -223,6 +223,30 @@ impl HashedValue<Archive> {
}
}
impl<T> HashedValue<&T>
where
T: rkyv::Portable
+ for<'a> rkyv::bytecheck::CheckBytes<rkyv::api::high::HighValidator<'a, rkyv::rancor::Error>>
+ Sync
+ Send,
{
pub fn into_deserialized<V>(&self) -> trc::Result<HashedValue<V>>
where
T: rkyv::Deserialize<V, rkyv::api::high::HighDeserializer<rkyv::rancor::Error>>,
{
rkyv::deserialize::<V, rkyv::rancor::Error>(self.inner)
.map_err(|err| {
trc::StoreEvent::DeserializeError
.caused_by(trc::location!())
.reason(err)
})
.map(|inner| HashedValue {
hash: self.hash,
inner,
})
}
}
#[inline]
pub fn rkyv_deserialize<T, V>(input: &T) -> trc::Result<V>
where

52
crates/utils/src/bimap.rs Normal file
View file

@ -0,0 +1,52 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use std::{borrow::Borrow, rc::Rc};
use ahash::AHashMap;
#[derive(Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
#[repr(transparent)]
struct StringRef(Rc<String>);
#[derive(Debug, Default)]
pub struct IdBimap {
id_to_uri: AHashMap<u32, Rc<String>>,
uri_to_id: AHashMap<StringRef, u32>,
}
impl IdBimap {
pub fn with_capacity(capacity: usize) -> Self {
Self {
id_to_uri: AHashMap::with_capacity(capacity),
uri_to_id: AHashMap::with_capacity(capacity),
}
}
pub fn insert(&mut self, id: u32, uri: impl Into<String>) {
let uri = Rc::new(uri.into());
self.id_to_uri.insert(id, uri.clone());
self.uri_to_id.insert(StringRef(uri), id);
}
pub fn by_name(&self, uri: &str) -> Option<u32> {
self.uri_to_id.get(uri).copied()
}
pub fn by_id(&self, id: u32) -> Option<&str> {
self.id_to_uri.get(&id).map(|x| x.as_str())
}
}
// SAFETY: Safe because Rc<> are never returned from the struct
unsafe impl Send for IdBimap {}
unsafe impl Sync for IdBimap {}
impl Borrow<str> for StringRef {
fn borrow(&self) -> &str {
&self.0
}
}

View file

@ -6,6 +6,7 @@
use std::{fmt::Display, sync::Arc};
pub mod bimap;
pub mod cache;
pub mod codec;
pub mod config;