Multi-tenancy (closes #212 closes #346 closes #582)

This commit is contained in:
mdecimus 2024-09-13 18:10:56 +02:00
parent d214468c54
commit ab77a0aca8
41 changed files with 1709 additions and 529 deletions

View file

@ -4,7 +4,10 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use directory::{backend::internal::PrincipalField, Permission, Principal, QueryBy};
use directory::{
backend::internal::{lookup::DirectoryStore, PrincipalField},
Permission, Principal, QueryBy,
};
use jmap_proto::{
request::RequestMethod,
types::{acl::Acl, collection::Collection, id::Id},
@ -24,7 +27,7 @@ use utils::map::{
use crate::Core;
use super::{roles::RolePermissions, AccessToken};
use super::{roles::RolePermissions, AccessToken, ResourceToken, TenantInfo};
impl Core {
pub async fn build_access_token(&self, mut principal: Principal) -> trc::Result<AccessToken> {
@ -57,13 +60,32 @@ impl Core {
// Apply principal permissions
let mut permissions = role_permissions.finalize();
// Limit tenant permissions
let mut tenant_id = None;
let mut tenant = None;
#[cfg(feature = "enterprise")]
if self.is_enterprise_edition() {
tenant_id = principal.get_int(PrincipalField::Tenant).map(|v| v as u32);
if let Some(tenant_id) = tenant_id {
if let Some(tenant_id) = principal.get_int(PrincipalField::Tenant).map(|v| v as u32) {
// Limit tenant permissions
permissions.intersection(&self.get_role_permissions(tenant_id).await?.enabled);
// Obtain tenant quota
tenant = Some(TenantInfo {
id: tenant_id,
quota: self
.storage
.data
.query(QueryBy::Id(tenant_id), false)
.await
.caused_by(trc::location!())?
.ok_or_else(|| {
trc::SecurityEvent::Unauthorized
.into_err()
.details("Tenant not found")
.id(tenant_id)
.caused_by(trc::location!())
})?
.get_int(PrincipalField::Quota)
.unwrap_or_default(),
});
}
}
@ -74,7 +96,7 @@ impl Core {
.map(|v| v as u32)
.collect(),
access_to: VecMap::new(),
tenant_id,
tenant,
name: principal.take_str(PrincipalField::Name).unwrap_or_default(),
description: principal.take_str(PrincipalField::Description),
quota: principal.quota(),
@ -430,4 +452,12 @@ impl AccessToken {
.details("You are not authorized to perform this action"))
}
}
pub fn as_resource_token(&self) -> ResourceToken {
ResourceToken {
account_id: self.primary_id,
quota: self.quota,
tenant: self.tenant,
}
}
}

View file

@ -14,11 +14,24 @@ pub mod roles;
#[derive(Debug, Clone, Default)]
pub struct AccessToken {
pub primary_id: u32,
pub tenant_id: Option<u32>,
pub member_of: Vec<u32>,
pub access_to: VecMap<u32, Bitmap<Collection>>,
pub name: String,
pub description: Option<String>,
pub quota: u64,
pub permissions: Permissions,
pub tenant: Option<TenantInfo>,
}
#[derive(Debug, Clone, Copy, Default)]
pub struct TenantInfo {
pub id: u32,
pub quota: u64,
}
#[derive(Debug, Clone, Default)]
pub struct ResourceToken {
pub account_id: u32,
pub quota: u64,
pub tenant: Option<TenantInfo>,
}

View file

@ -4,7 +4,7 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use std::sync::Arc;
use std::sync::{Arc, LazyLock};
use ahash::AHashSet;
use directory::{
@ -12,7 +12,6 @@ use directory::{
Permission, Permissions, QueryBy, ROLE_ADMIN, ROLE_TENANT_ADMIN, ROLE_USER,
};
use trc::AddContext;
use utils::map::ttl_dashmap::TtlMap;
use crate::Core;
@ -22,18 +21,17 @@ pub struct RolePermissions {
pub disabled: Permissions,
}
const USER_PERMISSIONS: RolePermissions = user_permissions();
const ADMIN_PERMISSIONS: RolePermissions = admin_permissions();
const TENANT_ADMIN_PERMISSIONS: RolePermissions = tenant_admin_permissions();
static USER_PERMISSIONS: LazyLock<Arc<RolePermissions>> = LazyLock::new(user_permissions);
static ADMIN_PERMISSIONS: LazyLock<Arc<RolePermissions>> = LazyLock::new(admin_permissions);
static TENANT_ADMIN_PERMISSIONS: LazyLock<Arc<RolePermissions>> =
LazyLock::new(tenant_admin_permissions);
impl Core {
pub async fn get_role_permissions(&self, role_id: u32) -> trc::Result<Arc<RolePermissions>> {
let todo = "create default permissions";
match role_id {
ROLE_USER => Ok(Arc::new(USER_PERMISSIONS.clone())),
ROLE_ADMIN => Ok(Arc::new(ADMIN_PERMISSIONS.clone())),
ROLE_TENANT_ADMIN => Ok(Arc::new(TENANT_ADMIN_PERMISSIONS.clone())),
ROLE_USER => Ok(USER_PERMISSIONS.clone()),
ROLE_ADMIN => Ok(ADMIN_PERMISSIONS.clone()),
ROLE_TENANT_ADMIN => Ok(TENANT_ADMIN_PERMISSIONS.clone()),
role_id => {
if let Some(role_permissions) = self.security.permissions.get(&role_id) {
Ok(role_permissions.clone())
@ -170,23 +168,41 @@ impl RolePermissions {
}
}
const fn admin_permissions() -> RolePermissions {
RolePermissions {
enabled: Permissions::all(),
disabled: Permissions::new(),
}
}
const fn tenant_admin_permissions() -> RolePermissions {
RolePermissions {
enabled: Permissions::all(),
disabled: Permissions::new(),
}
}
const fn user_permissions() -> RolePermissions {
RolePermissions {
fn tenant_admin_permissions() -> Arc<RolePermissions> {
let mut permissions = RolePermissions {
enabled: Permissions::new(),
disabled: Permissions::all(),
disabled: Permissions::new(),
};
for permission_id in 0..Permission::COUNT {
let permission = Permission::from_id(permission_id).unwrap();
if permission.is_tenant_admin_permission() {
permissions.enabled.set(permission_id);
}
}
Arc::new(permissions)
}
fn user_permissions() -> Arc<RolePermissions> {
let mut permissions = RolePermissions {
enabled: Permissions::new(),
disabled: Permissions::new(),
};
for permission_id in 0..Permission::COUNT {
let permission = Permission::from_id(permission_id).unwrap();
if permission.is_user_permission() {
permissions.enabled.set(permission_id);
}
}
Arc::new(permissions)
}
fn admin_permissions() -> Arc<RolePermissions> {
Arc::new(RolePermissions {
enabled: Permissions::all(),
disabled: Permissions::new(),
})
}

View file

@ -10,7 +10,7 @@
use std::time::Duration;
use directory::Type;
use directory::{backend::internal::manage::ManageDirectory, Type};
use store::{Store, Stores};
use trc::{EventType, MetricType, TOTAL_EVENT_COUNT};
use utils::config::{
@ -19,10 +19,7 @@ use utils::config::{
Config,
};
use crate::{
expr::{tokenizer::TokenMap, Expression},
total_principals,
};
use crate::expr::{tokenizer::TokenMap, Expression};
use super::{
license::LicenseValidator, AlertContent, AlertContentToken, AlertMethod, Enterprise,
@ -43,7 +40,10 @@ impl Enterprise {
}
};
match total_principals(data, Type::Individual).await {
match data
.count_principals(None, Type::Individual.into(), None)
.await
{
Ok(total) if total > license.accounts as u64 => {
config.new_build_warning(
"enterprise.license-key",

View file

@ -25,8 +25,8 @@ use config::{
telemetry::Metrics,
};
use directory::{
backend::internal::PrincipalInfo, core::secret::verify_secret_hash, Directory, Principal,
QueryBy, Type,
backend::internal::manage::ManageDirectory, core::secret::verify_secret_hash, Directory,
Principal, QueryBy, Type,
};
use expr::if_block::IfBlock;
use listener::{
@ -37,8 +37,8 @@ use mail_send::Credentials;
use sieve::Sieve;
use store::{
write::{DirectoryClass, QueueClass, ValueClass},
Deserialize, IterateParams, LookupStore, ValueKey,
write::{QueueClass, ValueClass},
IterateParams, LookupStore, ValueKey,
};
use tokio::sync::{mpsc, oneshot};
use trc::AddContext;
@ -149,11 +149,13 @@ impl Core {
pub fn get_directory_or_default(&self, name: &str, session_id: u64) -> &Arc<Directory> {
self.storage.directories.get(name).unwrap_or_else(|| {
trc::event!(
Eval(trc::EvalEvent::DirectoryNotFound),
Id = name.to_string(),
SpanId = session_id,
);
if !name.is_empty() {
trc::event!(
Eval(trc::EvalEvent::DirectoryNotFound),
Id = name.to_string(),
SpanId = session_id,
);
}
&self.storage.directory
})
@ -161,11 +163,13 @@ impl Core {
pub fn get_lookup_store(&self, name: &str, session_id: u64) -> &LookupStore {
self.storage.lookups.get(name).unwrap_or_else(|| {
trc::event!(
Eval(trc::EvalEvent::StoreNotFound),
Id = name.to_string(),
SpanId = session_id,
);
if !name.is_empty() {
trc::event!(
Eval(trc::EvalEvent::StoreNotFound),
Id = name.to_string(),
SpanId = session_id,
);
}
&self.storage.lookup
})
@ -362,43 +366,22 @@ impl Core {
}
pub async fn total_accounts(&self) -> trc::Result<u64> {
total_principals(&self.storage.data, Type::Individual).await
self.storage
.data
.count_principals(None, Type::Individual.into(), None)
.await
.caused_by(trc::location!())
}
pub async fn total_domains(&self) -> trc::Result<u64> {
total_principals(&self.storage.data, Type::Domain).await
self.storage
.data
.count_principals(None, Type::Domain.into(), None)
.await
.caused_by(trc::location!())
}
}
pub(crate) async fn total_principals(store: &store::Store, typ: Type) -> trc::Result<u64> {
let mut total = 0;
store
.iterate(
IterateParams::new(
ValueKey::from(ValueClass::Directory(DirectoryClass::NameToId(vec![]))),
ValueKey::from(ValueClass::Directory(DirectoryClass::NameToId(vec![
u8::MAX;
10
]))),
)
.ascending(),
|_, value| {
if PrincipalInfo::deserialize(value)
.caused_by(trc::location!())?
.typ
== typ
{
total += 1;
}
Ok(true)
},
)
.await
.caused_by(trc::location!())
.map(|_| total)
}
trait CredentialsUsername {
fn login(&self) -> &str;
}

View file

@ -4,6 +4,7 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use ahash::AHashSet;
use jmap_proto::types::collection::Collection;
use store::{
write::{
@ -43,6 +44,7 @@ pub trait ManageDirectory: Sized {
&self,
by: QueryBy<'_>,
changes: Vec<PrincipalUpdate>,
tenant_id: Option<u32>,
) -> trc::Result<()>;
async fn delete_principal(&self, by: QueryBy<'_>) -> trc::Result<()>;
async fn list_principals(
@ -51,6 +53,12 @@ pub trait ManageDirectory: Sized {
typ: Option<Type>,
tenant_id: Option<u32>,
) -> trc::Result<Vec<String>>;
async fn count_principals(
&self,
filter: Option<&str>,
typ: Option<Type>,
tenant_id: Option<u32>,
) -> trc::Result<u64>;
}
impl ManageDirectory for Store {
@ -140,8 +148,8 @@ impl ManageDirectory for Store {
}
// Tenants must provide principal names including a valid domain
let mut valid_domains = AHashSet::new();
if tenant_id.is_some() {
let mut name_is_valid = false;
if let Some(domain) = name.split('@').nth(1) {
if self
.get_principal_info(domain)
@ -150,11 +158,11 @@ impl ManageDirectory for Store {
.filter(|v| v.typ == Type::Domain && v.has_tenant_access(tenant_id))
.is_some()
{
name_is_valid = true;
valid_domains.insert(domain.to_string());
}
}
if !name_is_valid {
if valid_domains.is_empty() {
return Err(error(
"Invalid principal name",
"Principal name must include a valid domain".into(),
@ -190,28 +198,17 @@ impl ManageDirectory for Store {
};
for name in names {
let id = match name.strip_prefix("_") {
Some("admin") if field == PrincipalField::Roles && tenant_id.is_none() => {
PrincipalInfo::new(ROLE_ADMIN, Type::Role, None)
}
Some("tenant_admin") if field == PrincipalField::Roles => {
PrincipalInfo::new(ROLE_TENANT_ADMIN, Type::Role, None)
}
Some("user") if field == PrincipalField::Roles => {
PrincipalInfo::new(ROLE_USER, Type::Role, None)
}
_ => self
.get_principal_info(&name)
list.push(
self.get_principal_info(&name)
.await
.caused_by(trc::location!())?
.filter(|v| {
expected_type.map_or(true, |t| v.typ == t)
&& v.has_tenant_access(tenant_id)
})
.or_else(|| field.map_internal_roles(&name))
.ok_or_else(|| not_found(name))?,
};
list.push(id);
);
}
}
}
@ -222,14 +219,24 @@ impl ManageDirectory for Store {
PrincipalField::DisabledPermissions,
] {
if let Some(names) = principal.take_str_array(field) {
let mut permissions = Vec::with_capacity(names.len());
for name in names {
let permission = Permission::from_name(&name).ok_or_else(|| {
error(
"Invalid permission",
format!("Permission {name:?} is invalid").into(),
)
})?;
principal.append_int(field, permission.id() as u64);
let permission = Permission::from_name(&name)
.ok_or_else(|| {
error(
format!("Invalid {} value", field.as_str()),
format!("Permission {name:?} is invalid").into(),
)
})?
.id() as u64;
if !permissions.contains(&permission) {
permissions.push(permission);
}
}
if !permissions.is_empty() {
principal.set(field, permissions);
}
}
}
@ -241,11 +248,13 @@ impl ManageDirectory for Store {
return Err(err_exists(PrincipalField::Emails, email.to_string()));
}
if let Some(domain) = email.split('@').nth(1) {
self.get_principal_info(domain)
.await
.caused_by(trc::location!())?
.filter(|v| v.typ == Type::Domain && v.has_tenant_access(tenant_id))
.ok_or_else(|| not_found(domain.to_string()))?;
if valid_domains.insert(domain.to_string()) {
self.get_principal_info(domain)
.await
.caused_by(trc::location!())?
.filter(|v| v.typ == Type::Domain && v.has_tenant_access(tenant_id))
.ok_or_else(|| not_found(domain.to_string()))?;
}
}
}
@ -344,8 +353,7 @@ impl ManageDirectory for Store {
}
async fn delete_principal(&self, by: QueryBy<'_>) -> trc::Result<()> {
let todo = "do not delete tenants with children";
// Obtain principal
let principal_id = match by {
QueryBy::Name(name) => self
.get_principal_id(name)
@ -355,7 +363,6 @@ impl ManageDirectory for Store {
QueryBy::Id(principal_id) => principal_id,
QueryBy::Credentials(_) => unreachable!(),
};
let mut principal = self
.get_value::<Principal>(ValueKey::from(ValueClass::Directory(
DirectoryClass::Principal(principal_id),
@ -364,6 +371,50 @@ impl ManageDirectory for Store {
.caused_by(trc::location!())?
.ok_or_else(|| not_found(principal_id.to_string()))?;
// Make sure tenant has no data
let mut batch = BatchBuilder::new();
match principal.typ {
Type::Individual | Type::Group => {
// Update tenant quota
if let Some(tenant_id) = principal.tenant() {
let quota = self
.get_counter(DirectoryClass::UsedQuota(principal_id))
.await
.caused_by(trc::location!())?;
if quota > 0 {
batch.add(DirectoryClass::UsedQuota(tenant_id), -quota);
}
}
}
Type::Tenant => {
let tenant_members = self
.list_principals(None, None, principal.id().into())
.await
.caused_by(trc::location!())?;
if !tenant_members.is_empty() {
let tenant_members = if tenant_members.len() > 5 {
tenant_members[..5].join(", ")
+ " and "
+ &(&tenant_members.len() - 5).to_string()
+ " others"
} else {
tenant_members.join(", ")
};
return Err(error(
"Tenant has members",
format!(
"Tenant must have no members to be deleted: Found: {tenant_members}"
)
.into(),
));
}
}
_ => {}
}
// Unlink all principal's blobs
self.blob_hash_unlink_account(principal_id)
.await
@ -380,7 +431,6 @@ impl ManageDirectory for Store {
.caused_by(trc::location!())?;
// Delete principal
let mut batch = BatchBuilder::new();
batch
.with_account_id(principal_id)
.clear(DirectoryClass::NameToId(
@ -441,6 +491,7 @@ impl ManageDirectory for Store {
&self,
by: QueryBy<'_>,
changes: Vec<PrincipalUpdate>,
tenant_id: Option<u32>,
) -> trc::Result<()> {
let principal_id = match by {
QueryBy::Name(name) => self
@ -474,16 +525,22 @@ impl ManageDirectory for Store {
.await
.caused_by(trc::location!())?;
// Apply changes
// Prepare changes
let mut batch = BatchBuilder::new();
let mut pinfo_name =
PrincipalInfo::new(principal_id, principal.inner.typ, principal.inner.tenant())
.serialize();
let pinfo_email = PrincipalInfo::new(principal_id, principal.inner.typ, None).serialize();
let update_principal = !changes.is_empty()
&& !changes
.iter()
.all(|c| matches!(c.field, PrincipalField::MemberOf | PrincipalField::Members));
&& !changes.iter().all(|c| {
matches!(
c.field,
PrincipalField::MemberOf
| PrincipalField::Members
| PrincipalField::Lists
| PrincipalField::Roles
)
});
if update_principal {
batch.assert_value(
@ -493,12 +550,70 @@ impl ManageDirectory for Store {
&principal,
);
}
// Obtain used quota
let mut used_quota = None;
if tenant_id.is_none()
&& changes
.iter()
.any(|c| matches!(c.field, PrincipalField::Tenant))
{
let quota = self
.get_counter(DirectoryClass::UsedQuota(principal_id))
.await
.caused_by(trc::location!())?;
if quota > 0 {
used_quota = Some(quota);
}
}
// Allowed principal types for Member fields
let allowed_member_types = match principal.inner.typ() {
Type::Group => &[Type::Individual, Type::Group][..],
Type::Resource => &[Type::Resource][..],
Type::Location => &[
Type::Location,
Type::Resource,
Type::Individual,
Type::Group,
Type::Other,
][..],
Type::List => &[Type::Individual, Type::Group][..],
Type::Other | Type::Domain | Type::Tenant | Type::Individual => &[][..],
Type::Role => &[Type::Role][..],
};
let mut valid_domains = AHashSet::new();
// Process changes
for change in changes {
match (change.action, change.field, change.value) {
(PrincipalAction::Set, PrincipalField::Name, PrincipalValue::String(new_name)) => {
// Make sure new name is not taken
let new_name = new_name.to_lowercase();
if principal.inner.name() != new_name {
if tenant_id.is_some() {
if let Some(domain) = new_name.split('@').nth(1) {
if self
.get_principal_info(domain)
.await
.caused_by(trc::location!())?
.filter(|v| {
v.typ == Type::Domain && v.has_tenant_access(tenant_id)
})
.is_some()
{
valid_domains.insert(domain.to_string());
}
}
if valid_domains.is_empty() {
return Err(error(
"Invalid principal name",
"Principal name must include a valid domain".into(),
));
}
}
if self
.get_principal_id(&new_name)
.await
@ -524,13 +639,14 @@ impl ManageDirectory for Store {
PrincipalAction::Set,
PrincipalField::Tenant,
PrincipalValue::String(tenant_name),
) => {
) if tenant_id.is_none() => {
if !tenant_name.is_empty() {
let tenant_info = self
.get_principal_info(&tenant_name)
.await
.caused_by(trc::location!())?
.ok_or_else(|| not_found(tenant_name.clone()))?;
if tenant_info.typ != Type::Tenant {
return Err(error(
"Not a tenant",
@ -538,18 +654,31 @@ impl ManageDirectory for Store {
));
}
if principal.inner.tenant() != Some(tenant_info.id) {
principal.inner.set(PrincipalField::Tenant, tenant_info.id);
pinfo_name = PrincipalInfo::new(
principal_id,
principal.inner.typ,
tenant_info.id.into(),
)
.serialize();
} else {
continue;
match principal.inner.tenant() {
Some(old_tenant_id) if old_tenant_id != tenant_info.id => {
// Update quota
if let Some(used_quota) = used_quota {
batch
.add(DirectoryClass::UsedQuota(old_tenant_id), -used_quota)
.add(DirectoryClass::UsedQuota(tenant_info.id), used_quota);
}
principal.inner.set(PrincipalField::Tenant, tenant_info.id);
pinfo_name = PrincipalInfo::new(
principal_id,
principal.inner.typ,
tenant_info.id.into(),
)
.serialize();
}
_ => continue,
}
} else if principal.inner.tenant().is_some() {
} else if let Some(tenant_id) = principal.inner.tenant() {
// Update quota
if let Some(used_quota) = used_quota {
batch.add(DirectoryClass::UsedQuota(tenant_id), -used_quota);
}
principal.inner.remove(PrincipalField::Tenant);
pinfo_name =
PrincipalInfo::new(principal_id, principal.inner.typ, None).serialize();
@ -620,9 +749,31 @@ impl ManageDirectory for Store {
principal.inner.remove(PrincipalField::Description);
}
}
(PrincipalAction::Set, PrincipalField::Quota, PrincipalValue::Integer(quota)) => {
(PrincipalAction::Set, PrincipalField::Quota, PrincipalValue::Integer(quota))
if matches!(
principal.inner.typ,
Type::Individual | Type::Group | Type::Tenant
) =>
{
principal.inner.set(PrincipalField::Quota, quota);
}
(PrincipalAction::Set, PrincipalField::Quota, PrincipalValue::String(quota))
if matches!(
principal.inner.typ,
Type::Individual | Type::Group | Type::Tenant
) && quota.is_empty() =>
{
principal.inner.remove(PrincipalField::Quota);
}
(
PrincipalAction::Set,
PrincipalField::Quota,
PrincipalValue::IntegerList(quotas),
) if matches!(principal.inner.typ, Type::Tenant)
&& quotas.len() <= (Type::Other as usize + 1) =>
{
principal.inner.set(PrincipalField::Quota, quotas);
}
// Emails
(
@ -721,7 +872,7 @@ impl ManageDirectory for Store {
// MemberOf
(
PrincipalAction::Set,
PrincipalField::MemberOf,
PrincipalField::MemberOf | PrincipalField::Lists | PrincipalField::Roles,
PrincipalValue::StringList(members),
) => {
let mut new_member_of = Vec::new();
@ -730,7 +881,28 @@ impl ManageDirectory for Store {
.get_principal_info(&member)
.await
.caused_by(trc::location!())?
.ok_or_else(|| not_found(member))?;
.filter(|p| p.has_tenant_access(tenant_id))
.or_else(|| change.field.map_internal_roles(&member))
.ok_or_else(|| not_found(member.clone()))?;
let expected_type = match change.field {
PrincipalField::MemberOf => Type::Group,
PrincipalField::Lists => Type::List,
PrincipalField::Roles => Type::Role,
_ => unreachable!(),
};
if member_info.typ != expected_type {
return Err(error(
format!("Invalid {} value", change.field.as_str()),
format!(
"Principal {member:?} is not a {}.",
expected_type.as_str()
)
.into(),
));
}
if !member_of.contains(&member_info.id) {
batch.set(
ValueClass::Directory(DirectoryClass::MemberOf {
@ -768,15 +940,36 @@ impl ManageDirectory for Store {
}
(
PrincipalAction::AddItem,
PrincipalField::MemberOf,
PrincipalField::MemberOf | PrincipalField::Lists | PrincipalField::Roles,
PrincipalValue::String(member),
) => {
let member_info = self
.get_principal_info(&member)
.await
.caused_by(trc::location!())?
.ok_or_else(|| not_found(member))?;
.filter(|p| p.has_tenant_access(tenant_id))
.or_else(|| change.field.map_internal_roles(&member))
.ok_or_else(|| not_found(member.clone()))?;
if !member_of.contains(&member_info.id) {
let expected_type = match change.field {
PrincipalField::MemberOf => Type::Group,
PrincipalField::Lists => Type::List,
PrincipalField::Roles => Type::Role,
_ => unreachable!(),
};
if member_info.typ != expected_type {
return Err(error(
format!("Invalid {} value", change.field.as_str()),
format!(
"Principal {member:?} is not a {}.",
expected_type.as_str()
)
.into(),
));
}
batch.set(
ValueClass::Directory(DirectoryClass::MemberOf {
principal_id: MaybeDynamicId::Static(principal_id),
@ -784,6 +977,7 @@ impl ManageDirectory for Store {
}),
vec![member_info.typ as u8],
);
batch.set(
ValueClass::Directory(DirectoryClass::Members {
principal_id: MaybeDynamicId::Static(member_info.id),
@ -791,28 +985,32 @@ impl ManageDirectory for Store {
}),
vec![],
);
member_of.push(member_info.id);
}
}
(
PrincipalAction::RemoveItem,
PrincipalField::MemberOf,
PrincipalField::MemberOf | PrincipalField::Lists | PrincipalField::Roles,
PrincipalValue::String(member),
) => {
if let Some(member_id) = self
.get_principal_id(&member)
.await
.caused_by(trc::location!())?
.or_else(|| change.field.map_internal_role_name(&member))
{
if let Some(pos) = member_of.iter().position(|v| *v == member_id) {
batch.clear(ValueClass::Directory(DirectoryClass::MemberOf {
principal_id: MaybeDynamicId::Static(principal_id),
member_of: MaybeDynamicId::Static(member_id),
}));
batch.clear(ValueClass::Directory(DirectoryClass::Members {
principal_id: MaybeDynamicId::Static(member_id),
has_member: MaybeDynamicId::Static(principal_id),
}));
member_of.remove(pos);
}
}
@ -824,12 +1022,30 @@ impl ManageDirectory for Store {
PrincipalValue::StringList(members_),
) => {
let mut new_members = Vec::new();
for member in members_ {
let member_info = self
.get_principal_info(&member)
.await
.caused_by(trc::location!())?
.ok_or_else(|| not_found(member))?;
.filter(|p| p.has_tenant_access(tenant_id))
.ok_or_else(|| not_found(member.clone()))?;
if !allowed_member_types.contains(&member_info.typ) {
return Err(error(
"Invalid members value",
format!(
"Principal {member:?} is not one of {}.",
allowed_member_types
.iter()
.map(|v| v.as_str())
.collect::<Vec<_>>()
.join(", ")
)
.into(),
));
}
if !members.contains(&member_info.id) {
batch.set(
ValueClass::Directory(DirectoryClass::MemberOf {
@ -874,8 +1090,25 @@ impl ManageDirectory for Store {
.get_principal_info(&member)
.await
.caused_by(trc::location!())?
.ok_or_else(|| not_found(member))?;
.filter(|p| p.has_tenant_access(tenant_id))
.ok_or_else(|| not_found(member.clone()))?;
if !members.contains(&member_info.id) {
if !allowed_member_types.contains(&member_info.typ) {
return Err(error(
"Invalid members value",
format!(
"Principal {member:?} is not one of {}.",
allowed_member_types
.iter()
.map(|v| v.as_str())
.collect::<Vec<_>>()
.join(", ")
)
.into(),
));
}
batch.set(
ValueClass::Directory(DirectoryClass::MemberOf {
principal_id: MaybeDynamicId::Static(member_info.id),
@ -917,6 +1150,68 @@ impl ManageDirectory for Store {
}
}
(
PrincipalAction::Set,
PrincipalField::EnabledPermissions | PrincipalField::DisabledPermissions,
PrincipalValue::StringList(names),
) => {
let mut permissions = Vec::with_capacity(names.len());
for name in names {
let permission = Permission::from_name(&name)
.ok_or_else(|| {
error(
format!("Invalid {} value", change.field.as_str()),
format!("Permission {name:?} is invalid").into(),
)
})?
.id() as u64;
if !permissions.contains(&permission) {
permissions.push(permission);
}
}
if !permissions.is_empty() {
principal.inner.set(change.field, permissions);
} else {
principal.inner.remove(change.field);
}
}
(
PrincipalAction::AddItem,
PrincipalField::EnabledPermissions | PrincipalField::DisabledPermissions,
PrincipalValue::String(name),
) => {
let permission = Permission::from_name(&name)
.ok_or_else(|| {
error(
format!("Invalid {} value", change.field.as_str()),
format!("Permission {name:?} is invalid").into(),
)
})?
.id() as u64;
principal.inner.append_int(change.field, permission);
}
(
PrincipalAction::RemoveItem,
PrincipalField::EnabledPermissions | PrincipalField::DisabledPermissions,
PrincipalValue::String(name),
) => {
let permission = Permission::from_name(&name)
.ok_or_else(|| {
error(
format!("Invalid {} value", change.field.as_str()),
format!("Permission {name:?} is invalid").into(),
)
})?
.id() as u64;
principal
.inner
.retain_int(change.field, |v| *v != permission);
}
_ => {
return Err(trc::StoreEvent::NotSupported.caused_by(trc::location!()));
}
@ -996,6 +1291,41 @@ impl ManageDirectory for Store {
}
}
async fn count_principals(
&self,
filter: Option<&str>,
typ: Option<Type>,
tenant_id: Option<u32>,
) -> trc::Result<u64> {
let from_key = ValueKey::from(ValueClass::Directory(DirectoryClass::NameToId(vec![])));
let to_key = ValueKey::from(ValueClass::Directory(DirectoryClass::NameToId(vec![
u8::MAX;
10
])));
let mut count = 0;
self.iterate(
IterateParams::new(from_key, to_key).ascending(),
|key, value| {
let pt = PrincipalInfo::deserialize(value).caused_by(trc::location!())?;
let name =
std::str::from_utf8(key.get(1..).unwrap_or_default()).unwrap_or_default();
if typ.map_or(true, |t| pt.typ == t)
&& pt.has_tenant_access(tenant_id)
&& filter.map_or(true, |f| name.contains(f))
{
count += 1;
}
Ok(true)
},
)
.await
.caused_by(trc::location!())
.map(|_| count)
}
async fn get_member_of(&self, principal_id: u32) -> trc::Result<Vec<MemberOf>> {
let from_key = ValueKey::from(ValueClass::Directory(DirectoryClass::MemberOf {
principal_id,
@ -1044,6 +1374,22 @@ impl ManageDirectory for Store {
}
}
impl PrincipalField {
pub fn map_internal_role_name(&self, name: &str) -> Option<u32> {
match (self, name) {
(PrincipalField::Roles, "admin") => Some(ROLE_ADMIN),
(PrincipalField::Roles, "tenant-admin") => Some(ROLE_TENANT_ADMIN),
(PrincipalField::Roles, "user") => Some(ROLE_USER),
_ => None,
}
}
pub fn map_internal_roles(&self, name: &str) -> Option<PrincipalInfo> {
self.map_internal_role_name(name)
.map(|role_id| PrincipalInfo::new(role_id, Type::Role, None))
}
}
impl SerializeWithId for Principal {
fn serialize_with_id(&self, ids: &AssignedIds) -> trc::Result<Vec<u8>> {
let mut principal = self.clone();

View file

@ -244,6 +244,7 @@ pub enum PrincipalField {
Lists,
EnabledPermissions,
DisabledPermissions,
Picture,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@ -263,7 +264,7 @@ pub enum PrincipalAction {
RemoveItem,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
#[serde(untagged)]
pub enum PrincipalValue {
String(String),
@ -321,6 +322,7 @@ impl PrincipalField {
PrincipalField::EnabledPermissions => 11,
PrincipalField::DisabledPermissions => 12,
PrincipalField::UsedQuota => 13,
PrincipalField::Picture => 14,
}
}
@ -340,6 +342,7 @@ impl PrincipalField {
11 => Some(PrincipalField::EnabledPermissions),
12 => Some(PrincipalField::DisabledPermissions),
13 => Some(PrincipalField::UsedQuota),
14 => Some(PrincipalField::Picture),
_ => None,
}
}
@ -360,6 +363,7 @@ impl PrincipalField {
PrincipalField::Lists => "lists",
PrincipalField::EnabledPermissions => "enabledPermissions",
PrincipalField::DisabledPermissions => "disabledPermissions",
PrincipalField::Picture => "picture",
}
}
@ -379,6 +383,7 @@ impl PrincipalField {
"lists" => Some(PrincipalField::Lists),
"enabledPermissions" => Some(PrincipalField::EnabledPermissions),
"disabledPermissions" => Some(PrincipalField::DisabledPermissions),
"picture" => Some(PrincipalField::Picture),
_ => None,
}
}

View file

@ -4,14 +4,18 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use std::{collections::hash_map::Entry, str::FromStr};
use std::{collections::hash_map::Entry, fmt, str::FromStr};
use serde::{ser::SerializeMap, Serializer};
use serde::{
de::{self, IgnoredAny, Visitor},
ser::SerializeMap,
Deserializer, Serializer,
};
use store::U64_LEN;
use crate::{
backend::internal::{PrincipalField, PrincipalValue},
Principal, Type, ROLE_ADMIN,
Permission, Principal, Type, ROLE_ADMIN,
};
impl Principal {
@ -59,6 +63,22 @@ impl Principal {
self.fields.get(&key).and_then(|v| v.as_int())
}
pub fn get_str_array(&self, key: PrincipalField) -> Option<&[String]> {
self.fields.get(&key).and_then(|v| match v {
PrincipalValue::StringList(v) => Some(v.as_slice()),
PrincipalValue::String(v) => Some(std::slice::from_ref(v)),
PrincipalValue::Integer(_) | PrincipalValue::IntegerList(_) => None,
})
}
pub fn get_int_array(&self, key: PrincipalField) -> Option<&[u64]> {
self.fields.get(&key).and_then(|v| match v {
PrincipalValue::IntegerList(v) => Some(v.as_slice()),
PrincipalValue::Integer(v) => Some(std::slice::from_ref(v)),
PrincipalValue::String(_) | PrincipalValue::StringList(_) => None,
})
}
pub fn take(&mut self, key: PrincipalField) -> Option<PrincipalValue> {
self.fields.remove(&key)
}
@ -128,10 +148,14 @@ impl Principal {
match v {
PrincipalValue::IntegerList(v) => {
v.push(value);
if !v.contains(&value) {
v.push(value);
}
}
PrincipalValue::Integer(i) => {
*v = PrincipalValue::IntegerList(vec![*i, value]);
if value != *i {
*v = PrincipalValue::IntegerList(vec![*i, value]);
}
}
PrincipalValue::String(s) => {
*v =
@ -163,10 +187,14 @@ impl Principal {
match v {
PrincipalValue::StringList(v) => {
v.push(value);
if !v.contains(&value) {
v.push(value);
}
}
PrincipalValue::String(s) => {
*v = PrincipalValue::StringList(vec![std::mem::take(s), value]);
if s != &value {
*v = PrincipalValue::StringList(vec![std::mem::take(s), value]);
}
}
PrincipalValue::Integer(i) => {
*v = PrincipalValue::StringList(vec![i.to_string(), value]);
@ -196,10 +224,14 @@ impl Principal {
match v {
PrincipalValue::StringList(v) => {
v.insert(0, value);
if !v.contains(&value) {
v.insert(0, value);
}
}
PrincipalValue::String(s) => {
*v = PrincipalValue::StringList(vec![value, std::mem::take(s)]);
if s != &value {
*v = PrincipalValue::StringList(vec![value, std::mem::take(s)]);
}
}
PrincipalValue::Integer(i) => {
*v = PrincipalValue::StringList(vec![value, i.to_string()]);
@ -567,6 +599,82 @@ impl serde::Serialize for Principal {
}
}
impl<'de> serde::Deserialize<'de> for PrincipalValue {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct PrincipalValueVisitor;
impl<'de> Visitor<'de> for PrincipalValueVisitor {
type Value = PrincipalValue;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("an optional u64 or a vector of u64")
}
fn visit_none<E>(self) -> Result<Self::Value, E>
where
E: de::Error,
{
Ok(PrincipalValue::String(String::new()))
}
fn visit_some<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_any(self)
}
fn visit_u64<E>(self, value: u64) -> Result<Self::Value, E>
where
E: de::Error,
{
Ok(PrincipalValue::Integer(value))
}
fn visit_string<E>(self, value: String) -> Result<Self::Value, E>
where
E: de::Error,
{
Ok(PrincipalValue::String(value))
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
Ok(PrincipalValue::String(v.to_string()))
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: de::SeqAccess<'de>,
{
let mut vec_u64 = Vec::new();
let mut vec_string = Vec::new();
while let Some(value) = seq.next_element::<StringOrU64>()? {
match value {
StringOrU64::String(s) => vec_string.push(s),
StringOrU64::U64(u) => vec_u64.push(u),
}
}
match (vec_u64.is_empty(), vec_string.is_empty()) {
(true, false) => Ok(PrincipalValue::StringList(vec_string)),
(false, true) => Ok(PrincipalValue::IntegerList(vec_u64)),
(true, true) => Ok(PrincipalValue::StringList(vec_string)),
_ => Err(serde::de::Error::custom("invalid principal value")),
}
}
}
deserializer.deserialize_map(PrincipalValueVisitor)
}
}
impl<'de> serde::Deserialize<'de> for Principal {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
@ -575,16 +683,16 @@ impl<'de> serde::Deserialize<'de> for Principal {
struct PrincipalVisitor;
// Deserialize the principal
impl<'de> serde::de::Visitor<'de> for PrincipalVisitor {
impl<'de> Visitor<'de> for PrincipalVisitor {
type Value = Principal;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a valid principal")
}
fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: serde::de::MapAccess<'de>,
A: de::MapAccess<'de>,
{
let mut principal = Principal::default();
@ -594,7 +702,9 @@ impl<'de> serde::Deserialize<'de> for Principal {
})?;
let value = match key {
PrincipalField::Name => PrincipalValue::String(map.next_value()?),
PrincipalField::Description | PrincipalField::Tenant => {
PrincipalField::Description
| PrincipalField::Tenant
| PrincipalField::Picture => {
if let Some(v) = map.next_value::<Option<String>>()? {
PrincipalValue::String(v)
} else {
@ -608,9 +718,7 @@ impl<'de> serde::Deserialize<'de> for Principal {
})?;
continue;
}
PrincipalField::Quota => PrincipalValue::Integer(
map.next_value::<Option<u64>>()?.unwrap_or_default(),
),
PrincipalField::Quota => map.next_value::<PrincipalValue>()?,
PrincipalField::Secrets
| PrincipalField::Emails
@ -624,7 +732,7 @@ impl<'de> serde::Deserialize<'de> for Principal {
}
PrincipalField::UsedQuota => {
// consume and ignore
let _ = map.next_value::<Option<u64>>()?;
map.next_value::<IgnoredAny>()?;
continue;
}
};
@ -639,3 +747,193 @@ impl<'de> serde::Deserialize<'de> for Principal {
deserializer.deserialize_map(PrincipalVisitor)
}
}
impl Permission {
pub const fn is_user_permission(&self) -> bool {
matches!(
self,
Permission::Authenticate
| Permission::AuthenticateOauth
| Permission::EmailSend
| Permission::EmailReceive
| Permission::ManageEncryption
| Permission::ManagePasswords
| Permission::JmapEmailGet
| Permission::JmapMailboxGet
| Permission::JmapThreadGet
| Permission::JmapIdentityGet
| Permission::JmapEmailSubmissionGet
| Permission::JmapPushSubscriptionGet
| Permission::JmapSieveScriptGet
| Permission::JmapVacationResponseGet
| Permission::JmapQuotaGet
| Permission::JmapBlobGet
| Permission::JmapEmailSet
| Permission::JmapMailboxSet
| Permission::JmapIdentitySet
| Permission::JmapEmailSubmissionSet
| Permission::JmapPushSubscriptionSet
| Permission::JmapSieveScriptSet
| Permission::JmapVacationResponseSet
| Permission::JmapEmailChanges
| Permission::JmapMailboxChanges
| Permission::JmapThreadChanges
| Permission::JmapIdentityChanges
| Permission::JmapEmailSubmissionChanges
| Permission::JmapQuotaChanges
| Permission::JmapEmailCopy
| Permission::JmapBlobCopy
| Permission::JmapEmailImport
| Permission::JmapEmailParse
| Permission::JmapEmailQueryChanges
| Permission::JmapMailboxQueryChanges
| Permission::JmapEmailSubmissionQueryChanges
| Permission::JmapSieveScriptQueryChanges
| Permission::JmapQuotaQueryChanges
| Permission::JmapEmailQuery
| Permission::JmapMailboxQuery
| Permission::JmapEmailSubmissionQuery
| Permission::JmapSieveScriptQuery
| Permission::JmapQuotaQuery
| Permission::JmapSearchSnippet
| Permission::JmapSieveScriptValidate
| Permission::JmapBlobLookup
| Permission::JmapBlobUpload
| Permission::JmapEcho
| Permission::ImapAuthenticate
| Permission::ImapAclGet
| Permission::ImapAclSet
| Permission::ImapMyRights
| Permission::ImapListRights
| Permission::ImapAppend
| Permission::ImapCapability
| Permission::ImapId
| Permission::ImapCopy
| Permission::ImapMove
| Permission::ImapCreate
| Permission::ImapDelete
| Permission::ImapEnable
| Permission::ImapExpunge
| Permission::ImapFetch
| Permission::ImapIdle
| Permission::ImapList
| Permission::ImapLsub
| Permission::ImapNamespace
| Permission::ImapRename
| Permission::ImapSearch
| Permission::ImapSort
| Permission::ImapSelect
| Permission::ImapExamine
| Permission::ImapStatus
| Permission::ImapStore
| Permission::ImapSubscribe
| Permission::ImapThread
| Permission::Pop3Authenticate
| Permission::Pop3List
| Permission::Pop3Uidl
| Permission::Pop3Stat
| Permission::Pop3Retr
| Permission::Pop3Dele
| Permission::SieveAuthenticate
| Permission::SieveListScripts
| Permission::SieveSetActive
| Permission::SieveGetScript
| Permission::SievePutScript
| Permission::SieveDeleteScript
| Permission::SieveRenameScript
| Permission::SieveCheckScript
| Permission::SieveHaveSpace
)
}
pub const fn is_tenant_admin_permission(&self) -> bool {
matches!(
self,
Permission::MessageQueueList
| Permission::MessageQueueGet
| Permission::MessageQueueUpdate
| Permission::MessageQueueDelete
| Permission::OutgoingReportList
| Permission::OutgoingReportGet
| Permission::OutgoingReportDelete
| Permission::IncomingReportList
| Permission::IncomingReportGet
| Permission::IncomingReportDelete
| Permission::IndividualList
| Permission::IndividualGet
| Permission::IndividualUpdate
| Permission::IndividualDelete
| Permission::IndividualCreate
| Permission::GroupList
| Permission::GroupGet
| Permission::GroupUpdate
| Permission::GroupDelete
| Permission::GroupCreate
| Permission::DomainList
| Permission::DomainGet
| Permission::DomainCreate
| Permission::DomainUpdate
| Permission::DomainDelete
| Permission::MailingListList
| Permission::MailingListGet
| Permission::MailingListCreate
| Permission::MailingListUpdate
| Permission::MailingListDelete
| Permission::RoleList
| Permission::RoleGet
| Permission::RoleCreate
| Permission::RoleUpdate
| Permission::RoleDelete
| Permission::PrincipalList
| Permission::PrincipalGet
| Permission::PrincipalCreate
| Permission::PrincipalUpdate
| Permission::PrincipalDelete
| Permission::Undelete
| Permission::DkimSignatureCreate
| Permission::DkimSignatureGet
| Permission::JmapPrincipalGet
| Permission::JmapPrincipalQueryChanges
| Permission::JmapPrincipalQuery
) || self.is_user_permission()
}
}
#[derive(Debug)]
enum StringOrU64 {
String(String),
U64(u64),
}
impl<'de> serde::Deserialize<'de> for StringOrU64 {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
struct StringOrU64Visitor;
impl<'de> Visitor<'de> for StringOrU64Visitor {
type Value = StringOrU64;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a string or u64")
}
fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
Ok(StringOrU64::String(value.to_string()))
}
fn visit_u64<E>(self, value: u64) -> Result<Self::Value, E>
where
E: de::Error,
{
Ok(StringOrU64::U64(value))
}
}
deserializer.deserialize_any(StringOrU64Visitor)
}
}

View file

@ -88,11 +88,11 @@ impl<T: SessionStream> SessionData<T> {
}
// Obtain quota
let account_quota = self
let resource_token = self
.get_access_token()
.await
.imap_ctx(&arguments.tag, trc::location!())?
.quota as i64;
.as_resource_token();
// Append messages
let mut response = StatusResponse::completed(Command::Append);
@ -104,8 +104,7 @@ impl<T: SessionStream> SessionData<T> {
.email_ingest(IngestEmail {
raw_message: &message.message,
message: MessageParser::new().parse(&message.message),
account_id,
account_quota,
resource: resource_token.clone(),
mailbox_ids: vec![mailbox_id],
keywords: message.flags.into_iter().map(Keyword::from).collect(),
received_at: message.received_at.map(|d| d as u64),
@ -127,6 +126,9 @@ impl<T: SessionStream> SessionData<T> {
if err.matches(trc::EventType::Limit(trc::LimitEvent::Quota)) {
err.details("Disk quota exceeded.")
.code(ResponseCode::OverQuota)
} else if err.matches(trc::EventType::Limit(trc::LimitEvent::TenantQuota)) {
err.details("Organization disk quota exceeded.")
.code(ResponseCode::OverQuota)
} else {
err
}

View file

@ -241,13 +241,13 @@ impl<T: SessionStream> SessionData<T> {
let src_account_id = src_mailbox.id.account_id;
let mut dest_change_id = None;
let dest_account_id = dest_mailbox.account_id;
let dest_quota = self
let resource_token = self
.jmap
.core
.get_cached_access_token(dest_account_id)
.await
.imap_ctx(&arguments.tag, trc::location!())?
.quota as i64;
.as_resource_token();
let mut destroy_ids = RoaringBitmap::new();
for (id, imap_id) in ids {
match self
@ -255,8 +255,7 @@ impl<T: SessionStream> SessionData<T> {
.copy_message(
src_account_id,
id,
dest_account_id,
dest_quota,
&resource_token,
vec![dest_mailbox_id],
Vec::new(),
None,

View file

@ -118,6 +118,14 @@ impl<'x> RequestError<'x> {
)
}
pub fn tenant_over_quota() -> Self {
RequestError::blank(
403,
"Tenant quota exceeded",
"Your organization has exceeded its quota.",
)
}
pub fn too_many_requests() -> Self {
RequestError::blank(
429,

View file

@ -880,6 +880,7 @@ impl ToRequestError for trc::Error {
RequestError::limit(RequestLimitError::ConcurrentUpload)
}
trc::LimitEvent::Quota => RequestError::over_quota(),
trc::LimitEvent::TenantQuota => RequestError::tenant_over_quota(),
trc::LimitEvent::BlobQuota => RequestError::over_blob_quota(
self.value(trc::Key::Total)
.and_then(|v| v.to_uint())

View file

@ -11,7 +11,7 @@
use std::str::FromStr;
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine};
use common::enterprise::undelete::DeletedBlob;
use common::{auth::AccessToken, enterprise::undelete::DeletedBlob};
use directory::backend::internal::manage::ManageDirectory;
use hyper::Method;
use jmap_proto::types::collection::Collection;
@ -168,8 +168,13 @@ impl JMAP {
.email_ingest(IngestEmail {
raw_message: &bytes,
message: MessageParser::new().parse(&bytes),
account_id,
account_quota: 0,
resource: self
.get_resource_token(
&AccessToken::from_id(u32::MAX),
account_id,
)
.await
.caused_by(trc::location!())?,
mailbox_ids: vec![INBOX_ID],
keywords: vec![],
received_at: (request.time as u64).into(),

View file

@ -4,16 +4,16 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use std::sync::Arc;
use std::sync::{atomic::Ordering, Arc};
use common::auth::AccessToken;
use directory::{
backend::internal::{
lookup::DirectoryStore,
manage::{self, ManageDirectory},
manage::{self, not_found, ManageDirectory},
PrincipalAction, PrincipalField, PrincipalUpdate, PrincipalValue, SpecialSecrets,
},
DirectoryInner, Permission, Principal, QueryBy, Type,
DirectoryInner, Permission, Principal, QueryBy, Type, ROLE_ADMIN, ROLE_TENANT_ADMIN, ROLE_USER,
};
use hyper::{header, Method};
@ -57,8 +57,6 @@ impl JMAP {
) -> trc::Result<HttpResponse> {
match (path.get(1), req.method()) {
(None, &Method::POST) => {
let todo = "increment role list version + implement gossip";
// Parse principal
let principal =
serde_json::from_slice::<Principal>(body.as_deref().unwrap_or_default())
@ -78,6 +76,14 @@ impl JMAP {
Type::Resource | Type::Location | Type::Other => Permission::PrincipalCreate,
})?;
#[cfg(feature = "enterprise")]
if (matches!(principal.typ(), Type::Tenant)
|| principal.has_field(PrincipalField::Tenant))
&& !self.core.is_enterprise_edition()
{
return Err(manage::enterprise());
}
// Make sure the current directory supports updates
if matches!(principal.typ(), Type::Individual | Type::Group | Type::List) {
self.assert_supported_directory()?;
@ -86,12 +92,12 @@ impl JMAP {
// Validate tenant limits
#[cfg(feature = "enterprise")]
if self.core.is_enterprise_edition() {
if let Some(tenant_id) = access_token.tenant_id {
if let Some(tenant_info) = access_token.tenant {
let tenant = self
.core
.storage
.data
.query(QueryBy::Id(tenant_id), false)
.query(QueryBy::Id(tenant_info.id), false)
.await?
.ok_or_else(|| {
trc::ManageEvent::NotFound
@ -99,7 +105,35 @@ impl JMAP {
.caused_by(trc::location!())
})?;
let todo = "check limits";
// Enforce tenant quotas
if let Some(limit) = tenant
.get_int_array(PrincipalField::Quota)
.and_then(|quotas| quotas.get(principal.typ() as usize + 1))
.copied()
.filter(|q| *q > 0)
{
// Obtain number of principals
let total = self
.core
.storage
.data
.count_principals(
None,
principal.typ().into(),
tenant_info.id.into(),
)
.await
.caused_by(trc::location!())?;
if total >= limit {
trc::bail!(trc::LimitEvent::TenantQuota
.into_err()
.details("Tenant principal quota exceeded")
.ctx(trc::Key::Details, principal.typ().as_str())
.ctx(trc::Key::Limit, limit)
.ctx(trc::Key::Total, total));
}
}
}
}
@ -108,7 +142,7 @@ impl JMAP {
.core
.storage
.data
.create_principal(principal, access_token.tenant_id)
.create_principal(principal, access_token.tenant.map(|t| t.id))
.await?;
Ok(JsonResponse::new(json!({
@ -117,38 +151,52 @@ impl JMAP {
.into_http_response())
}
(None, &Method::GET) => {
// Validate the access token
access_token.assert_has_permission(Permission::PrincipalList)?;
// List principal ids
let params = UrlParams::new(req.uri().query());
let filter = params.get("filter");
let typ = params.parse("type");
let typ = params.parse("type").unwrap_or(Type::Individual);
let page: usize = params.parse("page").unwrap_or(0);
let limit: usize = params.parse("limit").unwrap_or(0);
let mut tenant_id = access_token.tenant_id;
// Validate the access token
access_token.assert_has_permission(match typ {
Type::Individual => Permission::IndividualList,
Type::Group => Permission::GroupList,
Type::List => Permission::MailingListList,
Type::Domain => Permission::DomainList,
Type::Tenant => Permission::TenantList,
Type::Role => Permission::RoleList,
Type::Resource | Type::Location | Type::Other => Permission::PrincipalList,
})?;
let mut tenant = access_token.tenant.map(|t| t.id);
#[cfg(feature = "enterprise")]
if self.core.is_enterprise_edition() && tenant_id.is_none() {
if let Some(tenant_name) = params.get("tenant") {
tenant_id = self
.core
.storage
.data
.get_principal_info(tenant_name)
.await?
.filter(|p| p.typ == Type::Tenant)
.map(|p| p.id);
if self.core.is_enterprise_edition() {
if tenant.is_none() {
// Limit search to current tenant
if let Some(tenant_name) = params.get("tenant") {
tenant = self
.core
.storage
.data
.get_principal_info(tenant_name)
.await?
.filter(|p| p.typ == Type::Tenant)
.map(|p| p.id);
}
}
} else if matches!(typ, Type::Tenant) {
return Err(manage::enterprise());
}
let accounts = self
.core
.storage
.data
.list_principals(filter, typ, tenant_id)
.list_principals(filter, typ.into(), tenant)
.await?;
let (total, accounts) = if limit > 0 {
let offset = page.saturating_sub(1) * limit;
(
@ -168,32 +216,38 @@ impl JMAP {
.into_http_response())
}
(Some(name), method) => {
// Validate the access token
match *method {
Method::GET => {
access_token.assert_has_permission(Permission::PrincipalGet)?;
}
Method::DELETE => {
access_token.assert_has_permission(Permission::PrincipalDelete)?;
}
Method::PATCH => {
access_token.assert_has_permission(Permission::PrincipalUpdate)?;
}
_ => {}
}
// Fetch, update or delete principal
let name = decode_path_element(name);
let account_id = self
let (account_id, typ) = self
.core
.storage
.data
.get_principal_id(name.as_ref())
.get_principal_info(name.as_ref())
.await?
.ok_or_else(|| trc::ManageEvent::NotFound.into_err())?;
.filter(|p| p.has_tenant_access(access_token.tenant.map(|t| t.id)))
.map(|p| (p.id, p.typ))
.ok_or_else(|| not_found(name.to_string()))?;
#[cfg(feature = "enterprise")]
if matches!(typ, Type::Tenant) && !self.core.is_enterprise_edition() {
return Err(manage::enterprise());
}
match *method {
Method::GET => {
// Validate the access token
access_token.assert_has_permission(match typ {
Type::Individual => Permission::IndividualGet,
Type::Group => Permission::GroupGet,
Type::List => Permission::MailingListGet,
Type::Domain => Permission::DomainGet,
Type::Tenant => Permission::TenantGet,
Type::Role => Permission::RoleGet,
Type::Resource | Type::Location | Type::Other => {
Permission::PrincipalGet
}
})?;
let mut principal = self
.core
.storage
@ -203,27 +257,47 @@ impl JMAP {
.ok_or_else(|| trc::ManageEvent::NotFound.into_err())?;
// Map groups
if let Some(member_of) = principal.take_int_array(PrincipalField::MemberOf)
{
for principal_id in member_of {
if let Some(name) = self
.core
.storage
.data
.get_principal_name(principal_id as u32)
.await
.caused_by(trc::location!())?
{
principal.append_str(PrincipalField::MemberOf, name);
for field in [
PrincipalField::MemberOf,
PrincipalField::Lists,
PrincipalField::Roles,
] {
if let Some(member_of) = principal.take_int_array(field) {
for principal_id in member_of {
match principal_id as u32 {
ROLE_ADMIN if field == PrincipalField::Roles => {
principal.append_str(field, "admin");
}
ROLE_TENANT_ADMIN if field == PrincipalField::Roles => {
principal.append_str(field, "tenant-admin");
}
ROLE_USER if field == PrincipalField::Roles => {
principal.append_str(field, "user");
}
principal_id => {
if let Some(name) = self
.core
.storage
.data
.get_principal_name(principal_id)
.await
.caused_by(trc::location!())?
{
principal.append_str(field, name);
}
}
}
}
}
}
// Obtain quota usage
principal.set(
PrincipalField::UsedQuota,
self.get_used_quota(account_id).await? as u64,
);
if matches!(typ, Type::Individual | Type::Group | Type::Tenant) {
principal.set(
PrincipalField::UsedQuota,
self.get_used_quota(account_id).await? as u64,
);
}
// Obtain member names
for member_id in self.core.storage.data.get_members(account_id).await? {
@ -247,6 +321,19 @@ impl JMAP {
.into_http_response())
}
Method::DELETE => {
// Validate the access token
access_token.assert_has_permission(match typ {
Type::Individual => Permission::IndividualDelete,
Type::Group => Permission::GroupDelete,
Type::List => Permission::MailingListDelete,
Type::Domain => Permission::DomainDelete,
Type::Tenant => Permission::TenantDelete,
Type::Role => Permission::RoleDelete,
Type::Resource | Type::Location | Type::Other => {
Permission::PrincipalDelete
}
})?;
// Remove FTS index
self.core.storage.fts.remove_all(account_id).await?;
@ -256,15 +343,39 @@ impl JMAP {
.data
.delete_principal(QueryBy::Id(account_id))
.await?;
// Remove entries from cache
self.inner.sessions.retain(|_, id| id.item != account_id);
if matches!(typ, Type::Role | Type::Tenant) {
// Update permissions cache
self.core.security.permissions.clear();
self.core
.security
.permissions_version
.fetch_add(1, Ordering::Relaxed);
}
Ok(JsonResponse::new(json!({
"data": (),
}))
.into_http_response())
}
Method::PATCH => {
// Validate the access token
let permission_needed = match typ {
Type::Individual => Permission::IndividualUpdate,
Type::Group => Permission::GroupUpdate,
Type::List => Permission::MailingListUpdate,
Type::Domain => Permission::DomainUpdate,
Type::Tenant => Permission::TenantUpdate,
Type::Role => Permission::RoleUpdate,
Type::Resource | Type::Location | Type::Other => {
Permission::PrincipalUpdate
}
};
access_token.assert_has_permission(permission_needed)?;
let changes = serde_json::from_slice::<Vec<PrincipalUpdate>>(
body.as_deref().unwrap_or_default(),
)
@ -273,30 +384,83 @@ impl JMAP {
.from_json_error(err)
})?;
// Make sure the current directory supports updates
if changes.iter().any(|change| {
!matches!(
change.field,
PrincipalField::Quota | PrincipalField::Description
)
}) {
// Validate changes
let mut needs_assert = false;
let mut is_password_change = false;
let mut is_role_change = false;
for change in &changes {
match change.field {
PrincipalField::Name
| PrincipalField::Emails
| PrincipalField::MemberOf
| PrincipalField::Members
| PrincipalField::Lists => {
needs_assert = true;
}
PrincipalField::Quota
| PrincipalField::UsedQuota
| PrincipalField::Description
| PrincipalField::Type
| PrincipalField::Picture => (),
PrincipalField::Secrets => {
is_password_change = true;
needs_assert = true;
}
PrincipalField::Tenant => {
// Tenants are not allowed to change their tenantId
if access_token.tenant.is_some() {
trc::bail!(trc::SecurityEvent::Unauthorized
.into_err()
.details(permission_needed.name())
.ctx(
trc::Key::Reason,
"Tenants cannot change their tenantId"
));
}
}
PrincipalField::Roles
| PrincipalField::EnabledPermissions
| PrincipalField::DisabledPermissions => {
if matches!(typ, Type::Role | Type::Tenant) {
is_role_change = true;
}
if change.field == PrincipalField::Roles {
needs_assert = true;
}
}
}
}
if needs_assert {
self.assert_supported_directory()?;
}
let is_password_change = changes
.iter()
.any(|change| matches!(change.field, PrincipalField::Secrets));
// Update principal
self.core
.storage
.data
.update_principal(QueryBy::Id(account_id), changes)
.update_principal(
QueryBy::Id(account_id),
changes,
access_token.tenant.map(|t| t.id),
)
.await?;
if is_password_change {
// Remove entries from cache
self.inner.sessions.retain(|_, id| id.item != account_id);
}
if is_role_change {
// Update permissions cache
self.core.security.permissions.clear();
self.core
.security
.permissions_version
.fetch_add(1, Ordering::Relaxed);
}
Ok(JsonResponse::new(json!({
"data": (),
}))
@ -451,7 +615,11 @@ impl JMAP {
self.core
.storage
.data
.update_principal(QueryBy::Id(access_token.primary_id()), actions)
.update_principal(
QueryBy::Id(access_token.primary_id()),
actions,
access_token.tenant.map(|t| t.id),
)
.await?;
// Remove entries from cache

View file

@ -6,7 +6,7 @@
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine};
use common::auth::AccessToken;
use directory::Permission;
use directory::{backend::internal::manage::ManageDirectory, Permission, Type};
use hyper::Method;
use mail_auth::{
dmarc::URI,
@ -21,6 +21,7 @@ use store::{
write::{key::DeserializeBigEndian, now, Bincode, QueueClass, ReportEvent, ValueClass},
Deserialize, IterateParams, ValueKey,
};
use trc::AddContext;
use utils::url_params::UrlParams;
use crate::{
@ -111,6 +112,21 @@ impl JMAP {
) -> trc::Result<HttpResponse> {
let params = UrlParams::new(req.uri().query());
// Limit to tenant domains
let mut tenant_domains = None;
if self.core.is_enterprise_edition() {
if let Some(tenant) = access_token.tenant {
tenant_domains = self
.core
.storage
.data
.list_principals(None, Type::Domain.into(), tenant.id.into())
.await
.caused_by(trc::location!())?
.into();
}
}
match (
path.get(1).copied().unwrap_or_default(),
path.get(2).copied().map(decode_path_element),
@ -156,32 +172,35 @@ impl JMAP {
IterateParams::new(from_key, to_key).ascending(),
|key, value| {
let message = Bincode::<queue::Message>::deserialize(value)?.inner;
let matches = !has_filters
|| (text
.as_ref()
.map(|text| {
message.return_path.contains(text)
|| message
.recipients
.iter()
.any(|r| r.address_lcase.contains(text))
})
.unwrap_or_else(|| {
from.as_ref()
.map_or(true, |from| message.return_path.contains(from))
&& to.as_ref().map_or(true, |to| {
let matches = tenant_domains
.as_ref()
.map_or(true, |domains| message.has_domain(domains))
&& (!has_filters
|| (text
.as_ref()
.map(|text| {
message.return_path.contains(text)
|| message
.recipients
.iter()
.any(|r| r.address_lcase.contains(text))
})
.unwrap_or_else(|| {
from.as_ref().map_or(true, |from| {
message.return_path.contains(from)
}) && to.as_ref().map_or(true, |to| {
message
.recipients
.iter()
.any(|r| r.address_lcase.contains(to))
})
})
&& before.as_ref().map_or(true, |before| {
message.next_delivery_event() < *before
})
&& after.as_ref().map_or(true, |after| {
message.next_delivery_event() > *after
}));
})
&& before.as_ref().map_or(true, |before| {
message.next_delivery_event() < *before
})
&& after.as_ref().map_or(true, |after| {
message.next_delivery_event() > *after
})));
if matches {
if offset == 0 {
@ -230,6 +249,11 @@ impl JMAP {
.smtp
.read_message(queue_id.parse().unwrap_or_default())
.await
.filter(|message| {
tenant_domains
.as_ref()
.map_or(true, |domains| message.has_domain(domains))
})
{
Ok(JsonResponse::new(json!({
"data": Message::from(&message),
@ -253,6 +277,11 @@ impl JMAP {
.smtp
.read_message(queue_id.parse().unwrap_or_default())
.await
.filter(|message| {
tenant_domains
.as_ref()
.map_or(true, |domains| message.has_domain(domains))
})
{
let prev_event = message.next_event().unwrap_or_default();
let mut found = false;
@ -297,6 +326,11 @@ impl JMAP {
.smtp
.read_message(queue_id.parse().unwrap_or_default())
.await
.filter(|message| {
tenant_domains
.as_ref()
.map_or(true, |domains| message.has_domain(domains))
})
{
let mut found = false;
let prev_event = message.next_event().unwrap_or_default();
@ -417,7 +451,10 @@ impl JMAP {
|key, _| {
if type_.map_or(true, |t| t == *key.last().unwrap()) {
let event = ReportEvent::deserialize(key)?;
if event.seq_id != 0
if tenant_domains
.as_ref()
.map_or(true, |domains| domains.contains(&event.domain))
&& event.seq_id != 0
&& domain.as_ref().map_or(true, |d| event.domain.contains(d))
{
if offset == 0 {
@ -460,7 +497,11 @@ impl JMAP {
let mut result = None;
if let Some(report_id) = parse_queued_report_id(report_id.as_ref()) {
match report_id {
QueueClass::DmarcReportHeader(event) => {
QueueClass::DmarcReportHeader(event)
if tenant_domains
.as_ref()
.map_or(true, |domains| domains.contains(&event.domain)) =>
{
let mut rua = Vec::new();
if let Some(report) = self
.smtp
@ -470,7 +511,11 @@ impl JMAP {
result = Report::dmarc(event, report, rua).into();
}
}
QueueClass::TlsReportHeader(event) => {
QueueClass::TlsReportHeader(event)
if tenant_domains
.as_ref()
.map_or(true, |domains| domains.contains(&event.domain)) =>
{
let mut rua = Vec::new();
if let Some(report) = self
.smtp
@ -498,18 +543,28 @@ impl JMAP {
access_token.assert_has_permission(Permission::OutgoingReportDelete)?;
if let Some(report_id) = parse_queued_report_id(report_id.as_ref()) {
match report_id {
QueueClass::DmarcReportHeader(event) => {
let result = match report_id {
QueueClass::DmarcReportHeader(event)
if tenant_domains
.as_ref()
.map_or(true, |domains| domains.contains(&event.domain)) =>
{
self.smtp.delete_dmarc_report(event).await;
true
}
QueueClass::TlsReportHeader(event) => {
QueueClass::TlsReportHeader(event)
if tenant_domains
.as_ref()
.map_or(true, |domains| domains.contains(&event.domain)) =>
{
self.smtp.delete_tls_report(vec![event]).await;
true
}
_ => (),
}
_ => false,
};
Ok(JsonResponse::new(json!({
"data": true,
"data": result,
}))
.into_http_response())
} else {

View file

@ -5,7 +5,7 @@
*/
use common::auth::AccessToken;
use directory::Permission;
use directory::{backend::internal::manage::ManageDirectory, Permission, Type};
use hyper::Method;
use mail_auth::report::{
tlsrpt::{FailureDetails, Policy, TlsReport},
@ -17,6 +17,7 @@ use store::{
write::{key::DeserializeBigEndian, BatchBuilder, Bincode, ReportClass, ValueClass},
Deserialize, IterateParams, ValueKey, U64_LEN,
};
use trc::AddContext;
use utils::url_params::UrlParams;
use crate::{
@ -39,6 +40,21 @@ impl JMAP {
path: Vec<&str>,
access_token: &AccessToken,
) -> trc::Result<HttpResponse> {
// Limit to tenant domains
let mut tenant_domains = None;
if self.core.is_enterprise_edition() {
if let Some(tenant) = access_token.tenant {
tenant_domains = self
.core
.storage
.data
.list_principals(None, Type::Domain.into(), tenant.id.into())
.await
.caused_by(trc::location!())?
.into();
}
}
match (
path.get(1).copied().unwrap_or_default(),
path.get(2).copied().map(decode_path_element),
@ -98,12 +114,13 @@ impl JMAP {
let mut offset = page.saturating_sub(1) * limit;
let mut total = 0;
let mut last_id = 0;
let has_filters = filter.is_some() || tenant_domains.is_some();
self.core
.storage
.data
.iterate(
IterateParams::new(from_key, to_key)
.set_values(filter.is_some())
.set_values(has_filters)
.descending(),
|key, value| {
// Skip chunked records
@ -114,22 +131,51 @@ impl JMAP {
last_id = id;
// TODO: Support filtering chunked records (over 10MB) on FDB
let matches = filter.map_or(true, |filter| match typ {
ReportType::Dmarc => Bincode::<
IncomingReport<mail_auth::report::Report>,
>::deserialize(
value
)
.map_or(false, |v| v.inner.contains(filter)),
ReportType::Tls => {
Bincode::<IncomingReport<TlsReport>>::deserialize(value)
.map_or(false, |v| v.inner.contains(filter))
let matches = if has_filters {
match typ {
ReportType::Dmarc => {
let report = Bincode::<
IncomingReport<mail_auth::report::Report>,
>::deserialize(
value
)
.caused_by(trc::location!())?
.inner;
filter.map_or(true, |f| report.contains(f))
&& tenant_domains
.as_ref()
.map_or(true, |domains| report.has_domain(domains))
}
ReportType::Tls => {
let report =
Bincode::<IncomingReport<TlsReport>>::deserialize(
value,
)
.caused_by(trc::location!())?
.inner;
filter.map_or(true, |f| report.contains(f))
&& tenant_domains
.as_ref()
.map_or(true, |domains| report.has_domain(domains))
}
ReportType::Arf => {
let report =
Bincode::<IncomingReport<Feedback>>::deserialize(value)
.caused_by(trc::location!())?
.inner;
filter.map_or(true, |f| report.contains(f))
&& tenant_domains
.as_ref()
.map_or(true, |domains| report.has_domain(domains))
}
}
ReportType::Arf => {
Bincode::<IncomingReport<Feedback>>::deserialize(value)
.map_or(false, |v| v.inner.contains(filter))
}
});
} else {
true
};
if matches {
if offset == 0 {
if limit == 0 || results.len() < limit {
@ -174,11 +220,17 @@ impl JMAP {
))
.await?
{
Some(report) => Ok(JsonResponse::new(json!({
"data": report.inner,
}))
.into_http_response()),
None => Err(trc::ResourceEvent::NotFound.into_err()),
Some(report)
if tenant_domains
.as_ref()
.map_or(true, |domains| report.inner.has_domain(domains)) =>
{
Ok(JsonResponse::new(json!({
"data": report.inner,
}))
.into_http_response())
}
_ => Err(trc::ResourceEvent::NotFound.into_err()),
},
ReportClass::Dmarc { .. } => match self
.core
@ -189,11 +241,17 @@ impl JMAP {
)
.await?
{
Some(report) => Ok(JsonResponse::new(json!({
"data": report.inner,
}))
.into_http_response()),
None => Err(trc::ResourceEvent::NotFound.into_err()),
Some(report)
if tenant_domains
.as_ref()
.map_or(true, |domains| report.inner.has_domain(domains)) =>
{
Ok(JsonResponse::new(json!({
"data": report.inner,
}))
.into_http_response())
}
_ => Err(trc::ResourceEvent::NotFound.into_err()),
},
ReportClass::Arf { .. } => match self
.core
@ -204,11 +262,17 @@ impl JMAP {
))
.await?
{
Some(report) => Ok(JsonResponse::new(json!({
"data": report.inner,
}))
.into_http_response()),
None => Err(trc::ResourceEvent::NotFound.into_err()),
Some(report)
if tenant_domains
.as_ref()
.map_or(true, |domains| report.inner.has_domain(domains)) =>
{
Ok(JsonResponse::new(json!({
"data": report.inner,
}))
.into_http_response())
}
_ => Err(trc::ResourceEvent::NotFound.into_err()),
},
}
} else {
@ -220,6 +284,43 @@ impl JMAP {
access_token.assert_has_permission(Permission::IncomingReportDelete)?;
if let Some(report_id) = parse_incoming_report_id(class, report_id.as_ref()) {
if let Some(domains) = &tenant_domains {
let is_tenant_report = match &report_id {
ReportClass::Tls { .. } => self
.core
.storage
.data
.get_value::<Bincode<IncomingReport<TlsReport>>>(ValueKey::from(
ValueClass::Report(report_id.clone()),
))
.await?
.map_or(true, |report| report.inner.has_domain(domains)),
ReportClass::Dmarc { .. } => self
.core
.storage
.data
.get_value::<Bincode<IncomingReport<mail_auth::report::Report>>>(
ValueKey::from(ValueClass::Report(report_id.clone())),
)
.await?
.map_or(true, |report| report.inner.has_domain(domains)),
ReportClass::Arf { .. } => self
.core
.storage
.data
.get_value::<Bincode<IncomingReport<Feedback>>>(ValueKey::from(
ValueClass::Report(report_id.clone()),
))
.await?
.map_or(true, |report| report.inner.has_domain(domains)),
};
if !is_tenant_report {
return Err(trc::ResourceEvent::NotFound.into_err());
}
}
let mut batch = BatchBuilder::new();
batch.clear(ValueClass::Report(report_id));
self.core.storage.data.write(batch.build()).await?;

View file

@ -273,7 +273,7 @@ impl JMAP {
set::RequestArguments::VacationResponse => {
access_token.assert_is_member(req.account_id)?;
self.vacation_response_set(req).await?.into()
self.vacation_response_set(req, access_token).await?.into()
}
},
RequestMethod::Changes(req) => self.changes(req, access_token).await?.into(),

View file

@ -4,7 +4,7 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use common::auth::AccessToken;
use common::auth::{AccessToken, ResourceToken};
use jmap_proto::{
error::set::SetError,
method::{
@ -94,7 +94,7 @@ impl JMAP {
let mut destroy_ids = Vec::new();
// Obtain quota
let account_quota = self.get_quota(access_token, account_id).await?;
let resource_token = self.get_resource_token(access_token, account_id).await?;
'create: for (id, create) in request.create {
let id = id.unwrap();
@ -216,8 +216,7 @@ impl JMAP {
.copy_message(
from_account_id,
from_message_id,
account_id,
account_quota,
&resource_token,
mailboxes,
keywords,
received_at,
@ -276,14 +275,14 @@ impl JMAP {
&self,
from_account_id: u32,
from_message_id: u32,
account_id: u32,
account_quota: i64,
resource_token: &ResourceToken,
mailboxes: Vec<u32>,
keywords: Vec<Keyword>,
received_at: Option<UTCDate>,
session_id: u64,
) -> trc::Result<Result<IngestedEmail, SetError>> {
// Obtain metadata
let account_id = resource_token.account_id;
let mut metadata = if let Some(metadata) = self
.get_property::<Bincode<MessageMetadata>>(
from_account_id,
@ -303,12 +302,14 @@ impl JMAP {
// Check quota
match self
.has_available_quota(account_id, account_quota, metadata.size as i64)
.has_available_quota(resource_token, metadata.size as u64)
.await
{
Ok(_) => (),
Err(err) => {
if err.matches(trc::EventType::Limit(trc::LimitEvent::Quota)) {
if err.matches(trc::EventType::Limit(trc::LimitEvent::Quota))
|| err.matches(trc::EventType::Limit(trc::LimitEvent::TenantQuota))
{
trc::error!(err.account_id(account_id).span_id(session_id));
return Ok(Err(SetError::over_quota()));
} else {
@ -412,8 +413,12 @@ impl JMAP {
hash: metadata.blob_hash.clone(),
}),
0u64.serialize(),
)
.custom(EmailIndexBuilder::set(metadata));
);
EmailIndexBuilder::set(metadata).build(
&mut batch,
account_id,
resource_token.tenant.map(|t| t.id),
);
// Insert and obtain ids
let ids = self

View file

@ -399,6 +399,15 @@ impl JMAP {
.remove(account_id, Collection::Email.into(), &tombstoned_ids)
.await?;
// Obtain tenant id
let tenant_id = self
.core
.get_cached_access_token(account_id)
.await
.caused_by(trc::location!())?
.tenant
.map(|t| t.id);
// Delete messages
for document_id in tombstoned_ids {
let mut batch = BatchBuilder::new();
@ -466,7 +475,7 @@ impl JMAP {
// SPDX-SnippetEnd
// Delete message
batch.custom(EmailIndexBuilder::clear(metadata.inner));
EmailIndexBuilder::clear(metadata.inner).build(&mut batch, account_id, tenant_id);
// Commit batch
self.core.storage.data.write(batch.build()).await?;

View file

@ -47,7 +47,7 @@ impl JMAP {
};
// Obtain quota
let account_quota = self.get_quota(access_token, account_id).await?;
let resource_token = self.get_resource_token(access_token, account_id).await?;
let mut response = ImportEmailResponse {
account_id: request.account_id,
@ -117,8 +117,7 @@ impl JMAP {
.email_ingest(IngestEmail {
raw_message: &raw_message,
message: MessageParser::new().parse(&raw_message),
account_id,
account_quota,
resource: resource_token.clone(),
mailbox_ids,
keywords: email.keywords,
received_at: email.received_at.map(|r| r.into()),

View file

@ -17,10 +17,7 @@ use nlp::language::Language;
use store::{
backend::MAX_TOKEN_LENGTH,
fts::{index::FtsDocument, Field},
write::{
BatchBuilder, Bincode, BlobOp, DirectoryClass, IntoOperations, F_BITMAP, F_CLEAR, F_INDEX,
F_VALUE,
},
write::{BatchBuilder, Bincode, BlobOp, DirectoryClass, F_BITMAP, F_CLEAR, F_INDEX, F_VALUE},
};
use utils::BlobHash;
@ -41,8 +38,11 @@ pub struct SortedAddressBuilder {
}
pub(super) trait IndexMessage {
#[allow(clippy::too_many_arguments)]
fn index_message(
&mut self,
account_id: u32,
tenant_id: Option<u32>,
message: Message,
blob_hash: BlobHash,
keywords: Vec<Keyword>,
@ -60,6 +60,8 @@ pub trait IndexMessageText<'x>: Sized {
impl IndexMessage for BatchBuilder {
fn index_message(
&mut self,
account_id: u32,
tenant_id: Option<u32>,
message: Message,
blob_hash: BlobHash,
keywords: Vec<Keyword>,
@ -73,12 +75,17 @@ impl IndexMessage for BatchBuilder {
self.value(Property::MailboxIds, mailbox_ids, F_VALUE | F_BITMAP);
// Index size
let account_id = self.last_account_id().unwrap();
self.value(Property::Size, message.raw_message.len() as u32, F_INDEX)
.add(
DirectoryClass::UsedQuota(account_id),
message.raw_message.len() as i64,
);
if let Some(tenant_id) = tenant_id {
self.add(
DirectoryClass::UsedQuota(tenant_id),
message.raw_message.len() as i64,
);
}
// Index receivedAt
self.value(Property::ReceivedAt, received_at, F_INDEX);
@ -399,8 +406,8 @@ impl<'x> EmailIndexBuilder<'x> {
}
}
impl<'x> IntoOperations for EmailIndexBuilder<'x> {
fn build(self, batch: &mut BatchBuilder) {
impl<'x> EmailIndexBuilder<'x> {
pub fn build(self, batch: &mut BatchBuilder, account_id: u32, tenant_id: Option<u32>) {
let options = if self.set {
// Serialize metadata
batch.value(Property::BodyStructure, &self.inner, F_VALUE);
@ -413,17 +420,18 @@ impl<'x> IntoOperations for EmailIndexBuilder<'x> {
let metadata = &self.inner.inner;
// Index properties
let account_id = batch.last_account_id().unwrap();
let quota = if self.set {
metadata.size as i64
} else {
-(metadata.size as i64)
};
batch
.value(Property::Size, metadata.size as u32, F_INDEX | options)
.add(
DirectoryClass::UsedQuota(account_id),
if self.set {
metadata.size as i64
} else {
-(metadata.size as i64)
},
);
.add(DirectoryClass::UsedQuota(account_id), quota);
if let Some(tenant_id) = tenant_id {
batch.add(DirectoryClass::UsedQuota(tenant_id), quota);
}
batch.value(
Property::ReceivedAt,
metadata.received_at,

View file

@ -9,6 +9,7 @@ use std::{
time::{Duration, Instant},
};
use common::auth::ResourceToken;
use jmap_proto::{
object::Object,
types::{
@ -57,8 +58,7 @@ pub struct IngestedEmail {
pub struct IngestEmail<'x> {
pub raw_message: &'x [u8],
pub message: Option<Message<'x>>,
pub account_id: u32,
pub account_quota: i64,
pub resource: ResourceToken,
pub mailbox_ids: Vec<u32>,
pub keywords: Vec<Keyword>,
pub received_at: Option<u64>,
@ -81,8 +81,10 @@ impl JMAP {
pub async fn email_ingest(&self, mut params: IngestEmail<'_>) -> trc::Result<IngestedEmail> {
// Check quota
let start_time = Instant::now();
let mut raw_message_len = params.raw_message.len() as i64;
self.has_available_quota(params.account_id, params.account_quota, raw_message_len)
let account_id = params.resource.account_id;
let tenant_id = params.resource.tenant.map(|t| t.id);
let mut raw_message_len = params.raw_message.len() as u64;
self.has_available_quota(&params.resource, raw_message_len)
.await
.caused_by(trc::location!())?;
@ -157,7 +159,7 @@ impl JMAP {
.storage
.data
.filter(
params.account_id,
account_id,
Collection::Email,
vec![
Filter::eq(Property::MessageId, &message_id),
@ -175,7 +177,7 @@ impl JMAP {
trc::event!(
MessageIngest(MessageIngestEvent::Duplicate),
SpanId = params.session_id,
AccountId = params.account_id,
AccountId = account_id,
MessageId = message_id,
);
@ -189,7 +191,7 @@ impl JMAP {
}
if !references.is_empty() {
self.find_or_merge_thread(params.account_id, subject, &references)
self.find_or_merge_thread(account_id, subject, &references)
.await?
} else {
None
@ -200,7 +202,7 @@ impl JMAP {
if params.encrypt && !message.is_encrypted() {
if let Some(encrypt_params) = self
.get_property::<EncryptionParams>(
params.account_id,
account_id,
Collection::Principal,
0,
Property::Parameters,
@ -211,7 +213,7 @@ impl JMAP {
match message.encrypt(&encrypt_params).await {
Ok(new_raw_message) => {
raw_message = Cow::from(new_raw_message);
raw_message_len = raw_message.len() as i64;
raw_message_len = raw_message.len() as u64;
message = MessageParser::default()
.parse(raw_message.as_ref())
.ok_or_else(|| {
@ -252,13 +254,13 @@ impl JMAP {
// Obtain a documentId and changeId
let change_id = self
.assign_change_id(params.account_id)
.assign_change_id(account_id)
.await
.caused_by(trc::location!())?;
// Store blob
let blob_id = self
.put_blob(params.account_id, raw_message.as_ref(), false)
.put_blob(account_id, raw_message.as_ref(), false)
.await
.caused_by(trc::location!())?;
@ -267,7 +269,7 @@ impl JMAP {
let mut imap_uids = Vec::with_capacity(params.mailbox_ids.len());
for mailbox_id in &params.mailbox_ids {
let uid = self
.assign_imap_uid(params.account_id, *mailbox_id)
.assign_imap_uid(account_id, *mailbox_id)
.await
.caused_by(trc::location!())?;
mailbox_ids.push(UidMailbox::new(*mailbox_id, uid));
@ -278,7 +280,7 @@ impl JMAP {
let mut batch = BatchBuilder::new();
batch
.with_change_id(change_id)
.with_account_id(params.account_id)
.with_account_id(account_id)
.with_collection(Collection::Thread);
if let Some(thread_id) = thread_id {
batch.log(Changes::update([thread_id]));
@ -301,6 +303,8 @@ impl JMAP {
.create_document()
.log(LogEmailInsert(thread_id))
.index_message(
account_id,
tenant_id,
message,
blob_id.hash.clone(),
params.keywords,
@ -348,13 +352,13 @@ impl JMAP {
IngestSource::Imap => MessageIngestEvent::ImapAppend,
}),
SpanId = params.session_id,
AccountId = params.account_id,
AccountId = account_id,
DocumentId = document_id,
MailboxId = mailbox_ids_event,
BlobId = blob_id.hash.to_hex(),
ChangeId = change_id,
MessageId = message_id,
Size = raw_message_len as u64,
Size = raw_message_len,
Elapsed = start_time.elapsed(),
);
@ -364,7 +368,7 @@ impl JMAP {
blob_id: BlobId {
hash: blob_id.hash,
class: BlobClass::Linked {
account_id: params.account_id,
account_id,
collection: Collection::Email.into(),
document_id,
},

View file

@ -89,7 +89,7 @@ impl JMAP {
let will_destroy = request.unwrap_destroy();
// Obtain quota
let account_quota = self.get_quota(access_token, account_id).await?;
let resource_token = self.get_resource_token(access_token, account_id).await?;
// Process creates
'create: for (id, mut object) in request.unwrap_create() {
@ -715,8 +715,7 @@ impl JMAP {
.email_ingest(IngestEmail {
raw_message: &raw_message,
message: MessageParser::new().parse(&raw_message),
account_id,
account_quota,
resource: resource_token.clone(),
mailbox_ids: mailboxes,
keywords,
received_at,

View file

@ -13,7 +13,9 @@ use std::{
use auth::rate_limit::ConcurrencyLimiters;
use common::{
auth::AccessToken, manager::webadmin::WebAdminManager, Core, DeliveryEvent, SharedCore,
auth::{AccessToken, ResourceToken, TenantInfo},
manager::webadmin::WebAdminManager,
Core, DeliveryEvent, SharedCore,
};
use dashmap::DashMap;
use directory::QueryBy;
@ -319,18 +321,56 @@ impl JMAP {
)
}
pub async fn get_quota(&self, access_token: &AccessToken, account_id: u32) -> trc::Result<i64> {
pub async fn get_resource_token(
&self,
access_token: &AccessToken,
account_id: u32,
) -> trc::Result<ResourceToken> {
Ok(if access_token.primary_id == account_id {
access_token.quota as i64
ResourceToken {
account_id,
quota: access_token.quota,
tenant: access_token.tenant,
}
} else {
self.core
let mut quotas = ResourceToken {
account_id,
..Default::default()
};
if let Some(principal) = self
.core
.storage
.directory
.query(QueryBy::Id(account_id), false)
.await
.add_context(|err| err.caused_by(trc::location!()).account_id(account_id))?
.map(|p| p.quota() as i64)
.unwrap_or_default()
{
quotas.quota = principal.quota();
#[cfg(feature = "enterprise")]
if self.core.is_enterprise_edition() {
if let Some(tenant_id) = principal.tenant() {
quotas.tenant = TenantInfo {
id: tenant_id,
quota: self
.core
.storage
.directory
.query(QueryBy::Id(tenant_id), false)
.await
.add_context(|err| {
err.caused_by(trc::location!()).account_id(tenant_id)
})?
.map(|tenant| tenant.quota())
.unwrap_or_default(),
}
.into();
}
}
}
quotas
})
}
@ -345,25 +385,35 @@ impl JMAP {
pub async fn has_available_quota(
&self,
account_id: u32,
account_quota: i64,
item_size: i64,
quotas: &ResourceToken,
item_size: u64,
) -> trc::Result<()> {
if account_quota == 0 {
return Ok(());
if quotas.quota != 0 {
let used_quota = self.get_used_quota(quotas.account_id).await? as u64;
if used_quota + item_size > quotas.quota {
return Err(trc::LimitEvent::Quota
.into_err()
.ctx(trc::Key::Limit, quotas.quota)
.ctx(trc::Key::Size, used_quota));
}
}
self.get_used_quota(account_id)
.await
.and_then(|used_quota| {
if used_quota + item_size <= account_quota {
Ok(())
} else {
Err(trc::LimitEvent::Quota
#[cfg(feature = "enterprise")]
if self.core.is_enterprise_edition() {
if let Some(tenant) = quotas.tenant {
let used_quota = self.get_used_quota(tenant.id).await? as u64;
if used_quota + item_size > tenant.quota {
return Err(trc::LimitEvent::TenantQuota
.into_err()
.ctx(trc::Key::Limit, account_quota as u64)
.ctx(trc::Key::Size, used_quota as u64))
.ctx(trc::Key::Limit, tenant.quota)
.ctx(trc::Key::Size, used_quota));
}
})
}
}
Ok(())
}
pub async fn filter(

View file

@ -66,6 +66,7 @@ pub struct Peer {
pub epoch: EpochId,
pub gen_config: GenerationId,
pub gen_lists: GenerationId,
pub gen_permissions: GenerationId,
pub state: State,
// Heartbeat state
@ -83,6 +84,7 @@ pub struct PeerStatus {
pub epoch: EpochId,
pub gen_config: GenerationId,
pub gen_lists: GenerationId,
pub gen_permissions: GenerationId,
}
impl From<&Peer> for PeerStatus {
@ -92,12 +94,14 @@ impl From<&Peer> for PeerStatus {
epoch: peer.epoch,
gen_config: peer.gen_config,
gen_lists: peer.gen_lists,
gen_permissions: peer.gen_permissions,
}
}
}
impl From<&Gossiper> for PeerStatus {
fn from(cluster: &Gossiper) -> Self {
let core = cluster.core.core.load();
PeerStatus {
addr: cluster.addr,
epoch: cluster.epoch,
@ -106,14 +110,8 @@ impl From<&Gossiper> for PeerStatus {
.jmap_inner
.config_version
.load(Ordering::Relaxed),
gen_lists: cluster
.core
.core
.load()
.network
.blocked_ips
.version
.load(Ordering::Relaxed),
gen_lists: core.network.blocked_ips.version.load(Ordering::Relaxed),
gen_permissions: core.security.permissions_version.load(Ordering::Relaxed),
}
}
}

View file

@ -14,6 +14,7 @@ impl Peer {
epoch: 0,
gen_config: 0,
gen_lists: 0,
gen_permissions: 0,
addr,
state: State::Seed,
last_heartbeat: Instant::now(),
@ -80,6 +81,7 @@ impl From<PeerStatus> for Peer {
epoch: value.epoch,
gen_config: value.gen_config,
gen_lists: value.gen_lists,
gen_permissions: value.gen_permissions,
state: State::Alive,
last_heartbeat: Instant::now(),
hb_window: vec![0; HEARTBEAT_WINDOW],

View file

@ -99,6 +99,7 @@ impl Gossiper {
let mut remove_seeds = false;
let mut update_config = false;
let mut update_lists = false;
let mut update_permissions = false;
'outer: for (pos, peer) in peers.into_iter().enumerate() {
if peer.addr == self.addr {
@ -116,8 +117,9 @@ impl Gossiper {
local_peer.gen_config = peer.gen_config;
if local_peer.hb_sum > 0 {
trc::event!(
Cluster(ClusterEvent::PeerHasConfigChanges),
RemoteIp = peer.addr
Cluster(ClusterEvent::PeerHasChanges),
RemoteIp = peer.addr,
Details = "settings"
);
update_config = true;
@ -127,13 +129,26 @@ impl Gossiper {
local_peer.gen_lists = peer.gen_lists;
if local_peer.hb_sum > 0 {
trc::event!(
Cluster(ClusterEvent::PeerHasListChanges),
RemoteIp = peer.addr
Cluster(ClusterEvent::PeerHasChanges),
RemoteIp = peer.addr,
Details = "blocked_ips"
);
update_lists = true;
}
}
if local_peer.gen_permissions != peer.gen_permissions {
local_peer.gen_permissions = peer.gen_permissions;
if local_peer.hb_sum > 0 {
trc::event!(
Cluster(ClusterEvent::PeerHasChanges),
RemoteIp = peer.addr,
Details = "permissions"
);
update_permissions = true;
}
}
}
continue 'outer;
@ -158,6 +173,10 @@ impl Gossiper {
}
// Reload settings
if update_permissions {
self.core.core.load().security.permissions.clear();
}
if update_config || update_lists {
let core = self.core.core.clone();
let inner = self.core.jmap_inner.clone();

View file

@ -57,6 +57,7 @@ impl Request {
epoch: EpochId::from_leb128_it(&mut it)?,
gen_config: it.next().copied()?,
gen_lists: it.next().copied()?,
gen_permissions: it.next().copied()?,
});
}
match flags & !(1 << 7) {
@ -108,6 +109,7 @@ impl Request {
peer.epoch.to_leb128_bytes(&mut bytes);
bytes.push(peer.gen_config);
bytes.push(peer.gen_lists);
bytes.push(peer.gen_permissions);
}
bytes

View file

@ -5,7 +5,7 @@
*/
use common::{DeliveryResult, IngestMessage};
use directory::QueryBy;
use directory::Permission;
use jmap_proto::types::{state::StateChange, type_state::DataType};
use mail_parser::MessageParser;
use store::ahash::AHashMap;
@ -83,68 +83,50 @@ impl JMAP {
// Deliver to each recipient
for (uid, (status, rcpt)) in &mut deliver_names {
// Check if there is an active sieve script
let result = match self.sieve_script_get_active(*uid).await {
Ok(Some(active_script)) => {
self.sieve_script_ingest(
&raw_message,
&message.sender_address,
rcpt,
*uid,
message.session_id,
active_script,
)
.await
}
Ok(None) => {
let account_quota = match self
.core
.storage
.directory
.query(QueryBy::Id(*uid), false)
.await
{
Ok(Some(p)) => p.quota() as i64,
Ok(None) => 0,
Err(err) => {
trc::error!(err
.details("Failed to obtain account quota.")
.ctx(trc::Key::To, rcpt.to_string())
.span_id(message.session_id)
.caused_by(trc::location!()));
*status = DeliveryResult::TemporaryFailure {
reason: "Transient server failure.".into(),
};
continue;
// Obtain access token
let result = match self
.core
.get_cached_access_token(*uid)
.await
.and_then(|token| {
token
.assert_has_permission(Permission::EmailReceive)
.map(|_| token)
}) {
Ok(access_token) => {
// Check if there is an active sieve script
match self.sieve_script_get_active(*uid).await {
Ok(Some(active_script)) => {
self.sieve_script_ingest(
&access_token,
&raw_message,
&message.sender_address,
rcpt,
message.session_id,
active_script,
)
.await
}
};
self.email_ingest(IngestEmail {
raw_message: &raw_message,
message: MessageParser::new().parse(&raw_message),
account_id: *uid,
account_quota,
mailbox_ids: vec![INBOX_ID],
keywords: vec![],
received_at: None,
source: IngestSource::Smtp,
encrypt: self.core.jmap.encrypt,
session_id: message.session_id,
})
.await
Ok(None) => {
// Ingest message
self.email_ingest(IngestEmail {
raw_message: &raw_message,
message: MessageParser::new().parse(&raw_message),
resource: access_token.as_resource_token(),
mailbox_ids: vec![INBOX_ID],
keywords: vec![],
received_at: None,
source: IngestSource::Smtp,
encrypt: self.core.jmap.encrypt,
session_id: message.session_id,
})
.await
}
Err(err) => Err(err),
}
}
Err(err) => {
trc::error!(err
.details("Failed to ingest message.")
.ctx(trc::Key::To, rcpt.to_string())
.span_id(message.session_id));
*status = DeliveryResult::TemporaryFailure {
reason: "Transient server failure.".into(),
};
continue;
}
Err(err) => Err(err),
};
match result {
@ -168,6 +150,17 @@ impl JMAP {
reason: "Mailbox over quota.".into(),
}
}
trc::EventType::Limit(trc::LimitEvent::TenantQuota) => {
*status = DeliveryResult::TemporaryFailure {
reason: "Organization over quota.".into(),
}
}
trc::EventType::Security(trc::SecurityEvent::Unauthorized) => {
*status = DeliveryResult::PermanentFailure {
code: [5, 5, 0],
reason: "This account is not authorized to receive email.".into(),
}
}
trc::EventType::MessageIngest(trc::MessageIngestEvent::Error) => {
*status = DeliveryResult::PermanentFailure {
code: err

View file

@ -6,7 +6,7 @@
use std::borrow::Cow;
use common::listener::stream::NullIo;
use common::{auth::AccessToken, listener::stream::NullIo};
use directory::{backend::internal::PrincipalField, QueryBy};
use jmap_proto::types::{collection::Collection, id::Id, keyword::Keyword, property::Property};
use mail_parser::MessageParser;
@ -37,10 +37,10 @@ impl JMAP {
#[allow(clippy::blocks_in_conditions)]
pub async fn sieve_script_ingest(
&self,
access_token: &AccessToken,
raw_message: &[u8],
envelope_from: &str,
envelope_to: &str,
account_id: u32,
session_id: u64,
mut active_script: ActiveScript,
) -> trc::Result<IngestedEmail> {
@ -56,6 +56,7 @@ impl JMAP {
};
// Obtain mailboxIds
let account_id = access_token.primary_id;
let mailbox_ids = self
.mailbox_get_or_create(account_id)
.await
@ -64,29 +65,21 @@ impl JMAP {
// Create Sieve instance
let mut instance = self.core.sieve.untrusted_runtime.filter_parsed(message);
// Set account name and obtain quota
let (account_quota, mail_from) = match self
// Set account name and email
let mail_from = self
.core
.storage
.directory
.query(QueryBy::Id(account_id), false)
.await
{
Ok(Some(mut p)) => {
.caused_by(trc::location!())?
.and_then(|mut p| {
instance.set_user_full_name(p.description().unwrap_or_else(|| p.name()));
(
p.quota() as i64,
p.take_str_array(PrincipalField::Emails)
.unwrap_or_default()
.into_iter()
.next(),
)
}
Ok(None) => (0, None),
Err(err) => {
return Err(err.caused_by(trc::location!()));
}
};
p.take_str_array(PrincipalField::Emails)
.unwrap_or_default()
.into_iter()
.next()
});
// Set account address
let mail_from = mail_from.unwrap_or_else(|| envelope_to.to_string());
@ -458,8 +451,7 @@ impl JMAP {
.email_ingest(IngestEmail {
raw_message: &sieve_message.raw_message,
message: message.into(),
account_id,
account_quota,
resource: access_token.as_resource_token(),
mailbox_ids: sieve_message.file_into,
keywords: sieve_message.flags,
received_at: None,

View file

@ -4,7 +4,7 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use common::auth::AccessToken;
use common::auth::{AccessToken, ResourceToken};
use jmap_proto::{
error::set::{SetError, SetErrorType},
method::set::{SetRequest, SetResponse},
@ -37,8 +37,7 @@ use store::{
use crate::{api::http::HttpSessionData, JMAP};
struct SetContext<'x> {
account_id: u32,
account_quota: i64,
resource_token: ResourceToken,
access_token: &'x AccessToken,
response: SetResponse,
}
@ -67,8 +66,7 @@ impl JMAP {
.await?
.unwrap_or_default();
let mut ctx = SetContext {
account_id,
account_quota: self.get_quota(access_token, account_id).await?,
resource_token: self.get_resource_token(access_token, account_id).await?,
access_token,
response: self
.prepare_set_response(&request, Collection::SieveScript)
@ -106,6 +104,14 @@ impl JMAP {
)
.custom(builder);
// Increment tenant quota
#[cfg(feature = "enterprise")]
if self.core.is_enterprise_edition() {
if let Some(tenant) = ctx.resource_token.tenant {
batch.add(DirectoryClass::UsedQuota(tenant.id), script_size as i64);
}
}
let document_id = self.write_batch_expect_id(batch).await?;
sieve_ids.insert(document_id);
changes.log_insert(Collection::SieveScript, document_id);
@ -192,11 +198,6 @@ impl JMAP {
// Store blob
let blob_id = builder.changes_mut().unwrap().blob_id_mut().unwrap();
blob_id.hash = self.put_blob(account_id, &blob, false).await?.hash;
/*blob_id.class = BlobClass::Linked {
account_id,
collection: Collection::SieveScript.into(),
document_id,
};*/
let script_size = blob_id.section.as_ref().unwrap().size as i64;
let prev_script_size =
prev_blob_id.section.as_ref().unwrap().size as i64;
@ -210,6 +211,17 @@ impl JMAP {
};
if update_quota != 0 {
batch.add(DirectoryClass::UsedQuota(account_id), update_quota);
// Update tenant quota
#[cfg(feature = "enterprise")]
if self.core.is_enterprise_edition() {
if let Some(tenant) = ctx.resource_token.tenant {
batch.add(
DirectoryClass::UsedQuota(tenant.id),
update_quota,
);
}
}
}
// Update blobId
@ -271,7 +283,7 @@ impl JMAP {
let document_id = id.document_id();
if sieve_ids.contains(document_id) {
if self
.sieve_script_delete(account_id, document_id, true)
.sieve_script_delete(&ctx.resource_token, document_id, true)
.await?
{
changes.log_delete(Collection::SieveScript, document_id);
@ -333,11 +345,12 @@ impl JMAP {
pub async fn sieve_script_delete(
&self,
account_id: u32,
resource_token: &ResourceToken,
document_id: u32,
fail_if_active: bool,
) -> trc::Result<bool> {
// Fetch record
let account_id = resource_token.account_id;
let obj = self
.get_property::<HashedValue<Object<Value>>>(
account_id,
@ -371,6 +384,7 @@ impl JMAP {
.caused_by(trc::location!())
.document_id(document_id)
})?;
let updated_quota = -(blob_id.section.as_ref().unwrap().size as i64);
batch
.with_account_id(account_id)
.with_collection(Collection::SieveScript)
@ -379,11 +393,17 @@ impl JMAP {
.clear(BlobOp::Link {
hash: blob_id.hash.clone(),
})
.add(
DirectoryClass::UsedQuota(account_id),
-(blob_id.section.as_ref().unwrap().size as i64),
)
.add(DirectoryClass::UsedQuota(account_id), updated_quota)
.custom(ObjectIndexBuilder::new(SCHEMA).with_current(obj));
// Update tenant quota
#[cfg(feature = "enterprise")]
if self.core.is_enterprise_edition() {
if let Some(tenant) = resource_token.tenant {
batch.add(DirectoryClass::UsedQuota(tenant.id), updated_quota);
}
}
self.write_batch(batch).await?;
Ok(true)
}
@ -437,7 +457,7 @@ impl JMAP {
{
if let Some(id) = self
.filter(
ctx.account_id,
ctx.resource_token.account_id,
Collection::SieveScript,
vec![Filter::eq(Property::Name, &value)],
)
@ -494,29 +514,35 @@ impl JMAP {
let blob_update = if let Some(blob_id) = blob_id {
if update.as_ref().map_or(true, |(document_id, _)| {
!matches!(blob_id.class, BlobClass::Linked { account_id, collection, document_id: d } if account_id == ctx.account_id && collection == u8::from(Collection::SieveScript) && *document_id == d)
!matches!(blob_id.class, BlobClass::Linked { account_id, collection, document_id: d } if account_id == ctx.resource_token.account_id && collection == u8::from(Collection::SieveScript) && *document_id == d)
}) {
// Check access
if let Some(mut bytes) = self.blob_download(&blob_id, ctx.access_token).await? {
// Check quota
match self
.has_available_quota(ctx.account_id, ctx.account_quota, bytes.len() as i64)
.await {
Ok(_) => (),
Err(err) => {
if err.matches(trc::EventType::Limit(trc::LimitEvent::Quota)) {
trc::error!(err.account_id(ctx.account_id).span_id(session_id));
return Ok(Err(SetError::over_quota()));
} else {
return Err(err);
}
},
.has_available_quota(&ctx.resource_token, bytes.len() as u64)
.await
{
Ok(_) => (),
Err(err) => {
if err.matches(trc::EventType::Limit(trc::LimitEvent::Quota))
|| err.matches(trc::EventType::Limit(trc::LimitEvent::TenantQuota))
{
trc::error!(err.account_id(ctx.resource_token.account_id).span_id(session_id));
return Ok(Err(SetError::over_quota()));
} else {
return Err(err);
}
}
}
// Compile script
match self.core.sieve.untrusted_compiler.compile(&bytes) {
Ok(script) => {
changes.set(Property::BlobId, BlobId::default().with_section_size(bytes.len()));
changes.set(
Property::BlobId,
BlobId::default().with_section_size(bytes.len()),
);
bytes.extend(bincode::serialize(&script).unwrap_or_default());
bytes.into()
}

View file

@ -6,6 +6,7 @@
use std::borrow::Cow;
use common::auth::AccessToken;
use jmap_proto::{
error::set::{SetError, SetErrorType},
method::set::{RequestArguments, SetRequest, SetResponse},
@ -36,12 +37,14 @@ impl JMAP {
pub async fn vacation_response_set(
&self,
mut request: SetRequest<RequestArguments>,
access_token: &AccessToken,
) -> trc::Result<SetResponse> {
let account_id = request.account_id.document_id();
let mut response = self
.prepare_set_response(&request, Collection::SieveScript)
.await?;
let will_destroy = request.unwrap_destroy();
let resource_token = self.get_resource_token(access_token, account_id).await?;
// Process set or update requests
let mut create_id = None;
@ -232,11 +235,6 @@ impl JMAP {
.hash;
let blob_id = obj.changes_mut().unwrap().blob_id_mut().unwrap();
blob_id.hash = hash;
/*blob_id.class = BlobClass::Linked {
account_id,
collection: Collection::SieveScript.into(),
document_id: u32::MAX,
};*/
// Link blob
batch.set(
@ -270,9 +268,25 @@ impl JMAP {
};
if quota != 0 {
batch.add(DirectoryClass::UsedQuota(account_id), quota);
// Update tenant quota
#[cfg(feature = "enterprise")]
if self.core.is_enterprise_edition() {
if let Some(tenant) = resource_token.tenant {
batch.add(DirectoryClass::UsedQuota(tenant.id), quota);
}
}
}
} else {
batch.add(DirectoryClass::UsedQuota(account_id), script_size);
// Update tenant quota
#[cfg(feature = "enterprise")]
if self.core.is_enterprise_edition() {
if let Some(tenant) = resource_token.tenant {
batch.add(DirectoryClass::UsedQuota(tenant.id), script_size);
}
}
}
};
@ -309,7 +323,7 @@ impl JMAP {
if id.is_singleton() {
if let Some(document_id) = self.get_vacation_sieve_script_id(account_id).await?
{
self.sieve_script_delete(account_id, document_id, false)
self.sieve_script_delete(&resource_token, document_id, false)
.await?;
batch.log(Changes::delete([document_id]));
response.destroyed.push(id);

View file

@ -33,11 +33,12 @@ impl<T: SessionStream> Session<T> {
.details("Expected script name as a parameter.")
})?;
let account_id = self.state.access_token().primary_id();
let access_token = self.state.access_token();
let account_id = access_token.primary_id();
let document_id = self.get_script_id(account_id, &name).await?;
if self
.jmap
.sieve_script_delete(account_id, document_id, true)
.sieve_script_delete(&access_token.as_resource_token(), document_id, true)
.await
.caused_by(trc::location!())?
{

View file

@ -52,14 +52,10 @@ impl<T: SessionStream> Session<T> {
let script_size = script_bytes.len() as i64;
// Check quota
let access_token = self.state.access_token();
let account_id = access_token.primary_id();
let resource_token = self.state.access_token().as_resource_token();
let account_id = resource_token.account_id;
self.jmap
.has_available_quota(
account_id,
access_token.quota as i64,
script_bytes.len() as i64,
)
.has_available_quota(&resource_token, script_bytes.len() as u64)
.await
.caused_by(trc::location!())?;
@ -169,6 +165,13 @@ impl<T: SessionStream> Session<T> {
};
if update_quota != 0 {
batch.add(DirectoryClass::UsedQuota(account_id), update_quota);
// Update tenant quota
if self.jmap.core.is_enterprise_edition() {
if let Some(tenant) = resource_token.tenant {
batch.add(DirectoryClass::UsedQuota(tenant.id), update_quota);
}
}
}
batch.custom(
@ -229,6 +232,14 @@ impl<T: SessionStream> Session<T> {
.with_property(Property::BlobId, Value::BlobId(blob_id)),
),
);
// Update tenant quota
if self.jmap.core.is_enterprise_edition() {
if let Some(tenant) = resource_token.tenant {
batch.add(DirectoryClass::UsedQuota(tenant.id), script_size);
}
}
let assigned_ids = self
.jmap
.write_batch(batch)

View file

@ -464,4 +464,12 @@ impl Message {
true
}
}
pub fn has_domain(&self, domains: &[String]) -> bool {
self.domains.iter().any(|d| domains.contains(&d.domain))
|| self
.return_path
.rsplit_once('@')
.map_or(false, |(_, domain)| domains.contains(&domain.to_string()))
}
}

View file

@ -500,3 +500,12 @@ impl LogReport for Feedback<'_> {
);
}
}
impl<T> IncomingReport<T> {
pub fn has_domain(&self, domain: &[String]) -> bool {
self.to
.iter()
.any(|to| domain.iter().any(|d| to.ends_with(d)))
|| domain.iter().any(|d| self.from.ends_with(d))
}
}

View file

@ -138,8 +138,7 @@ impl ClusterEvent {
ClusterEvent::PeerSuspectedIsAlive => "A suspected peer is actually alive",
ClusterEvent::PeerBackOnline => "A peer came back online",
ClusterEvent::PeerLeaving => "A peer is leaving the cluster",
ClusterEvent::PeerHasConfigChanges => "A peer has configuration changes",
ClusterEvent::PeerHasListChanges => "A peer has list changes",
ClusterEvent::PeerHasChanges => "A peer has reported changes",
ClusterEvent::OneOrMorePeersOffline => "One or more peers are offline",
ClusterEvent::EmptyPacket => "Received an empty gossip packet",
ClusterEvent::InvalidPacket => "Received an invalid gossip packet",
@ -157,8 +156,7 @@ impl ClusterEvent {
ClusterEvent::PeerSuspectedIsAlive => "A suspected peer is actually alive",
ClusterEvent::PeerBackOnline => "A peer came back online",
ClusterEvent::PeerLeaving => "A peer is leaving the cluster",
ClusterEvent::PeerHasConfigChanges => "A peer has configuration changes",
ClusterEvent::PeerHasListChanges => "A peer has list changes",
ClusterEvent::PeerHasChanges => "A peer has reported changes",
ClusterEvent::OneOrMorePeersOffline => "One or more peers are offline",
ClusterEvent::EmptyPacket => "Received an empty gossip packet",
ClusterEvent::InvalidPacket => "Received an invalid gossip packet",
@ -1691,6 +1689,7 @@ impl LimitEvent {
LimitEvent::Quota => "Quota limit reached",
LimitEvent::BlobQuota => "Blob quota limit reached",
LimitEvent::TooManyRequests => "Too many requests",
LimitEvent::TenantQuota => "Tenant quota limit reached",
}
}
@ -1705,6 +1704,7 @@ impl LimitEvent {
LimitEvent::Quota => "The quota limit has been reached",
LimitEvent::BlobQuota => "The blob quota limit has been reached",
LimitEvent::TooManyRequests => "Too many requests have been made",
LimitEvent::TenantQuota => "One of the tenant quota limits has been reached",
}
}
}

View file

@ -221,6 +221,7 @@ impl EventType {
LimitEvent::Quota => Level::Debug,
LimitEvent::BlobQuota => Level::Debug,
LimitEvent::TooManyRequests => Level::Warn,
LimitEvent::TenantQuota => Level::Info,
},
EventType::Manage(_) => Level::Debug,
EventType::Auth(cause) => match cause {
@ -361,9 +362,7 @@ impl EventType {
| ClusterEvent::PeerSuspectedIsAlive
| ClusterEvent::PeerBackOnline
| ClusterEvent::PeerLeaving => Level::Info,
ClusterEvent::PeerHasConfigChanges
| ClusterEvent::PeerHasListChanges
| ClusterEvent::OneOrMorePeersOffline => Level::Debug,
ClusterEvent::PeerHasChanges | ClusterEvent::OneOrMorePeersOffline => Level::Debug,
ClusterEvent::EmptyPacket
| ClusterEvent::Error
| ClusterEvent::DecryptionError

View file

@ -468,6 +468,7 @@ impl LimitEvent {
Self::Quota => "Quota exceeded",
Self::BlobQuota => "Blob quota exceeded",
Self::TooManyRequests => "Too many requests",
Self::TenantQuota => "Tenant quota exceeded",
}
}
}

View file

@ -214,8 +214,7 @@ pub enum ClusterEvent {
PeerSuspectedIsAlive,
PeerBackOnline,
PeerLeaving,
PeerHasConfigChanges,
PeerHasListChanges,
PeerHasChanges,
OneOrMorePeersOffline,
EmptyPacket,
InvalidPacket,
@ -906,6 +905,7 @@ pub enum LimitEvent {
ConcurrentConnection, // Used by listener
Quota,
BlobQuota,
TenantQuota,
TooManyRequests,
}

View file

@ -353,8 +353,7 @@ impl EventType {
EventType::Cluster(ClusterEvent::PeerAlive) => 44,
EventType::Cluster(ClusterEvent::PeerBackOnline) => 45,
EventType::Cluster(ClusterEvent::PeerDiscovered) => 46,
EventType::Cluster(ClusterEvent::PeerHasConfigChanges) => 47,
EventType::Cluster(ClusterEvent::PeerHasListChanges) => 48,
EventType::Cluster(ClusterEvent::PeerHasChanges) => 47,
EventType::Cluster(ClusterEvent::PeerLeaving) => 49,
EventType::Cluster(ClusterEvent::PeerOffline) => 50,
EventType::Cluster(ClusterEvent::PeerSuspected) => 51,
@ -859,6 +858,7 @@ impl EventType {
EventType::Security(SecurityEvent::LoiterBan) => 550,
EventType::Smtp(SmtpEvent::MailFromNotAllowed) => 551,
EventType::Security(SecurityEvent::Unauthorized) => 552,
EventType::Limit(LimitEvent::TenantQuota) => 553,
}
}
@ -911,8 +911,8 @@ impl EventType {
44 => Some(EventType::Cluster(ClusterEvent::PeerAlive)),
45 => Some(EventType::Cluster(ClusterEvent::PeerBackOnline)),
46 => Some(EventType::Cluster(ClusterEvent::PeerDiscovered)),
47 => Some(EventType::Cluster(ClusterEvent::PeerHasConfigChanges)),
48 => Some(EventType::Cluster(ClusterEvent::PeerHasListChanges)),
47 => Some(EventType::Cluster(ClusterEvent::PeerHasChanges)),
48 => Some(EventType::Cluster(ClusterEvent::PeerHasChanges)), // TODO: recycle
49 => Some(EventType::Cluster(ClusterEvent::PeerLeaving)),
50 => Some(EventType::Cluster(ClusterEvent::PeerOffline)),
51 => Some(EventType::Cluster(ClusterEvent::PeerSuspected)),
@ -1457,6 +1457,7 @@ impl EventType {
550 => Some(EventType::Security(SecurityEvent::LoiterBan)),
551 => Some(EventType::Smtp(SmtpEvent::MailFromNotAllowed)),
552 => Some(EventType::Security(SecurityEvent::Unauthorized)),
553 => Some(EventType::Limit(LimitEvent::TenantQuota)),
_ => None,
}
}